httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

unit_test.py (9595B)

      1 #!/usr/bin/env python3
      2 # HTTPZ Web Scanner - Unit Tests
      3 # unit_test.py
      4 
      5 import asyncio
      6 import logging
      7 import sys
      8 import time
      9 
     10 try:
     11     from httpz_scanner        import HTTPZScanner
     12     from httpz_scanner.colors import Colors
     13 except ImportError:
     14     raise ImportError('missing httpz_scanner library (pip install httpz_scanner)')
     15 
     16 
     17 class ColoredFormatter(logging.Formatter):
     18     '''Custom formatter for colored log output'''
     19     
     20     def format(self, record):
     21         if record.levelno == logging.INFO:
     22             color = Colors.GREEN
     23         elif record.levelno == logging.WARNING:
     24             color = Colors.YELLOW
     25         elif record.levelno == logging.ERROR:
     26             color = Colors.RED
     27         else:
     28             color = Colors.RESET
     29             
     30         record.msg = f'{color}{record.msg}{Colors.RESET}'
     31         return super().format(record)
     32 
     33 
     34 # Configure logging with colors
     35 logger = logging.getLogger()
     36 handler = logging.StreamHandler()
     37 handler.setFormatter(ColoredFormatter('%(asctime)s - %(levelname)s - %(message)s'))
     38 logger.setLevel(logging.INFO)
     39 logger.addHandler(handler)
     40 
     41 
     42 async def get_domains_from_url() -> list:
     43     '''
     44     Fetch domains from SecLists URL
     45     
     46     :return: List of domains
     47     '''
     48     
     49     try:
     50         import aiohttp
     51     except ImportError:
     52         raise ImportError('missing aiohttp library (pip install aiohttp)')
     53 
     54     url = 'https://raw.githubusercontent.com/danielmiessler/SecLists/refs/heads/master/Fuzzing/email-top-100-domains.txt'
     55     
     56     async with aiohttp.ClientSession() as session:
     57         async with session.get(url) as response:
     58             content = await response.text()
     59             return [line.strip() for line in content.splitlines() if line.strip()]
     60 
     61 
     62 async def domain_generator(domains: list):
     63     '''
     64     Async generator that yields domains
     65     
     66     :param domains: List of domains to yield
     67     '''
     68     
     69     for domain in domains:
     70         await asyncio.sleep(0) # Allow other coroutines to run
     71         yield domain
     72 
     73 
     74 async def run_benchmark(test_type: str, domains: list, concurrency: int) -> tuple:
     75     '''Run a single benchmark test'''
     76     
     77     logging.info(f'{Colors.BOLD}Testing {test_type} input with {concurrency} concurrent connections...{Colors.RESET}')
     78     scanner = HTTPZScanner(concurrent_limit=concurrency, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
     79     
     80     count = 0
     81     got_first = False
     82     start_time = None
     83     
     84     if test_type == 'List':
     85         async for result in scanner.scan(domains):
     86             if result:
     87                 if not got_first:
     88                     got_first = True
     89                     start_time = time.time()
     90                 count += 1
     91                 
     92                 # More detailed status reporting
     93                 status_str = ''
     94                 if result['status'] < 0:
     95                     error_type = result.get('error_type', 'UNKNOWN')
     96                     error_msg = result.get('error', 'Unknown Error')
     97                     status_str = f"{Colors.RED}[{result['status']} - {error_type}: {error_msg}]{Colors.RESET}"
     98                 elif 200 <= result['status'] < 300:
     99                     status_str = f"{Colors.GREEN}[{result['status']}]{Colors.RESET}"
    100                 elif 300 <= result['status'] < 400:
    101                     status_str = f"{Colors.YELLOW}[{result['status']}]{Colors.RESET}"
    102                 else:
    103                     status_str = f"{Colors.RED}[{result['status']}]{Colors.RESET}"
    104                 
    105                 # Show protocol and response headers if available
    106                 protocol_info = f" {Colors.CYAN}({result.get('protocol', 'unknown')}){Colors.RESET}" if result.get('protocol') else ''
    107                 headers_info = ''
    108                 if result.get('response_headers'):
    109                     important_headers = ['server', 'location', 'content-type']
    110                     headers = [f"{k}: {v}" for k, v in result['response_headers'].items() if k.lower() in important_headers]
    111                     if headers:
    112                         headers_info = f" {Colors.GRAY}[{', '.join(headers)}]{Colors.RESET}"
    113                 
    114                 # Show redirect chain if present
    115                 redirect_info = ''
    116                 if result.get('redirect_chain'):
    117                     redirect_info = f" -> {Colors.YELLOW}Redirects: {' -> '.join(result['redirect_chain'])}{Colors.RESET}"
    118                 
    119                 # Show error details if present
    120                 error_info = ''
    121                 if result.get('error'):
    122                     error_info = f" {Colors.RED}Error: {result['error']}{Colors.RESET}"
    123                 
    124                 # Show final URL if different from original
    125                 url_info = ''
    126                 if result.get('url') and result['url'] != f"http(s)://{result['domain']}":
    127                     url_info = f" {Colors.CYAN}Final URL: {result['url']}{Colors.RESET}"
    128                 
    129                 logging.info(
    130                     f"{test_type}-{concurrency} Result {count}: "
    131                     f"{status_str}{protocol_info} "
    132                     f"{Colors.CYAN}{result['domain']}{Colors.RESET}"
    133                     f"{redirect_info}"
    134                     f"{url_info}"
    135                     f"{headers_info}"
    136                     f"{error_info}"
    137                 )
    138     else:
    139         # Skip generator test
    140         pass
    141 
    142     elapsed = time.time() - start_time if start_time else 0
    143     domains_per_sec = count/elapsed if elapsed > 0 else 0
    144     logging.info(f'{Colors.YELLOW}{test_type} test with {concurrency} concurrent connections completed in {elapsed:.2f} seconds ({domains_per_sec:.2f} domains/sec){Colors.RESET}')
    145     
    146     return elapsed, domains_per_sec
    147 
    148 
    149 async def test_list_input(domains: list):
    150     '''Test scanning using a list input'''
    151     
    152     logging.info(f'{Colors.BOLD}Testing list input...{Colors.RESET}')
    153     scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
    154     
    155     start_time = time.time()
    156     count = 0
    157     async for result in scanner.scan(domains):
    158         if result:
    159             count += 1
    160             status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
    161             title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
    162             error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
    163             logging.info(f'List-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
    164 
    165 
    166 async def test_generator_input(domains: list):
    167     '''Test scanning using an async generator input'''
    168     
    169     logging.info(f'{Colors.BOLD}Testing generator input...{Colors.RESET}')
    170     scanner = HTTPZScanner(concurrent_limit=25, timeout=3, show_progress=True, debug_mode=True, follow_redirects=True)
    171     
    172     start_time = time.time()
    173     count = 0
    174     async for result in scanner.scan(domain_generator(domains)):
    175         if result:
    176             count += 1
    177             status_color = Colors.GREEN if 200 <= result['status'] < 300 else Colors.RED
    178             title = f" - {Colors.CYAN}{result.get('title', 'No Title')}{Colors.RESET}" if result.get('title') else ''
    179             error = f" - {Colors.RED}{result.get('error', '')}{Colors.RESET}" if result.get('error') else ''
    180             logging.info(f'Generator-25 Result {count}: {status_color}[{result["status"]}]{Colors.RESET} {Colors.CYAN}{result["domain"]}{Colors.RESET}{title}{error}')
    181 
    182 
    183 async def main() -> None:
    184     '''Main test function'''
    185     
    186     try:
    187         # Fetch domains
    188         domains = await get_domains_from_url()
    189         logging.info(f'Loaded {Colors.YELLOW}{len(domains)}{Colors.RESET} domains for testing')
    190         
    191         # Store benchmark results
    192         results = []
    193         
    194         # Run tests with different concurrency levels
    195         for concurrency in [25, 50, 100]:
    196             # Generator tests
    197             gen_result = await run_benchmark('Generator', domains, concurrency)
    198             results.append(('Generator', concurrency, *gen_result))
    199             
    200             # List tests
    201             list_result = await run_benchmark('List', domains, concurrency)
    202             results.append(('List', concurrency, *list_result))
    203         
    204         # Print benchmark comparison
    205         logging.info(f'\n{Colors.BOLD}Benchmark Results:{Colors.RESET}')
    206         logging.info('-' * 80)
    207         logging.info(f'{"Test Type":<15} {"Concurrency":<15} {"Time (s)":<15} {"Domains/sec":<15}')
    208         logging.info('-' * 80)
    209         
    210         # Sort by domains per second (fastest first)
    211         results.sort(key=lambda x: x[3], reverse=True)
    212         
    213         for test_type, concurrency, elapsed, domains_per_sec in results:
    214             logging.info(f'{test_type:<15} {concurrency:<15} {elapsed:.<15.2f} {domains_per_sec:<15.2f}')
    215         
    216         # Highlight fastest result
    217         fastest = results[0]
    218         logging.info('-' * 80)
    219         logging.info(f'{Colors.GREEN}Fastest: {fastest[0]} test with {fastest[1]} concurrent connections')
    220         logging.info(f'Time: {fastest[2]:.2f} seconds')
    221         logging.info(f'Speed: {fastest[3]:.2f} domains/sec{Colors.RESET}')
    222         
    223         logging.info(f'\n{Colors.GREEN}All tests completed successfully!{Colors.RESET}')
    224         
    225     except Exception as e:
    226         logging.error(f'Test failed: {Colors.RED}{str(e)}{Colors.RESET}')
    227         sys.exit(1)
    228 
    229 
    230 if __name__ == '__main__':
    231     try:
    232         asyncio.run(main())
    233     except KeyboardInterrupt:
    234         logging.warning(f'{Colors.YELLOW}Tests interrupted by user{Colors.RESET}')
    235         sys.exit(1)