httpz

- Hyper-fast HTTP Scraping Tool
git clone git://git.acid.vegas/httpz.git
Log | Files | Refs | Archive | README | LICENSE

dns.py (4263B)

      1 #!/usr/bin/env python3
      2 # HTTPZ Web Scanner - Developed by acidvegas in Python (https://github.com/acidvegas/httpz)
      3 # httpz_scanner/dns.py
      4 
      5 import asyncio
      6 import os
      7 
      8 try:
      9     import aiohttp
     10 except ImportError:
     11     raise ImportError('missing aiohttp library (pip install aiohttp)')
     12 
     13 try:
     14     import dns.asyncresolver
     15     import dns.query
     16     import dns.resolver
     17     import dns.zone
     18 except ImportError:
     19     raise ImportError('missing dnspython library (pip install dnspython)')
     20 
     21 from .utils import debug, info, SILENT_MODE
     22 
     23 
     24 async def resolve_all_dns(domain: str, timeout: int = 5, nameserver: str = None, check_axfr: bool = False) -> tuple:
     25     '''
     26     Resolve all DNS records for a domain
     27     
     28     :param domain: Domain to resolve
     29     :param timeout: Timeout in seconds
     30     :param nameserver: Specific nameserver to use
     31     :param check_axfr: Whether to attempt zone transfer
     32     '''
     33 
     34     # Setup resolver
     35     resolver = dns.asyncresolver.Resolver()
     36     resolver.lifetime = timeout
     37     if nameserver:
     38         resolver.nameservers = [nameserver]
     39     
     40     # Resolve all DNS records
     41     results = await asyncio.gather(*[resolver.resolve(domain, rtype) for rtype in ('NS', 'A', 'AAAA', 'CNAME')], return_exceptions=True)
     42     
     43     # Parse results
     44     nameservers = [str(ns).rstrip('.') for ns in results[0]] if isinstance(results[0], dns.resolver.Answer) else []
     45     ips         = ([str(ip) for ip in results[1]] if isinstance(results[1], dns.resolver.Answer) else []) + ([str(ip) for ip in results[2]] if isinstance(results[2], dns.resolver.Answer) else [])
     46     cname       = str(results[3][0].target).rstrip('.') if isinstance(results[3], dns.resolver.Answer) else None
     47 
     48     # Get NS IPs
     49     ns_ips = {}
     50     if nameservers:
     51         ns_results = await asyncio.gather(*[resolver.resolve(ns, rtype) for ns in nameservers for rtype in ('A', 'AAAA')], return_exceptions=True)
     52         for i, ns in enumerate(nameservers):
     53             ns_ips[ns] = [str(ip) for records in ns_results[i*2:i*2+2] if isinstance(records, dns.resolver.Answer) for ip in records]
     54 
     55     # Attempt zone transfer
     56     if check_axfr:
     57         await attempt_axfr(domain, ns_ips, timeout)
     58 
     59     return sorted(set(ips)), cname, nameservers, ns_ips
     60 
     61 
     62 async def attempt_axfr(domain: str, ns_ips: dict, timeout: int = 5) -> None:
     63     '''
     64     Attempt zone transfer for a domain
     65     
     66     :param domain: Domain to attempt AXFR transfer
     67     :param ns_ips: Dictionary of nameserver hostnames to their IPs
     68     :param timeout: Timeout in seconds
     69     '''
     70 
     71     try:
     72         os.makedirs('axfrout', exist_ok=True)
     73 
     74         # Loop through each NS
     75         for ns_host, ips in ns_ips.items():
     76             # Loop through each NS IP
     77             for ns_ip in ips:
     78                 try:
     79                     # Attempt zone transfer
     80                     zone = dns.zone.from_xfr(dns.query.xfr(ns_ip, domain, lifetime=timeout))
     81 
     82                     # Write zone to file
     83                     with open(f'axfrout/{domain}_{ns_ip}.zone', 'w') as f:
     84                         zone.to_text(f)
     85 
     86                     info(f'[AXFR SUCCESS] {domain} from {ns_host} ({ns_ip})')
     87                 except Exception as e:
     88                     debug(f'AXFR failed for {domain} from {ns_ip}: {str(e)}')
     89     except Exception as e:
     90         debug(f'Failed AXFR for {domain}: {str(e)}')
     91 
     92 
     93 async def load_resolvers(resolver_file: str = None) -> list:
     94     '''
     95     Load DNS resolvers from file or default source
     96     
     97     :param resolver_file: Path to file containing resolver IPs
     98     '''
     99 
    100     # Load from file
    101     if resolver_file:
    102         try:
    103             with open(resolver_file) as f:
    104                 resolvers = [line.strip() for line in f if line.strip()]
    105             if resolvers:
    106                 return resolvers
    107         except Exception as e:
    108             debug(f'Error loading resolvers from {resolver_file}: {str(e)}')
    109 
    110     # Load from GitHub
    111     async with aiohttp.ClientSession() as session:
    112         async with session.get('https://raw.githubusercontent.com/trickest/resolvers/refs/heads/main/resolvers.txt') as response:
    113             resolvers = await response.text()
    114             if not SILENT_MODE:
    115                 info(f'Loaded {len(resolvers.splitlines()):,} resolvers.')
    116             return [resolver.strip() for resolver in resolvers.splitlines()]