czds- ICANN Centralized Zone Data Service Tool |
git clone git://git.acid.vegas/czds.git |
Log | Files | Refs | Archive | README | LICENSE |
utils.py (2379B)
1 #!/usr/bin/env python3 2 # ICANN API for the Centralized Zones Data Service - developed by acidvegas (https://git.acid.vegas/czds) 3 # czds/utils.py 4 5 import asyncio 6 import gzip 7 import logging 8 import os 9 10 try: 11 import aiofiles 12 except ImportError: 13 raise ImportError('missing aiofiles library (pip install aiofiles)') 14 15 try: 16 from tqdm import tqdm 17 except ImportError: 18 raise ImportError('missing tqdm library (pip install tqdm)') 19 20 21 async def gzip_decompress(filepath: str, cleanup: bool = True): 22 ''' 23 Decompress a gzip file in place 24 25 :param filepath: Path to the gzip file 26 :param cleanup: Whether to remove the original gzip file after decompressions 27 ''' 28 original_size = os.path.getsize(filepath) 29 output_path = filepath[:-3] 30 31 logging.debug(f'Decompressing {filepath} ({humanize_bytes(original_size)})...') 32 33 # Use a large chunk size (256MB) for maximum throughput 34 chunk_size = 256 * 1024 * 1024 35 36 # Run the actual decompression in a thread pool to prevent blocking 37 with tqdm(total=original_size, unit='B', unit_scale=True, desc=f'Decompressing {os.path.basename(filepath)}', leave=False) as pbar: 38 async with aiofiles.open(output_path, 'wb') as f_out: 39 # Run gzip decompression in thread pool since it's CPU-bound 40 loop = asyncio.get_event_loop() 41 with gzip.open(filepath, 'rb') as gz: 42 while True: 43 chunk = await loop.run_in_executor(None, gz.read, chunk_size) 44 if not chunk: 45 break 46 await f_out.write(chunk) 47 pbar.update(len(chunk)) 48 49 decompressed_size = os.path.getsize(output_path) 50 logging.debug(f'Decompressed {filepath} ({humanize_bytes(decompressed_size)})') 51 52 if cleanup: 53 os.remove(filepath) 54 logging.debug(f'Removed original gzip file: {filepath}') 55 56 57 def humanize_bytes(bytes: int) -> str: 58 ''' 59 Humanize a number of bytes 60 61 :param bytes: The number of bytes to humanize 62 ''' 63 64 # List of units 65 units = ('B','KB','MB','GB','TB','PB','EB','ZB','YB') 66 67 # Iterate over the units 68 for unit in units: 69 # If the bytes are less than 1024, return the bytes with the unit 70 if bytes < 1024: 71 return f'{bytes:.2f} {unit}' if unit != 'B' else f'{bytes} {unit}' 72 73 # Divide the bytes by 1024 74 bytes /= 1024 75 76 return f'{bytes:.2f} {units[-1]}'