eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

commit 4dc31a50900b91c74465d6bb3b172c2ab91426ed
parent bd735ea8a7e2420ae82e2191a7f36330041e5b2f
Author: acidvegas <acid.vegas@acid.vegas>
Date: Wed, 13 Mar 2024 20:51:53 -0400

RIR Transfers ingestor added

Diffstat:
Aingestors/ingest_rir_transfers.py | 193+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

1 file changed, 193 insertions(+), 0 deletions(-)

diff --git a/ingestors/ingest_rir_transfers.py b/ingestors/ingest_rir_transfers.py
@@ -0,0 +1,193 @@
+#!/usr/bin/env python
+# Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
+# ingest_rir_transfers.py
+
+import json
+import ipaddress
+import logging
+import time
+
+try:
+	import aiohttp
+except ImportError:
+	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
+
+
+# Set a default elasticsearch index if one is not provided
+default_index = 'rir-transfer-' + time.strftime('%Y-%m-%d')
+
+# Transfers data sources
+transfers_db = {
+	'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/transfers/transfers_latest.json',
+	'apnic'   : 'https://ftp.apnic.net/stats/apnic/transfers/transfers_latest.json',
+	'arin'    : 'https://ftp.arin.net/pub/stats/arin/transfers/transfers_latest.json',
+	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json',
+	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json'
+}
+    
+
+def construct_map() -> dict:
+	'''Construct the Elasticsearch index mapping for records'''
+
+	# Match on exact value or full text search
+	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
+
+	# Construct the index mapping
+	mapping = {
+		'mappings': {
+			'properties': {
+				'date'    : { 'type': 'date' },
+				'ip4nets' : {
+					'properties': {
+						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
+						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
+					}
+				},
+				'ip6nets' : {
+					'properties': {
+						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
+						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
+					}
+				},
+				'asns' : {
+					'properties': {
+						'original_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } },
+						'transfer_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } }
+					}
+				},
+				'type'                   : { 'type': 'keyword' },
+				'source_organization'    : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
+				'recipient_organization' : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
+				'source_rir'             : { 'type': 'keyword' },
+				'recipient_rir'          : { 'type': 'keyword' },
+			}
+		}
+	}
+
+	return mapping
+
+
+async def process_data():
+	'''Read and process the transfers data.'''
+
+	for registry, url in transfers_db.items():
+		try:
+			headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC
+
+			async with aiohttp.ClientSession(headers=headers) as session:
+				async with session.get(url) as response:
+					if response.status != 200:
+						logging.error(f'Failed to fetch {registry} delegation data: {response.status}')
+						raise Exception('errror') #continue
+  
+					data = await response.text()
+
+					try:
+						json_data = json.loads(data)
+					except aiohttp.ContentTypeError as e:
+						logging.error(f'Failed to parse {registry} delegation data: {e}')
+						raise Exception('errror') #continue
+  
+					if 'transfers' not in json_data:
+						logging.error(f'Invalid {registry} delegation data: {json_data}')
+      
+					for record in json_data['transfers']:
+         
+						if 'asns' in record:
+							for set_type in ('original_set', 'transfer_set'):
+								if set_type in record['asns']:
+									count = 0
+									for set_block in record['asns'][set_type]:
+										for option in ('start', 'end'):
+											asn = set_block[option]
+											if type(asn) != int:
+												if not asn.isdigit():
+													logging.error(f'Invalid {set_type} {option} ASN in {registry} data: {asn}')
+													raise Exception('errror') #continue
+												else:
+													record['asns'][set_type][count][option] = int(asn)
+										count += 1
+													
+
+						if 'ip4nets' in record or 'ip6nets' in record:
+							for ip_type in ('ip4nets', 'ip6nets'):
+								if ip_type in record:
+									for set_type in ('original_set', 'transfer_set'):
+										if set_type in record[ip_type]:
+											count = 0
+											for set_block in record[ip_type][set_type]:
+												for option in ('start_address', 'end_address'):
+													try:
+														ipaddress.ip_address(set_block[option])
+													except ValueError as e:
+														octets = set_block[option].split('.')
+														normalized_ip = '.'.join(str(int(octet)) for octet in octets)
+														try:
+															ipaddress.ip_address(normalized_ip)
+															record[ip_type][set_type][count][option] = normalized_ip
+														except ValueError as e:
+															logging.error(f'Invalid {set_type} {option} IP in {registry} data: {e}')
+															raise Exception('errror') #continue
+												count += 1
+        
+						if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'):
+							logging.error(f'Invalid transfer type in {registry} data: {record["type"]}')
+							raise Exception('errror') #continue
+        
+						yield {'_index': default_index, '_source': record}
+
+		except Exception as e:
+			logging.error(f'Error processing {registry} delegation data: {e}')
+
+
+async def test():
+	'''Test the ingestion process'''
+
+	async for document in process_data():
+		print(document)
+
+
+
+if __name__ == '__main__':
+	import asyncio
+
+	asyncio.run(test())
+
+
+
+'''
+Output:
+	{
+    	"transfer_date" : "2017-09-15T19:00:00Z",
+     	"ip4nets"       : {
+        	"original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ],
+         	"transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ]
+        },
+        "type"                   : "MERGER_ACQUISITION",
+        "source_organization"    : { "name": "Unser Ortsnetz GmbH" },
+        "recipient_organization" : {
+            "name"         : "Deutsche Glasfaser Wholesale GmbH",
+            "country_code" : "DE"
+        },
+        "source_rir"    : "RIPE NCC",
+        "recipient_rir" : "RIPE NCC"
+    },
+	{
+    	"transfer_date" : "2017-09-18T19:00:00Z",
+     	"asns"          : {
+        	"original_set" : [ { "start": 198257, "end": 198257 } ],
+        	"transfer_set" : [ { "start": 198257, "end": 198257 } ]
+        },
+        "type"                   : "MERGER_ACQUISITION",
+        "source_organization"    : { "name": "CERT PLIX Sp. z o.o." },
+        "recipient_organization" : { 
+        	"name"         : "Equinix (Poland) Sp. z o.o.",
+			"country_code" : "PL"
+         },
+        "source_rir"    : "RIPE NCC",
+        "recipient_rir" : "RIPE NCC"
+    }
+    
+Input:
+	- Nothing changed from the output for now...
+'''