eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_firehol.py (5191B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_firehol.py
      4 
      5 import ipaddress
      6 import logging
      7 import os
      8 import time
      9 import re
     10 
     11 try:
     12     import git
     13 except ImportError:
     14     raise ImportError('Missing required libraries. (pip install gitpython)')
     15 
     16 
     17 # Set a default elasticsearch index if one is not provided
     18 default_index = 'eris-firehol'
     19 
     20 # Git repository settings
     21 REPO_URL = 'https://github.com/firehol/blocklist-ipsets.git'
     22 REPO_PATH = os.path.join('data', 'firehol-blocklist') # Local path to store the repo
     23 
     24 # File suffixes to ignore
     25 IGNORES = ('_1d', '_7d', '_30d', '_90d', '_180d', '_365d', '_730d')
     26 
     27 
     28 def construct_map() -> dict:
     29     '''Construct the Elasticsearch index mapping for Firehol records.'''
     30 
     31     mapping = {
     32         'mappings': {
     33             'properties': {
     34                 'ip'         : { 'type': 'ip_range' },
     35                 'ipsets'     : { 'type': 'keyword'  },
     36                 'categories' : { 'type': 'keyword'  },
     37                 'seen'       : { 'type': 'date'     },
     38             }
     39         }
     40     }
     41 
     42     return mapping
     43 
     44 
     45 def update_repo():
     46     '''Update the repository locally.'''
     47 
     48     # If the repository doesn't exist, clone it
     49     if not os.path.exists(REPO_PATH):
     50         logging.info(f'Cloning repository to {REPO_PATH}...')
     51     
     52         # Create the directory if it doesn't exist
     53         os.makedirs(os.path.dirname(REPO_PATH), exist_ok=True)
     54         
     55         # Clone the repository
     56         git.Repo.clone_from(REPO_URL, REPO_PATH)
     57     else:
     58         # If the repository already exists, update it
     59         repo = git.Repo(REPO_PATH)
     60         logging.info('Updating repository...')
     61         repo.remotes.origin.pull()
     62 
     63 
     64 def stream_ips(file_path: str):
     65     '''
     66     Stream IPs from file, skipping comments and validating each IP.
     67     
     68     :param file_path: Path to the ipset file.
     69     '''
     70 
     71     try:
     72         # Open the file
     73         with open(file_path) as f:
     74 
     75             # Iterate over each line
     76             for line in f:
     77 
     78                 # Skip comments and empty lines
     79                 line = line.strip()
     80                 if line.startswith('#') or not line:
     81                     continue
     82                 
     83                 # Validate IP/network
     84                 try:
     85                     if not '/' in line:
     86                         line = f'{line}/32'
     87                     ipaddress.ip_network(line, strict=True)
     88                 except ValueError as e:
     89                     logging.warning(f'Invalid IP/network in {os.path.basename(file_path)}: {line} ({e})')
     90                     continue
     91 
     92                 # Yield the valid IP/network
     93                 yield line
     94 
     95     except Exception as e:
     96         logging.error(f'Error streaming IPs from {file_path}: {e}')
     97 
     98 
     99 async def process_data(input_path = None):
    100     '''
    101     Process Firehol ipsets and yield records for indexing.
    102     
    103     :param input_path: Placeholder for uniformity
    104     '''
    105 
    106     # Update the repository
    107     update_repo()
    108         
    109     # Get all files
    110     files = []
    111     for filename in os.listdir(REPO_PATH):
    112         if filename.endswith(('.ipset', '.netset')):
    113             if any(filename.rsplit('.', 1)[0].endswith(x) for x in IGNORES):
    114                 logging.debug(f'Ignoring {filename} because it ends with {IGNORES}')
    115                 continue
    116             files.append(os.path.join(REPO_PATH, filename))
    117 
    118     logging.info(f'Processing {len(files)} files...')
    119     
    120     # Dictionary to store unique IPs and their metadata
    121     ip_records = {}
    122     
    123     # Process each file
    124     for file_path in files:
    125         logging.info(f'Processing {os.path.basename(file_path)}...')
    126 
    127         # Get the ipset name
    128         ipset_name = os.path.splitext(os.path.basename(file_path))[0]
    129         
    130         # Extract category if present
    131         category = None
    132         with open(file_path) as f:
    133             for line in f:
    134                 if match := re.search(r'^#\s*Category\s*:\s*(.+)$', line, re.IGNORECASE):
    135                     category = match.group(1).strip()
    136                     break
    137         
    138         # Stream IPs from the file
    139         for ip in stream_ips(file_path):
    140             # Initialize record if IP not seen before
    141             if ip not in ip_records:
    142                 ip_records[ip] = {'ip': ip, 'ipsets': set(), 'categories': set()}
    143             
    144             # Update arrays
    145             ip_records[ip]['ipsets'].add(ipset_name)
    146             if category:
    147                 ip_records[ip]['categories'].add(category)
    148 
    149     # Yield unique records with converted sets to lists
    150     for ip, record in ip_records.items():
    151         # Convert sets to lists for JSON serialization
    152         record['ipsets']     = list(record['ipsets'])
    153         record['categories'] = list(record['categories'])
    154         record['seen']       = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
    155         
    156         # Yield the document with _id set to the IP
    157         yield {'_index': default_index, '_id': ip, '_source': record}
    158 
    159 
    160 async def test():
    161     '''Test the ingestion process'''
    162 
    163     async for document in process_data():
    164         print(document)
    165 
    166 
    167 
    168 if __name__ == '__main__':
    169     import asyncio
    170     logging.basicConfig(level=logging.INFO)
    171     asyncio.run(test())