eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_rir_delegations.py (6186B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_rir_delegations.py
      4 
      5 import csv
      6 import ipaddress
      7 import logging
      8 import time
      9 
     10 try:
     11 	import aiohttp
     12 except ImportError:
     13 	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     14 
     15 
     16 # Set a default elasticsearch index if one is not provided
     17 default_index = 'rir-delegation-' + time.strftime('%Y-%m-%d')
     18 
     19 # Delegation data sources
     20 delegation_db = {
     21 	'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/delegated-afrinic-extended-latest',
     22 	'apnic'   : 'https://ftp.apnic.net/stats/apnic/delegated-apnic-extended-latest',
     23 	'arin'    : 'https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest',
     24 	'lacnic'  : 'https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest',
     25 	'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest'
     26 }
     27 
     28 
     29 def construct_map() -> dict:
     30 	'''Construct the Elasticsearch index mapping for records'''
     31 
     32 	# Match on exact value or full text search
     33 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     34 
     35 	# Construct the index mapping
     36 	mapping = {
     37 		'mappings': {
     38 			'properties': {
     39 				'registry'   : { 'type': 'keyword' },
     40 				'cc'         : { 'type': 'keyword' }, # ISO 3166 2-letter code
     41 				'asn'        : {
     42         			'properties': {
     43 						'start' : { 'type': 'integer' },
     44 						'end'   : { 'type': 'integer' }
     45 					}
     46 				},
     47 				'ip'         : {
     48 					'properties': {
     49 						'start' : { 'type': 'ip' },
     50 						'end'   : { 'type': 'ip' }
     51 					}
     52 				},
     53 				'date'       : { 'type': 'date'    },
     54 				'status'     : { 'type': 'keyword' },
     55 				'extensions' : keyword_mapping
     56 			}
     57 		}
     58 	}
     59 
     60 	return mapping
     61 
     62 
     63 async def process_data():
     64 	'''Read and process the delegation data.'''
     65 
     66 	for registry, url in delegation_db.items():
     67 		try:
     68 			headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC
     69 
     70 			async with aiohttp.ClientSession(headers=headers) as session:
     71 				async with session.get(url) as response:
     72 					if response.status != 200:
     73 						logging.error(f'Failed to fetch {registry} delegation data: {response.status}')
     74 						continue
     75 
     76 					csv_data   = await response.text()
     77 					rows       = [line.lower() for line in csv_data.split('\n') if line and not line.startswith('#')]
     78 					csv_reader = csv.reader(rows, delimiter='|')
     79 
     80 					del rows, csv_data # Cleanup
     81 
     82 					# Process the CSV data
     83 					for row in csv_reader:
     84 						cache = '|'.join(row) # Cache the last row for error handling
     85 
     86 						# Heuristic for the header line (not doing anything with it for now)
     87 						if len(row) == 7 and row[1] != '*':
     88 							header = {
     89 								'version'   : row[0],
     90 								'registry'  : row[1],
     91 								'serial'    : row[2],
     92 								'records'   : row[3],
     93 								'startdate' : row[4],
     94 								'enddate'   : row[5],
     95 								'UTCoffset' : row[6]
     96 							}
     97 							continue
     98 
     99 						# Heuristic for the summary lines (not doing anything with it for now)
    100 						elif row[2] != '*' and row[3] == '*':
    101 							summary = {
    102 								'registry' : row[0],
    103 								'type'     : row[2],
    104 								'count'    : row[4]
    105 							}
    106 							continue
    107 
    108 						# Record lines (this is what we want)
    109 						else:
    110 							record = {
    111 								'registry' : row[0],
    112 								'cc'       : row[1],
    113 								'type'     : row[2],
    114 								'start'    : row[3],
    115 								'value'    : row[4],
    116 								'date'     : row[5],
    117 								'status'   : row[6]
    118 							}
    119 
    120 							if len(row) == 7:
    121 								if row[7]:
    122 									record['extensions'] = row[7]
    123 
    124 							if not record['cc']:
    125 								del record['cc']
    126 							elif len(record['cc']) != 2:
    127 								raise ValueError(f'Invalid country code: {cache}')
    128 
    129 							if not record['value'].isdigit():
    130 								raise ValueError(f'Invalid value: {cache}')
    131 
    132 							if record['type'] == 'asn':
    133 								end = int(record['start']) + int(record['value']) - 1
    134 								record['asn'] = { 'start': int(record['start']), 'end': end }
    135 							elif record['type'] in ('ipv4', 'ipv6'):
    136 								try:
    137 									if record['type'] == 'ipv4':
    138 										end = ipaddress.ip_address(record['start']) + int(record['value']) - 1
    139 									elif record['type'] == 'ipv6':
    140 										end = ipaddress.ip_network(f'{record["start"]}/{record["value"]}').broadcast_address
    141 										end = end.compressed.lower()
    142 									record['ip'] = { 'start': record['start'], 'end': str(end) }
    143 								except ValueError:
    144 									raise ValueError(f'Invalid IP range: {cache}')
    145 							else:
    146 								raise ValueError(f'Invalid record type: {cache}')
    147 
    148 							del record['start'], record['value'], record['type'] # Cleanup variables no longer needed
    149 
    150 							if not record['date'] or record['date'] == '00000000':
    151 								del record['date']
    152 							else:
    153 								record['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(record['date'], '%Y%m%d')),
    154 
    155 							if record['status'] not in ('allocated', 'assigned', 'available', 'reserved', 'unallocated', 'unknown'):
    156 								raise ValueError(f'Invalid status: {cache}')
    157 
    158 							#json_output['records'].append(record)
    159 
    160 							# Let's just index the records themself (for now)
    161 							yield {'_index': default_index, '_source': record}
    162 
    163 		except Exception as e:
    164 			logging.error(f'Error processing {registry} delegation data: {e}')
    165 
    166 
    167 async def test():
    168 	'''Test the ingestion process'''
    169 
    170 	async for document in process_data():
    171 		print(document)
    172 
    173 
    174 
    175 if __name__ == '__main__':
    176 	import asyncio
    177 
    178 	asyncio.run(test())
    179 
    180 
    181 
    182 '''
    183 Output:
    184 	arin|US|ipv4|76.15.132.0|1024|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    185 	arin|US|ipv4|76.15.136.0|2048|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    186 	arin|US|ipv4|76.15.144.0|4096|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    187 	arin|US|ipv4|76.15.160.0|8192|20070502|allocated|6c065d5b54b877781f05e7d30ebfff28
    188 
    189 Input:
    190 	{
    191 		'registry'   : 'arin',
    192 		'cc'         : 'us',
    193 		'type'       : 'ipv4',
    194 		'ip'         : { 'start': '76.15.132.0', 'end': '76.16.146.0' },
    195 		'date'       : '2007-05-02T00:00:00Z',
    196 		'status'     : 'allocated',
    197 		'extensions' : '6c065d5b54b877781f05e7d30ebfff28'
    198 	}
    199 
    200 Notes:
    201 	Do we make this handle the database locally or load it into ram?
    202 '''