eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

eris.py (10447B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # eris.py
      4 
      5 import asyncio
      6 import argparse
      7 import logging
      8 import os
      9 import stat
     10 import sys
     11 
     12 sys.dont_write_bytecode = True
     13 
     14 try:
     15     # This is commented out because there is a bug with the elasticsearch library that requires a patch (see initialize() method below)
     16     #from elasticsearch import AsyncElasticsearch 
     17     from elasticsearch.exceptions import NotFoundError
     18     from elasticsearch.helpers import async_streaming_bulk
     19 except ImportError:
     20     raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
     21 
     22 # Setting up logging
     23 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S')
     24 
     25 
     26 class ElasticIndexer:
     27     def __init__(self, args: argparse.Namespace):
     28         '''
     29         Initialize the Elastic Search indexer.
     30 
     31         :param args: Parsed arguments from argparse
     32         '''
     33 
     34         self.chunk_max  = args.chunk_max * 1024 * 1024 # MB
     35         self.chunk_size = args.chunk_size
     36         self.es = None
     37         self.es_index = args.index
     38 
     39         self.es_config = {
     40             'hosts': [f'{args.host}:{args.port}'],
     41             'verify_certs': args.self_signed,
     42             'ssl_show_warn': args.self_signed,
     43             'request_timeout': args.timeout,
     44             'max_retries': args.retries,
     45             'retry_on_timeout': True,
     46             'sniff_on_start': True, # Is this problematic? 
     47             'sniff_on_node_failure': True,
     48             'min_delay_between_sniffing': 60 # Add config option for this?
     49         }
     50 
     51         if args.api_key:
     52             self.es_config['api_key'] = (args.api_key, '') # Verify this is correct
     53         else:
     54             self.es_config['basic_auth'] = (args.user, args.password)
     55             
     56         
     57     async def initialize(self):
     58         '''Initialize the Elasticsearch client.'''
     59 
     60         # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960)
     61         import sniff_patch
     62         self.es = sniff_patch.init_elasticsearch(**self.es_config)
     63 
     64         # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client:
     65         #self.es = AsyncElasticsearch(**es_config)
     66 
     67 
     68     async def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1):
     69         '''
     70         Create the Elasticsearch index with the defined mapping.
     71         
     72         :param map_body: Mapping for the index
     73         :param pipeline: Name of the ingest pipeline to use for the index
     74         :param replicas: Number of replicas for the index
     75         :param shards: Number of shards for the index
     76         '''
     77 
     78         if await self.es.indices.exists(index=self.es_index):
     79             logging.info(f'Index \'{self.es_index}\' already exists.')
     80             return
     81 
     82         mapping = map_body
     83 
     84         mapping['settings'] = {
     85             'number_of_shards': shards,
     86             'number_of_replicas': replicas
     87         }
     88 
     89         if pipeline:
     90             try:
     91                 await self.es.ingest.get_pipeline(id=pipeline)
     92                 logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
     93                 mapping['settings']['index.default_pipeline'] = pipeline
     94             except NotFoundError:
     95                 raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
     96 
     97         response = await self.es.indices.create(index=self.es_index, body=mapping)
     98 
     99         if response.get('acknowledged') and response.get('shards_acknowledged'):
    100             logging.info(f'Index \'{self.es_index}\' successfully created.')
    101         else:
    102             raise Exception(f'Failed to create index. ({response})')
    103 
    104 
    105     async def get_cluster_health(self) -> dict:
    106         '''Get the health of the Elasticsearch cluster.'''
    107 
    108         return await self.es.cluster.health()
    109     
    110 
    111     async def get_cluster_size(self) -> int:
    112         '''Get the number of nodes in the Elasticsearch cluster.'''
    113 
    114         cluster_stats = await self.es.cluster.stats()
    115         number_of_nodes = cluster_stats['nodes']['count']['total']
    116 
    117         return number_of_nodes
    118 
    119 
    120     async def process_data(self, file_path: str, data_generator: callable):
    121         '''
    122         Index records in chunks to Elasticsearch.
    123 
    124         :param file_path: Path to the file
    125         :param index_name: Name of the index
    126         :param data_generator: Generator for the records to index
    127         '''
    128 
    129         count = 0
    130         total = 0
    131         
    132         async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
    133             action, result = result.popitem()
    134 
    135             if not ok:
    136                 logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
    137                 input('Press Enter to continue...') # Debugging (will possibly remove this since we have retries enabled)
    138                 continue
    139 
    140             count += 1
    141             total += 1
    142 
    143             if count == self.chunk_size:
    144                 logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
    145                 count = 0
    146 
    147         logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
    148 
    149 
    150 async def main():
    151     '''Main function when running this script directly.'''
    152 
    153     parser = argparse.ArgumentParser(description='Index data into Elasticsearch.')
    154     
    155     # General arguments
    156     parser.add_argument('input_path', help='Path to the input file or directory') # Required
    157     parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
    158     
    159     # Elasticsearch arguments
    160     parser.add_argument('--host', default='http://localhost/', help='Elasticsearch host')
    161     parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
    162     parser.add_argument('--user', default='elastic', help='Elasticsearch username')
    163     parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
    164     parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
    165     parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
    166 
    167     # Elasticsearch indexing arguments
    168     parser.add_argument('--index', help='Elasticsearch index name')
    169     parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
    170     parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
    171     parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
    172     
    173     # Performance arguments
    174     parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
    175     parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
    176     parser.add_argument('--retries', type=int, default=100, help='Number of times to retry indexing a chunk before failing')
    177     parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
    178 
    179     # Ingestion arguments
    180     parser.add_argument('--cert', action='store_true', help='Index Certstream records')
    181     parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
    182     parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
    183     parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
    184     parser.add_argument('--zone', action='store_true', help='Index Zone records')
    185 
    186     args = parser.parse_args()
    187 
    188     if args.watch:
    189         if not os.path.exists(args.input_path):
    190             os.mkfifo(args.input_path)
    191         elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    192             raise ValueError(f'Path {args.input_path} is not a FIFO')
    193     elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
    194         raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
    195 
    196     edx = ElasticIndexer(args)
    197     await edx.initialize() # Initialize the Elasticsearch client asyncronously
    198 
    199     if args.cert:
    200         from ingestors import ingest_certs   as ingestor
    201     if args.httpx:
    202         from ingestors import ingest_httpx   as ingestor
    203     elif args.masscan:
    204         from ingestors import ingest_masscan as ingestor
    205     elif args.massdns:
    206         from ingestors import ingest_massdns as ingestor
    207     elif args.zone:
    208         from ingestors import ingest_zone    as ingestor
    209     
    210     health = await edx.get_cluster_health()
    211     print(health)
    212 
    213     await asyncio.sleep(5) # Delay to allow time for sniffing to complete
    214     
    215     nodes = await edx.get_cluster_size()
    216     logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
    217 
    218     if not edx.es_index:
    219         edx.es_index = ingestor.default_index
    220 
    221     map_body = ingestor.construct_map()
    222     await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
    223     
    224     if os.path.isfile(args.input_path):
    225         logging.info(f'Processing file: {args.input_path}')
    226         await edx.process_data(args.input_path, ingestor.process_data)
    227 
    228     elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    229         logging.info(f'Watching FIFO: {args.input_path}')
    230         await edx.process_data(args.input_path, ingestor.process_data)
    231 
    232     elif os.path.isdir(args.input_path):
    233         count = 1
    234         total = len(os.listdir(args.input_path))
    235         logging.info(f'Processing {total:,} files in directory: {args.input_path}')
    236         for file in sorted(os.listdir(args.input_path)):
    237             file_path = os.path.join(args.input_path, file)
    238             if os.path.isfile(file_path):
    239                 logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
    240                 await edx.process_data(file_path, ingestor.process_data)
    241                 count += 1
    242             else:
    243                 logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
    244 
    245 
    246 
    247 if __name__ == '__main__':
    248     asyncio.run(main())