eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
eris.py (11602B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 4 import argparse 5 import logging 6 import os 7 import stat 8 import time 9 import sys 10 11 sys.dont_write_bytecode = True 12 13 try: 14 from elasticsearch import Elasticsearch, helpers 15 from elasticsearch.exceptions import NotFoundError 16 except ImportError: 17 raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)') 18 19 # Setting up logging 20 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d %I:%M:%S') 21 22 23 class ElasticIndexer: 24 def __init__(self, args: argparse.Namespace): 25 ''' 26 Initialize the Elastic Search indexer. 27 28 :param args: Parsed arguments from argparse 29 ''' 30 31 self.chunk_max = args.chunk_max * 1024 * 1024 # MB 32 self.chunk_size = args.chunk_size 33 self.chunk_threads = args.chunk_threads 34 self.dry_run = args.dry_run 35 self.es_index = args.index 36 37 if not args.dry_run: 38 es_config = { 39 'hosts': [f'{args.host}:{args.port}'], 40 'verify_certs': args.self_signed, 41 'ssl_show_warn': args.self_signed, 42 'request_timeout': args.timeout, 43 'max_retries': args.retries, 44 'retry_on_timeout': True, 45 'sniff_on_start': False, 46 'sniff_on_node_failure': True, 47 'min_delay_between_sniffing': 60 # Add config option for this? 48 } 49 50 if args.api_key: 51 es_config['headers'] = {'Authorization': f'ApiKey {args.api_key}'} 52 else: 53 es_config['basic_auth'] = (args.user, args.password) 54 55 # Patching the Elasticsearch client to fix a bug with sniffing (https://github.com/elastic/elasticsearch-py/issues/2005#issuecomment-1645641960) 56 import sniff_patch 57 self.es = sniff_patch.init_elasticsearch(**es_config) 58 59 # Remove the above and uncomment the below if the bug is fixed in the Elasticsearch client: 60 #self.es = Elasticsearch(**es_config) 61 62 63 def create_index(self, map_body: dict, pipeline: str = '', replicas: int = 1, shards: int = 1, ): 64 ''' 65 Create the Elasticsearch index with the defined mapping. 66 67 :param pipline: Name of the ingest pipeline to use for the index 68 :param replicas: Number of replicas for the index 69 :param shards: Number of shards for the index 70 ''' 71 72 if self.es.indices.exists(index=self.es_index): 73 logging.info(f'Index \'{self.es_index}\' already exists.') 74 return 75 76 mapping = map_body 77 78 mapping['settings'] = { 79 'number_of_shards': shards, 80 'number_of_replicas': replicas 81 } 82 83 if pipeline: 84 try: 85 self.es.ingest.get_pipeline(id=pipeline) 86 logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'') 87 mapping['settings']['index.default_pipeline'] = pipeline 88 except NotFoundError: 89 raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.') 90 91 response = self.es.indices.create(index=self.es_index, body=mapping) 92 93 if response.get('acknowledged') and response.get('shards_acknowledged'): 94 logging.info(f'Index \'{self.es_index}\' successfully created.') 95 else: 96 raise Exception(f'Failed to create index. ({response})') 97 98 99 def get_cluster_health(self) -> dict: 100 '''Get the health of the Elasticsearch cluster.''' 101 102 return self.es.cluster.health() 103 104 105 def get_cluster_size(self) -> int: 106 '''Get the number of nodes in the Elasticsearch cluster.''' 107 108 cluster_stats = self.es.cluster.stats() 109 number_of_nodes = cluster_stats['nodes']['count']['total'] 110 111 return number_of_nodes 112 113 114 def bulk_index(self, documents: list, file_path: str, count: int): 115 ''' 116 Index a batch of documents to Elasticsearch. 117 118 :param documents: List of documents to index 119 :param file_path: Path to the file being indexed 120 :param count: Total number of records processed 121 ''' 122 123 remaining_documents = documents 124 125 parallel_bulk_config = { 126 'client': self.es, 127 'chunk_size': self.chunk_size, 128 'max_chunk_bytes': self.chunk_max, 129 'thread_count': self.chunk_threads, 130 'queue_size': 2 # Add config option for this? 131 } 132 133 while remaining_documents: 134 failed_documents = [] 135 136 try: 137 for success, response in helpers.parallel_bulk(actions=remaining_documents, **parallel_bulk_config): 138 if not success: 139 failed_documents.append(response) 140 141 if not failed_documents: 142 ingested = parallel_bulk_config['chunk_size'] * parallel_bulk_config['thread_count'] 143 logging.info(f'Successfully indexed {ingested:,} ({count:,} processed) records to {self.es_index} from {file_path}') 144 break 145 146 else: 147 logging.warning(f'Failed to index {len(failed_documents):,} failed documents! Retrying...') 148 remaining_documents = failed_documents 149 except Exception as e: 150 logging.error(f'Failed to index documents! ({e})') 151 time.sleep(30) # Should we add a config option for this? 152 153 154 def process_file(self, file_path: str, batch_size: int, ingest_function: callable): 155 ''' 156 Read and index records in batches to Elasticsearch. 157 158 :param file_path: Path to the file 159 :param batch_size: Number of records to index per batch 160 :param ingest_function: Function to process the file 161 ''' 162 163 count = 0 164 records = [] 165 166 for processed in ingest_function(file_path): 167 168 if not processed: 169 break 170 171 if self.dry_run: 172 print(processed) 173 continue 174 175 struct = {'_index': self.es_index, '_source': processed} 176 records.append(struct) 177 count += 1 178 179 if len(records) >= batch_size: 180 self.bulk_index(records, file_path, count) 181 records = [] 182 183 if records: 184 self.bulk_index(records, file_path, count) 185 186 187 def main(): 188 '''Main function when running this script directly.''' 189 190 parser = argparse.ArgumentParser(description='Index data into Elasticsearch.') 191 192 # General arguments 193 parser.add_argument('input_path', help='Path to the input file or directory') # Required 194 parser.add_argument('--dry-run', action='store_true', help='Dry run (do not index records to Elasticsearch)') 195 parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing') 196 197 # Elasticsearch arguments 198 parser.add_argument('--host', default='localhost', help='Elasticsearch host') 199 parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port') 200 parser.add_argument('--user', default='elastic', help='Elasticsearch username') 201 parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)') 202 parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)') 203 parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates') 204 205 # Elasticsearch indexing arguments 206 parser.add_argument('--index', help='Elasticsearch index name') 207 parser.add_argument('--pipeline', help='Use an ingest pipeline for the index') 208 parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index') 209 parser.add_argument('--shards', type=int, default=3, help='Number of shards for the index') 210 211 # Performance arguments 212 parser.add_argument('--chunk-max', type=int, default=10, help='Maximum size in MB of a chunk') 213 parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk') 214 parser.add_argument('--chunk-threads', type=int, default=3, help='Number of threads to use when indexing in chunks') 215 parser.add_argument('--retries', type=int, default=60, help='Number of times to retry indexing a chunk before failing') 216 parser.add_argument('--timeout', type=int, default=30, help='Number of seconds to wait before retrying a chunk') 217 218 # Ingestion arguments 219 parser.add_argument('--httpx', action='store_true', help='Index Httpx records') 220 parser.add_argument('--masscan', action='store_true', help='Index Masscan records') 221 parser.add_argument('--massdns', action='store_true', help='Index Massdns records') 222 parser.add_argument('--zone', action='store_true', help='Index Zone records') 223 224 args = parser.parse_args() 225 226 if args.watch: 227 if not os.path.exists(args.input_path): 228 os.mkfifo(args.input_path) 229 elif os.path.exists(args.input_path) and stat.S_ISFIFO(os.stat(args.input_path).st_mode): 230 raise ValueError(f'Path {args.input_path} is not a FIFO') 231 elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path): 232 raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory') 233 234 edx = ElasticIndexer(args) 235 236 if args.httpx: 237 from ingestors import ingest_httpx as ingestor 238 elif args.masscan: 239 from ingestors import ingest_masscan as ingestor 240 elif args.massdns: 241 from ingestors import ingest_massdns as ingestor 242 elif args.zone: 243 from ingestors import ingest_zone as ingestor 244 245 batch_size = 0 246 247 if not args.dry_run: 248 print(edx.get_cluster_health()) 249 250 time.sleep(3) # Delay to allow time for sniffing to complete 251 252 nodes = edx.get_cluster_size() 253 logging.info(f'Connected to {nodes:,} Elasticsearch node(s)') 254 255 if not edx.es_index: 256 edx.es_index = ingestor.default_index 257 258 map_body = ingestor.construct_map() 259 edx.create_index(map_body, args.pipeline, args.replicas, args.shards) 260 261 batch_size = int(nodes * (args.chunk_size * args.chunk_threads)) 262 263 if os.path.isfile(args.input_path): 264 logging.info(f'Processing file: {args.input_path}') 265 edx.process_file(args.input_path, batch_size, ingestor.process_file) 266 267 elif stat.S_ISFIFO(os.stat(args.input_path).st_mode): 268 logging.info(f'Watching FIFO: {args.input_path}') 269 edx.process_file(args.input_path, batch_size, ingestor.process_file) 270 271 elif os.path.isdir(args.input_path): 272 count = 1 273 total = len(os.listdir(args.input_path)) 274 logging.info(f'Processing {total:,} files in directory: {args.input_path}') 275 for file in sorted(os.listdir(args.input_path)): 276 file_path = os.path.join(args.input_path, file) 277 if os.path.isfile(file_path): 278 logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}') 279 edx.process_file(file_path, batch_size, ingestor.process_file) 280 count += 1 281 else: 282 logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}') 283 284 285 286 if __name__ == '__main__': 287 main()