eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit 124e4b0cf34c4c37992aed0ad3516868b5203c66
parent 510f7db07e878a025b9378c34792f606b6dffc7c
Author: acidvegas <acid.vegas@acid.vegas>
Date: Sat, 23 Mar 2024 22:47:30 -0400

ECS formatting added to eris, certstream ingestor now caches to prevent duplication, elastic connections properly closed now

Diffstat:
Meris.py | 39+++++++++++++++++++++++++++------------
Mingestors/ingest_certstream.py | 25++++++++++++++++++++++---
Mingestors/ingest_ixps.py | 13+++++++++----
Mingestors/ingest_masscan.py | 2+-
Mingestors/ingest_massdns.py | 2+-

5 files changed, 60 insertions(+), 21 deletions(-)

diff --git a/eris.py b/eris.py
@@ -19,11 +19,6 @@ try:
 except ImportError:
 	raise ImportError('Missing required \'elasticsearch\' library. (pip install elasticsearch)')
 
-try:
-	from ecs_logging import StdlibFormatter
-except ImportError:
-	raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)')
-
 
 class ElasticIndexer:
 	def __init__(self, args: argparse.Namespace):
@@ -61,6 +56,12 @@ class ElasticIndexer:
 		self.es = AsyncElasticsearch(**es_config)
 
 
+	async def close_connect(self):
+		'''Close the Elasticsearch connection.'''
+
+		await self.es.close()
+
+
 	async def create_index(self, map_body: dict, pipeline: str = None, replicas: int = 1, shards: int = 1):
 		'''
 		Create the Elasticsearch index with the defined mapping.
@@ -131,7 +132,7 @@ class ElasticIndexer:
 			raise Exception(f'Failed to index records to {self.es_index} from {file_path} ({e})')
 
 
-def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5):
+def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_file: str = 'debug.json', max_file_size: int = 10*1024*1024, backups: int = 5, ecs_format: bool = False):
 	'''
 	Setup the global logger for the application.
 
@@ -140,13 +141,14 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_
 	:param log_file: File to write logs to.
 	:param max_file_size: Maximum size of the log file before it is rotated.
 	:param backups: Number of backup log files to keep.
+	:param ecs_format: Use the Elastic Common Schema (ECS) format for logs.
 	'''
 
 	# Configure the root logger
 	logger = logging.getLogger()
 	logger.setLevel(logging.DEBUG) # Minimum level to capture all logs
 
-	# Clear existing handlers
+	# Clear existing handlersaise Exception(f'Failed to fetch zone links: {e}')
 	logger.handlers = []
 
 	# Setup console handler
@@ -160,8 +162,18 @@ def setup_logger(console_level: int = logging.INFO, file_level: int = None, log_
 	if file_level is not None:
 		file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=max_file_size, backupCount=backups)
 		file_handler.setLevel(file_level)
-		ecs_formatter = StdlibFormatter() # ECS (Elastic Common Schema) formatter
-		file_handler.setFormatter(ecs_formatter)
+  
+		# Setup formatter to use ECS format if enabled or default format
+		if ecs_format:
+			try:
+				from ecs_logging import StdlibFormatter
+			except ImportError:
+				raise ImportError('Missing required \'ecs-logging\' library. (pip install ecs-logging)')
+			file_formatter = StdlibFormatter() # ECS formatter
+		else:
+			file_formatter = logging.Formatter('%(asctime)s | %(levelname)9s | %(message)s', '%Y-%m-%d %H:%M:%S')
+        
+		file_handler.setFormatter(file_formatter)
 		logger.addHandler(file_handler)
 
 
@@ -174,6 +186,7 @@ async def main():
 	parser.add_argument('input_path', help='Path to the input file or directory') # Required
 	parser.add_argument('--watch', action='store_true', help='Create or watch a FIFO for real-time indexing')
 	parser.add_argument('--log', choices=['debug', 'info', 'warning', 'error', 'critical'], help='Logging file level (default: disabled)')
+	parser.add_argument('--ecs', action='store_true', default=False, help='Use the Elastic Common Schema (ECS) for logging')
 
 	# Elasticsearch arguments
 	parser.add_argument('--host', default='http://localhost', help='Elasticsearch host')
@@ -202,14 +215,14 @@ async def main():
 	parser.add_argument('--massdns', action='store_true', help='Index Massdns records')
 	parser.add_argument('--zone', action='store_true', help='Index Zone records')
 
+	args = parser.parse_args()
+
 	if args.log:
 		levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL}
-		setup_logger(file_level=levels[args.log], log_file='eris.log')
+		setup_logger(file_level=levels[args.log], log_file='eris.log', ecs_format=args.ecs)
 	else:
 		setup_logger()
 
-	args = parser.parse_args()
-
 	if args.host.endswith('/'):
 		args.host = args.host[:-1]
 
@@ -270,6 +283,8 @@ async def main():
 			else:
 				logging.warning(f'[{count:,}/{total:,}] Skipping non-file: {file_path}')
 
+	await edx.close_connect() # Close the Elasticsearch connection to stop "Unclosed client session" warnings
+
 
 
 if __name__ == '__main__':
diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
-# ingest_certs.py
+# ingest_certstream.py
 
 import asyncio
 import json
@@ -16,6 +16,10 @@ except ImportError:
 # Set a default elasticsearch index if one is not provided
 default_index = 'eris-certstream'
 
+# Set the cache size for the Certstream records to prevent duplicates
+cache      = []
+cache_size = 5000
+
 
 def construct_map() -> dict:
 	'''Construct the Elasticsearch index mapping for Certstream records.'''
@@ -56,14 +60,14 @@ async def process_data(place_holder: str = None):
 					continue
 
 				# Grab the unique domains from the records
-				all_domains = record['data']['leaf_cert']['all_domains']
+				all_domains = set(record['data']['leaf_cert']['all_domains'])
 				domains     =  list()
 
 				# We only care about subdomains (excluding www. and wildcards)
 				for domain in all_domains:
 					if domain.startswith('*.'):
 						domain = domain[2:]
-					elif domain.startswith('www.') and domain.count('.') == 2:
+					if domain.startswith('www.') and domain.count('.') == 2:
 						continue
 					if domain.count('.') > 1:
 						# TODO: Add a check for PSL TLDs...domain.co.uk, domain.com.au, etc. (we want to ignore these if they are not subdomains)
@@ -72,6 +76,14 @@ async def process_data(place_holder: str = None):
 
 				# Construct the document
 				for domain in domains:
+					if domain in cache:
+						continue # Skip the domain if it is already in the cache
+
+					if len(cache) >= cache_size:
+						cache.pop(0) # Remove the oldest domain from the cache
+
+					cache.append(domain) # Add the domain to the cache
+
 					struct = {
 						'domain' : domain,
 						'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
@@ -160,6 +172,13 @@ Output:
 		"message_type": "certificate_update"
 	}
 
+	
+Input:
+	{
+		"domain" : "d7zdnegbre53n.amplifyapp.com",
+		"seen"   : "2022-01-02T12:00:00Z"
+	}
+
 Notes:
 	- Fix the "no close frame received or sent" error
 '''
diff --git a/ingestors/ingest_ixps.py b/ingestors/ingest_ixps.py
@@ -26,10 +26,15 @@ def construct_map() -> dict:
 	mapping = {
 		'mappings': {
 			'properties': {
-				'name'           : {'type': 'keyword'},
-				'alternatenames' : {'type': 'keyword'},
-				'sources'        : {'type': 'keyword'},
-				'prefixes'       : { 'properties': { 'ipv4' : {'type': 'ip'}, 'ipv6' : {'type': 'ip_range'} } },
+				'name'           : { 'type': 'keyword' },
+				'alternatenames' : { 'type': 'keyword' },
+				'sources'        : { 'type': 'keyword' },
+				'prefixes'       : {
+        			'properties': {
+               			'ipv4' : { 'type': 'ip'       },
+                  		'ipv6' : { 'type': 'ip_range' }
+                    }
+           		},
 				'url'            : { 'type': 'keyword' },
 				'region'         : { 'type': 'keyword' },
 				'country'        : { 'type': 'keyword' },
diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py
@@ -94,7 +94,7 @@ async def process_data(input_path: str):
 			# Process each port in the record
 			for port_info in record['ports']:
 				struct = {
-					'ip'   : record['ip'],
+					'ip'    : record['ip'],
 					'port'  : port_info['port'],
 					'proto' : port_info['proto'],
 					'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp'])))
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -104,7 +104,7 @@ async def process_data(input_path: str):
 					'_index'   : default_index,
 					'doc'      : {
 						'ip'     : ip,
-						'record' : [record], # Consider using painless script to add to list if it exists (Use 'seen' per-record and 'last_seen' for the IP address)
+						'record' : [record],
 						'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
 					},
 					'doc_as_upsert' : True # Create the document if it does not exist