mdaxfr

- Mass DNS AXFR
git clone git://git.acid.vegas/mdaxfr.git
Log | Files | Refs | Archive | README | LICENSE

mdaxfr.py (5830B)

      1 #!/usr/bin/env python
      2 # Mass DNS AXFR - developed by acidvegas in python (https://git.acid.vegas/mdaxfr)
      3 
      4 import logging
      5 import os
      6 import urllib.request
      7 
      8 try:
      9 	import dns.rdatatype
     10 	import dns.query
     11 	import dns.zone
     12 	import dns.resolver
     13 except ImportError:
     14 	raise SystemExit('missing required \'dnspython\' module (pip install dnspython)')
     15 
     16 
     17 def attempt_axfr(tld: str, nameserver: str, filename: str):
     18 	'''
     19 	Perform a DNS zone transfer on a target domain.
     20 
     21 	:param target: The target domain to perform the zone transfer on.
     22 	:param nameserver: The nameserver to perform the zone transfer on.
     23 	:param filename: The filename to store the zone transfer results in.
     24 	'''
     25 	temp_file = filename + '.temp'
     26 	if not (resolvers := resolve_nameserver(nameserver)):
     27 		logging.error(f'Failed to resolve nameserver {nameserver}: {ex}')
     28 	else:
     29 		for ns in resolvers: # Let's try all the IP addresses for the nameserver
     30 			try:
     31 				xfr = dns.query.xfr(ns, tld, lifetime=300)
     32 				if next(xfr, None) is not None:
     33 					if not tld:
     34 						print(f'\033[32mSUCCESS\033[0m AXFR for \033[36m.\033[0m on \033[33m{nameserver}\033[0m \033[90m({ns})\033[0m')
     35 					else:
     36 						print(f'\033[32mSUCCESS\033[0m AXFR for \033[36m{tld}\033[0m on \033[33m{nameserver}\033[0m \033[90m({ns})\033[0m')
     37 					with open(temp_file, 'w') as file:
     38 						for msg in xfr:
     39 							for rrset in msg.answer:
     40 								for rdata in rrset:
     41 									file.write(f'{rrset.name}.{tld} {rrset.ttl} {rdata}\n')
     42 					os.rename(temp_file, filename)
     43 					break
     44 			except Exception as ex:
     45 				#logging.error(f'Failed to perform zone transfer from {nameserver} ({ns}) for {tld}: {ex}')
     46 				print(f'\033[31mFAIL\033[0m AXFR for \033[36m{tld}\033[0m on \033[33m{nameserver}\033[0m \033[90m({ns})\033[0m has failed! \033[90m({ex})\033[0m')
     47 				if os.path.exists(temp_file):
     48 					os.remove(temp_file)
     49 
     50 
     51 def get_nameservers(target: str) -> list:
     52 	'''
     53 	Generate a list of the root nameservers.
     54 
     55 	:param target: The target domain to get the nameservers for.
     56 	'''
     57 	try:
     58 		ns_records = dns.resolver.resolve(target, 'NS', lifetime=60)
     59 		nameservers = [str(rr.target)[:-1] for rr in ns_records]
     60 		return nameservers
     61 	except Exception as ex:
     62 		print(f'\033[31mFAIL\033[0m Error resolving nameservers for \033[36m{target}\033[0m \033[90m({ex})\033[0m')
     63 	return []
     64 
     65 
     66 def get_root_tlds(output_dir: str) -> list:
     67 	'''
     68 	Get the root TLDs from a root nameservers.
     69 
     70 	:param output_dir: The output directory to use.
     71 	'''
     72 	rndroot = [root for root in os.listdir(output_dir) if root.endswith('.root-servers.net.txt')]
     73 	if rndroot:
     74 		rndroot_file = rndroot[0]  # Take the first file from the list
     75 		tlds = sorted(set([item.split()[0][:-1] for item in open(os.path.join(root_dir, rndroot_file)).read().split('\n') if item and 'IN' in item and 'NS' in item]))
     76 	else:
     77 		logging.warning('Failed to find root nameserver list...fallback to using IANA list')
     78 		tlds = urllib.request.urlopen('https://data.iana.org/TLD/tlds-alpha-by-domain.txt').read().decode('utf-8').lower().split('\n')[1:]
     79 	return tlds
     80 
     81 
     82 def get_psl_tlds() -> list:
     83 	'''Download the Public Suffix List and return its contents.'''
     84 	data = urllib.request.urlopen('https://publicsuffix.org/list/public_suffix_list.dat').read().decode()
     85 	domains = []
     86 	for line in data.split('\n'):
     87 		if line.startswith('//') or not line:
     88 			continue
     89 		if '*' in line or '!' in line:
     90 			continue
     91 		if '.' not in line:
     92 			continue
     93 		domains.append(line)
     94 	return domains
     95 
     96 
     97 def resolve_nameserver(nameserver: str) -> list:
     98 	'''
     99 	Resolve a nameserver to its IP address.
    100 
    101 	:param nameserver: The nameserver to resolve.
    102 	'''
    103 	data = []
    104 	for version in ('A', 'AAAA'):
    105 		try:
    106 			data += [ip.address for ip in dns.resolver.resolve(nameserver, version, lifetime=60)]
    107 		except:
    108 			pass
    109 	return data
    110 
    111 
    112 
    113 if __name__ == '__main__':
    114 	import argparse
    115 	import concurrent.futures
    116 
    117 	parser = argparse.ArgumentParser(description='Mass DNS AXFR')
    118 	parser.add_argument('-c', '--concurrency', type=int, default=30, help='maximum concurrent tasks')
    119 	parser.add_argument('-o', '--output', default='axfrout', help='output directory')
    120 	parser.add_argument('-t', '--timeout', type=int, default=15, help='DNS timeout (default: 15)')
    121 	args = parser.parse_args()
    122 
    123 	logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    124 
    125 	root_dir = os.path.join(args.output, 'root')
    126 	os.makedirs(root_dir, exist_ok=True)
    127 	os.makedirs(args.output, exist_ok=True)
    128 	dns.resolver._DEFAULT_TIMEOUT = args.timeout
    129 
    130 	logging.info('Fetching root nameservers...')
    131 	with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
    132 		futures = [executor.submit(attempt_axfr, '', root, os.path.join(args.output, f'root/{root}.txt')) for root in get_nameservers('.')]
    133 		for future in concurrent.futures.as_completed(futures):
    134 			try:
    135 				future.result()
    136 			except Exception as e:
    137 				logging.error(f'Error in TLD task: {e}')
    138 
    139 	logging.info('Fetching root TLDs...')
    140 	with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
    141 		futures = [executor.submit(attempt_axfr, tld, ns, os.path.join(args.output, tld + '.txt')) for tld in get_root_tlds(root_dir) for ns in get_nameservers(tld) if ns]
    142 		for future in concurrent.futures.as_completed(futures):
    143 			try:
    144 				future.result()
    145 			except Exception as e:
    146 				logging.error(f'Error in TLD task: {e}')
    147 
    148 	logging.info('Fetching PSL TLDs...')
    149 	os.makedirs(os.path.join(args.output, 'psl'), exist_ok=True)
    150 	with concurrent.futures.ThreadPoolExecutor(max_workers=args.concurrency) as executor:
    151 		futures = [executor.submit(attempt_axfr, tld, ns, os.path.join(args.output, f'psl/{tld}.txt')) for tld in get_psl_tlds() for ns in get_nameservers(tld) if ns]
    152 		for future in concurrent.futures.as_completed(futures):
    153 			try:
    154 				future.result()
    155 			except Exception as e:
    156 				logging.error(f'Error in TLD task: {e}')