eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit 603d005a475e4e26f0d51b7d68d5ed8858450920
parent 24850ea976281cbacea8c75dd700468873232a06
Author: acidvegas <acid.vegas@acid.vegas>
Date: Tue, 19 Mar 2024 19:00:12 -0400

Added IXP ingestor, updated all other ingestors

Diffstat:
Mingestors/ingest_certstream.py | 15+++++++++++----
Mingestors/ingest_httpx.py | 286+++++++++++++++++++++++++++++++++++++++++++------------------------------------
Aingestors/ingest_ixps.py | 111+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mingestors/ingest_masscan.py | 295++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mingestors/ingest_massdns.py | 12++++++++----
Mingestors/ingest_rir_delegations.py | 7+++----
Mingestors/ingest_rir_transfers.py | 76++++++++++++++++++++++++++++++++++++++--------------------------------------

7 files changed, 476 insertions(+), 326 deletions(-)

diff --git a/ingestors/ingest_certstream.py b/ingestors/ingest_certstream.py
@@ -45,8 +45,9 @@ async def process_data(place_holder: str = None):
 
 	async for websocket in websockets.connect('wss://certstream.calidog.io', ping_interval=15, ping_timeout=60):
 		try:
+			# Read the websocket stream
 			async for line in websocket:
-			
+
 				# Parse the JSON record
 				try:
 					record = json.loads(line)
@@ -65,9 +66,10 @@ async def process_data(place_holder: str = None):
 					elif domain.startswith('www.') and domain.count('.') == 2:
 						continue
 					if domain.count('.') > 1:
+						# TODO: Add a check for PSL TLDs...domain.co.uk, domain.com.au, etc. (we want to ignore these if they are not subdomains)
 						if domain not in domains:
 							domains.append(domain)
-			
+
 				# Construct the document
 				for domain in domains:
 					struct = {
@@ -81,6 +83,10 @@ async def process_data(place_holder: str = None):
 			logging.error(f'Connection to Certstream was closed. Attempting to reconnect... ({e})')
 			await asyncio.sleep(3)
 
+		except Exception as e:
+			logging.error(f'Error processing Certstream data: {e}')
+			await asyncio.sleep(3)
+
 
 async def test():
 	'''Test the ingestion process.'''
@@ -91,8 +97,6 @@ async def test():
 
 
 if __name__ == '__main__':
-	import asyncio
-
 	asyncio.run(test())
 
 
@@ -155,4 +159,7 @@ Output:
 		},
 		"message_type": "certificate_update"
 	}
+
+Notes:
+	- Fix the "no close frame received or sent" error
 '''
diff --git a/ingestors/ingest_httpx.py b/ingestors/ingest_httpx.py
@@ -3,151 +3,176 @@
 # ingest_httpx.py
 
 import json
+import logging
 
 try:
-    import aiofiles
+	import aiofiles
 except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+
+
+# Set a default elasticsearch index if one is not provided
+default_index = 'eris-httpx'
 
-default_index = 'httpx-logs'
 
 def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Masscan records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                "timestamp" : { 'type' : 'date' },
-                "hash"      : {
-                    "body_md5"       : { 'type': 'keyword' },
-                    "body_mmh3"      : { 'type': 'keyword' },
-                    "body_sha256"    : { 'type': 'keyword' },
-                    "body_simhash"   : { 'type': 'keyword' },
-                    "header_md5"     : { 'type': 'keyword' },
-                    "header_mmh3"    : { 'type': 'keyword' },
-                    "header_sha256"  : { 'type': 'keyword' },
-                    "header_simhash" : { 'type': 'keyword' }
-                },
-                "port"           : { 'type': 'integer' },
-                "url"            : keyword_mapping,
-                "input"          : keyword_mapping,
-                "title"          : keyword_mapping,
-                "scheme"         : { 'type': 'keyword' },
-                "webserver"      : { 'type': 'keyword' },
-                "body_preview"   : keyword_mapping,
-                "content_type"   : { 'type': 'keyword' },
-                "method"         : { 'type': 'keyword'},
-                "host"           : { 'type': 'ip'},
-                "path"           : keyword_mapping,
-                "favicon"        : { 'type': 'keyword' },
-                "favicon_path"   : keyword_mapping,
-                "a"              : { 'type': 'ip'},
-                "aaaa"           : { 'type': 'ip'},
-                "tech"           : keyword_mapping,
-                "words"          : { 'type': 'integer'},
-                "lines"          : { 'type': 'integer'},
-                "status_code"    : { 'type': 'integer'},
-                "content_length" : { 'type': 'integer'}
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process HTTPX records from the log file.
-
-    :param file_path: Path to the HTTPX log file
-    '''
-
-    async with aiofiles.open(file_path) as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if not line:
-                continue
-
-            record = json.loads(line)
-
-            record['seen']   = record.pop('timestamp').split('.')[0] + 'Z' # Hacky solution to maintain ISO 8601 format without milliseconds or offsets
-            record['domain'] = record.pop('input')
-
-            for item in ('failed', 'knowledgebase', 'time'):
-                del record[item]
-
-            yield {'_id': record['domain'], '_index': default_index, '_source': record}
+	'''Construct the Elasticsearch index mapping for Masscan records.'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'timestamp' : { 'type' : 'date' },
+				'hash'      : {
+					'properties': {
+						'body_md5'       : { 'type': 'keyword' },
+						'body_mmh3'      : { 'type': 'keyword' },
+						'body_sha256'    : { 'type': 'keyword' },
+						'body_simhash'   : { 'type': 'keyword' },
+						'header_md5'     : { 'type': 'keyword' },
+						'header_mmh3'    : { 'type': 'keyword' },
+						'header_sha256'  : { 'type': 'keyword' },
+						'header_simhash' : { 'type': 'keyword' }
+					}
+				},
+				'port'               : { 'type': 'integer' },
+				'url'                : keyword_mapping,
+				'final_url'          : keyword_mapping,
+				'input'              : keyword_mapping,
+				'title'              : keyword_mapping,
+				'scheme'             : { 'type': 'keyword' },
+				'webserver'          : { 'type': 'keyword' },
+				'body_preview'       : keyword_mapping,
+				'content_type'       : { 'type': 'keyword' },
+				'method'             : { 'type': 'keyword' },
+				'host'               : { 'type': 'ip' },
+				'path'               : keyword_mapping,
+				'favicon'            : { 'type': 'keyword' },
+				'favicon_path'       : keyword_mapping,
+				'a'                  : { 'type': 'ip' },
+				'cname'              : keyword_mapping,
+				'aaaa'               : { 'type': 'ip' },
+				'tech'               : keyword_mapping,
+				'words'              : { 'type': 'integer' },
+				'lines'              : { 'type': 'integer' },
+				'status_code'        : { 'type': 'integer' },
+				'chain_status_codes' : { 'type': 'integer' },
+				'content_length'     : { 'type': 'integer' }
+			}
+		}
+	}
+
+	return mapping
+
+
+async def process_data(input_path: str):
+	'''
+	Read and process the input file
+
+	:param input_path: Path to the input file
+	'''
+
+	async with aiofiles.open(input_path) as input_file:
+		# Read the input file line by line
+		async for line in input_file:
+			line = line.strip()
+
+			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
+			if line == '~eof':
+				break
+
+			# Skip empty lines
+			if not line:
+				continue
+
+			# Parse the JSON record
+			try:
+				record = json.loads(line)
+			except json.JSONDecodeError:
+				logging.error(f'Failed to parse JSON record: {line}')
+				continue
+
+		# Hacky solution to maintain ISO 8601 format without milliseconds or offsets
+		record['timestamp'] = record['timestamp'].split('.')[0] + 'Z'
+
+		# Remove unnecessary fields we don't care about
+		for item in ('failed', 'knowledgebase', 'time', 'csp'):
+			if item in record:
+				del record[item]
+
+		yield {'_index': default_index, '_source': record}
 
 
 async def test(input_path: str):
-    '''
-    Test the HTTPX ingestion process
-    
-    :param input_path: Path to the HTTPX log file
-    '''
-    async for document in process_data(input_path):
-        print(document)
+	'''
+	Test the ingestion process
+
+	:param input_path: Path to the input file
+	'''
+
+	async for document in process_data(input_path):
+		print(document)
 
 
 
 if __name__ == '__main__':
-    import argparse
-    import asyncio
+	import argparse
+	import asyncio
 
-    parser = argparse.ArgumentParser(description='HTTPX Ingestor for ERIS')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-    args = parser.parse_args()
-    
-    asyncio.run(test(args.input_path))
+	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
+	parser.add_argument('input_path', help='Path to the input file or directory')
+	args = parser.parse_args()
 
+	asyncio.run(test(args.input_path))
 
-  
-''''
+
+
+'''
 Deploy:
-    go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest 
-    curl -s https://public-dns.info/nameservers.txt -o nameservers.txt
-    httpx -l zone.txt -t 200 -sc -location -favicon -title -bp -td -ip -cname -mc 200,201,301,302,303,307,308 -fr -r nameservers.txt -retries 2 -stream -sd -j -o httpx.json -v
-    
+	go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest 
+	curl -s https://public-dns.info/nameservers.txt -o nameservers.txt
+	httpx -l fulldomains.txt -t 200 -sc -location -favicon -title -bp -td -ip -cname -mc 200,201,301,302,303,307,308 -fr -r nameservers.txt -retries 2 -stream -sd -j -o fifo.json -v
+
 Output:
-    {
-        "timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
-        "hash": { # Do we need all of these ?
-            "body_md5"       : "4ae9394eb98233b482508cbda3b33a66",
-            "body_mmh3"      : "-4111954",
-            "body_sha256"    : "89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
-            "body_simhash"   : "9814303593401624250",
-            "header_md5"     : "980366deb2b2fb5df2ad861fc63e79ce",
-            "header_mmh3"    : "-813072798",
-            "header_sha256"  : "39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
-            "header_simhash" : "10962523587435277678"
-        },
-        "port"           : "443",
-        "url"            : "https://supernets.org", # Remove this and only use the input field as "domain" maybe
-        "input"          : "supernets.org", # rename to domain
-        "title"          : "SuperNETs",
-        "scheme"         : "https",
-        "webserver"      : "nginx",
-        "body_preview"   : "SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
-        "content_type"   : "text/html",
-        "method"         : "GET", # Remove this
-        "host"           : "51.89.151.158",
-        "path"           : "/",
-        "favicon"        : "-674048714",
-        "favicon_path"   : "/i/favicon.png",
-        "time"           : "592.907689ms", # Do we need this ?
-        "a"              : ["6.150.220.23"],
-        "tech"           : ["Bootstrap:4.0.0", "HSTS", "Nginx"],
-        "words"          : 436, # Do we need this ?
-        "lines"          : 79, # Do we need this ?
-        "status_code"    : 200, 
-        "content_length" : 4597,
-        "failed"         : false, # Do we need this ?
-        "knowledgebase"  : { # Do we need this ?
-            "PageType" : "nonerror",
-            "pHash"    : 0
-        }
-    }
-'''
-\ No newline at end of file
+	{
+		"timestamp":"2024-01-14T13:08:15.117348474-05:00", # Rename to seen and remove milliseconds and offset
+		"hash": { # Do we need all of these ?
+			"body_md5"       : "4ae9394eb98233b482508cbda3b33a66",
+			"body_mmh3"      : "-4111954",
+			"body_sha256"    : "89e06e8374353469c65adb227b158b265641b424fba7ddb2c67eef0c4c1280d3",
+			"body_simhash"   : "9814303593401624250",
+			"header_md5"     : "980366deb2b2fb5df2ad861fc63e79ce",
+			"header_mmh3"    : "-813072798",
+			"header_sha256"  : "39aea75ad548e38b635421861641ad1919ed3b103b17a33c41e7ad46516f736d",
+			"header_simhash" : "10962523587435277678"
+		},
+		"port"           : "443",
+		"url"            : "https://supernets.org", # Remove this and only use the input field as "domain" maybe
+		"input"          : "supernets.org", # rename to domain
+		"title"          : "SuperNETs",
+		"scheme"         : "https",
+		"webserver"      : "nginx",
+		"body_preview"   : "SUPERNETS Home About Contact Donate Docs Network IRC Git Invidious Jitsi LibreX Mastodon Matrix Sup",
+		"content_type"   : "text/html",
+		"method"         : "GET", # Remove this
+		"host"           : "51.89.151.158",
+		"path"           : "/",
+		"favicon"        : "-674048714",
+		"favicon_path"   : "/i/favicon.png",
+		"time"           : "592.907689ms", # Do we need this ?
+		"a"              : ["6.150.220.23"],
+		"tech"           : ["Bootstrap:4.0.0", "HSTS", "Nginx"],
+		"words"          : 436, # Do we need this ?
+		"lines"          : 79, # Do we need this ?
+		"status_code"    : 200,
+		"content_length" : 4597,
+		"failed"         : false, # Do we need this ?
+		"knowledgebase"  : { # Do we need this ?
+			"PageType" : "nonerror",
+			"pHash"    : 0
+		}
+	}
+'''
diff --git a/ingestors/ingest_ixps.py b/ingestors/ingest_ixps.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# ingest_ixps.py
+
+import json
+import ipaddress
+import time
+
+try:
+	import aiohttp
+except ImportError:
+	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
+
+
+# Set a default elasticsearch index if one is not provided
+default_index = 'ixp-' + time.strftime('%Y-%m-%d')
+
+
+def construct_map() -> dict:
+	'''Construct the Elasticsearch index mapping for records'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'name'           : {'type': 'keyword'},
+				'alternatenames' : {'type': 'keyword'},
+				'sources'        : {'type': 'keyword'},
+				'prefixes'       : { 'properties': { 'ipv4' : {'type': 'ip'}, 'ipv6' : {'type': 'ip_range'} } },
+				'url'            : { 'type': 'keyword' },
+				'region'         : { 'type': 'keyword' },
+				'country'        : { 'type': 'keyword' },
+				'city'           : { 'type': 'keyword' },
+				'state'          : { 'type': 'keyword' },
+				'zip'            : { 'type': 'keyword' },
+				'address'        : keyword_mapping,
+				'iata'           : { 'type': 'keyword' },
+				'latitude'       : { 'type': 'float'   },
+				'longitude'      : { 'type': 'float'   },
+				'geo_id'         : { 'type': 'integer' },
+				'ix_id'          : { 'type': 'integer' },
+				'org_id'         : { 'type': 'integer' },
+				'pdb_id'         : { 'type': 'integer' },
+				'pdb_org_id'     : { 'type': 'integer' },
+				'pch_id'         : { 'type': 'integer' }
+			}
+		}
+	}
+
+	return mapping
+
+
+async def process_data():
+	'''Read and process the transfers data.'''
+
+	try:
+		async with aiohttp.ClientSession() as session:
+			async with session.get('https://publicdata.caida.org/datasets/ixps/ixs_202401.jsonl') as response:
+				if response.status != 200:
+					raise Exception(f'Failed to fetch IXP data: {response.status}')
+
+				data = await response.text()
+
+				try:
+					json_data = json.loads(data)
+				except json.JSONDecodeError as e:
+					raise Exception(f'Failed to parse IXP data: {e}')
+
+				pass
+
+	except Exception as e:
+		raise Exception(f'Error processing IXP data: {e}')
+
+
+async def test():
+	'''Test the ingestion process'''
+
+	async for document in process_data():
+		print(document)
+
+
+
+if __name__ == '__main__':
+	import asyncio
+
+	asyncio.run(test())
+
+
+
+'''
+Output:
+{
+	"pch_id"         : 1848,
+	"name"           : "ANGONIX",
+	"country"        : "AO",
+	"region"         : "Africa",
+	"city"           : "Luanda",
+	"iata"           : "LAD",
+	"alternatenames" : [],
+	"sources"        : ["pch"],
+	"prefixes"       : {
+		"ipv4" : ["196.11.234.0/24"],
+		"ipv6" : ["2001:43f8:9d0::/48"]
+	},
+	"geo_id" : 2240449,
+	"ix_id"  : 10
+}
+'''
diff --git a/ingestors/ingest_masscan.py b/ingestors/ingest_masscan.py
@@ -7,171 +7,177 @@ import logging
 import time
 
 try:
-    import aiofiles
+	import aiofiles
 except ImportError:
-    raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
+	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
 
 
+# Set a default elasticsearch index if one is not provided
 default_index = 'masscan-logs'
 
 
 def construct_map() -> dict:
-    '''Construct the Elasticsearch index mapping for Masscan records.'''
-
-    keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
-
-    geoip_mapping = {
-        'city_name'        : keyword_mapping,
-        'continent_name'   : keyword_mapping,
-        'country_iso_code' : keyword_mapping,
-        'country_name'     : keyword_mapping,
-        'location'         : { 'type': 'geo_point' },
-        'region_iso_code'  : keyword_mapping,
-        'region_name'      : keyword_mapping,
-    }
-
-    mapping = {
-        'mappings': {
-            'properties': {
-                'ip'      : { 'type': 'ip' },
-                'port'    : { 'type': 'integer' },
-                'data'    : {
-                    'properties': {
-                        'proto'   : { 'type': 'keyword' },
-                        'service' : { 'type': 'keyword' },
-                        'banner'  : keyword_mapping,
-                        'seen'    : { 'type': 'date' }
-                    }
-                },
-                #'geoip'    : { 'properties': geoip_mapping } # Used with the geoip pipeline to enrich the data
-                'last_seen' : { 'type': 'date' }                
-            }
-        }
-    }
-
-    return mapping
-
-
-async def process_data(file_path: str):
-    '''
-    Read and process Masscan records from the log file.
-
-    :param file_path: Path to the Masscan log file
-    '''
-
-    async with aiofiles.open(file_path) as input_file:
-        async for line in input_file:
-            line = line.strip()
-
-            if line == '~eof': # Sentinel value to indicate the end of a process (Used with --watch with FIFO)
-                break
-
-            if not line or not line.startswith('{'):
-                continue
-
-            if line.endswith(','): # Do we need this? Masscan JSON output seems with seperate records with a comma between lines for some reason...
-                line = line[:-1]
-
-            try:
-                record = json.loads(line)
-            except json.decoder.JSONDecodeError:
-                # In rare cases, the JSON record may be incomplete or malformed:
-                #   { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
-                #   { "ip": "83.66.211.246", "timestamp": "1706557002"
-                logging.error(f'Failed to parse JSON record! ({line})')
-                input('Press Enter to continue...') # Pause for review & debugging (remove this in production)
-                continue
-
-            if len(record['ports']) > 1:
-                # In rare cases, a single record may contain multiple ports, though I have yet to witness this...
-                logging.warning(f'Multiple ports found for record! ({record})')
-                input('Press Enter to continue...') # Pause for review (remove this in production)
-
-            for port_info in record['ports']:
-                struct = {
-                    'ip'   : record['ip'],
-                    'data' : {
-                        'port'  : port_info['port'],
-                        'proto' : port_info['proto'],
-                        'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
-                    },
-                    'last_seen' : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp']))),
-                }
-
-                if 'service' in port_info:
-                    if 'name' in port_info['service']:
-                        if (service_name := port_info['service']['name']) not in ('unknown',''):
-                            struct['service'] = service_name
-
-                    if 'banner' in port_info['service']:
-                        banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
-                        if banner:
-                            struct['banner'] = banner
-
-                id = f'{record["ip"]}:{port_info["port"]}' # Store with ip:port as the unique id to allow the record to be reindexed if it exists.
-
-                yield {'_id': id, '_index': default_index, '_source': struct}
+	'''Construct the Elasticsearch index mapping for Masscan records.'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text',  'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the geoip mapping (Used with the geoip pipeline to enrich the data)
+	geoip_mapping = {
+		'city_name'        : keyword_mapping,
+		'continent_name'   : keyword_mapping,
+		'country_iso_code' : keyword_mapping,
+		'country_name'     : keyword_mapping,
+		'location'         : { 'type': 'geo_point' },
+		'region_iso_code'  : keyword_mapping,
+		'region_name'      : keyword_mapping,
+	}
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'ip'      : { 'type': 'ip' },
+				'port'    : { 'type': 'integer' },
+				'proto'   : { 'type': 'keyword' },
+				'service' : { 'type': 'keyword' },
+				'banner'  : keyword_mapping,
+				'seen'    : { 'type': 'date' }
+				#'geoip'	: { 'properties': geoip_mapping }
+			}
+		}
+	}
+
+	return mapping
+
+
+async def process_data(input_path: str):
+	'''
+	Read and process the input file
+
+	:param input_path: Path to the input file
+	'''
+
+	async with aiofiles.open(input_path) as input_file:
+		# Read the input file line by line
+		async for line in input_file:
+			line = line.strip()
+
+			# Sentinel value to indicate the end of a process (for closing out a FIFO stream)
+			if line == '~eof':
+				break
+
+			# Skip empty lines and lines that do not start with a JSON object
+			if not line or not line.startswith('{'):
+				continue
+
+			# Do we need this? Masscan JSON output seems with seperate records with a comma between lines for some reason...
+			if line.endswith(','):
+				line = line[:-1]
+
+			# Parse the JSON record
+			try:
+				record = json.loads(line)
+			except json.decoder.JSONDecodeError:
+				# In rare cases, the JSON record may be incomplete or malformed:
+				#   { "ip": "51.161.12.223", "timestamp": "1707628302", "ports": [ {"port": 22, "proto": "tcp", "service": {"name": "ssh", "banner":
+				#   { "ip": "83.66.211.246", "timestamp": "1706557002"
+				logging.error(f'Failed to parse JSON record! ({line})')
+				input('Press Enter to continue...') # Pause for review & debugging (remove this in production)
+				continue
+
+			# In rare cases, a single record may contain multiple ports, though I have yet to witness this...
+			if len(record['ports']) > 1:
+				logging.warning(f'Multiple ports found for record! ({record})')
+				input('Press Enter to continue...') # Pause for review (remove this in production)
+
+			# Process each port in the record
+			for port_info in record['ports']:
+				struct = {
+					'ip'   : record['ip'],
+					'port'  : port_info['port'],
+					'proto' : port_info['proto'],
+					'seen'  : time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(int(record['timestamp'])))
+				}
+
+				# Add the service information if available (this field is optional)
+				if 'service' in port_info:
+
+        			# Add the service name if available
+					if 'name' in port_info['service']:
+						if (service_name := port_info['service']['name']) not in ('unknown',''):
+							struct['service'] = service_name
+
+					# Add the service banner if available
+					if 'banner' in port_info['service']:
+						banner = ' '.join(port_info['service']['banner'].split()) # Remove extra whitespace
+						if banner:
+							struct['banner'] = banner
+
+				# Yield the record
+				yield {'_index': default_index, '_source': struct}
 
 
 async def test(input_path: str):
-    '''
-    Test the Masscan ingestion process
-    
-    :param input_path: Path to the MassDNS log file
-    '''
-    async for document in process_data(input_path):
-        print(document)
+	'''
+	Test the ingestion process
+
+	:param input_path: Path to the input file
+	'''
+
+	async for document in process_data(input_path):
+		print(document)
 
 
 
 if __name__ == '__main__':
-    import argparse
-    import asyncio
+	import argparse
+	import asyncio
 
-    parser = argparse.ArgumentParser(description='Masscan Ingestor for ERIS')
-    parser.add_argument('input_path', help='Path to the input file or directory')
-    args = parser.parse_args()
-    
-    asyncio.run(test(args.input_path))
+	parser = argparse.ArgumentParser(description='Ingestor for ERIS')
+	parser.add_argument('input_path', help='Path to the input file or directory')
+	args = parser.parse_args()
+
+	asyncio.run(test(args.input_path))
 
 
 
 '''
 Deploy:
-    apt-get install iptables masscan libpcap-dev screen
-    setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
-    /sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent
-    printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
-    screen -S scan
-    masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ output.json
-    masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
+	apt-get install iptables masscan libpcap-dev screen
+	setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /bin/masscan
+	/sbin/iptables -A INPUT -p tcp --dport 61010 -j DROP # Not persistent
+	printf "0.0.0.0/8\n10.0.0.0/8\n100.64.0.0/10\n127.0.0.0/8\n169.254.0.0/16\n172.16.0.0/12\n192.0.0.0/24\n192.0.2.0/24\n192.31.196.0/24\n192.52.193.0/24\n192.88.99.0/24\n192.168.0.0/16\n192.175.48.0/24\n198.18.0.0/15\n198.51.100.0/24\n203.0.113.0/24\n224.0.0.0/3\n255.255.255.255/32"  > exclude.conf
+	screen -S scan
+	masscan 0.0.0.0/0 -p18000 --banners --http-user-agent "USER_AGENT" --source-port 61010 --open-only --rate 30000 --excludefile exclude.conf -oJ 18000.json
+	masscan 0.0.0.0/0 -p21,22,23 --banners --http-user-agent "USER_AGENT" --source-port 61000-65503 --open-only --rate 30000 --excludefile exclude.conf -oJ output_new.json --shard $i/$TOTAL
 
 Output:
-    {
-        "ip"        : "43.134.51.142",
-        "timestamp" : "1705255468",
-        "ports"     : [
-            {
-                "port"    : 22, # We will create a record for each port opened
-                "proto"   : "tcp",
-                "service" : {
-                    "name"   : "ssh",
-                    "banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
-                }
-            }
-        ]
-    }
+	{
+		"ip"        : "43.134.51.142",
+		"timestamp" : "1705255468",
+		"ports"     : [
+			{
+				"port"    : 22, # We will create a record for each port opened
+				"proto"   : "tcp",
+				"service" : {
+					"name"   : "ssh",
+					"banner" : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4"
+				}
+			}
+		]
+	}
 
 Input:
-    {
-        "_id"     : "43.134.51.142:22"
-        "_index"  : "masscan-logs",
-        "_source" : {
-            "ip"      : "43.134.51.142",
-            "port"    : 22,
-            "proto"   : "tcp",
-            "service" : "ssh",
-            "banner"  : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
-            "seen"    : "2021-10-08T02:04:28Z"
-    }
-'''
-\ No newline at end of file
+	{
+		"_id"     : "43.134.51.142:22"
+		"_index"  : "masscan-logs",
+		"_source" : {
+			"ip"      : "43.134.51.142",
+			"port"    : 22,
+			"proto"   : "tcp",
+			"service" : "ssh",
+			"banner"  : "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4",
+			"seen"    : "2021-10-08T02:04:28Z"
+	}
+'''
diff --git a/ingestors/ingest_massdns.py b/ingestors/ingest_massdns.py
@@ -140,18 +140,21 @@ if __name__ == '__main__':
 
 '''
 Deployment:
-	sudo apt-get install build-essential gcc make
+	sudo apt-get install build-essential gcc make python3 python3-pip
+	pip install aiofiles aiohttp elasticsearch
+	git clone --depth 1 https://github.com/acidvegas/eris.git $HOME/eris
+
 	git clone --depth 1 https://github.com/blechschmidt/massdns.git $HOME/massdns && cd $HOME/massdns && make
 	curl -s https://public-dns.info/nameservers.txt | grep -v ':' > $HOME/massdns/nameservers.txt
-	python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json
-	or...
-	while true; do python ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/massdns/fifo.json; done
+	while true; do python3 ./scripts/ptr.py | ./bin/massdns -r $HOME/massdns/nameservers.txt -t PTR --filter NOERROR -o S -w $HOME/eris/FIFO; done
+
 
 Output:
 	0.6.229.47.in-addr.arpa. PTR 047-229-006-000.res.spectrum.com.
 	0.6.228.75.in-addr.arpa. PTR 0.sub-75-228-6.myvzw.com.
 	0.6.207.73.in-addr.arpa. PTR c-73-207-6-0.hsd1.ga.comcast.net.
 
+
 Input:
 	{
 		'_id'     : '47.229.6.0'
@@ -163,6 +166,7 @@ Input:
 		}
 	}
 
+
 Notes:
 	Why do some IP addresses return a A/CNAME from a PTR request
 	What is dns-servfail.net (Frequent CNAME response from PTR requests)
diff --git a/ingestors/ingest_rir_delegations.py b/ingestors/ingest_rir_delegations.py
@@ -191,13 +191,12 @@ Input:
 		'registry'   : 'arin',
 		'cc'         : 'us',
 		'type'       : 'ipv4',
-		'start'      : { 'ip': '76.15.132.0' },
-		'value'      : 1024,
+		'ip'         : { 'start': '76.15.132.0', 'end': '76.16.146.0' },
 		'date'       : '2007-05-02T00:00:00Z',
 		'status'     : 'allocated',
 		'extensions' : '6c065d5b54b877781f05e7d30ebfff28'
 	}
- 
- Notes:
+
+Notes:
 	Do we make this handle the database locally or load it into ram?
 '''
diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py
@@ -23,7 +23,7 @@ transfers_db = {
 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json',
 	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json'
 }
-    
+
 
 def construct_map() -> dict:
 	'''Construct the Elasticsearch index mapping for records'''
@@ -77,19 +77,19 @@ async def process_data():
 				async with session.get(url) as response:
 					if response.status != 200:
 						raise Exception(f'Failed to fetch {registry} delegation data: {response.status}')
-  
+
 					data = await response.text()
 
 					try:
 						json_data = json.loads(data)
-					except aiohttp.ContentTypeError as e:
+					except json.JSONDecodeError as e:
 						raise Exception(f'Failed to parse {registry} delegation data: {e}')
-  
+
 					if 'transfers' not in json_data:
 						raise Exception(f'Invalid {registry} delegation data: {json_data}')
-      
+
 					for record in json_data['transfers']:
-         
+
 						if 'asns' in record:
 							for set_type in ('original_set', 'transfer_set'):
 								if set_type in record['asns']:
@@ -103,7 +103,7 @@ async def process_data():
 												else:
 													record['asns'][set_type][count][option] = int(asn)
 										count += 1
-													
+
 
 						if 'ip4nets' in record or 'ip6nets' in record:
 							for ip_type in ('ip4nets', 'ip6nets'):
@@ -124,10 +124,10 @@ async def process_data():
 														except ValueError as e:
 															raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}')
 												count += 1
-        
+
 						if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'):
 							raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}')
-        
+
 						yield {'_index': default_index, '_source': record}
 
 		except Exception as e:
@@ -152,36 +152,36 @@ if __name__ == '__main__':
 '''
 Output:
 	{
-    	"transfer_date" : "2017-09-15T19:00:00Z",
-     	"ip4nets"       : {
-        	"original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ],
-         	"transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ]
-        },
-        "type"                   : "MERGER_ACQUISITION",
-        "source_organization"    : { "name": "Unser Ortsnetz GmbH" },
-        "recipient_organization" : {
-            "name"         : "Deutsche Glasfaser Wholesale GmbH",
-            "country_code" : "DE"
-        },
-        "source_rir"    : "RIPE NCC",
-        "recipient_rir" : "RIPE NCC"
-    },
+		"transfer_date" : "2017-09-15T19:00:00Z",
+	 	"ip4nets"       : {
+			"original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ],
+		 	"transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ]
+		},
+		"type"                   : "MERGER_ACQUISITION",
+		"source_organization"    : { "name": "Unser Ortsnetz GmbH" },
+		"recipient_organization" : {
+			"name"         : "Deutsche Glasfaser Wholesale GmbH",
+			"country_code" : "DE"
+		},
+		"source_rir"    : "RIPE NCC",
+		"recipient_rir" : "RIPE NCC"
+	},
 	{
-    	"transfer_date" : "2017-09-18T19:00:00Z",
-     	"asns"          : {
-        	"original_set" : [ { "start": 198257, "end": 198257 } ],
-        	"transfer_set" : [ { "start": 198257, "end": 198257 } ]
-        },
-        "type"                   : "MERGER_ACQUISITION",
-        "source_organization"    : { "name": "CERT PLIX Sp. z o.o." },
-        "recipient_organization" : { 
-        	"name"         : "Equinix (Poland) Sp. z o.o.",
+		"transfer_date" : "2017-09-18T19:00:00Z",
+	 	"asns"          : {
+			"original_set" : [ { "start": 198257, "end": 198257 } ],
+			"transfer_set" : [ { "start": 198257, "end": 198257 } ]
+		},
+		"type"                   : "MERGER_ACQUISITION",
+		"source_organization"    : { "name": "CERT PLIX Sp. z o.o." },
+		"recipient_organization" : {
+			"name"         : "Equinix (Poland) Sp. z o.o.",
 			"country_code" : "PL"
-         },
-        "source_rir"    : "RIPE NCC",
-        "recipient_rir" : "RIPE NCC"
-    }
-    
+		 },
+		"source_rir"    : "RIPE NCC",
+		"recipient_rir" : "RIPE NCC"
+	}
+
 Input:
-	- Nothing changed from the output for now...
+	Nothing changed from the output for now...
 '''