httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

httpz.py (10586B)

      1 #!/usr/bin/env python
      2 # HTTPZ Crawler - Developed by acidvegas in Python (https://git.acid.vegas/httpz)
      3 
      4 '''
      5 BCUZ FUCK HTTPX PYTHON STILL GO HARD
      6 '''
      7 
      8 import argparse
      9 import asyncio
     10 import json
     11 import random
     12 import re
     13 import logging
     14 import ssl
     15 import urllib.request
     16 
     17 try:
     18     import aiodns
     19 except ImportError:
     20     print('Missing required module \'aiodns\'. (pip install aiodns)')
     21     exit(1)
     22 
     23 try:
     24     import aiohttp
     25 except ImportError:
     26     print('Missing required module \'aiohttp\'. (pip install aiohttp)')
     27     exit(1)
     28 
     29 # ANSI escape codes for colors
     30 BLUE = '\033[34m'
     31 CYAN = '\033[36m'
     32 RED = '\033[91m'
     33 GREEN = '\033[92m'
     34 DARK_GREY = '\033[90m'
     35 YELLOW = '\033[93m'
     36 RESET = '\033[0m'
     37 
     38 # Globals
     39 DNS_SERVERS = None
     40 args = None  # Global args variable
     41 
     42 def vlog(msg: str):
     43     '''
     44     Verbose logging only if enabled
     45 
     46     :param msg: Message to print to console
     47     '''
     48     if args.verbose:
     49         logging.info(msg)
     50 
     51 
     52 def create_session(user_agent: str, timeout: int, proxy: str = None) -> dict:
     53     '''
     54     Create a custom aiohttp session
     55 
     56     :param user_agent: User agent to use for HTTP requests
     57     :param timeout: Timeout for HTTP requests
     58     '''
     59     ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     60     ssl_context.check_hostname = False
     61     ssl_context.verify_mode = ssl.CERT_NONE
     62 
     63     headers = {'User-Agent': user_agent}
     64     connector = aiohttp.TCPConnector(ssl=ssl_context)
     65 
     66     session_params = {
     67         'connector': connector,
     68         'headers': headers,
     69         'timeout': aiohttp.ClientTimeout(total=timeout)
     70     }
     71 
     72     return session_params
     73 
     74 
     75 def get_dns_servers() -> dict:
     76     '''Get a list of DNS servers to use for lookups.'''
     77     with urllib.request.urlopen('https://public-dns.info/nameservers.txt') as source:
     78         results = source.read().decode().split('\n')
     79 
     80     v4_servers = [server for server in results if ':' not in server]
     81     v6_servers = [server for server in results if ':'     in server]
     82 
     83     return {'4': v4_servers, '6': v6_servers}
     84 
     85 
     86 async def dns_lookup(domain: str, record_type: str, timeout: int, retry: int) -> list:
     87     '''
     88     Resolve DNS information from a domain
     89 
     90     :param domain: Domain name to resolve
     91     :param record_type: DNS record type to resolve
     92     :param timeout: Timeout for DNS request
     93     :param retry: Number of times to retry failed requests
     94     '''
     95     for i in range(retry):
     96         try:
     97             version = '4' if record_type == 'A' else '6' if record_type == 'AAAA' else random.choice(['4','6'])
     98             nameserver = random.choice(DNS_SERVERS[version])
     99             resolver = aiodns.DNSResolver(nameservers=[nameserver], timeout=timeout)
    100             records = await resolver.query(domain, record_type)
    101             return records.cname if record_type == 'CNAME' else [record.host for record in records]
    102         except Exception as e:
    103             vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to resolve {record_type} record using {nameserver} {DARK_GREY}({str(e)}){RESET}')
    104     return []
    105 
    106 
    107 async def get_body(source: str, preview: int) -> str:
    108     '''
    109     Get the body of a webpage
    110 
    111     :param source: HTML source of the webpage
    112     :param preview: Number of bytes to preview
    113     '''
    114     body_content = re.search(r'<body.*?>(.*?)</body>', source[:5000], re.DOTALL | re.IGNORECASE)
    115     processed_content = body_content.group(1) if body_content else source
    116     clean_content = re.sub(r'<[^>]+>', '', processed_content)
    117     return clean_content[:preview]
    118 
    119 
    120 async def get_title(session: aiohttp.ClientSession, domain: str):
    121     '''
    122     Get the title of a webpage and its status code
    123 
    124     :param session: aiohttp session
    125     :param domain: URL to get the title of
    126     '''
    127     title = None
    128     body = None
    129     status_code = None
    130 
    131     try:
    132         async with session.get(domain, timeout=args.timeout, allow_redirects=False) as response:
    133             status_code = response.status
    134             if status_code in (200, 201):
    135                 html_content = await response.text()
    136                 match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE | re.DOTALL)
    137                 title = match.group(1).strip() if match else None
    138                 title = bytes(title, 'utf-8').decode('unicode_escape') if title else None
    139                 title = re.sub(r'[\r\n]+', ' ', title)[:300] if title else None # Fix this ugly shit
    140                 body = await get_body(html_content, args.preview)
    141                 body = re.sub(r'\s+', ' ', body).strip() if body else None
    142             elif status_code in (301, 302, 303, 307, 308) and args.retry > 0: # Need to implement a max redirect limit
    143                 redirect_url = response.headers.get('Location')
    144                 if redirect_url:
    145                     vlog(f'{YELLOW}[WARN]{RESET} {domain} -> {redirect_url} {DARK_GREY}({status_code}){RESET}')
    146                     return await get_title(session, redirect_url)
    147                 else:
    148                     vlog(f'{RED}[ERROR]{RESET} No redirect URL found for {domain} {DARK_GREY}({status_code}){RESET}')
    149             else:
    150                 vlog(f'{RED}[ERROR]{RESET} {domain} - Invalid status code {DARK_GREY}{status_code}{RESET}')
    151     except asyncio.TimeoutError:
    152         vlog(f'{RED}[ERROR]{RESET} {domain} - HTTP request timed out')
    153     except Exception as e:
    154         vlog(f'{RED}[ERROR]{RESET} Failed to get title for {domain} {DARK_GREY}({e}){RESET}')
    155     return title, body, status_code # Fix this ugly shit
    156 
    157 
    158 async def check_url(session: aiohttp.ClientSession, domain: str):
    159     '''
    160     Process a domain name
    161 
    162     :param session: aiohttp session
    163     :param domain: URL to get the title of
    164     '''
    165     dns_records = {}
    166 
    167     for record_type in ('A', 'AAAA'):
    168         records = await dns_lookup(domain, record_type, args.timeout, args.retry)
    169         if records:
    170             dns_records[record_type] = records
    171     if not dns_records:
    172         cname_record = await dns_lookup(domain, 'CNAME', args.timeout, args.retry)
    173         if cname_record:
    174             dns_records['CNAME'] = cname_record
    175             domain = cname_record
    176         else:
    177             vlog(f'{RED}[ERROR]{RESET} No DNS records found for {domain}')
    178             return domain, None, None, None, None, None
    179 
    180     title, body, status_code = await get_title(session, f'https://{domain}')
    181     if not title and not body:
    182         title, body, status_code = await get_title(session, f'http://{domain}')
    183 
    184     if title or body:
    185         if status_code in (200, 201):
    186             status_code = f'[{GREEN}200{RESET}]'
    187         elif status_code in (301, 302, 303, 307, 308):
    188             status_code = f'[{YELLOW}{status_code}{RESET}]'
    189         logging.info(f'{domain} {status_code} [{CYAN}{title}{RESET}] - [{BLUE}{body}{RESET}]')
    190         return domain, 'https', title, body, dns_records, status_code
    191     else:
    192         vlog(f'{RED}[ERROR]{RESET} {domain} - Failed to retrieve title')
    193 
    194     return domain, None, None, None, None, status_code
    195 
    196 
    197 async def process_file():
    198     '''
    199     Process a list of domains from file
    200     '''
    201     
    202     session_params = create_session(args.user_agent, args.timeout, args.proxy)
    203 
    204     async with aiohttp.ClientSession(**session_params) as session:
    205         tasks = set()
    206         with open(args.file, 'r') as file:
    207             for line in file:
    208                 domain = line.strip()
    209                 if domain:
    210                     tasks.add(asyncio.create_task(check_url(session, domain)))
    211 
    212                     if len(tasks) >= args.concurrency: # Should be a better way to do this
    213                         done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    214 
    215                         for task in done:
    216                             domain, protocol, title, body, dns_records, status_code = task.result()
    217                             if title or body or dns_records:
    218                                 write_result_to_file(domain, protocol, title, body, dns_records, status_code)
    219 
    220 
    221         if tasks:
    222             done, _ = await asyncio.wait(tasks)
    223             for task in done:
    224                 domain, protocol, title, body, dns_records, status_code = task.result()
    225                 if title:
    226                     write_result_to_file(domain, protocol, title, body, dns_records, status_code)
    227 
    228 
    229 
    230 
    231 def write_result_to_file(domain, protocol, title, body, dns_records, status_code):
    232     '''
    233     Write a single domain result to file
    234 
    235     :param domain: Domain name
    236     :param protocol: Protocol used (http or https)
    237     :param title: Title of the domain
    238     :param dns_records: DNS records of the domain
    239     :param status_code: HTTP status code
    240     '''
    241     result = {
    242         'domain': domain,
    243         'protocol': protocol,
    244         'status_code': status_code,
    245         'title': title,
    246         'body': body,
    247         'dns_records': dns_records
    248     }
    249     with open(args.output, 'a') as f:
    250         json.dump(result, f)
    251         f.write('\n')
    252 
    253 
    254 def main():
    255     global DNS_SERVERS, args
    256 
    257     parser = argparse.ArgumentParser(description='Check URLs from a file asynchronously, perform DNS lookups and store results in JSON.')
    258     parser.add_argument('file', help='File containing list of domains')
    259     parser.add_argument('-c', '--concurrency', type=int, default=10, help='Number of concurrent requests')
    260     parser.add_argument('-m', '--memory_limit', type=int, default=1000, help='Number of results to store in memory before syncing to file')
    261     parser.add_argument('-o', '--output', default='results.json', help='Output file')
    262     parser.add_argument('-t', '--timeout', type=int, default=10, help='Timeout for HTTP requests')
    263     parser.add_argument('-u', '--user_agent', default='Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)', help='User agent to use for HTTP requests')
    264     parser.add_argument('-x', '--proxy', type=str, help='Proxy to use for HTTP requests')
    265     parser.add_argument('-r', '--retry', type=int, default=2, help='Number of times to retry failed requests')
    266     parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
    267     parser.add_argument('-p', '--preview', type=int, default=500, help='Preview size in bytes for body & title (default: 500)')
    268     args = parser.parse_args()
    269 
    270     log_level = logging.INFO
    271     logging.basicConfig(level=log_level, format=f'{DARK_GREY}%(asctime)s{RESET} %(message)s', datefmt='%H:%M:%S')
    272 
    273     logging.info('Loading DNS servers...')
    274     DNS_SERVERS = get_dns_servers()
    275     if not DNS_SERVERS:
    276         logging.fatal('Failed to get DNS servers.')
    277     logging.info(f'Found {len(DNS_SERVERS["4"])} IPv4 and {len(DNS_SERVERS["6"])} IPv6 DNS servers.')
    278 
    279     asyncio.run(process_file())
    280 
    281 if __name__ == '__main__':
    282     main()