ptrstream

- endless stream of rdns
git clone git://git.acid.vegas/ptrstream.git
Log | Files | Refs | Archive | README

arpa-stream.py (3636B)

      1 #/usr/bin/env python
      2 # arpa stream - developed by acidvegas in python (https://git.acid.vegas/ptrstream)
      3 
      4 '''
      5 I have no idea where we are going with this, but I'm sure it'll be fun...
      6 '''
      7 
      8 import argparse
      9 import concurrent.futures
     10 import random
     11 
     12 try:
     13     import dns.resolver
     14 except ImportError:
     15     raise ImportError('missing required \'dnspython\' library (pip install dnspython)')
     16 
     17 
     18 class colors:
     19     axfr     = '\033[34m'
     20     error    = '\033[31m'
     21     success  = '\033[32m'
     22     ns_query = '\033[33m'
     23     ns_zone  = '\033[36m'
     24     reset    = '\033[0m'
     25 
     26 
     27 def genip() -> str:
     28     '''Generate a random IP address with 1 to 4 octets.'''
     29     num_octets = random.randint(1, 4)
     30     ip_parts = [str(random.randint(0, 255)) for _ in range(num_octets)]
     31     return '.'.join(ip_parts)
     32 
     33 
     34 def query_ns_records(ip: str) -> list:
     35     '''
     36     Query NS records for a given IP.
     37     
     38     :param ip: The IP address to query NS records for.
     39     '''
     40     try:
     41         ns_records = [str(record.target)[:-1] for record in dns.resolver.resolve(f'{ip}.in-addr.arpa', 'NS')]
     42         if ns_records:
     43             print(f'{colors.ns_zone}Queried NS records for {ip}: {ns_records}{colors.reset}')
     44         return ns_records
     45     except Exception as e:
     46         print(f'{colors.error}Error querying NS records for {ip}: {e}{colors.reset}')
     47         return []
     48 
     49 
     50 def resolve_ns_to_ip(ns_hostname: str) -> list:
     51     '''
     52     Resolve NS hostname to IP.
     53     
     54     :param ns_hostname: The NS hostname to resolve.
     55     '''
     56     try:
     57         ns_ips = [ip.address for ip in dns.resolver.resolve(ns_hostname, 'A')]
     58         if ns_ips:
     59             print(f'{colors.ns_query}Resolved NS hostname {ns_hostname} to IPs: {ns_ips}{colors.reset}')
     60         return ns_ips
     61     except Exception as e:
     62         print(f'{colors.error}Error resolving NS {ns_hostname}: {e}{colors.reset}')
     63         return []
     64 
     65 
     66 def axfr_check(ip: str, ns_ip: str):
     67     '''
     68     Perform AXFR check on a specific nameserver IP.
     69     
     70     :param ip: The IP address to perform the AXFR check on.
     71     :param ns_ip: The nameserver IP to perform the AXFR check on.
     72     '''
     73     resolver = dns.resolver.Resolver()
     74     resolver.nameservers = [ns_ip]
     75     try:
     76         if resolver.resolve(f'{ip}.in-addr.arpa', 'AXFR'):
     77             print(f'{colors.success}[SUCCESS]{colors.reset} AXFR on {ns_ip} for {ip}.in-addr.arpa')
     78             return True
     79     except Exception as e:
     80         print(f'{colors.error}[FAIL]{colors.reset} AXFR on {ns_ip} for {ip}.in-addr.arpa - Error: {e}')
     81         return False
     82 
     83 
     84 def process_ip(ip: str):
     85     '''
     86     Process each IP: Fetch NS records and perform AXFR check.
     87     
     88     :param ip: The IP address to process.
     89     '''
     90     for ns_hostname in query_ns_records(ip):
     91         for ns_ip in resolve_ns_to_ip(ns_hostname):
     92             if axfr_check(ip, ns_ip):
     93                 return
     94 
     95 
     96 if __name__ == '__main__':
     97     parser = argparse.ArgumentParser(description='DNS AXFR Check Script')
     98     parser.add_argument('--concurrency', type=int, default=100, help='Number of concurrent workers')
     99     args = parser.parse_args()
    100 
    101     with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
    102         futures = {executor.submit(process_ip, genip()): ip for ip in range(args.concurrency)}
    103         while True:
    104             done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
    105             for future in done:
    106                 future.result()  # We don't need to store the result as it's already printed
    107                 futures[executor.submit(process_ip, genip())] = genip()
    108             futures = {future: ip for future, ip in futures.items() if future not in done}