eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_rir_transfers.py (6369B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_rir_transfers.py
      4 
      5 import json
      6 import ipaddress
      7 import time
      8 
      9 try:
     10 	import aiohttp
     11 except ImportError:
     12 	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     13 
     14 
     15 # Set a default elasticsearch index if one is not provided
     16 default_index = 'rir-transfer-' + time.strftime('%Y-%m-%d')
     17 
     18 # Transfers data sources
     19 transfers_db = {
     20 	'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/transfers/transfers_latest.json',
     21 	'apnic'   : 'https://ftp.apnic.net/stats/apnic/transfers/transfers_latest.json',
     22 	'arin'    : 'https://ftp.arin.net/pub/stats/arin/transfers/transfers_latest.json',
     23 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json',
     24 	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json'
     25 }
     26 
     27 
     28 def construct_map() -> dict:
     29 	'''Construct the Elasticsearch index mapping for records'''
     30 
     31 	# Match on exact value or full text search
     32 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     33 
     34 	# Construct the index mapping
     35 	mapping = {
     36 		'mappings': {
     37 			'properties': {
     38 				'date'    : { 'type': 'date' },
     39 				'ip4nets' : {
     40 					'properties': {
     41 						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
     42 						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
     43 					}
     44 				},
     45 				'ip6nets' : {
     46 					'properties': {
     47 						'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } },
     48 						'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }
     49 					}
     50 				},
     51 				'asns' : {
     52 					'properties': {
     53 						'original_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } },
     54 						'transfer_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } }
     55 					}
     56 				},
     57 				'type'                   : { 'type': 'keyword' },
     58 				'source_organization'    : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
     59 				'recipient_organization' : { 'properties': { 'name':  keyword_mapping, 'country_code' : { 'type': 'keyword' } } },
     60 				'source_rir'             : { 'type': 'keyword' },
     61 				'recipient_rir'          : { 'type': 'keyword' },
     62 			}
     63 		}
     64 	}
     65 
     66 	return mapping
     67 
     68 
     69 async def process_data():
     70 	'''Read and process the transfers data.'''
     71 
     72 	for registry, url in transfers_db.items():
     73 		try:
     74 			headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC
     75 
     76 			async with aiohttp.ClientSession(headers=headers) as session:
     77 				async with session.get(url) as response:
     78 					if response.status != 200:
     79 						raise Exception(f'Failed to fetch {registry} delegation data: {response.status}')
     80 
     81 					data = await response.text()
     82 
     83 					try:
     84 						json_data = json.loads(data)
     85 					except json.JSONDecodeError as e:
     86 						raise Exception(f'Failed to parse {registry} delegation data: {e}')
     87 
     88 					if 'transfers' not in json_data:
     89 						raise Exception(f'Invalid {registry} delegation data: {json_data}')
     90 
     91 					for record in json_data['transfers']:
     92 
     93 						if 'asns' in record:
     94 							for set_type in ('original_set', 'transfer_set'):
     95 								if set_type in record['asns']:
     96 									count = 0
     97 									for set_block in record['asns'][set_type]:
     98 										for option in ('start', 'end'):
     99 											asn = set_block[option]
    100 											if type(asn) != int:
    101 												if not asn.isdigit():
    102 													raise Exception(f'Invalid {set_type} {option} ASN in {registry} data: {asn}')
    103 												else:
    104 													record['asns'][set_type][count][option] = int(asn)
    105 										count += 1
    106 
    107 
    108 						if 'ip4nets' in record or 'ip6nets' in record:
    109 							for ip_type in ('ip4nets', 'ip6nets'):
    110 								if ip_type in record:
    111 									for set_type in ('original_set', 'transfer_set'):
    112 										if set_type in record[ip_type]:
    113 											count = 0
    114 											for set_block in record[ip_type][set_type]:
    115 												for option in ('start_address', 'end_address'):
    116 													try:
    117 														ipaddress.ip_address(set_block[option])
    118 													except ValueError as e:
    119 														octets = set_block[option].split('.')
    120 														normalized_ip = '.'.join(str(int(octet)) for octet in octets)
    121 														try:
    122 															ipaddress.ip_address(normalized_ip)
    123 															record[ip_type][set_type][count][option] = normalized_ip
    124 														except ValueError as e:
    125 															raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}')
    126 												count += 1
    127 
    128 						if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'):
    129 							raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}')
    130 
    131 						yield {'_index': default_index, '_source': record}
    132 
    133 		except Exception as e:
    134 			raise Exception(f'Error processing {registry} delegation data: {e}')
    135 
    136 
    137 async def test():
    138 	'''Test the ingestion process'''
    139 
    140 	async for document in process_data():
    141 		print(document)
    142 
    143 
    144 
    145 if __name__ == '__main__':
    146 	import asyncio
    147 
    148 	asyncio.run(test())
    149 
    150 
    151 
    152 '''
    153 Output:
    154 	{
    155 		"transfer_date" : "2017-09-15T19:00:00Z",
    156 	 	"ip4nets"       : {
    157 			"original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ],
    158 		 	"transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ]
    159 		},
    160 		"type"                   : "MERGER_ACQUISITION",
    161 		"source_organization"    : { "name": "Unser Ortsnetz GmbH" },
    162 		"recipient_organization" : {
    163 			"name"         : "Deutsche Glasfaser Wholesale GmbH",
    164 			"country_code" : "DE"
    165 		},
    166 		"source_rir"    : "RIPE NCC",
    167 		"recipient_rir" : "RIPE NCC"
    168 	},
    169 	{
    170 		"transfer_date" : "2017-09-18T19:00:00Z",
    171 	 	"asns"          : {
    172 			"original_set" : [ { "start": 198257, "end": 198257 } ],
    173 			"transfer_set" : [ { "start": 198257, "end": 198257 } ]
    174 		},
    175 		"type"                   : "MERGER_ACQUISITION",
    176 		"source_organization"    : { "name": "CERT PLIX Sp. z o.o." },
    177 		"recipient_organization" : {
    178 			"name"         : "Equinix (Poland) Sp. z o.o.",
    179 			"country_code" : "PL"
    180 		 },
    181 		"source_rir"    : "RIPE NCC",
    182 		"recipient_rir" : "RIPE NCC"
    183 	}
    184 
    185 Input:
    186 	Nothing changed from the output for now...
    187 '''