diff --git a/pylcg/__init__.py b/pylcg/__init__.py
@@ -1,5 +1,5 @@
from .core import LCG, IPRange, ip_stream
-__version__ = "1.0.0"
+__version__ = "1.0.3"
__author__ = "acidvegas"
__all__ = ["LCG", "IPRange", "ip_stream"]
\ No newline at end of file
diff --git a/pylcg/cli.py b/pylcg/cli.py
@@ -6,7 +6,8 @@ def main():
parser.add_argument('cidr', help='Target IP range in CIDR format')
parser.add_argument('--shard-num', type=int, default=1, help='Shard number (1-based)')
parser.add_argument('--total-shards', type=int, default=1, help='Total number of shards (default: 1, no sharding)')
- parser.add_argument('--seed', type=int, default=0, help='Random seed for LCG')
+ parser.add_argument('--seed', type=int, required=True, help='Random seed for LCG (required)')
+ parser.add_argument('--state', type=int, help='Resume from specific LCG state (must be used with same seed)')
args = parser.parse_args()
@@ -19,7 +20,10 @@ def main():
if args.shard_num < 1:
raise ValueError('Shard number must be at least 1')
- for ip in ip_stream(args.cidr, args.shard_num, args.total_shards, args.seed):
+ if args.state is not None and not args.seed:
+ raise ValueError('When using --state, you must provide the same --seed that was used originally')
+
+ for ip in ip_stream(args.cidr, args.shard_num, args.total_shards, args.seed, args.state):
print(ip)
if __name__ == '__main__':
diff --git a/pylcg/core.py b/pylcg/core.py
@@ -38,7 +38,7 @@ class IPRange:
return str(ipaddress.ip_address(self.start + index))
-def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int = 0):
+def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int = 0, state: int = None):
'''
Stream random IPs from the CIDR range. Optionally supports sharding.
Each IP in the range will be yielded exactly once in a pseudo-random order.
@@ -47,6 +47,7 @@ def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int =
:param shard_num: Shard number (1-based), defaults to 1
:param total_shards: Total number of shards, defaults to 1 (no sharding)
:param seed: Random seed for LCG (default: random)
+ :param state: Resume from specific LCG state (default: None)
'''
# Convert to 0-based indexing internally
@@ -61,6 +62,10 @@ def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int =
# Initialize LCG
lcg = LCG(seed + shard_index)
+
+ # Set LCG state if provided
+ if state is not None:
+ lcg.current = state
# Calculate how many IPs this shard should generate
shard_size = ip_range.total // total_shards
@@ -77,3 +82,7 @@ def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int =
if total_shards == 1 or index % total_shards == shard_index:
yield ip_range.get_ip_at_index(index)
remaining -= 1
+ # Save state every 1000 IPs
+ if remaining % 1000 == 0:
+ from .state import save_state
+ save_state(seed, cidr, shard_num, total_shards, lcg.current)
diff --git a/pylcg/state.py b/pylcg/state.py
@@ -0,0 +1,20 @@
+import os
+import tempfile
+
+def save_state(seed: int, cidr: str, shard: int, total: int, lcg_current: int):
+ '''
+ Save LCG state to temp file
+
+ :param seed: Random seed for LCG
+ :param cidr: Target IP range in CIDR format
+ :param shard: Shard number (1-based)
+ :param total: Total number of shards
+ :param lcg_current: Current LCG state
+ '''
+
+ file_name = f'pylcg_{seed}_{cidr.replace("/", "_")}_{shard}_{total}.state'
+
+ state_file = os.path.join(tempfile.gettempdir(), file_name)
+
+ with open(state_file, 'w') as f:
+ f.write(str(lcg_current))
+\ No newline at end of file
diff --git a/setup.py b/setup.py
@@ -5,7 +5,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
setup(
name="pylcg",
- version="1.0.2",
+ version="1.0.3",
author="acidvegas",
author_email="acid.vegas@acid.vegas",
description="Linear Congruential Generator for IP Sharding",
diff --git a/unit_test.py b/unit_test.py
@@ -1,9 +1,10 @@
-#!/usr/bin/env python3
import unittest
import ipaddress
import time
+
from pylcg import IPRange, ip_stream, LCG
+
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
@@ -17,15 +18,19 @@ def print_header(message: str) -> None:
print(f'TEST: {message}')
print(f'{"="*80}{Colors.ENDC}\n')
+
def print_success(message: str) -> None:
print(f'{Colors.GREEN}✓ {message}{Colors.ENDC}')
+
def print_info(message: str) -> None:
print(f"{Colors.CYAN}ℹ {message}{Colors.ENDC}")
+
def print_warning(message: str) -> None:
print(f"{Colors.YELLOW}! {message}{Colors.ENDC}")
+
class TestIPSharder(unittest.TestCase):
@classmethod
def setUpClass(cls):
@@ -39,6 +44,7 @@ class TestIPSharder(unittest.TestCase):
cls.all_ips = {str(ip) for ip in network}
print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs")
+
def test_ip_range_initialization(self):
print_header('Testing IPRange initialization')
start_time = time.perf_counter()
@@ -54,6 +60,7 @@ class TestIPSharder(unittest.TestCase):
print_info(f'IP range spans from {first_ip} to {last_ip}')
print_info(f'Total IPs in range: {ip_range.total:,}')
+
def test_lcg_sequence(self):
print_header('Testing LCG sequence generation')
@@ -80,6 +87,7 @@ class TestIPSharder(unittest.TestCase):
print_success(f'Verified LCG determinism in {elapsed:.6f}s')
+
def test_shard_distribution(self):
print_header('Testing shard distribution and randomness')
@@ -128,6 +136,8 @@ class TestIPSharder(unittest.TestCase):
print_info(f'Sequential IP pairs in first 1000: {sequential_percentage:.2f}% (lower is more random)')
+
+
if __name__ == '__main__':
print(f"\n{Colors.CYAN}{'='*80}")
print(f"Starting IP Sharder Tests - Testing with 65,536 IPs (/16 network)")
| | | | | |