eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

eris.py (11609B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # eris.py
      4 
      5 import asyncio
      6 import argparse
      7 import logging
      8 import logging.handlers
      9 import os
     10 import stat
     11 import sys
     12 
     13 sys.dont_write_bytecode = True
     14 
     15 try:
     16 	from elasticsearch            import AsyncElasticsearch
     17 	from elasticsearch.exceptions import NotFoundError
     18 	from elasticsearch.helpers    import async_streaming_bulk
     19 except ImportError:
     20 	raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
     21 
     22 
     23 class ElasticIndexer:
     24 	def __init__(self, args: argparse.Namespace):
     25 		'''
     26 		Initialize the Elastic Search indexer.
     27 
     28 		:param args: Parsed arguments from argparse
     29 		'''
     30 
     31 		self.chunk_max  = args.chunk_max * 1024 * 1024 # MB
     32 		self.chunk_size = args.chunk_size
     33 		self.es_index   = args.index
     34 
     35 		# Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005)
     36 		es_config = {
     37 			#'hosts'               : [f'{args.host}:{args.port}'],
     38 			'hosts'                : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing
     39 			'verify_certs'         : args.self_signed,
     40 			'ssl_show_warn'        : args.self_signed,
     41 			'request_timeout'      : args.timeout,
     42 			'max_retries'          : args.retries,
     43 			'retry_on_timeout'     : True,
     44 			'http_compress'        : True,
     45 			'connections_per_node' : 3 # Experiment with this value
     46 			#'sniff_on_start': True,
     47 			#'sniff_on_node_failure': True,
     48 			#'min_delay_between_sniffing': 60
     49 		}
     50 
     51 		if args.api_key:
     52 			es_config['api_key'] = (args.api_key, '') # Verify this is correct
     53 		else:
     54 			es_config['basic_auth'] = (args.user, args.password)
     55 
     56 		self.es = AsyncElasticsearch(**es_config)
     57 
     58 
     59 	async def close_connect(self):
     60 		'''Close the Elasticsearch connection.'''
     61 
     62 		await self.es.close()
     63 
     64 
     65 	async def create_index(self, map_body: dict, pipeline: str = None, replicas: int = 1, shards: int = 1):
     66 		'''
     67 		Create the Elasticsearch index with the defined mapping.
     68 
     69 		:param map_body: Mapping for the index
     70 		:param pipeline: Name of the ingest pipeline to use for the index
     71 		:param replicas: Number of replicas for the index
     72 		:param shards: Number of shards for the index
     73 		'''
     74 
     75 		if await self.es.indices.exists(index=self.es_index):
     76 			logging.info(f'Index \'{self.es_index}\' already exists.')
     77 			return
     78 
     79 		mapping = map_body
     80 
     81 		mapping['settings'] = {
     82 			'number_of_shards'   : shards,
     83 			'number_of_replicas' : replicas
     84 		}
     85 
     86 		if pipeline:
     87 			try:
     88 				await self.es.ingest.get_pipeline(id=pipeline)
     89 				logging.info(f'Using ingest pipeline \'{pipeline}\' for index \'{self.es_index}\'')
     90 				mapping['settings']['index.default_pipeline'] = pipeline
     91 			except NotFoundError:
     92 				raise ValueError(f'Ingest pipeline \'{pipeline}\' does not exist.')
     93 
     94 		response = await self.es.indices.create(index=self.es_index, body=mapping)
     95 
     96 		if response.get('acknowledged') and response.get('shards_acknowledged'):
     97 			logging.info(f'Index \'{self.es_index}\' successfully created.')
     98 		else:
     99 			raise Exception(f'Failed to create index. ({response})')
    100 
    101 
    102 	async def process_data(self, file_path: str, data_generator: callable):
    103 		'''
    104 		Index records in chunks to Elasticsearch.
    105 
    106 		:param file_path: Path to the file
    107 		:param index_name: Name of the index
    108 		:param data_generator: Generator for the records to index
    109 		'''
    110 
    111 		count = 0
    112 		total = 0
    113 
    114 		try:
    115 			async for ok, result in async_streaming_bulk(self.es, actions=data_generator(file_path), chunk_size=self.chunk_size, max_chunk_bytes=self.chunk_max):
    116 				action, result = result.popitem()
    117 
    118 				if not ok:
    119 					logging.error(f'Failed to index document ({result["_id"]}) to {self.es_index} from {file_path} ({result})')
    120 					continue
    121 
    122 				count += 1
    123 				total += 1
    124 
    125 				if count == self.chunk_size:
    126 					logging.info(f'Successfully indexed {self.chunk_size:,} ({total:,} processed) records to {self.es_index} from {file_path}')
    127 					count = 0
    128 
    129 			logging.info(f'Finished indexing {total:,} records to {self.es_index} from {file_path}')
    130 
    131 		except Exception as e:
    132 			raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})')
    133 
    134 
    135 def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5, ecs_format: bool = False):
    136 	'''
    137 	Setup the global logger for the application.
    138 
    139 	:param console_level: Minimum level to capture logs to the console.
    140 	:param file_level: Minimum level to capture logs to the file.
    141 	:param log_file: File to write logs to.
    142 	:param max_file_size: Maximum size of the log file before it is rotated.
    143 	:param backups: Number of backup log files to keep.
    144 	:param ecs_format: Use the Elastic Common Schema (ECS) format for logs.
    145 	'''
    146 
    147 	# Configure the root logger
    148 	logger = logging.getLogger()
    149 	logger.setLevel(logging.DEBUG) # Minimum level to capture all logs
    150 
    151 	# Clear existing handlersaise Exception(f'Failed to fetch zone links: {e}')
    152 	logger.handlers = []
    153 
    154 	# Setup console handler
    155 	console_handler = logging.StreamHandler()
    156 	console_handler.setLevel(console_level)
    157 	console_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S')
    158 	console_handler.setFormatter(console_formatter)
    159 	logger.addHandler(console_handler)
    160 
    161 	# Setup rotating file handler if file logging is enabled
    162 	if file_level is not None:
    163 		file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups)
    164 		file_handler.setLevel(file_level)
    165   
    166 		# Setup formatter to use ECS format if enabled or default format
    167 		if ecs_format:
    168 			try:
    169 				from ecs_logging import StdlibFormatter
    170 			except ImportError:
    171 				raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)')
    172 			file_formatter = StdlibFormatter() # ECS formatter
    173 		else:
    174 			file_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %H:%M:%S')
    175         
    176 		file_handler.setFormatter(file_formatter)
    177 		logger.addHandler(file_handler)
    178 
    179 
    180 async def main():
    181 	'''Main function when running this script directly.'''
    182 
    183 	parser = argparse.ArgumentParser(description='Elasticsearch Recon Ingestion Scripts (ERIS)')
    184 
    185 	# General arguments
    186 	parser.add_argument('input_path', help='Path to the input file or directory') # Required
    187 	parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
    188 	parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)')
    189 	parser.add_argument('--ecs', action='store_true', default=False, help='Use the Elastic Common Schema (ECS) for logging')
    190 
    191 	# Elasticsearch arguments
    192 	parser.add_argument('--host', default='http://localhost', help='Elasticsearch host')
    193 	parser.add_argument('--port', type=int, default=9200, help='Elasticsearch port')
    194 	parser.add_argument('--user', default='elastic', help='Elasticsearch username')
    195 	parser.add_argument('--password', default=os.getenv('ES_PASSWORD'), help='Elasticsearch password (if not provided, check environment variable ES_PASSWORD)')
    196 	parser.add_argument('--api-key', default=os.getenv('ES_APIKEY'), help='Elasticsearch API Key for authentication (if not provided, check environment variable ES_APIKEY)')
    197 	parser.add_argument('--self-signed', action='store_false', help='Elasticsearch is using self-signed certificates')
    198 
    199 	# Elasticsearch indexing arguments
    200 	parser.add_argument('--index', help='Elasticsearch index name')
    201 	parser.add_argument('--pipeline', help='Use an ingest pipeline for the index')
    202 	parser.add_argument('--replicas', type=int, default=1, help='Number of replicas for the index')
    203 	parser.add_argument('--shards', type=int, default=1, help='Number of shards for the index')
    204 
    205 	# Performance arguments
    206 	parser.add_argument('--chunk-size', type=int, default=50000, help='Number of records to index in a chunk')
    207 	parser.add_argument('--chunk-max', type=int, default=100, help='Maximum size of a chunk in bytes')
    208 	parser.add_argument('--retries', type=int, default=30, help='Number of times to retry indexing a chunk before failing')
    209 	parser.add_argument('--timeout', type=int, default=60, help='Number of seconds to wait before retrying a chunk')
    210 
    211 	# Ingestion arguments
    212 	parser.add_argument('--certstream', action='store_true', help='Index Certstream records')
    213 	parser.add_argument('--httpx', action='store_true', help='Index Httpx records')
    214 	parser.add_argument('--masscan', action='store_true', help='Index Masscan records')
    215 	parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
    216 	parser.add_argument('--zone', action='store_true', help='Index Zone records')
    217 
    218 	args = parser.parse_args()
    219 
    220 	if args.log:
    221 		levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL}
    222 		setup_logger(file_level=levels[args.log], log_file='eris.log', ecs_format=args.ecs)
    223 	else:
    224 		setup_logger()
    225 
    226 	if args.host.endswith('/'):
    227 		args.host = args.host[:-1]
    228 
    229 	if args.watch:
    230 		if not os.path.exists(args.input_path):
    231 			os.mkfifo(args.input_path)
    232 		elif not stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    233 			raise ValueError(f'Path {args.input_path} is not a FIFO')
    234 	elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
    235 		raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
    236 
    237 	logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}')
    238 
    239 	edx = ElasticIndexer(args)
    240 
    241 	if args.certstream:
    242 		from ingestors import ingest_certstream as ingestor
    243 	elif args.httpx:
    244 		from ingestors import ingest_httpx      as ingestor
    245 	elif args.masscan:
    246 		from ingestors import ingest_masscan    as ingestor
    247 	elif args.massdns:
    248 		from ingestors import ingest_massdns    as ingestor
    249 	elif args.zone:
    250 		from ingestors import ingest_zone       as ingestor
    251 	else:
    252 		raise ValueError('No ingestor specified')
    253 
    254 	health = await edx.es.cluster.health()
    255 	logging.info(health)
    256 
    257 	#await asyncio.sleep(5) # Delay to allow time for sniffing to complete (Sniffer temporarily disabled)
    258 
    259 	if not edx.es_index:
    260 		edx.es_index = ingestor.default_index
    261 
    262 	map_body = ingestor.construct_map()
    263 	await edx.create_index(map_body, args.pipeline, args.replicas, args.shards)
    264 
    265 	if os.path.isfile(args.input_path):
    266 		logging.info(f'Processing file: {args.input_path}')
    267 		await edx.process_data(args.input_path, ingestor.process_data)
    268 
    269 	elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
    270 		logging.info(f'Watching FIFO: {args.input_path}')
    271 		await edx.process_data(args.input_path, ingestor.process_data)
    272 
    273 	elif os.path.isdir(args.input_path):
    274 		count = 1
    275 		total = len(os.listdir(args.input_path))
    276 		logging.info(f'Processing {total:,} files in directory: {args.input_path}')
    277 		for file in sorted(os.listdir(args.input_path)):
    278 			file_path = os.path.join(args.input_path, file)
    279 			if os.path.isfile(file_path):
    280 				logging.info(f'[{count:,}/{total:,}] Processing file: {file_path}')
    281 				await edx.process_data(file_path, ingestor.process_data)
    282 				count += 1
    283 			else:
    284 				logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
    285 
    286 	await edx.close_connect() # Close the Elasticsearch connection to stop "Unclosed client session" warnings
    287 
    288 
    289 
    290 if __name__ == '__main__':
    291 	print('')
    292 	print('┏┓┳┓┳┏┓   Elasticsearch Recon Ingestion Scripts')
    293 	print('┣ ┣┫┃┗┓        Developed by Acidvegas in Python')
    294 	print('┗┛┛┗┻┗┛             https://git.acid.vegas/eris')
    295 	print('')
    296 	asyncio.run(main())