czds

- ICANN Centralized Zone Data Service Tool
git clone git://git.acid.vegas/czds.git
Log | Files | Refs | Archive | README | LICENSE

czdz.py (3622B)

      1 #!/usr/bin/env python
      2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
      3 
      4 '''
      5 References:
      6     - https://czds.icann.org
      7     - https://czds.icann.org/sites/default/files/czds-api-documentation.pdf
      8 '''
      9 
     10 import argparse
     11 import concurrent.futures
     12 import getpass
     13 import logging
     14 import os
     15 
     16 try:
     17     import requests
     18 except ImportError:
     19     raise ImportError('Missing dependency: requests (pip install requests)')
     20 
     21 def authenticate(username: str, password: str) -> str:
     22     '''
     23     Authenticate with ICANN's API and return the access token.
     24     
     25     :param username: ICANN Username
     26     :param password: ICANN Password
     27     '''
     28     response = requests.post('https://account-api.icann.org/api/authenticate', json={'username': username, 'password': password})
     29     response.raise_for_status()
     30     return response.json()['accessToken']
     31 
     32 
     33 def download_zone(url: str, token: str, output_directory: str):
     34     '''
     35     Download a single zone file.
     36     
     37     :param url: URL to download
     38     :param token: ICANN access token
     39     :param output_directory: Directory to save the zone file
     40     '''
     41     headers = {'Authorization': f'Bearer {token}'}
     42     response = requests.get(url, headers=headers)
     43     response.raise_for_status()
     44     filename = response.headers.get('Content-Disposition').split('filename=')[-1].strip('"')
     45     filepath = os.path.join(output_directory, filename)
     46     with open(filepath, 'wb') as file:
     47         for chunk in response.iter_content(chunk_size=1024):
     48             file.write(chunk)
     49     return filepath
     50 
     51 
     52 def main(username: str, password: str, concurrency: int):
     53     '''
     54     Main function to download all zone files.
     55 
     56     :param username: ICANN Username
     57     :param password: ICANN Password
     58     :param concurrency: Number of concurrent downloads
     59     '''
     60     token = authenticate(username, password)
     61     headers = {'Authorization': f'Bearer {token}'}
     62     
     63     response = requests.get('https://czds-api.icann.org/czds/downloads/links', headers=headers)
     64     response.raise_for_status()
     65     zone_links = response.json()
     66     
     67     output_directory = 'zonefiles'
     68     os.makedirs(output_directory, exist_ok=True)
     69     
     70     with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
     71         future_to_url = {executor.submit(download_zone, url, token, output_directory): url for url in zone_links}
     72         for future in concurrent.futures.as_completed(future_to_url):
     73             url = future_to_url[future]
     74             try:
     75                 filepath = future.result()
     76                 logging.info(f'Completed downloading {url} to file {filepath}')
     77             except Exception as e:
     78                 logging.error(f'{url} generated an exception: {e}')
     79 
     80 
     81 
     82 if __name__ == '__main__':
     83     parser = argparse.ArgumentParser(description="ICANN Zone Files Downloader")
     84     parser.add_argument('-u', '--username', help='ICANN Username')
     85     parser.add_argument('-p', '--password', help='ICANN Password')
     86     parser.add_argument('-c', '--concurrency', type=int, default=5, help='Number of concurrent downloads')
     87     args = parser.parse_args()
     88 
     89     username = args.username or os.getenv('CZDS_USER')
     90     password = args.password or os.getenv('CZDS_PASS')
     91 
     92     if not username:
     93         username = input('ICANN Username: ')
     94     if not password:
     95         password = getpass.getpass('ICANN Password: ')
     96     
     97     try:
     98         main(username, password, args.concurrency)
     99     except requests.HTTPError as e:
    100         logging.error(f'HTTP error occurred: {e.response.status_code} - {e.response.reason}')
    101     except Exception as e:
    102         logging.error(f'An error occurred: {e}')