httpz- Hyper-fast HTTP Scraping Tool |
git clone git://git.acid.vegas/httpz.git |
Log | Files | Refs | Archive | README | LICENSE |
scanner.py (10022B)
1 #!/usr/bin/env python3 2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz) 3 # httpz/scanner.py 4 5 import asyncio 6 import json 7 import random 8 import sys 9 10 try: 11 import aiohttp 12 except ImportError: 13 raise ImportError('missing aiohttp module (pip install aiohttp)') 14 15 try: 16 import bs4 17 except ImportError: 18 raise ImportError('missing bs4 module (pip install beautifulsoup4)') 19 20 from .dns import resolve_all_dns, load_resolvers 21 from .formatters import format_console_output 22 from .colors import Colors 23 from .parsers import parse_domain_url, get_cert_info, get_favicon_hash 24 from .utils import debug, info, USER_AGENTS, input_generator 25 26 27 class HTTPZScanner: 28 '''Core scanner class for HTTP domain checking''' 29 30 def __init__(self, concurrent_limit = 100, timeout = 5, follow_redirects = False, check_axfr = False, resolver_file = None, output_file = None, show_progress = False, debug_mode = False, jsonl_output = False, show_fields = None, match_codes = None, exclude_codes = None): 31 ''' 32 Initialize the HTTPZScanner class 33 34 :param concurrent_limit: Maximum number of concurrent requests 35 :param timeout: Request timeout in seconds 36 :param follow_redirects: Follow redirects 37 :param check_axfr: Check for AXFR 38 :param resolver_file: Path to resolver file 39 :param output_file: Path to output file 40 :param show_progress: Show progress bar 41 :param debug_mode: Enable debug mode 42 :param jsonl_output: Output in JSONL format 43 :param show_fields: Fields to show 44 :param match_codes: Status codes to match 45 :param exclude_codes: Status codes to exclude 46 ''' 47 48 self.concurrent_limit = concurrent_limit 49 self.timeout = timeout 50 self.follow_redirects = follow_redirects 51 self.check_axfr = check_axfr 52 self.resolver_file = resolver_file 53 self.output_file = output_file 54 self.show_progress = show_progress 55 self.debug_mode = debug_mode 56 self.jsonl_output = jsonl_output 57 58 self.show_fields = show_fields or { 59 'status_code' : True, 60 'content_type' : True, 61 'content_length' : True, 62 'title' : True, 63 'body' : True, 64 'ip' : True, 65 'favicon' : True, 66 'headers' : True, 67 'follow_redirects' : True, 68 'cname' : True, 69 'tls' : True 70 } 71 72 self.match_codes = match_codes 73 self.exclude_codes = exclude_codes 74 self.resolvers = None 75 self.processed_domains = 0 76 77 78 async def init(self): 79 '''Initialize resolvers - must be called before scanning''' 80 self.resolvers = await load_resolvers(self.resolver_file) 81 82 83 async def check_domain(self, session: aiohttp.ClientSession, domain: str): 84 '''Check a single domain and return results''' 85 nameserver = random.choice(self.resolvers) if self.resolvers else None 86 base_domain, port, protocols = parse_domain_url(domain) 87 88 result = { 89 'domain' : base_domain, 90 'status' : 0, 91 'url' : protocols[0], 92 'port' : port, 93 } 94 95 # Try each protocol 96 for url in protocols: 97 try: 98 # Set random user agent for each request 99 headers = {'User-Agent': random.choice(USER_AGENTS)} 100 101 async with session.get(url, timeout=self.timeout, 102 allow_redirects=self.follow_redirects, 103 max_redirects=10 if self.follow_redirects else 0, 104 headers=headers) as response: 105 106 result['status'] = response.status 107 108 # Early exit if status code doesn't match criteria 109 if self.match_codes and result['status'] not in self.match_codes: 110 return result 111 if self.exclude_codes and result['status'] in self.exclude_codes: 112 return result 113 114 # Continue with full processing only if status code matches criteria 115 result['url'] = str(response.url) 116 117 # Add headers if requested 118 headers = dict(response.headers) 119 if headers and (self.show_fields.get('headers') or self.show_fields.get('all_flags')): 120 result['headers'] = headers 121 else: 122 # Only add content type/length if headers aren't included 123 if content_type := response.headers.get('content-type', '').split(';')[0]: 124 result['content_type'] = content_type 125 if content_length := response.headers.get('content-length'): 126 result['content_length'] = content_length 127 128 # Only add redirect chain if it exists 129 if self.follow_redirects and response.history: 130 result['redirect_chain'] = [str(h.url) for h in response.history] + [str(response.url)] 131 132 # Do DNS lookups only if we're going to use the result 133 ips, cname, nameservers, _ = await resolve_all_dns( 134 base_domain, self.timeout, nameserver, self.check_axfr 135 ) 136 137 # Only add DNS fields if they have values 138 if ips: 139 result['ips'] = ips 140 if cname: 141 result['cname'] = cname 142 if nameservers: 143 result['nameservers'] = nameservers 144 145 # Only add TLS info if available 146 if response.url.scheme == 'https': 147 try: 148 if ssl_object := response._protocol.transport.get_extra_info('ssl_object'): 149 if tls_info := await get_cert_info(ssl_object, str(response.url)): 150 # Only add TLS fields that have values 151 result['tls'] = {k: v for k, v in tls_info.items() if v} 152 except AttributeError: 153 debug(f'Failed to get SSL info for {url}') 154 155 html = (await response.text())[:1024*1024] 156 soup = bs4.BeautifulSoup(html, 'html.parser') 157 158 # Only add title if it exists 159 if soup.title and soup.title.string: 160 result['title'] = ' '.join(soup.title.string.strip().split()).rstrip('.')[:300] 161 162 # Only add body if it exists 163 if body_text := soup.get_text(): 164 result['body'] = ' '.join(body_text.split()).rstrip('.')[:500] 165 166 # Only add favicon hash if it exists 167 if favicon_hash := await get_favicon_hash(session, url, html): 168 result['favicon_hash'] = favicon_hash 169 170 break 171 except Exception as e: 172 debug(f'Error checking {url}: {str(e)}') 173 result['status'] = -1 174 continue 175 176 return result 177 178 179 async def process_result(self, result): 180 ''' 181 Process and output a single result 182 183 :param result: result to process 184 ''' 185 186 formatted = format_console_output(result, self.debug_mode, self.show_fields, self.match_codes, self.exclude_codes) 187 188 if formatted: 189 # Write to file if specified 190 if self.output_file: 191 if (not self.match_codes or result['status'] in self.match_codes) and \ 192 (not self.exclude_codes or result['status'] not in self.exclude_codes): 193 async with aiohttp.ClientSession() as session: 194 with open(self.output_file, 'a') as f: 195 json.dump(result, f, ensure_ascii=False) 196 f.write('\n') 197 198 # Console output 199 if self.jsonl_output: 200 print(json.dumps(result)) 201 else: 202 self.processed_domains += 1 203 if self.show_progress: 204 info(f"{Colors.GRAY}[{self.processed_domains:,}]{Colors.RESET} {formatted}") 205 else: 206 info(formatted) 207 208 209 async def scan(self, input_source): 210 ''' 211 Scan domains from a file or stdin 212 213 :param input_source: Path to file or '-' for stdin 214 ''' 215 if not self.resolvers: 216 await self.init() 217 218 async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session: 219 tasks = set() 220 221 # Process domains with concurrent limit 222 for domain in input_generator(input_source): 223 if len(tasks) >= self.concurrent_limit: 224 done, tasks = await asyncio.wait( 225 tasks, return_when=asyncio.FIRST_COMPLETED 226 ) 227 for task in done: 228 result = await task 229 await self.process_result(result) 230 231 task = asyncio.create_task(self.check_domain(session, domain)) 232 tasks.add(task) 233 234 # Process remaining tasks 235 if tasks: 236 done, _ = await asyncio.wait(tasks) 237 for task in done: 238 result = await task 239 await self.process_result(result)