czds- ICANN Centralized Zone Data Service Tool |
git clone git://git.acid.vegas/czds.git |
Log | Files | Refs | Archive | README | LICENSE |
czdz.py (3622B)
1 #!/usr/bin/env python 2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) 3 4 ''' 5 References: 6 - https://czds.icann.org 7 - https://czds.icann.org/sites/default/files/czds-api-documentation.pdf 8 ''' 9 10 import argparse 11 import concurrent.futures 12 import getpass 13 import logging 14 import os 15 16 try: 17 import requests 18 except ImportError: 19 raise ImportError('Missing dependency: requests (pip install requests)') 20 21 def authenticate(username: str, password: str) -> str: 22 ''' 23 Authenticate with ICANN's API and return the access token. 24 25 :param username: ICANN Username 26 :param password: ICANN Password 27 ''' 28 response = requests.post('https://account-api.icann.org/api/authenticate', json={'username': username, 'password': password}) 29 response.raise_for_status() 30 return response.json()['accessToken'] 31 32 33 def download_zone(url: str, token: str, output_directory: str): 34 ''' 35 Download a single zone file. 36 37 :param url: URL to download 38 :param token: ICANN access token 39 :param output_directory: Directory to save the zone file 40 ''' 41 headers = {'Authorization': f'Bearer {token}'} 42 response = requests.get(url, headers=headers) 43 response.raise_for_status() 44 filename = response.headers.get('Content-Disposition').split('filename=')[-1].strip('"') 45 filepath = os.path.join(output_directory, filename) 46 with open(filepath, 'wb') as file: 47 for chunk in response.iter_content(chunk_size=1024): 48 file.write(chunk) 49 return filepath 50 51 52 def main(username: str, password: str, concurrency: int): 53 ''' 54 Main function to download all zone files. 55 56 :param username: ICANN Username 57 :param password: ICANN Password 58 :param concurrency: Number of concurrent downloads 59 ''' 60 token = authenticate(username, password) 61 headers = {'Authorization': f'Bearer {token}'} 62 63 response = requests.get('https://czds-api.icann.org/czds/downloads/links', headers=headers) 64 response.raise_for_status() 65 zone_links = response.json() 66 67 output_directory = 'zonefiles' 68 os.makedirs(output_directory, exist_ok=True) 69 70 with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: 71 future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links} 72 for future in concurrent.futures.as_completed(future_to_url): 73 url = future_to_url[future] 74 try: 75 filepath = future.result() 76 logging.info(f'Completed downloading {url} to file {filepath}') 77 except Exception as e: 78 logging.error(f'{url} generated an exception: {e}') 79 80 81 82 if __name__ == '__main__': 83 parser = argparse.ArgumentParser(description="ICANN Zone Files Downloader") 84 parser.add_argument('-u', '--username', help='ICANN Username') 85 parser.add_argument('-p', '--password', help='ICANN Password') 86 parser.add_argument('-c', '--concurrency', type=int, default=5, help='Number of concurrent downloads') 87 args = parser.parse_args() 88 89 username = args.username or os.getenv('CZDS_USER') 90 password = args.password or os.getenv('CZDS_PASS') 91 92 if not username: 93 username = input('ICANN Username: ') 94 if not password: 95 password = getpass.getpass('ICANN Password: ') 96 97 try: 98 main(username, password, args.concurrency) 99 except requests.HTTPError as e: 100 logging.error(f'HTTP error occurred: {e.response.status_code} - {e.response.reason}') 101 except Exception as e: 102 logging.error(f'An error occurred: {e}')