czds- ICANN Centralized Zone Data Service Tool |
git clone git://git.acid.vegas/czds.git |
Log | Files | Refs | Archive | README | LICENSE |
czds.py (5908B)
1 #!/usr/bin/env python3 2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) 3 # Reference: https://czds.icann.org 4 5 import argparse 6 import concurrent.futures 7 import getpass 8 import json 9 import logging 10 import os 11 import time 12 import urllib.request 13 14 15 class CZDS: 16 '''Class for the ICANN Centralized Zones Data Service''' 17 18 def __init__(self, username: str, password: str): 19 self.username = username 20 self.headers = {'Authorization': f'Bearer {self.authenticate(username, password)}'} 21 22 23 def authenticate(self, username: str, password: str) -> str: 24 ''' 25 Authenticate with the ICANN API and return the access token. 26 27 :param username: ICANN Username 28 :param password: ICANN Password 29 ''' 30 31 try: 32 # Prepare the request 33 data = json.dumps({'username': username, 'password': password}).encode('utf-8') 34 headers = {'Content-Type': 'application/json'} 35 request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers) 36 37 # Make the request 38 with urllib.request.urlopen(request) as response: 39 response = response.read().decode('utf-8') 40 41 return json.loads(response)['accessToken'] 42 43 except Exception as e: 44 raise Exception(f'Failed to authenticate with ICANN API: {e}') 45 46 47 def fetch_zone_links(self) -> list: 48 ''' 49 Fetch the list of zone files available for download. 50 51 :param token: ICANN access token 52 ''' 53 54 # Create the request 55 request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) 56 57 # Make the request 58 with urllib.request.urlopen(request) as response: 59 if response.status != 200: 60 raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') 61 62 return json.loads(response.read().decode('utf-8')) 63 64 65 def download_report(self, filepath: str): 66 ''' 67 Downloads the zone report stats from the API and scrubs the report for privacy. 68 69 :param filepath: Filepath to save the scrubbed report 70 ''' 71 72 # Create the request 73 request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=self.headers) 74 75 # Make the request 76 with urllib.request.urlopen(request) as response: 77 if response.status != 200: 78 raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}') 79 80 content = response.read().decode('utf-8') 81 82 # Write the content to the file 83 with open(filepath, 'w') as file: 84 file.write(content.replace(self.username, 'nobody@no.name')) # Wipe the email address from the report for privacy 85 86 87 def download_zone(self, url: str, output_directory: str): 88 ''' 89 Download a single zone file using urllib.request. 90 91 :param url: URL to download 92 :param output_directory: Directory to save the zone file 93 ''' 94 95 # Create the request 96 request = urllib.request.Request(url, headers=self.headers) 97 98 # Make the request 99 with urllib.request.urlopen(request) as response: 100 if response.status != 200: 101 raise Exception(f'Failed to download {url}: {response.status} {response.reason}') 102 103 if not (content_disposition := response.getheader('Content-Disposition')): 104 raise ValueError('Missing Content-Disposition header') 105 106 # Extract the filename from the Content-Disposition header 107 filename = content_disposition.split('filename=')[-1].strip('"') 108 filepath = os.path.join(output_directory, filename) 109 110 # Write the content to the file 111 with open(filepath, 'wb') as file: 112 while True: 113 chunk = response.read(1024) 114 if not chunk: 115 break 116 file.write(chunk) 117 118 return filepath 119 120 121 def main(username: str, password: str, concurrency: int): 122 ''' 123 Main function to download all zone files. 124 125 :param username: ICANN Username 126 :param password: ICANN Password 127 :param concurrency: Number of concurrent downloads 128 ''' 129 130 now = time.strftime('%Y-%m-%d') 131 132 logging.info(f'Authenticating with ICANN API...') 133 134 CZDS_client = CZDS(username, password) 135 136 logging.debug('Created CZDS client') 137 138 output_directory = os.path.join(os.getcwd(), 'zones', now) 139 os.makedirs(output_directory, exist_ok=True) 140 141 logging.info('Fetching zone stats report...') 142 143 try: 144 CZDS_client.download_report(os.path.join(output_directory, '.report.csv')) 145 except Exception as e: 146 raise Exception(f'Failed to download zone stats report: {e}') 147 148 logging.info('Fetching zone links...') 149 150 try: 151 zone_links = CZDS_client.fetch_zone_links() 152 except Exception as e: 153 raise Exception(f'Failed to fetch zone links: {e}') 154 155 logging.info(f'Fetched {len(zone_links):,} zone links') 156 157 with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: 158 future_to_url = {executor.submit(CZDS_client.download_zone, url, output_directory): url for url in sorted(zone_links)} 159 for future in concurrent.futures.as_completed(future_to_url): 160 url = future_to_url[future] 161 try: 162 filepath = future.result() 163 logging.info(f'Completed downloading {url} to file {filepath}') 164 except Exception as e: 165 logging.error(f'{url} generated an exception: {e}') 166 167 168 169 if __name__ == '__main__': 170 # Create argument parser 171 parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service') 172 173 # Add arguments 174 parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username') 175 parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password') 176 parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') 177 178 # Parse arguments 179 args = parser.parse_args() 180 181 # Setting up logging 182 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 183 184 # Get username and password 185 username = args.username or input('ICANN Username: ') 186 password = args.password or getpass.getpass('ICANN Password: ') 187 188 # Execute main function 189 main(username, password, args.concurrency)