pylcg

- Unnamed repository; edit this file 'description' to name the repository.
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

unit_test.py (4643B)

      1 #!/usr/bin/env python
      2 # PyLCG - Linear Congruential Generator for IP Sharding - Developed by acidvegas ib Python (https://github.com/acidvegas/pylcg)
      3 # unit_test.py
      4 
      5 
      6 import ipaddress
      7 import time
      8 import unittest
      9 
     10 from pylcg import IPRange, ip_stream, LCG
     11 
     12 
     13 class Colors:
     14 	BLUE   = '\033[94m'
     15 	GREEN  = '\033[92m'
     16 	YELLOW = '\033[93m'
     17 	CYAN   = '\033[96m'
     18 	RED    = '\033[91m'
     19 	ENDC   = '\033[0m'
     20 
     21 def print_header(message: str) -> None:
     22 	print(f'\n\n{Colors.BLUE}{"="*80}')
     23 	print(f'TEST: {message}')
     24 	print(f'{"="*80}{Colors.ENDC}\n')
     25 
     26 
     27 def print_success(message: str) -> None:
     28 	print(f'{Colors.GREEN}✓ {message}{Colors.ENDC}')
     29 
     30 
     31 def print_info(message: str) -> None:
     32 	print(f"{Colors.CYAN}ℹ {message}{Colors.ENDC}")
     33 
     34 
     35 def print_warning(message: str) -> None:
     36 	print(f"{Colors.YELLOW}! {message}{Colors.ENDC}")
     37 
     38 
     39 class TestIPSharder(unittest.TestCase):
     40 	@classmethod
     41 	def setUpClass(cls):
     42 		print_header('Setting up test environment')
     43 		cls.test_cidr = '192.0.0.0/16'  # 65,536 IPs
     44 		cls.test_seed = 12345
     45 		cls.total_shards = 4
     46 
     47 		# Calculate expected IPs
     48 		network = ipaddress.ip_network(cls.test_cidr)
     49 		cls.all_ips = {str(ip) for ip in network}
     50 		print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs")
     51 
     52 
     53 	def test_ip_range_initialization(self):
     54 		print_header('Testing IPRange initialization')
     55 		start_time = time.perf_counter()
     56 
     57 		ip_range = IPRange(self.test_cidr)
     58 		self.assertEqual(ip_range.total, 65536)
     59 
     60 		first_ip = ip_range.get_ip_at_index(0)
     61 		last_ip = ip_range.get_ip_at_index(ip_range.total - 1)
     62 
     63 		elapsed = time.perf_counter() - start_time
     64 		print_success(f'IP range initialization completed in {elapsed:.6f}s')
     65 		print_info(f'IP range spans from {first_ip} to {last_ip}')
     66 		print_info(f'Total IPs in range: {ip_range.total:,}')
     67 
     68 
     69 	def test_lcg_sequence(self):
     70 		print_header('Testing LCG sequence generation')
     71 
     72 		# Test sequence generation speed
     73 		lcg = LCG(seed=self.test_seed)
     74 		iterations = 1_000_000
     75 
     76 		start_time = time.perf_counter()
     77 		for _ in range(iterations):
     78 			lcg.next()
     79 		elapsed = time.perf_counter() - start_time
     80 
     81 		print_success(f'Generated {iterations:,} random numbers in {elapsed:.6f}s')
     82 		print_info(f'Average time per number: {(elapsed/iterations)*1000000:.2f} microseconds')
     83 
     84 		# Test deterministic behavior
     85 		lcg1 = LCG(seed=self.test_seed)
     86 		lcg2 = LCG(seed=self.test_seed)
     87 
     88 		start_time = time.perf_counter()
     89 		for _ in range(1000):
     90 			self.assertEqual(lcg1.next(), lcg2.next())
     91 		elapsed = time.perf_counter() - start_time
     92 
     93 		print_success(f'Verified LCG determinism in {elapsed:.6f}s')
     94 
     95 
     96 	def test_shard_distribution(self):
     97 		print_header('Testing shard distribution and randomness')
     98 
     99 		# Test distribution across shards
    100 		sample_size = 65_536  # Full size for /16
    101 		shard_counts = {i: 0 for i in range(1, self.total_shards + 1)}  # 1-based sharding
    102 		unique_ips = set()
    103 		duplicate_count = 0
    104 
    105 		start_time = time.perf_counter()
    106 
    107 		# Collect IPs from each shard
    108 		for shard in range(1, self.total_shards + 1):  # 1-based sharding
    109 			ip_gen = ip_stream(self.test_cidr, shard, self.total_shards, self.test_seed)
    110 			shard_unique = set()
    111 
    112 			# Get all IPs from this shard
    113 			for ip in ip_gen:
    114 				if ip in unique_ips:
    115 					duplicate_count += 1
    116 				else:
    117 					unique_ips.add(ip)
    118 					shard_unique.add(ip)
    119 
    120 			shard_counts[shard] = len(shard_unique)
    121 
    122 		elapsed = time.perf_counter() - start_time
    123 
    124 		# Print distribution statistics
    125 		print_success(f'Generated {len(unique_ips):,} IPs in {elapsed:.6f}s')
    126 		print_info(f'Average time per IP: {(elapsed/len(unique_ips))*1000000:.2f} microseconds')
    127 		print_info(f'Unique IPs generated: {len(unique_ips):,}')
    128 
    129 		if duplicate_count > 0:
    130 			print_warning(f'Duplicates found: {duplicate_count:,} ({(duplicate_count/len(unique_ips))*100:.2f}%)')
    131 
    132 		expected_per_shard = sample_size // self.total_shards
    133 		for shard, count in shard_counts.items():
    134 			deviation = abs(count - expected_per_shard) / expected_per_shard * 100
    135 			print_info(f'Shard {shard}: {count:,} unique IPs ({deviation:.2f}% deviation from expected)')
    136 
    137 		# Test randomness by checking sequential patterns
    138 		ips_list = sorted([int(ipaddress.ip_address(ip)) for ip in list(unique_ips)[:1000]])
    139 		sequential_count = sum(1 for i in range(len(ips_list)-1) if ips_list[i] + 1 == ips_list[i+1])
    140 		sequential_percentage = (sequential_count / (len(ips_list)-1)) * 100
    141 
    142 		print_info(f'Sequential IP pairs in first 1000: {sequential_percentage:.2f}% (lower is more random)')
    143 
    144 
    145 
    146 if __name__ == '__main__':
    147 	print(f"\n{Colors.CYAN}{'='*80}")
    148 	print(f"Starting IP Sharder Tests - Testing with 65,536 IPs (/16 network)")
    149 	print(f"{'='*80}{Colors.ENDC}\n")
    150 	unittest.main(verbosity=2)