czds

- ICANN Centralized Zone Data Service Tool
git clone git://git.acid.vegas/czds.git
Log | Files | Refs | Archive | README | LICENSE

utils.py (2379B)

      1 #!/usr/bin/env python3
      2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds)
      3 # czds/utils.py
      4 
      5 import asyncio
      6 import gzip
      7 import logging
      8 import os
      9 
     10 try:
     11     import aiofiles
     12 except ImportError:
     13     raise ImportError('missing aiofiles library (pip install aiofiles)')
     14 
     15 try:
     16     from tqdm import tqdm
     17 except ImportError:
     18     raise ImportError('missing tqdm library (pip install tqdm)')
     19 
     20 
     21 async def gzip_decompress(filepath: str, cleanup: bool = True):
     22     '''
     23     Decompress a gzip file in place
     24     
     25     :param filepath: Path to the gzip file
     26     :param cleanup: Whether to remove the original gzip file after decompressions
     27     '''
     28     original_size = os.path.getsize(filepath)
     29     output_path = filepath[:-3]
     30     
     31     logging.debug(f'Decompressing {filepath} ({humanize_bytes(original_size)})...')
     32 
     33     # Use a large chunk size (256MB) for maximum throughput
     34     chunk_size = 256 * 1024 * 1024
     35 
     36     # Run the actual decompression in a thread pool to prevent blocking
     37     with tqdm(total=original_size, unit='B', unit_scale=True, desc=f'Decompressing {os.path.basename(filepath)}', leave=False) as pbar:
     38         async with aiofiles.open(output_path, 'wb') as f_out:
     39             # Run gzip decompression in thread pool since it's CPU-bound
     40             loop = asyncio.get_event_loop()
     41             with gzip.open(filepath, 'rb') as gz:
     42                 while True:
     43                     chunk = await loop.run_in_executor(None, gz.read, chunk_size)
     44                     if not chunk:
     45                         break
     46                     await f_out.write(chunk)
     47                     pbar.update(len(chunk))
     48 
     49     decompressed_size = os.path.getsize(output_path)
     50     logging.debug(f'Decompressed {filepath} ({humanize_bytes(decompressed_size)})')
     51 
     52     if cleanup:
     53         os.remove(filepath)
     54         logging.debug(f'Removed original gzip file: {filepath}')
     55 
     56 
     57 def humanize_bytes(bytes: int) -> str:
     58 	'''
     59 	Humanize a number of bytes
     60 
     61 	:param bytes: The number of bytes to humanize
     62 	'''
     63 
     64 	# List of units
     65 	units = ('B','KB','MB','GB','TB','PB','EB','ZB','YB')
     66 
     67 	# Iterate over the units
     68 	for unit in units:
     69 		# If the bytes are less than 1024, return the bytes with the unit
     70 		if bytes < 1024:
     71 			return f'{bytes:.2f} {unit}' if unit != 'B' else f'{bytes} {unit}'
     72 
     73 		# Divide the bytes by 1024
     74 		bytes /= 1024
     75 
     76 	return f'{bytes:.2f} {units[-1]}'