eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit fe49255f69049b7e55330411806795f2960a38c4
parent 1ab7199f7db2c66b08f9dba4743dbb48f15663ce
Author: acidvegas <acid.vegas@acid.vegas>
Date: Fri, 15 Mar 2024 01:25:09 -0400

Added elastic common schema (ecs) logging to file for ingesting eris logs straight into ES

Diffstat:
MREADME.md | 12++++++++----
Meris.py | 103+++++++++++++++++++++++++++++++++++++++++--------------------------------------

2 files changed, 61 insertions(+), 54 deletions(-)

diff --git a/README.md b/README.md
@@ -6,6 +6,7 @@ The is a suite of tools to aid in the ingestion of recon data from various sourc
 ## Prerequisites
 - [python](https://www.python.org/)
     - [elasticsearch](https://pypi.org/project/elasticsearch/) *(`pip install elasticsearch`)*
+    - [ecs_logging](https://pypi.org/project/ecs-logging) *(`pip install ecs-logging`)*
     - [aiofiles](https://pypi.org/project/aiofiles) *(`pip install aiofiles`)*
     - [aiohttp](https://pypi.org/projects/aiohttp) *(`pip install aiohttp`)*
     - [websockets](https://pypi.org/project/websockets/) *(`pip install websockets`) (only required for `--certs` ingestion)*
@@ -18,10 +19,13 @@ python eris.py [options] <input>
 
 ### Options
 ###### General arguments
-| Argument     | Description                                   |
-|--------------|-----------------------------------------------|
-| `input_path` | Path to the input file or directory           |
-| `--watch`    | Create or watch a FIFO for real-time indexing |
+| Argument     | Description                                                      |
+|--------------|------------------------------------------------------------------|
+| `input_path` | Path to the input file or directory                              |
+| `--watch`    | Create or watch a FIFO for real-time indexing                    |
+| `--log`      | Logging level for file *(debug, info, warning, error, critical)* |
+
+**Note:** File logging is disabled by default. When enabled, it will log using the [Elastic Common Schema](https://www.elastic.co/guide/en/ecs-logging/python/current/intro.html) *(ECS)*.
 
 ###### Elasticsearch arguments
 | Argument        | Description                                             | Default            |
diff --git a/eris.py b/eris.py
@@ -19,6 +19,11 @@ try:
 except ImportError:
 	raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
 
+try:
+	from ecs_logging import StdlibFormatter
+except ImportError:
+	raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)')
+
 
 class ElasticIndexer:
 	def __init__(self, args: argparse.Namespace):
@@ -34,13 +39,15 @@ class ElasticIndexer:
 
 		# Sniffing disabled due to an issue with the elasticsearch 8.x client (https://github.com/elastic/elasticsearch-py/issues/2005)
 		es_config = {
-			#'hosts'           : [f'{args.host}:{args.port}'],
-			'hosts'            : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing
-			'verify_certs'     : args.self_signed,
-			'ssl_show_warn'    : args.self_signed,
-			'request_timeout'  : args.timeout,
-			'max_retries'      : args.retries,
-			'retry_on_timeout' : True
+			#'hosts'               : [f'{args.host}:{args.port}'],
+			'hosts'                : [f'{args.host}:{port}' for port in ('9002', '9003', '9004')], # Temporary alternative to sniffing
+			'verify_certs'         : args.self_signed,
+			'ssl_show_warn'        : args.self_signed,
+			'request_timeout'      : args.timeout,
+			'max_retries'          : args.retries,
+			'retry_on_timeout'     : True,
+			'http_compress'        : True,
+			'connections_per_node' : 3 # Experiment with this value
 			#'sniff_on_start': True,
 			#'sniff_on_node_failure': True,
 			#'min_delay_between_sniffing': 60
@@ -91,23 +98,6 @@ class ElasticIndexer:
 			raise Exception(f'Failed to create index. ({response})')
 
 
-	async def get_cluster_health(self) -> dict:
-		'''Get the health of the Elasticsearch cluster.'''
-		
-
-
-		return await self.es.cluster.health()
-
-
-	async def get_cluster_size(self) -> int:
-		'''Get the number of nodes in the Elasticsearch cluster.'''
-
-		cluster_stats   = await self.es.cluster.stats()
-		number_of_nodes = cluster_stats['nodes']['count']['total']
-
-		return number_of_nodes
-
-
 	async def process_data(self, file_path: str, data_generator: callable):
 		'''
 		Index records in chunks to Elasticsearch.
@@ -141,33 +131,38 @@ class ElasticIndexer:
 			raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})')
 
 
-def setup_logger(level: int = logging.INFO, to_file: bool = False, max_bytes: int = 250000, backups: int = 7) -> logging.Logger:
+def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5):
 	'''
-	Setup a custom logger with options for console and file logging.
+	Setup the global logger for the application.
 
-	:param level: Logging level.
-	:param to_file: Whether to log to a file.
-	:param max_bytes: Maximum size in bytes before rotating log file.
-	:param backups: Number of backup files to keep.
+	:param console_level: Minimum level to capture logs to the console.
+	:param file_level: Minimum level to capture logs to the file.
+	:param log_file: File to write logs to.
+	:param max_file_size: Maximum size of the log file before it is rotated.
+	:param backups: Number of backup log files to keep.
 	'''
 
+	# Configure the root logger
 	logger = logging.getLogger()
-	logger.setLevel(level)
-
-	logger.handlers.clear()
+	logger.setLevel(logging.DEBUG) # Minimum level to capture all logs
 
-	formatter_console = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S')
-	formatter_file    = logging.Formatter('%(asctime)s | %(levelname)9s | %(filename)s.%(funcName)s | %(message)s', '%Y-%m-%d %I:%M:%S')
+	# Clear existing handlers
+	logger.handlers = []
 
-	sh = logging.StreamHandler()
-	sh.setFormatter(formatter_console)
-	logger.addHandler(sh)
+	# Setup console handler
+	console_handler = logging.StreamHandler()
+	console_handler.setLevel(console_level)
+	console_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%I:%M:%S')
+	console_handler.setFormatter(console_formatter)
+	logger.addHandler(console_handler)
 
-	if to_file:
-		os.makedirs('logs', exist_ok=True)
-		fh = logging.handlers.RotatingFileHandler('logs/debug.log', maxBytes=max_bytes, backupCount=backups, encoding='utf-8')
-		fh.setFormatter(formatter_file)
-		logger.addHandler(fh)
+	# Setup rotating file handler if file logging is enabled
+	if file_level is not None:
+		file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups)
+		file_handler.setLevel(file_level)
+		ecs_formatter = StdlibFormatter() # ECS (Elastic Common Schema) formatter
+		file_handler.setFormatter(ecs_formatter)
+		logger.addHandler(file_handler)
 
 
 async def main():
@@ -178,6 +173,7 @@ async def main():
 	# General arguments
 	parser.add_argument('input_path', help='Path to the input file or directory') # Required
 	parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
+	parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)')
 
 	# Elasticsearch arguments
 	parser.add_argument('--host', default='http://localhost', help='Elasticsearch host')
@@ -206,6 +202,12 @@ async def main():
 	parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
 	parser.add_argument('--zone', action='store_true', help='Index Zone records')
 
+	if args.log:
+		levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL}
+		setup_logger(file_level=levels[args.log], log_file='eris.log')
+	else:
+		setup_logger()
+
 	args = parser.parse_args()
 
 	if args.host.endswith('/'):
@@ -219,6 +221,8 @@ async def main():
 	elif not os.path.isdir(args.input_path) and not os.path.isfile(args.input_path):
 		raise FileNotFoundError(f'Input path {args.input_path} does not exist or is not a file or directory')
 
+	logging.info(f'Connecting to Elasticsearch at {args.host}:{args.port}')
+
 	edx = ElasticIndexer(args)
 
 	if args.certstream:
@@ -237,10 +241,7 @@ async def main():
 	health = await edx.es.cluster.health()
 	logging.info(health)
 
-	await asyncio.sleep(5) # Delay to allow time for sniffing to complete
-
-	nodes = await edx.get_cluster_size()
-	logging.info(f'Connected to {nodes:,} Elasticsearch node(s)')
+	#await asyncio.sleep(5) # Delay to allow time for sniffing to complete (Sniffer temporarily disabled)
 
 	if not edx.es_index:
 		edx.es_index = ingestor.default_index
@@ -252,6 +253,10 @@ async def main():
 		logging.info(f'Processing file: {args.input_path}')
 		await edx.process_data(args.input_path, ingestor.process_data)
 
+	elif stat.S_ISFIFO(os.stat(args.input_path).st_mode):
+		logging.info(f'Watching FIFO: {args.input_path}')
+		await edx.process_data(args.input_path, ingestor.process_data)
+
 	elif os.path.isdir(args.input_path):
 		count = 1
 		total = len(os.listdir(args.input_path))
@@ -268,10 +273,9 @@ async def main():
 
 
 if __name__ == '__main__':
-	setup_logger(to_file=True)
 	print('')
 	print('┏┓┳┓┳┏┓   Elasticsearch Recon Ingestion Scripts')
 	print('┣ ┣┫┃┗┓        Developed by Acidvegas in Python')
 	print('┗┛┛┗┻┗┛             https://git.acid.vegas/eris')
 	print('')
-	asyncio.run(main())
-\ No newline at end of file
+	asyncio.run(main())