czds

- ICANN Centralized Zone Data Service Tool
git clone git://git.acid.vegas/czds.git
Log | Files | Refs | Archive | README | LICENSE

commit ab147238ce6672b9f26526b93e2affaf7969e8df
parent 8e84c3e224b3c12369f1fb6c6ec6e0903043cd37
Author: acidvegas <acid.vegas@acid.vegas>
Date: Sun, 23 Mar 2025 14:19:26 -0400

Added progress bars and better handling

Diffstat:
Mczds/__init__.py | 2+-
Mczds/__main__.py | 19++++++++++++-------
Mczds/client.py | 268++++++++++++++++++++++++++++++++++++-------------------------------------------
Aczds/utils.py | 89+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mrequirements.txt | 5+++--
Msetup.py | 2+-

6 files changed, 228 insertions(+), 157 deletions(-)

diff --git a/czds/__init__.py b/czds/__init__.py
@@ -5,7 +5,7 @@
 from .client import CZDS
 
 
-__version__ = '1.3.0'
+__version__ = '1.3.1'
 __author__  = 'acidvegas'
 __email__   = 'acid.vegas@acid.vegas'
 __github__  = 'https://github.com/acidvegas/czds'
 \ No newline at end of file
diff --git a/czds/__main__.py b/czds/__main__.py
@@ -7,7 +7,6 @@ import asyncio
 import getpass
 import logging
 import os
-import time
 
 from .client import CZDS
 
@@ -21,14 +20,12 @@ async def main():
     # Authentication
     parser.add_argument('-u', '--username', default=os.getenv('CZDS_USER'), help='ICANN Username')
     parser.add_argument('-p', '--password', default=os.getenv('CZDS_PASS'), help='ICANN Password')
-    parser.add_argument('-o', '--output',   default=os.getcwd(),            help='Output directory')
+    parser.add_argument('-o', '--output', default=os.getcwd(), help='Output directory')
     
     # Zone download options
     zone_group = parser.add_argument_group('Zone download options')
     zone_group.add_argument('-z', '--zones', action='store_true', help='Download zone files')
     zone_group.add_argument('-c', '--concurrency', type=int, default=3, help='Number of concurrent downloads')
-    zone_group.add_argument('-d', '--decompress', action='store_true', help='Decompress zone files after download')
-    zone_group.add_argument('-k', '--keep', action='store_true', help='Keep the original gzip files after decompression')
 
     # Report options
     report_group = parser.add_argument_group('Report options')
@@ -39,6 +36,7 @@ async def main():
     # Parse arguments
     args = parser.parse_args()
 
+    # Configure logging
     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
     # Get username and password
@@ -46,20 +44,25 @@ async def main():
     password = args.password or getpass.getpass('ICANN Password: ')
 
     # Create output directory
-    now = time.strftime('%Y-%m-%d')
-    output_directory = os.path.join(args.output, 'zones', now)
+    output_directory = os.path.join(args.output, 'zones')
     os.makedirs(output_directory, exist_ok=True)
 
     logging.info('Authenticating with ICANN API...')
 
+    # Create the CZDS client
     async with CZDS(username, password) as client:
         # Download zone stats report if requested
         if args.report:
             logging.info('Fetching zone stats report...')
             try:
+                # Create the report directory
                 output = os.path.join(output_directory, '.report.csv')
+
+                # Download the report
                 await client.get_report(output, scrub=args.scrub, format=args.format)
+
                 logging.info(f'Zone stats report saved to {output}')
+
                 return
             except Exception as e:
                 raise Exception(f'Failed to download zone stats report: {e}')
@@ -68,13 +71,15 @@ async def main():
         if args.zones:
             logging.info('Downloading zone files...')
             try:
-                await client.download_zones(output_directory, args.concurrency, decompress=args.decompress, cleanup=not args.keep)
+                # Download the zone files
+                await client.download_zones(output_directory, args.concurrency)
             except Exception as e:
                 raise Exception(f'Failed to download zone files: {e}')
 
 
 def cli_entry():
     '''Synchronous entry point for console script'''
+
     return asyncio.run(main())
 
 
diff --git a/czds/client.py b/czds/client.py
@@ -3,10 +3,9 @@
 # czds/client.py
 
 import asyncio
-import gzip
+import json
 import logging
 import os
-import io
 
 try:
     import aiohttp
@@ -18,6 +17,13 @@ try:
 except ImportError:
     raise ImportError('missing aiofiles library (pip install aiofiles)')
 
+try:
+    from tqdm import tqdm
+except ImportError:
+    raise ImportError('missing tqdm library (pip install tqdm)')
+
+from .utils import gzip_decompress, humanize_bytes
+
 
 # Configure logging
 logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
@@ -33,29 +39,40 @@ class CZDS:
         :param username: ICANN Username
         :param password: ICANN Password
         '''
-        
+
+        # Set the username and password
         self.username = username
         self.password = password
 
-        # Configure longer timeouts and proper SSL settings
-        timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_connect=60, sock_read=60)
-        self.session = aiohttp.ClientSession(timeout=timeout)
+        # Set the session with longer timeouts
+        self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None, connect=60, sock_connect=60, sock_read=60))
+
+        # Placeholder for the headers after authentication
         self.headers = None
 
         logging.info('Initialized CZDS client')
 
+
     async def __aenter__(self):
         '''Async context manager entry'''
+
+        # Authenticate with the ICANN API
         await self.authenticate()
+
         return self
 
+
     async def __aexit__(self, exc_type, exc_val, exc_tb):
         '''Async context manager exit'''
+
+        # Close the client session
         await self.close()
 
+
     async def close(self):
         '''Close the client session'''
 
+        # Close the client session if it exists
         if self.session:
             await self.session.close()
             logging.debug('Closed aiohttp session')
@@ -64,43 +81,46 @@ class CZDS:
     async def authenticate(self) -> str:
         '''Authenticate with the ICANN API and return the access token'''
 
-        try:
-            data = {'username': self.username, 'password': self.password}
-            logging.info('Authenticating with ICANN API')
+        # Set the data to be sent to the API
+        data = {'username': self.username, 'password': self.password}
+
+        logging.info('Authenticating with ICANN API...')
+
+        # Send the request to the API
+        async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response:
+            if response.status != 200:
+                raise Exception(f'Authentication failed: {response.status} {await response.text()}')
+
+            # Get the result from the API
+            result = await response.json()
 
-            async with self.session.post('https://account-api.icann.org/api/authenticate', json=data) as response:
-                if response.status != 200:
-                    error_msg = f'Authentication failed: {response.status} {await response.text()}'
-                    logging.error(error_msg)
-                    raise Exception(error_msg)
+            logging.info('Successfully authenticated with ICANN API')
 
-                result = await response.json()
-                logging.info('Successfully authenticated with ICANN API')
-                self.headers = {'Authorization': f'Bearer {result["accessToken"]}'}
-                return result['accessToken']
+            # Set the headers for the API requests
+            self.headers = {'Authorization': f'Bearer {result["accessToken"]}'}
 
-        except Exception as e:
-            error_msg = f'Failed to authenticate with ICANN API: {e}'
-            logging.error(error_msg)
-            raise Exception(error_msg)
+            return result['accessToken']
 
 
     async def fetch_zone_links(self) -> list:
         '''Fetch the list of zone files available for download'''
 
-        logging.info('Fetching zone links')
+        logging.info('Fetching zone file links...')
+
+        # Send the request to the API
         async with self.session.get('https://czds-api.icann.org/czds/downloads/links', headers=self.headers) as response:
             if response.status != 200:
-                error_msg = f'Failed to fetch zone links: {response.status} {await response.text()}'
-                logging.error(error_msg)
-                raise Exception(error_msg)
+                raise Exception(f'Failed to fetch zone links: {response.status} {await response.text()}')
 
+            # Get the result from the API
             links = await response.json()
+
             logging.info(f'Successfully fetched {len(links):,} zone links')
+
             return links
 
 
-    async def get_report(self, filepath: str = None, scrub: bool = True, format: str = 'csv') -> str | dict:
+    async def get_report(self, filepath: str = None, format: str = 'csv') -> str | dict:
         '''
         Downloads the zone report stats from the API and scrubs the report for privacy
         
@@ -110,210 +130,165 @@ class CZDS:
         '''
 
         logging.info('Downloading zone stats report')
+
+        # Send the request to the API
         async with self.session.get('https://czds-api.icann.org/czds/requests/report', headers=self.headers) as response:
+            # Check if the request was successful
             if response.status != 200:
-                error_msg = f'Failed to download the zone stats report: {response.status} {await response.text()}'
-                logging.error(error_msg)
-                raise Exception(error_msg)
+                raise Exception(f'Failed to download the zone stats report: {response.status} {await response.text()}')
 
+            # Get the content of the report
             content = await response.text()
 
-            if scrub:
-                content = content.replace(self.username, 'nobody@no.name')
-                logging.debug('Scrubbed username from report')
+            # Scrub the username from the report
+            content = content.replace(self.username, 'nobody@no.name')
+            logging.debug('Scrubbed username from report')
 
+            # Convert the report to JSON format if requested (default is CSV)
             if format.lower() == 'json':
-                rows    = [row.split(',') for row in content.strip().split('\n')]
-                header  = rows[0]
-                content = [dict(zip(header, row)) for row in rows[1:]]
+                content = json.dumps(content, indent=4)
                 logging.debug('Converted report to JSON format')
 
+            # Save the report to a file if a filepath is provided
             if filepath:
                 async with aiofiles.open(filepath, 'w') as file:
-                    if format.lower() == 'json':
-                        import json
-                        await file.write(json.dumps(content, indent=4))
-                    else:
-                        await file.write(content)
-                    logging.info(f'Saved report to {filepath}')
+                    await file.write(content)
+                logging.info(f'Saved report to {filepath}')
 
             return content
 
 
-    async def gzip_decompress(self, filepath: str, cleanup: bool = True):
-        '''
-        Decompress a gzip file in place
-        
-        :param filepath: Path to the gzip file
-        :param cleanup: Whether to remove the original gzip file after decompressions
-        '''
-
-        logging.debug(f'Decompressing {filepath}')
-        output_path = filepath[:-3]  # Remove .gz extension
-        chunk_size = 1024 * 1024  # 1MB chunks
-        
-        try:
-            with gzip.open(filepath, 'rb') as gz:
-                async with aiofiles.open(output_path, 'wb') as f_out:
-                    while True:
-                        chunk = gz.read(chunk_size)
-                        if not chunk:
-                            break
-                        await f_out.write(chunk)
-        
-            if cleanup:
-                os.remove(filepath)
-                logging.debug(f'Removed original gzip file: {filepath}')
-            
-        except Exception as e:
-            error_msg = f'Failed to decompress {filepath}: {str(e)}'
-            logging.error(error_msg)
-            # Clean up any partial files
-            if os.path.exists(output_path):
-                os.remove(output_path)
-            raise Exception(error_msg)
-
-
-    async def download_zone(self, url: str, output_directory: str, decompress: bool = False, cleanup: bool = True, semaphore: asyncio.Semaphore = None):
+    async def download_zone(self, url: str, output_directory: str, semaphore: asyncio.Semaphore):
         '''
         Download a single zone file
         
         :param url: URL to download
         :param output_directory: Directory to save the zone file
-        :param decompress: Whether to decompress the gzip file after download
-        :param cleanup: Whether to remove the original gzip file after decompression
         :param semaphore: Optional semaphore for controlling concurrency
         '''
         
         async def _download():
-            tld = url.split('/')[-1].split('.')[0]  # Extract TLD from URL
-            max_retries = 3
-            retry_delay = 5  # seconds
+            tld_name    = url.split('/')[-1].split('.')[0] # Extract TLD from URL
+            max_retries = 10                               # Maximum number of retries for failed downloads
+            retry_delay = 5                                # Delay between retries in seconds
+            timeout     = aiohttp.ClientTimeout(total=120) # Timeout for the download
             
+            # Start the attempt loop
             for attempt in range(max_retries):
                 try:
-                    logging.info(f'Starting download of {tld} zone file{" (attempt " + str(attempt + 1) + ")" if attempt > 0 else ""}')
-                    
-                    async with self.session.get(url, headers=self.headers, timeout=aiohttp.ClientTimeout(total=3600)) as response:
+                    logging.info(f'Starting download of {tld_name} zone file{" (attempt " + str(attempt + 1) + ")" if attempt > 0 else ""}')
+
+                    # Send the request to the API
+                    async with self.session.get(url, headers=self.headers, timeout=timeout) as response:
+                        # Check if the request was successful
                         if response.status != 200:
-                            error_msg = f'Failed to download {tld}: {response.status} {await response.text()}'
-                            logging.error(error_msg)
+                            logging.error(f'Failed to download {tld_name}: {response.status} {await response.text()}')
+
+                            # Retry the download if there are more attempts
                             if attempt + 1 < max_retries:
-                                logging.info(f'Retrying {tld} in {retry_delay} seconds...')
+                                logging.info(f'Retrying {tld_name} in {retry_delay:,} seconds...')
                                 await asyncio.sleep(retry_delay)
                                 continue
-                            raise Exception(error_msg)
+
+                            raise Exception(f'Failed to download {tld_name}: {response.status} {await response.text()}')
 
                         # Get expected file size from headers
-                        expected_size = int(response.headers.get('Content-Length', 0))
-                        if not expected_size:
-                            logging.warning(f'No Content-Length header for {tld}')
+                        if not (expected_size := int(response.headers.get('Content-Length', 0))):
+                            raise ValueError(f'Missing Content-Length header for {tld_name}')
 
+                        # Check if the Content-Disposition header is present
                         if not (content_disposition := response.headers.get('Content-Disposition')):
-                            error_msg = f'Missing Content-Disposition header for {tld}'
-                            logging.error(error_msg)
-                            raise ValueError(error_msg)
+                            raise ValueError(f'Missing Content-Disposition header for {tld_name}')
 
+                        # Extract the filename from the Content-Disposition header
                         filename = content_disposition.split('filename=')[-1].strip('"')
+
+                        # Create the filepath
                         filepath = os.path.join(output_directory, filename)
 
-                        async with aiofiles.open(filepath, 'wb') as file:
-                            total_size = 0
-                            last_progress = 0
-                            try:
-                                async for chunk in response.content.iter_chunked(8192):
-                                    await file.write(chunk)
-                                    total_size += len(chunk)
-                                    if expected_size:
-                                        progress = int((total_size / expected_size) * 100)
-                                        if progress >= last_progress + 5:
-                                            logging.info(f'Downloading {tld}: {progress}% ({total_size:,}/{expected_size:,} bytes)')
-                                            last_progress = progress
-                            except (asyncio.TimeoutError, aiohttp.ClientError) as e:
-                                logging.error(f'Connection error while downloading {tld}: {str(e)}')
-                                if attempt + 1 < max_retries:
-                                    logging.info(f'Retrying {tld} in {retry_delay} seconds...')
-                                    await asyncio.sleep(retry_delay)
-                                    continue
-                                raise
+                        # Create a progress bar to track the download
+                        with tqdm(total=expected_size, unit='B', unit_scale=True, desc=f'Downloading {tld_name}', leave=False) as pbar:
+                            # Open the file for writing
+                            async with aiofiles.open(filepath, 'wb') as file:
+                                # Initialize the total size for tracking
+                                total_size = 0
+
+                                # Write the chunk to the file
+                                try:
+                                    async for chunk in response.content.iter_chunked(8192):
+                                        await file.write(chunk)
+                                        total_size += len(chunk)
+                                        pbar.update(len(chunk))
+                                except (asyncio.TimeoutError, aiohttp.ClientError) as e:
+                                    logging.error(f'Connection error while downloading {tld_name}: {str(e)}')
+                                    if attempt + 1 < max_retries:
+                                        logging.info(f'Retrying {tld_name} in {retry_delay} seconds...')
+                                        await asyncio.sleep(retry_delay)
+                                        continue
+                                    raise
 
                         # Verify file size
                         if expected_size and total_size != expected_size:
-                            error_msg = f'Incomplete download for {tld}: Got {total_size} bytes, expected {expected_size} bytes'
+                            error_msg = f'Incomplete download for {tld_name}: Got {humanize_bytes(total_size)}, expected {humanize_bytes(expected_size)}'
                             logging.error(error_msg)
                             os.remove(filepath)
                             if attempt + 1 < max_retries:
-                                logging.info(f'Retrying {tld} in {retry_delay} seconds...')
+                                logging.info(f'Retrying {tld_name} in {retry_delay} seconds...')
                                 await asyncio.sleep(retry_delay)
                                 continue
                             raise Exception(error_msg)
                         
-                        size_mb = total_size / (1024 * 1024)
-                        logging.info(f'Successfully downloaded {tld} zone file ({size_mb:.2f} MB)')
-
-                        if decompress:
-                            try:
-                                with gzip.open(filepath, 'rb') as test_gzip:
-                                    test_gzip.read(1)
-                                
-                                await self.gzip_decompress(filepath, cleanup)
-                                filepath = filepath[:-3]
-                                logging.info(f'Decompressed {tld} zone file')
-                            except (gzip.BadGzipFile, OSError) as e:
-                                error_msg = f'Failed to decompress {tld}: {str(e)}'
-                                logging.error(error_msg)
-                                os.remove(filepath)
-                                raise Exception(error_msg)
+                        logging.info(f'Successfully downloaded {tld_name} zone file ({humanize_bytes(total_size)})')
+
+                        await gzip_decompress(filepath)
+                        filepath = filepath[:-3]
+                        logging.info(f'Decompressed {tld_name} zone file')
 
                         return filepath
 
                 except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                     if attempt + 1 >= max_retries:
-                        logging.error(f'Failed to download {tld} after {max_retries} attempts: {str(e)}')
+                        logging.error(f'Failed to download {tld_name} after {max_retries} attempts: {str(e)}')
                         if 'filepath' in locals() and os.path.exists(filepath):
                             os.remove(filepath)
                         raise
-                    logging.warning(f'Download attempt {attempt + 1} failed for {tld}: {str(e)}')
+                    logging.warning(f'Download attempt {attempt + 1} failed for {tld_name}: {str(e)}')
                     await asyncio.sleep(retry_delay)
 
                 except Exception as e:
-                    logging.error(f'Error downloading {tld}: {str(e)}')
+                    logging.error(f'Error downloading {tld_name}: {str(e)}')
                     if 'filepath' in locals() and os.path.exists(filepath):
                         os.remove(filepath)
                     raise
 
-        if semaphore:
-            async with semaphore:
-                return await _download()
-        else:
+        async with semaphore:
             return await _download()
 
 
-    async def download_zones(self, output_directory: str, concurrency: int, decompress: bool = False, cleanup: bool = True):
+    async def download_zones(self, output_directory: str, concurrency: int):
         '''
         Download multiple zone files concurrently
         
         :param output_directory: Directory to save the zone files
         :param concurrency: Number of concurrent downloads
-        :param decompress: Whether to decompress the gzip files after download
-        :param cleanup: Whether to remove the original gzip files after decompression
         '''
         
         # Create the output directory if it doesn't exist
         os.makedirs(output_directory, exist_ok=True)
         
-        logging.info(f'Starting concurrent download of zones with concurrency={concurrency}')
-
         # Get the zone links
         zone_links = await self.fetch_zone_links()
+        zone_links.sort() # Sort the zone alphabetically for better tracking
 
         # Create a semaphore to limit the number of concurrent downloads
         semaphore = asyncio.Semaphore(concurrency)
 
+        logging.info(f'Downloading {len(zone_links):,} zone files...')
+
         # Create a list of tasks to download the zone files
-        tasks = [self.download_zone(url, output_directory, decompress, cleanup, semaphore) for url in zone_links]
+        tasks = [self.download_zone(url, output_directory, semaphore) for url in zone_links]
 
         # Run the tasks concurrently
         await asyncio.gather(*tasks)
 
-        logging.info('Completed downloading all zone files')
+        logging.info(f'Completed downloading {len(zone_links):,} zone files')
+\ No newline at end of file
diff --git a/czds/utils.py b/czds/utils.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+# ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
+# czds/utils.py
+
+import gzip
+import logging
+import os
+
+try:
+    import aiofiles
+except ImportError:
+    raise ImportError('missing aiofiles library (pip install aiofiles)')
+
+try:
+    from tqdm import tqdm
+except ImportError:
+    raise ImportError('missing tqdm library (pip install tqdm)')
+
+
+async def gzip_decompress(filepath: str, cleanup: bool = True):
+    '''
+    Decompress a gzip file in place
+    
+    :param filepath: Path to the gzip file
+    :param cleanup: Whether to remove the original gzip file after decompressions
+    '''
+
+    # Get the original size of the file
+    original_size = os.path.getsize(filepath)
+
+    logging.debug(f'Decompressing {filepath} ({humanize_bytes(original_size)})...')
+
+    # Remove the .gz extension
+    output_path = filepath[:-3]
+
+    # Set the chunk size to 25MB
+    chunk_size = 25 * 1024 * 1024
+
+    # Create progress bar for decompression
+    with tqdm(total=original_size, unit='B', unit_scale=True, desc=f'Decompressing {os.path.basename(filepath)}', leave=False) as pbar:
+        # Decompress the file
+        with gzip.open(filepath, 'rb') as gz:
+            async with aiofiles.open(output_path, 'wb') as f_out:
+                while True:
+                    # Read the next chunk
+                    chunk = gz.read(chunk_size)
+
+                    # If the chunk is empty, break
+                    if not chunk:
+                        break
+
+                    # Write the chunk to the output file
+                    await f_out.write(chunk)
+
+                    # Update the progress bar
+                    pbar.update(len(chunk))
+
+    # Get the decompressed size of the file
+    decompressed_size = os.path.getsize(output_path)
+
+    logging.debug(f'Decompressed {filepath} ({humanize_bytes(decompressed_size)})')
+
+    # If the cleanup flag is set, remove the original gzip file
+    if cleanup:
+        os.remove(filepath)
+        logging.debug(f'Removed original gzip file: {filepath}')
+
+
+def humanize_bytes(bytes: int) -> str:
+	'''
+	Humanize a number of bytes
+
+	:param bytes: The number of bytes to humanize
+	'''
+
+	# List of units
+	units = ('B','KB','MB','GB','TB','PB','EB','ZB','YB')
+
+	# Iterate over the units
+	for unit in units:
+		# If the bytes are less than 1024, return the bytes with the unit
+		if bytes < 1024:
+			return f'{bytes:.2f} {unit}' if unit != 'B' else f'{bytes} {unit}'
+
+		# Divide the bytes by 1024
+		bytes /= 1024
+
+	return f'{bytes:.2f} {units[-1]}'
+\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
@@ -1,2 +1,3 @@
 aiohttp
-aiofiles
-\ No newline at end of file
+aiofiles
+tqdm
+\ No newline at end of file
diff --git a/setup.py b/setup.py
@@ -11,7 +11,7 @@ with open('README.md', 'r', encoding='utf-8') as fh:
 
 setup(
 	name='czds-api',
-	version='1.3.0',
+	version='1.3.1',
 	author='acidvegas',
 	author_email='acid.vegas@acid.vegas',
 	description='ICANN API for the Centralized Zones Data Service',