czds

- ICANN Centralized Zone Data Service Tool
git clone git://git.acid.vegas/czds.git
Log | Files | Refs | Archive | README | LICENSE

czds.py (5631B)

      1 #!/usr/bin/env python
      2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
      3 # Reference: https://czds.icann.org
      4 
      5 import argparse
      6 import concurrent.futures
      7 import getpass
      8 import json
      9 import logging
     10 import os
     11 import time
     12 import urllib.request
     13 
     14 
     15 # Setting up logging
     16 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
     17 
     18 
     19 def authenticate(username: str, password: str) -> str:
     20 	'''
     21 	Authenticate with the ICANN API and return the access token.
     22 
     23 	:param username: ICANN Username
     24 	:param password: ICANN Password
     25 	'''
     26 
     27 	data	= json.dumps({'username': username, 'password': password}).encode('utf-8')
     28 	headers = {'Content-Type': 'application/json'}
     29 	request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers)
     30 
     31 	with urllib.request.urlopen(request) as response:
     32 		response = response.read().decode('utf-8')
     33 		return json.loads(response)['accessToken']
     34 
     35 
     36 def fetch_zone_links(token: str) -> list:
     37 	'''
     38 	Fetch the list of zone files available for download.
     39 
     40 	:param token: ICANN access token
     41 	'''
     42 
     43 	headers = {'Authorization': f'Bearer {token}'}
     44 	request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=headers)
     45 
     46 	with urllib.request.urlopen(request) as response:
     47 		if response.status == 200:
     48 			return json.loads(response.read().decode('utf-8'))
     49 		else:
     50 			raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}')
     51 
     52 
     53 def download_report(token: str, output_directory: str, username: str):
     54 	'''
     55 	Downloads the zone report stats from the API and scrubs the report for privacy.
     56 
     57 	:param token: ICANN access token
     58 	:param output_directory: Directory to save the scrubbed report
     59 	:param username: Username to be redacted
     60 	'''
     61 
     62 	filepath = os.path.join(output_directory, '.stats.csv')
     63 	headers  = {'Authorization': f'Bearer {token}'}
     64 	request  = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=headers)
     65 
     66 	with urllib.request.urlopen(request) as response:
     67 		if response.status == 200:
     68 			report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name')
     69 			with open(filepath, 'w') as file:
     70 				file.write(report_data)
     71 		else:
     72 			raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}')
     73 
     74 
     75 
     76 def download_zone(url: str, token: str, output_directory: str):
     77 	'''
     78 	Download a single zone file using urllib.request.
     79 
     80 	:param url: URL to download
     81 	:param token: ICANN access token
     82 	:param output_directory: Directory to save the zone file
     83 	'''
     84 
     85 	headers = {'Authorization': f'Bearer {token}'}
     86 	request = urllib.request.Request(url, headers=headers)
     87 
     88 	with urllib.request.urlopen(request) as response:
     89 		if response.status == 200:
     90 			content_disposition = response.getheader('Content-Disposition')
     91 			if content_disposition:
     92 				filename = content_disposition.split('filename=')[-1].strip('"')
     93 			else:
     94 				raise ValueError(f'Failed to get filename from Content-Disposition header: {content_disposition}')
     95 
     96 			filepath = os.path.join(output_directory, filename)
     97 
     98 			with open(filepath, 'wb') as file:
     99 				while True:
    100 					chunk = response.read(1024)
    101 					if not chunk:
    102 						break
    103 					file.write(chunk)
    104 
    105 			return filepath
    106 		else:
    107 			raise Exception(f'Failed to download {url}: {response.status} {response.reason}')
    108 
    109 
    110 def main(username: str, password: str, concurrency: int):
    111 	'''
    112 	Main function to download all zone files.
    113 
    114 	:param username: ICANN Username
    115 	:param password: ICANN Password
    116 	:param concurrency: Number of concurrent downloads
    117 	'''
    118 
    119 	now = time.strftime('%Y-%m-%d')
    120 
    121 	logging.info(f'Authenticating with ICANN API...')
    122 	try:
    123 		token = authenticate(username, password)
    124 	except Exception as e:
    125 		raise Exception(f'Failed to authenticate with ICANN API: {e}')
    126 	#logging.info(f'Authenticated with token: {token}')
    127 	# The above line is commented out to avoid printing the token to the logs, you can uncomment it for debugging purposes
    128 
    129 	output_directory = os.path.join(os.getcwd(), 'zones', now)
    130 	os.makedirs(output_directory, exist_ok=True)
    131 
    132 	logging.info('Fetching zone stats report...')
    133 	try:
    134 		download_report(token, output_directory, username)
    135 	except Exception as e:
    136 		raise Exception(f'Failed to download zone stats report: {e}')
    137 
    138 	logging.info('Fetching zone links...')
    139 	try:
    140 		zone_links = fetch_zone_links(token)
    141 	except Exception as e:
    142 		raise Exception(f'Failed to fetch zone links: {e}')
    143 	logging.info(f'Fetched {len(zone_links)} zone links')
    144 
    145 	with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
    146 		future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links}
    147 		for future in concurrent.futures.as_completed(future_to_url):
    148 			url = future_to_url[future]
    149 			try:
    150 				filepath = future.result()
    151 				logging.info(f'Completed downloading {url} to file {filepath}')
    152 			except Exception as e:
    153 				logging.error(f'{url} generated an exception: {e}')
    154 
    155 
    156 
    157 if __name__ == '__main__':
    158 	parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service')
    159 	parser.add_argument('-u', '--username', help='ICANN Username')
    160 	parser.add_argument('-p', '--password', help='ICANN Password')
    161 	parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
    162 	args = parser.parse_args()
    163 
    164 	username = args.username or os.getenv('CZDS_USER') or input('ICANN Username: ')
    165 	password = args.password or os.getenv('CZDS_PASS') or getpass.getpass('ICANN Password: ')
    166 
    167 	main(username, password, args.concurrency)