czds- ICANN Centralized Zone Data Service Tool |
git clone git://git.acid.vegas/czds.git |
Log | Files | Refs | Archive | README | LICENSE |
czds.py (5631B)
1 #!/usr/bin/env python 2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) 3 # Reference: https://czds.icann.org 4 5 import argparse 6 import concurrent.futures 7 import getpass 8 import json 9 import logging 10 import os 11 import time 12 import urllib.request 13 14 15 # Setting up logging 16 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 17 18 19 def authenticate(username: str, password: str) -> str: 20 ''' 21 Authenticate with the ICANN API and return the access token. 22 23 :param username: ICANN Username 24 :param password: ICANN Password 25 ''' 26 27 data = json.dumps({'username': username, 'password': password}).encode('utf-8') 28 headers = {'Content-Type': 'application/json'} 29 request = urllib.request.Request('https://account-api.icann.org/api/authenticate', data=data, headers=headers) 30 31 with urllib.request.urlopen(request) as response: 32 response = response.read().decode('utf-8') 33 return json.loads(response)['accessToken'] 34 35 36 def fetch_zone_links(token: str) -> list: 37 ''' 38 Fetch the list of zone files available for download. 39 40 :param token: ICANN access token 41 ''' 42 43 headers = {'Authorization': f'Bearer {token}'} 44 request = urllib.request.Request('https://czds-api.icann.org/czds/downloads/links', headers=headers) 45 46 with urllib.request.urlopen(request) as response: 47 if response.status == 200: 48 return json.loads(response.read().decode('utf-8')) 49 else: 50 raise Exception(f'Failed to fetch zone links: {response.status} {response.reason}') 51 52 53 def download_report(token: str, output_directory: str, username: str): 54 ''' 55 Downloads the zone report stats from the API and scrubs the report for privacy. 56 57 :param token: ICANN access token 58 :param output_directory: Directory to save the scrubbed report 59 :param username: Username to be redacted 60 ''' 61 62 filepath = os.path.join(output_directory, '.stats.csv') 63 headers = {'Authorization': f'Bearer {token}'} 64 request = urllib.request.Request('https://czds-api.icann.org/czds/requests/report', headers=headers) 65 66 with urllib.request.urlopen(request) as response: 67 if response.status == 200: 68 report_data = response.read().decode('utf-8').replace(username, 'nobody@no.name') 69 with open(filepath, 'w') as file: 70 file.write(report_data) 71 else: 72 raise Exception(f'Failed to download the zone stats report: {response.status} {response.reason}') 73 74 75 76 def download_zone(url: str, token: str, output_directory: str): 77 ''' 78 Download a single zone file using urllib.request. 79 80 :param url: URL to download 81 :param token: ICANN access token 82 :param output_directory: Directory to save the zone file 83 ''' 84 85 headers = {'Authorization': f'Bearer {token}'} 86 request = urllib.request.Request(url, headers=headers) 87 88 with urllib.request.urlopen(request) as response: 89 if response.status == 200: 90 content_disposition = response.getheader('Content-Disposition') 91 if content_disposition: 92 filename = content_disposition.split('filename=')[-1].strip('"') 93 else: 94 raise ValueError(f'Failed to get filename from Content-Disposition header: {content_disposition}') 95 96 filepath = os.path.join(output_directory, filename) 97 98 with open(filepath, 'wb') as file: 99 while True: 100 chunk = response.read(1024) 101 if not chunk: 102 break 103 file.write(chunk) 104 105 return filepath 106 else: 107 raise Exception(f'Failed to download {url}: {response.status} {response.reason}') 108 109 110 def main(username: str, password: str, concurrency: int): 111 ''' 112 Main function to download all zone files. 113 114 :param username: ICANN Username 115 :param password: ICANN Password 116 :param concurrency: Number of concurrent downloads 117 ''' 118 119 now = time.strftime('%Y-%m-%d') 120 121 logging.info(f'Authenticating with ICANN API...') 122 try: 123 token = authenticate(username, password) 124 except Exception as e: 125 raise Exception(f'Failed to authenticate with ICANN API: {e}') 126 #logging.info(f'Authenticated with token: {token}') 127 # The above line is commented out to avoid printing the token to the logs, you can uncomment it for debugging purposes 128 129 output_directory = os.path.join(os.getcwd(), 'zones', now) 130 os.makedirs(output_directory, exist_ok=True) 131 132 logging.info('Fetching zone stats report...') 133 try: 134 download_report(token, output_directory, username) 135 except Exception as e: 136 raise Exception(f'Failed to download zone stats report: {e}') 137 138 logging.info('Fetching zone links...') 139 try: 140 zone_links = fetch_zone_links(token) 141 except Exception as e: 142 raise Exception(f'Failed to fetch zone links: {e}') 143 logging.info(f'Fetched {len(zone_links)} zone links') 144 145 with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: 146 future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} 147 for future in concurrent.futures.as_completed(future_to_url): 148 url = future_to_url[future] 149 try: 150 filepath = future.result() 151 logging.info(f'Completed downloading {url} to file {filepath}') 152 except Exception as e: 153 logging.error(f'{url} generated an exception: {e}') 154 155 156 157 if __name__ == '__main__': 158 parser = argparse.ArgumentParser(description='ICANN API for the Centralized Zones Data Service') 159 parser.add_argument('-u', '--username', help='ICANN Username') 160 parser.add_argument('-p', '--password', help='ICANN Password') 161 parser.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads') 162 args = parser.parse_args() 163 164 username = args.username or os.getenv('CZDS_USER') or input('ICANN Username: ') 165 password = args.password or os.getenv('CZDS_PASS') or getpass.getpass('ICANN Password: ') 166 167 main(username, password, args.concurrency)