eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit b1fa34f3aae83810b61e219f94e1753c19611d36
parent 00711fe8564ab453d6af953793e3cd6e51273965
Author: acidvegas <acid.vegas@acid.vegas>
Date: Tue, 12 Mar 2024 18:19:47 -0400

Added anomaly detection to RIR delegations ingestor

Diffstat:
Mingestors/ingest_certstream.py | 57+++++++++++++++++++++++++--------------------------------
Mingestors/ingest_rir_delegations.py | 42+++++++++++++++++++++++++++++++++---------

2 files changed, 58 insertions(+), 41 deletions(-)

diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py
@@ -43,41 +43,34 @@ async def process_data(place_holder: str = None):
 	:param place_holder: Placeholder parameter to match the process_data function signature of other ingestors.
 	'''
 
-	while True:
+	async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60):
 		try:
-			async with websockets.connect('wss://certstream.calidog.io') as websocket:
-				while True:
-					# Read a line from the websocket
-					line = await websocket.recv()
-
-					# Parse the JSON record
-					try:
-						record = json.loads(line)
-					except json.decoder.JSONDecodeError:
-						logging.error(f'Invalid line from the websocket: {line}')
-						continue
-
-					# Grab the unique domains from the record (excluding wildcards)
-					domains = record['data']['leaf_cert']['all_domains']
-					domains = set([domain[2:] if domain.startswith('*.') else domain for domain in domains])
-
-					# Construct the document
-					for domain in domains:
-						struct = {
-							'domain' : domain,
-							'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
-						}
-
-						yield {'_index': default_index, '_source': struct}
-
-		except websockets.ConnectionClosed:
-			logging.error('Connection to Certstream was closed. Attempting to reconnect...')
+			async for line in websocket:
+			
+				# Parse the JSON record
+				try:
+					record = json.loads(line)
+				except json.decoder.JSONDecodeError:
+					logging.error(f'Invalid line from the websocket: {line}')
+					continue
+
+				# Grab the unique domains from the record (excluding wildcards)
+				domains = record['data']['leaf_cert']['all_domains']
+				domains = set([domain[2:] if domain.startswith('*.') else domain for domain in domains])
+
+				# Construct the document
+				for domain in domains:
+					struct = {
+						'domain' : domain,
+						'seen'   : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
+					}
+
+					yield {'_index': default_index, '_source': struct}
+
+		except websockets.ConnectionClosed as e	:
+			logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})')
 			await asyncio.sleep(3)
 
-		except Exception as e:
-			logging.error(f'An error occurred while processing Certstream records! ({e})')
-			break
-
 
 async def test():
 	'''Test the ingestion process.'''
diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py
@@ -3,6 +3,7 @@
 # ingest_rir_delegations.py
 
 import csv
+import ipaddress
 import logging
 import time
 
@@ -21,7 +22,7 @@ delegation_db = {
 	'apnic'   : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest',
 	'arin'    : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest',
 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest',
-	'ripe'    : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest'
+	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest'
 }
 
 
@@ -36,7 +37,7 @@ def construct_map() -> dict:
 		'mappings': {
 			'properties': {
 				'registry'   : { 'type': 'keyword' },
-				'cc'         : { 'type': 'keyword' },
+				'cc'         : { 'type': 'keyword' }, # ISO 3166 2-letter code
 				'type'       : { 'type': 'keyword' },
 				'start'      : {
 					'properties': {
@@ -106,21 +107,44 @@ async def process_data():
 
 							record = {
 								'registry' : row[0],
+								'cc'       : row[1],
 								'type'     : row[2],
-								'start'    : { record_type: row[3] },
+								'start'    : row[3],
 								'value'    : row[4],
+								'date'     : row[5],
 								'status'   : row[6]
 							}
+       
 							if len(row) == 7:
 								if row[7]:
 									record['extensions'] = row[7]
 
-							if (cc := row[1]):
-								record['cc'] = cc.lower()
-
-							if (date_field := row[5]):
-								if date_field != '00000000':
-									record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(date_field, '%Y%m%d')),
+							if not record['cc']:
+								del record['cc']
+							elif len(record['cc']) != 2:
+								raise ValueError(f'Invalid country code: {record["cc"]} ({cache})')
+
+							if record['type'] == 'asn':
+								record['start'] = { record_type : int(record['start']) }
+							elif record['type'] in ('ipv4', 'ipv6'):
+								try:
+									ipaddress.ip_address(record['start'])
+									record['start'] = { record_type : record['start'] }
+								except ValueError:
+									raise ValueError(f'Invalid start IP: {record["start"]} ({cache})')
+							else:
+								raise ValueError(f'Invalid record type: {record["type"]}')
+    
+							if not record['value'].isdigit():
+								raise ValueError(f'Invalid value: {record["value"]} ({cache})')
+    
+							if not record['date'] or record['date'] == '00000000':
+								del record['date']
+							else:
+								record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')),
+         
+							if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'):
+								raise ValueError(f'Invalid status: {record["status"]} ({cache})')
 
 							#json_output['records'].append(record)