httpz- Hyper-fast HTTP Scraping Tool |
git clone git://git.acid.vegas/httpz.git |
Log | Files | Refs | Archive | README | LICENSE |
unit_test.py (9595B)
1 #!/usr/bin/env python3 2 # HTTPZ Web Scanner - Unit Tests 3 # unit_test.py 4 5 import asyncio 6 import logging 7 import sys 8 import time 9 10 try: 11 from httpz_scanner import HTTPZScanner 12 from httpz_scanner.colors import Colors 13 except ImportError: 14 raise ImportError('missing httpz_scanner library (pip install httpz_scanner)') 15 16 17 class ColoredFormatter(logging.Formatter): 18 '''Custom formatter for colored log output''' 19 20 def format(self, record): 21 if record.levelno == logging.INFO: 22 color = Colors.GREEN 23 elif record.levelno == logging.WARNING: 24 color = Colors.YELLOW 25 elif record.levelno == logging.ERROR: 26 color = Colors.RED 27 else: 28 color = Colors.RESET 29 30 record.msg = f'{color}{record.msg}{Colors.RESET}' 31 return super().format(record) 32 33 34 # Configure logging with colors 35 logger = logging.getLogger() 36 handler = logging.StreamHandler() 37 handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s')) 38 logger.setLevel(logging.INFO) 39 logger.addHandler(handler) 40 41 42 async def get_domains_from_url() -> list: 43 ''' 44 Fetch domains from SecLists URL 45 46 :return: List of domains 47 ''' 48 49 try: 50 import aiohttp 51 except ImportError: 52 raise ImportError('missing aiohttp library (pip install aiohttp)') 53 54 url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt' 55 56 async with aiohttp.ClientSession() as session: 57 async with session.get(url) as response: 58 content = await response.text() 59 return [line.strip() for line in content.splitlines() if line.strip()] 60 61 62 async def domain_generator(domains: list): 63 ''' 64 Async generator that yields domains 65 66 :param domains: List of domains to yield 67 ''' 68 69 for domain in domains: 70 await asyncio.sleep(0) # Allow other coroutines to run 71 yield domain 72 73 74 async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple: 75 '''Run a single benchmark test''' 76 77 logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}') 78 scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) 79 80 count = 0 81 got_first = False 82 start_time = None 83 84 if test_type == 'List': 85 async for result in scanner.scan(domains): 86 if result: 87 if not got_first: 88 got_first = True 89 start_time = time.time() 90 count += 1 91 92 # More detailed status reporting 93 status_str = '' 94 if result['status'] < 0: 95 error_type = result.get('error_type', 'UNKNOWN') 96 error_msg = result.get('error', 'Unknown Error') 97 status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}" 98 elif 200 <= result['status'] < 300: 99 status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}" 100 elif 300 <= result['status'] < 400: 101 status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}" 102 else: 103 status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}" 104 105 # Show protocol and response headers if available 106 protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else '' 107 headers_info = '' 108 if result.get('response_headers'): 109 important_headers = ['server', 'location', 'content-type'] 110 headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers] 111 if headers: 112 headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}" 113 114 # Show redirect chain if present 115 redirect_info = '' 116 if result.get('redirect_chain'): 117 redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}" 118 119 # Show error details if present 120 error_info = '' 121 if result.get('error'): 122 error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}" 123 124 # Show final URL if different from original 125 url_info = '' 126 if result.get('url') and result['url'] != f"http(s)://{result['domain']}": 127 url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}" 128 129 logging.info( 130 f"{test_type}-{concurrency} Result {count}: " 131 f"{status_str}{protocol_info} " 132 f"{Colors.CYAN}{result['domain']}{Colors.RESET}" 133 f"{redirect_info}" 134 f"{url_info}" 135 f"{headers_info}" 136 f"{error_info}" 137 ) 138 else: 139 # Skip generator test 140 pass 141 142 elapsed = time.time() - start_time if start_time else 0 143 domains_per_sec = count/elapsed if elapsed > 0 else 0 144 logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}') 145 146 return elapsed, domains_per_sec 147 148 149 async def test_list_input(domains: list): 150 '''Test scanning using a list input''' 151 152 logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}') 153 scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) 154 155 start_time = time.time() 156 count = 0 157 async for result in scanner.scan(domains): 158 if result: 159 count += 1 160 status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED 161 title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else '' 162 error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else '' 163 logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}') 164 165 166 async def test_generator_input(domains: list): 167 '''Test scanning using an async generator input''' 168 169 logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}') 170 scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True) 171 172 start_time = time.time() 173 count = 0 174 async for result in scanner.scan(domain_generator(domains)): 175 if result: 176 count += 1 177 status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED 178 title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else '' 179 error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else '' 180 logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}') 181 182 183 async def main() -> None: 184 '''Main test function''' 185 186 try: 187 # Fetch domains 188 domains = await get_domains_from_url() 189 logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing') 190 191 # Store benchmark results 192 results = [] 193 194 # Run tests with different concurrency levels 195 for concurrency in [25, 50, 100]: 196 # Generator tests 197 gen_result = await run_benchmark('Generator', domains, concurrency) 198 results.append(('Generator', concurrency, *gen_result)) 199 200 # List tests 201 list_result = await run_benchmark('List', domains, concurrency) 202 results.append(('List', concurrency, *list_result)) 203 204 # Print benchmark comparison 205 logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}') 206 logging.info('-' * 80) 207 logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}') 208 logging.info('-' * 80) 209 210 # Sort by domains per second (fastest first) 211 results.sort(key=lambda x: x[3], reverse=True) 212 213 for test_type, concurrency, elapsed, domains_per_sec in results: 214 logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}') 215 216 # Highlight fastest result 217 fastest = results[0] 218 logging.info('-' * 80) 219 logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections') 220 logging.info(f'Time: {fastest[2]:.2f} seconds') 221 logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}') 222 223 logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}') 224 225 except Exception as e: 226 logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}') 227 sys.exit(1) 228 229 230 if __name__ == '__main__': 231 try: 232 asyncio.run(main()) 233 except KeyboardInterrupt: 234 logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}') 235 sys.exit(1)