eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_rir_transfers.py (6369B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_rir_transfers.py 4 5 import json 6 import ipaddress 7 import time 8 9 try: 10 import aiohttp 11 except ImportError: 12 raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') 13 14 15 # Set a default elasticsearch index if one is not provided 16 default_index = 'rir-transfer-' + time.strftime('%Y-%m-%d') 17 18 # Transfers data sources 19 transfers_db = { 20 'afrinic' : 'https://ftp.afrinic.net/stats/afrinic/transfers/transfers_latest.json', 21 'apnic' : 'https://ftp.apnic.net/stats/apnic/transfers/transfers_latest.json', 22 'arin' : 'https://ftp.arin.net/pub/stats/arin/transfers/transfers_latest.json', 23 'lacnic' : 'https://ftp.lacnic.net/pub/stats/lacnic/transfers/transfers_latest.json', 24 'ripencc' : 'https://ftp.ripe.net/pub/stats/ripencc/transfers/transfers_latest.json' 25 } 26 27 28 def construct_map() -> dict: 29 '''Construct the Elasticsearch index mapping for records''' 30 31 # Match on exact value or full text search 32 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 33 34 # Construct the index mapping 35 mapping = { 36 'mappings': { 37 'properties': { 38 'date' : { 'type': 'date' }, 39 'ip4nets' : { 40 'properties': { 41 'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }, 42 'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } } 43 } 44 }, 45 'ip6nets' : { 46 'properties': { 47 'original_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } }, 48 'transfer_set' : { 'properties': { 'start_address': { 'type': 'ip' }, 'end_address' : { 'type': 'ip' } } } 49 } 50 }, 51 'asns' : { 52 'properties': { 53 'original_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } }, 54 'transfer_set' : { 'properties': { 'start': { 'type': 'integer' }, 'end' : { 'type': 'integer' } } } 55 } 56 }, 57 'type' : { 'type': 'keyword' }, 58 'source_organization' : { 'properties': { 'name': keyword_mapping, 'country_code' : { 'type': 'keyword' } } }, 59 'recipient_organization' : { 'properties': { 'name': keyword_mapping, 'country_code' : { 'type': 'keyword' } } }, 60 'source_rir' : { 'type': 'keyword' }, 61 'recipient_rir' : { 'type': 'keyword' }, 62 } 63 } 64 } 65 66 return mapping 67 68 69 async def process_data(): 70 '''Read and process the transfers data.''' 71 72 for registry, url in transfers_db.items(): 73 try: 74 headers = {'Connection': 'keep-alive'} # This is required for AIOHTTP connections to LACNIC 75 76 async with aiohttp.ClientSession(headers=headers) as session: 77 async with session.get(url) as response: 78 if response.status != 200: 79 raise Exception(f'Failed to fetch {registry} delegation data: {response.status}') 80 81 data = await response.text() 82 83 try: 84 json_data = json.loads(data) 85 except json.JSONDecodeError as e: 86 raise Exception(f'Failed to parse {registry} delegation data: {e}') 87 88 if 'transfers' not in json_data: 89 raise Exception(f'Invalid {registry} delegation data: {json_data}') 90 91 for record in json_data['transfers']: 92 93 if 'asns' in record: 94 for set_type in ('original_set', 'transfer_set'): 95 if set_type in record['asns']: 96 count = 0 97 for set_block in record['asns'][set_type]: 98 for option in ('start', 'end'): 99 asn = set_block[option] 100 if type(asn) != int: 101 if not asn.isdigit(): 102 raise Exception(f'Invalid {set_type} {option} ASN in {registry} data: {asn}') 103 else: 104 record['asns'][set_type][count][option] = int(asn) 105 count += 1 106 107 108 if 'ip4nets' in record or 'ip6nets' in record: 109 for ip_type in ('ip4nets', 'ip6nets'): 110 if ip_type in record: 111 for set_type in ('original_set', 'transfer_set'): 112 if set_type in record[ip_type]: 113 count = 0 114 for set_block in record[ip_type][set_type]: 115 for option in ('start_address', 'end_address'): 116 try: 117 ipaddress.ip_address(set_block[option]) 118 except ValueError as e: 119 octets = set_block[option].split('.') 120 normalized_ip = '.'.join(str(int(octet)) for octet in octets) 121 try: 122 ipaddress.ip_address(normalized_ip) 123 record[ip_type][set_type][count][option] = normalized_ip 124 except ValueError as e: 125 raise Exception(f'Invalid {set_type} {option} IP in {registry} data: {e}') 126 count += 1 127 128 if record['type'] not in ('MERGER_ACQUISITION', 'RESOURCE_TRANSFER'): 129 raise Exception(f'Invalid transfer type in {registry} data: {record["type"]}') 130 131 yield {'_index': default_index, '_source': record} 132 133 except Exception as e: 134 raise Exception(f'Error processing {registry} delegation data: {e}') 135 136 137 async def test(): 138 '''Test the ingestion process''' 139 140 async for document in process_data(): 141 print(document) 142 143 144 145 if __name__ == '__main__': 146 import asyncio 147 148 asyncio.run(test()) 149 150 151 152 ''' 153 Output: 154 { 155 "transfer_date" : "2017-09-15T19:00:00Z", 156 "ip4nets" : { 157 "original_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ], 158 "transfer_set" : [ { "start_address": "94.31.64.0", "end_address": "94.31.127.255" } ] 159 }, 160 "type" : "MERGER_ACQUISITION", 161 "source_organization" : { "name": "Unser Ortsnetz GmbH" }, 162 "recipient_organization" : { 163 "name" : "Deutsche Glasfaser Wholesale GmbH", 164 "country_code" : "DE" 165 }, 166 "source_rir" : "RIPE NCC", 167 "recipient_rir" : "RIPE NCC" 168 }, 169 { 170 "transfer_date" : "2017-09-18T19:00:00Z", 171 "asns" : { 172 "original_set" : [ { "start": 198257, "end": 198257 } ], 173 "transfer_set" : [ { "start": 198257, "end": 198257 } ] 174 }, 175 "type" : "MERGER_ACQUISITION", 176 "source_organization" : { "name": "CERT PLIX Sp. z o.o." }, 177 "recipient_organization" : { 178 "name" : "Equinix (Poland) Sp. z o.o.", 179 "country_code" : "PL" 180 }, 181 "source_rir" : "RIPE NCC", 182 "recipient_rir" : "RIPE NCC" 183 } 184 185 Input: 186 Nothing changed from the output for now... 187 '''