eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_firehol.py (5191B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_firehol.py 4 5 import ipaddress 6 import logging 7 import os 8 import time 9 import re 10 11 try: 12 import git 13 except ImportError: 14 raise ImportError('Missing required libraries. (pip install gitpython)') 15 16 17 # Set a default elasticsearch index if one is not provided 18 default_index = 'eris-firehol' 19 20 # Git repository settings 21 REPO_URL = 'https://github.com/firehol/blocklist-ipsets.git' 22 REPO_PATH = os.path.join('data', 'firehol-blocklist') # Local path to store the repo 23 24 # File suffixes to ignore 25 IGNORES = ('_1d', '_7d', '_30d', '_90d', '_180d', '_365d', '_730d') 26 27 28 def construct_map() -> dict: 29 '''Construct the Elasticsearch index mapping for Firehol records.''' 30 31 mapping = { 32 'mappings': { 33 'properties': { 34 'ip' : { 'type': 'ip_range' }, 35 'ipsets' : { 'type': 'keyword' }, 36 'categories' : { 'type': 'keyword' }, 37 'seen' : { 'type': 'date' }, 38 } 39 } 40 } 41 42 return mapping 43 44 45 def update_repo(): 46 '''Update the repository locally.''' 47 48 # If the repository doesn't exist, clone it 49 if not os.path.exists(REPO_PATH): 50 logging.info(f'Cloning repository to {REPO_PATH}...') 51 52 # Create the directory if it doesn't exist 53 os.makedirs(os.path.dirname(REPO_PATH), exist_ok=True) 54 55 # Clone the repository 56 git.Repo.clone_from(REPO_URL, REPO_PATH) 57 else: 58 # If the repository already exists, update it 59 repo = git.Repo(REPO_PATH) 60 logging.info('Updating repository...') 61 repo.remotes.origin.pull() 62 63 64 def stream_ips(file_path: str): 65 ''' 66 Stream IPs from file, skipping comments and validating each IP. 67 68 :param file_path: Path to the ipset file. 69 ''' 70 71 try: 72 # Open the file 73 with open(file_path) as f: 74 75 # Iterate over each line 76 for line in f: 77 78 # Skip comments and empty lines 79 line = line.strip() 80 if line.startswith('#') or not line: 81 continue 82 83 # Validate IP/network 84 try: 85 if not '/' in line: 86 line = f'{line}/32' 87 ipaddress.ip_network(line, strict=True) 88 except ValueError as e: 89 logging.warning(f'Invalid IP/network in {os.path.basename(file_path)}: {line} ({e})') 90 continue 91 92 # Yield the valid IP/network 93 yield line 94 95 except Exception as e: 96 logging.error(f'Error streaming IPs from {file_path}: {e}') 97 98 99 async def process_data(input_path = None): 100 ''' 101 Process Firehol ipsets and yield records for indexing. 102 103 :param input_path: Placeholder for uniformity 104 ''' 105 106 # Update the repository 107 update_repo() 108 109 # Get all files 110 files = [] 111 for filename in os.listdir(REPO_PATH): 112 if filename.endswith(('.ipset', '.netset')): 113 if any(filename.rsplit('.', 1)[0].endswith(x) for x in IGNORES): 114 logging.debug(f'Ignoring {filename} because it ends with {IGNORES}') 115 continue 116 files.append(os.path.join(REPO_PATH, filename)) 117 118 logging.info(f'Processing {len(files)} files...') 119 120 # Dictionary to store unique IPs and their metadata 121 ip_records = {} 122 123 # Process each file 124 for file_path in files: 125 logging.info(f'Processing {os.path.basename(file_path)}...') 126 127 # Get the ipset name 128 ipset_name = os.path.splitext(os.path.basename(file_path))[0] 129 130 # Extract category if present 131 category = None 132 with open(file_path) as f: 133 for line in f: 134 if match := re.search(r'^#\s*Category\s*:\s*(.+)$', line, re.IGNORECASE): 135 category = match.group(1).strip() 136 break 137 138 # Stream IPs from the file 139 for ip in stream_ips(file_path): 140 # Initialize record if IP not seen before 141 if ip not in ip_records: 142 ip_records[ip] = {'ip': ip, 'ipsets': set(), 'categories': set()} 143 144 # Update arrays 145 ip_records[ip]['ipsets'].add(ipset_name) 146 if category: 147 ip_records[ip]['categories'].add(category) 148 149 # Yield unique records with converted sets to lists 150 for ip, record in ip_records.items(): 151 # Convert sets to lists for JSON serialization 152 record['ipsets'] = list(record['ipsets']) 153 record['categories'] = list(record['categories']) 154 record['seen'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) 155 156 # Yield the document with _id set to the IP 157 yield {'_index': default_index, '_id': ip, '_source': record} 158 159 160 async def test(): 161 '''Test the ingestion process''' 162 163 async for document in process_data(): 164 print(document) 165 166 167 168 if __name__ == '__main__': 169 import asyncio 170 logging.basicConfig(level=logging.INFO) 171 asyncio.run(test())