czds

- ICANN Centralized Zone Data Service Tool
git clone git://git.acid.vegas/czds.git
Log | Files | Refs | Archive | README | LICENSE

czds.py (5908B)

      1 #!/usr/bin/env python3
      2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
      3 # Reference: https://czds.icann.org
      4 
      5 import argparse
      6 import concurrent.futures
      7 import getpass
      8 import json
      9 import logging
     10 import os
     11 import time
     12 import urllib.request
     13 
     14 
     15 class CZDS:
     16 	'''Class for the ICANN Centralized Zones Data Service'''
     17 
     18 	def __init__(self, username: str, password: str):
     19 		self.username = username
     20 		self.headers  = {'Authorization': f'Bearer {self.authenticate(username, password)}'}
     21 
     22 
     23 	def authenticate(self, username: str, password: str) -> str:
     24 		'''
     25 		Authenticate with the ICANN API and return the access token.
     26 
     27 		:param username: ICANN Username
     28 		:param password: ICANN Password
     29 		'''
     30 
     31 		try:
     32 			# Prepare the request
     33 			data	= json.dumps({'username': username, 'password': password}).encode('utf-8')
     34 			headers = {'Content-Type': 'application/json'}
     35 			request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
     36 
     37 			# Make the request
     38 			with urllib.request.urlopen(request) as response:
     39 				response = response.read().decode('utf-8')
     40 			
     41 			return json.loads(response)['accessToken']
     42 
     43 		except Exception as e:
     44 			raise Exception(f'Failed to authenticate with ICANN API: {e}')
     45 
     46 
     47 	def fetch_zone_links(self) -> list:
     48 		'''
     49 		Fetch the list of zone files available for download.
     50 
     51 		:param token: ICANN access token
     52 		'''
     53 
     54 		# Create the request
     55 		request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers)
     56 
     57 		# Make the request
     58 		with urllib.request.urlopen(request) as response:
     59 			if response.status != 200:
     60 				raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
     61 			
     62 			return json.loads(response.read().decode('utf-8'))
     63 			
     64 
     65 	def download_report(self, filepath: str):
     66 		'''
     67 		Downloads the zone report stats from the API and scrubs the report for privacy.
     68 
     69 		:param filepath: Filepath to save the scrubbed report
     70 		'''
     71 
     72 		# Create the request
     73 		request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers)
     74 		
     75 		# Make the request
     76 		with urllib.request.urlopen(request) as response:
     77 			if response.status != 200:
     78 				raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
     79 			
     80 			content = response.read().decode('utf-8')
     81 
     82 		# Write the content to the file
     83 		with open(filepath, 'w') as file:
     84 			file.write(content.replace(self.username, 'nobody@no.name')) # Wipe the email address from the report for privacy
     85 
     86 
     87 	def download_zone(self, url: str, output_directory: str):
     88 		'''
     89 		Download a single zone file using urllib.request.
     90 
     91 		:param url: URL to download
     92 		:param output_directory: Directory to save the zone file
     93 		'''
     94 
     95 		# Create the request
     96 		request = urllib.request.Request(url, headers=self.headers)
     97 
     98 		# Make the request
     99 		with urllib.request.urlopen(request) as response:
    100 			if response.status != 200:
    101 				raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
    102 
    103 			if not (content_disposition := response.getheader('Content-Disposition')):
    104 				raise ValueError('Missing Content-Disposition header')
    105 
    106 			# Extract the filename from the Content-Disposition header
    107 			filename = content_disposition.split('filename=')[-1].strip('"')
    108 			filepath = os.path.join(output_directory, filename)
    109 
    110 			# Write the content to the file
    111 			with open(filepath, 'wb') as file:
    112 				while True:
    113 					chunk = response.read(1024)
    114 					if not chunk:
    115 						break
    116 					file.write(chunk)
    117 
    118 			return filepath
    119 
    120 
    121 def main(username: str, password: str, concurrency: int):
    122 	'''
    123 	Main function to download all zone files.
    124 
    125 	:param username: ICANN Username
    126 	:param password: ICANN Password
    127 	:param concurrency: Number of concurrent downloads
    128 	'''
    129 
    130 	now = time.strftime('%Y-%m-%d')
    131 
    132 	logging.info(f'Authenticating with ICANN API...')
    133 	
    134 	CZDS_client = CZDS(username, password)
    135 
    136 	logging.debug('Created CZDS client')
    137 	
    138 	output_directory = os.path.join(os.getcwd(), 'zones', now)
    139 	os.makedirs(output_directory, exist_ok=True)
    140 
    141 	logging.info('Fetching zone stats report...')	
    142 	
    143 	try:
    144 		CZDS_client.download_report(os.path.join(output_directory, '.report.csv'))
    145 	except Exception as e:
    146 		raise Exception(f'Failed to download zone stats report: {e}')
    147 
    148 	logging.info('Fetching zone links...')
    149 
    150 	try:
    151 		zone_links = CZDS_client.fetch_zone_links()
    152 	except Exception as e:
    153 		raise Exception(f'Failed to fetch zone links: {e}')
    154 	
    155 	logging.info(f'Fetched {len(zone_links):,} zone links')
    156 
    157 	with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
    158 		future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)}
    159 		for future in concurrent.futures.as_completed(future_to_url):
    160 			url = future_to_url[future]
    161 			try:
    162 				filepath = future.result()
    163 				logging.info(f'Completed downloading {url} to file {filepath}')
    164 			except Exception as e:
    165 				logging.error(f'{url} generated an exception: {e}')
    166 
    167 
    168 
    169 if __name__ == '__main__':
    170 	# Create argument parser
    171 	parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
    172 
    173 	# Add arguments
    174 	parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
    175 	parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
    176 	parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
    177 
    178 	# Parse arguments
    179 	args = parser.parse_args()
    180 
    181 	# Setting up logging
    182 	logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    183 
    184 	# Get username and password
    185 	username = args.username or input('ICANN Username: ')
    186 	password = args.password or getpass.getpass('ICANN Password: ')
    187 
    188 	# Execute main function
    189 	main(username, password, args.concurrency)