ptrstream- endless stream of rdns |
git clone git://git.acid.vegas/ptrstream.git |
Log | Files | Refs | Archive | README |
arpa-stream.py (3636B)
1 #/usr/bin/env python 2 # arpa stream - developed by acidvegas in python (https://git.acid.vegas/ptrstream) 3 4 ''' 5 I have no idea where we are going with this, but I'm sure it'll be fun... 6 ''' 7 8 import argparse 9 import concurrent.futures 10 import random 11 12 try: 13 import dns.resolver 14 except ImportError: 15 raise ImportError('missing required \'dnspython\' library (pip install dnspython)') 16 17 18 class colors: 19 axfr = '\033[34m' 20 error = '\033[31m' 21 success = '\033[32m' 22 ns_query = '\033[33m' 23 ns_zone = '\033[36m' 24 reset = '\033[0m' 25 26 27 def genip() -> str: 28 '''Generate a random IP address with 1 to 4 octets.''' 29 num_octets = random.randint(1, 4) 30 ip_parts = [str(random.randint(0, 255)) for _ in range(num_octets)] 31 return '.'.join(ip_parts) 32 33 34 def query_ns_records(ip: str) -> list: 35 ''' 36 Query NS records for a given IP. 37 38 :param ip: The IP address to query NS records for. 39 ''' 40 try: 41 ns_records = [str(record.target)[:-1] for record in dns.resolver.resolve(f'{ip}.in-addr.arpa', 'NS')] 42 if ns_records: 43 print(f'{colors.ns_zone}Queried NS records for {ip}: {ns_records}{colors.reset}') 44 return ns_records 45 except Exception as e: 46 print(f'{colors.error}Error querying NS records for {ip}: {e}{colors.reset}') 47 return [] 48 49 50 def resolve_ns_to_ip(ns_hostname: str) -> list: 51 ''' 52 Resolve NS hostname to IP. 53 54 :param ns_hostname: The NS hostname to resolve. 55 ''' 56 try: 57 ns_ips = [ip.address for ip in dns.resolver.resolve(ns_hostname, 'A')] 58 if ns_ips: 59 print(f'{colors.ns_query}Resolved NS hostname {ns_hostname} to IPs: {ns_ips}{colors.reset}') 60 return ns_ips 61 except Exception as e: 62 print(f'{colors.error}Error resolving NS {ns_hostname}: {e}{colors.reset}') 63 return [] 64 65 66 def axfr_check(ip: str, ns_ip: str): 67 ''' 68 Perform AXFR check on a specific nameserver IP. 69 70 :param ip: The IP address to perform the AXFR check on. 71 :param ns_ip: The nameserver IP to perform the AXFR check on. 72 ''' 73 resolver = dns.resolver.Resolver() 74 resolver.nameservers = [ns_ip] 75 try: 76 if resolver.resolve(f'{ip}.in-addr.arpa', 'AXFR'): 77 print(f'{colors.success}[SUCCESS]{colors.reset} AXFR on {ns_ip} for {ip}.in-addr.arpa') 78 return True 79 except Exception as e: 80 print(f'{colors.error}[FAIL]{colors.reset} AXFR on {ns_ip} for {ip}.in-addr.arpa - Error: {e}') 81 return False 82 83 84 def process_ip(ip: str): 85 ''' 86 Process each IP: Fetch NS records and perform AXFR check. 87 88 :param ip: The IP address to process. 89 ''' 90 for ns_hostname in query_ns_records(ip): 91 for ns_ip in resolve_ns_to_ip(ns_hostname): 92 if axfr_check(ip, ns_ip): 93 return 94 95 96 if __name__ == '__main__': 97 parser = argparse.ArgumentParser(description='DNS AXFR Check Script') 98 parser.add_argument('--concurrency', type=int, default=100, help='Number of concurrent workers') 99 args = parser.parse_args() 100 101 with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor: 102 futures = {executor.submit(process_ip, genip()): ip for ip in range(args.concurrency)} 103 while True: 104 done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) 105 for future in done: 106 future.result() # We don't need to store the result as it's already printed 107 futures[executor.submit(process_ip, genip())] = genip() 108 futures = {future: ip for future, ip in futures.items() if future not in done}