pylcg- Unnamed repository; edit this file 'description' to name the repository. |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
unit_test.py (4643B)
1 #!/usr/bin/env python 2 # PyLCG - Linear Congruential Generator for IP Sharding - Developed by acidvegas ib Python (https://github.com/acidvegas/pylcg) 3 # unit_test.py 4 5 6 import ipaddress 7 import time 8 import unittest 9 10 from pylcg import IPRange, ip_stream, LCG 11 12 13 class Colors: 14 BLUE = '\033[94m' 15 GREEN = '\033[92m' 16 YELLOW = '\033[93m' 17 CYAN = '\033[96m' 18 RED = '\033[91m' 19 ENDC = '\033[0m' 20 21 def print_header(message: str) -> None: 22 print(f'\n\n{Colors.BLUE}{"="*80}') 23 print(f'TEST: {message}') 24 print(f'{"="*80}{Colors.ENDC}\n') 25 26 27 def print_success(message: str) -> None: 28 print(f'{Colors.GREEN}✓ {message}{Colors.ENDC}') 29 30 31 def print_info(message: str) -> None: 32 print(f"{Colors.CYAN}ℹ {message}{Colors.ENDC}") 33 34 35 def print_warning(message: str) -> None: 36 print(f"{Colors.YELLOW}! {message}{Colors.ENDC}") 37 38 39 class TestIPSharder(unittest.TestCase): 40 @classmethod 41 def setUpClass(cls): 42 print_header('Setting up test environment') 43 cls.test_cidr = '192.0.0.0/16' # 65,536 IPs 44 cls.test_seed = 12345 45 cls.total_shards = 4 46 47 # Calculate expected IPs 48 network = ipaddress.ip_network(cls.test_cidr) 49 cls.all_ips = {str(ip) for ip in network} 50 print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs") 51 52 53 def test_ip_range_initialization(self): 54 print_header('Testing IPRange initialization') 55 start_time = time.perf_counter() 56 57 ip_range = IPRange(self.test_cidr) 58 self.assertEqual(ip_range.total, 65536) 59 60 first_ip = ip_range.get_ip_at_index(0) 61 last_ip = ip_range.get_ip_at_index(ip_range.total - 1) 62 63 elapsed = time.perf_counter() - start_time 64 print_success(f'IP range initialization completed in {elapsed:.6f}s') 65 print_info(f'IP range spans from {first_ip} to {last_ip}') 66 print_info(f'Total IPs in range: {ip_range.total:,}') 67 68 69 def test_lcg_sequence(self): 70 print_header('Testing LCG sequence generation') 71 72 # Test sequence generation speed 73 lcg = LCG(seed=self.test_seed) 74 iterations = 1_000_000 75 76 start_time = time.perf_counter() 77 for _ in range(iterations): 78 lcg.next() 79 elapsed = time.perf_counter() - start_time 80 81 print_success(f'Generated {iterations:,} random numbers in {elapsed:.6f}s') 82 print_info(f'Average time per number: {(elapsed/iterations)*1000000:.2f} microseconds') 83 84 # Test deterministic behavior 85 lcg1 = LCG(seed=self.test_seed) 86 lcg2 = LCG(seed=self.test_seed) 87 88 start_time = time.perf_counter() 89 for _ in range(1000): 90 self.assertEqual(lcg1.next(), lcg2.next()) 91 elapsed = time.perf_counter() - start_time 92 93 print_success(f'Verified LCG determinism in {elapsed:.6f}s') 94 95 96 def test_shard_distribution(self): 97 print_header('Testing shard distribution and randomness') 98 99 # Test distribution across shards 100 sample_size = 65_536 # Full size for /16 101 shard_counts = {i: 0 for i in range(1, self.total_shards + 1)} # 1-based sharding 102 unique_ips = set() 103 duplicate_count = 0 104 105 start_time = time.perf_counter() 106 107 # Collect IPs from each shard 108 for shard in range(1, self.total_shards + 1): # 1-based sharding 109 ip_gen = ip_stream(self.test_cidr, shard, self.total_shards, self.test_seed) 110 shard_unique = set() 111 112 # Get all IPs from this shard 113 for ip in ip_gen: 114 if ip in unique_ips: 115 duplicate_count += 1 116 else: 117 unique_ips.add(ip) 118 shard_unique.add(ip) 119 120 shard_counts[shard] = len(shard_unique) 121 122 elapsed = time.perf_counter() - start_time 123 124 # Print distribution statistics 125 print_success(f'Generated {len(unique_ips):,} IPs in {elapsed:.6f}s') 126 print_info(f'Average time per IP: {(elapsed/len(unique_ips))*1000000:.2f} microseconds') 127 print_info(f'Unique IPs generated: {len(unique_ips):,}') 128 129 if duplicate_count > 0: 130 print_warning(f'Duplicates found: {duplicate_count:,} ({(duplicate_count/len(unique_ips))*100:.2f}%)') 131 132 expected_per_shard = sample_size // self.total_shards 133 for shard, count in shard_counts.items(): 134 deviation = abs(count - expected_per_shard) / expected_per_shard * 100 135 print_info(f'Shard {shard}: {count:,} unique IPs ({deviation:.2f}% deviation from expected)') 136 137 # Test randomness by checking sequential patterns 138 ips_list = sorted([int(ipaddress.ip_address(ip)) for ip in list(unique_ips)[:1000]]) 139 sequential_count = sum(1 for i in range(len(ips_list)-1) if ips_list[i] + 1 == ips_list[i+1]) 140 sequential_percentage = (sequential_count / (len(ips_list)-1)) * 100 141 142 print_info(f'Sequential IP pairs in first 1000: {sequential_percentage:.2f}% (lower is more random)') 143 144 145 146 if __name__ == '__main__': 147 print(f"\n{Colors.CYAN}{'='*80}") 148 print(f"Starting IP Sharder Tests - Testing with 65,536 IPs (/16 network)") 149 print(f"{'='*80}{Colors.ENDC}\n") 150 unittest.main(verbosity=2)