eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_ixps.py (2975B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_ixps.py
      4 
      5 import json
      6 import ipaddress
      7 import time
      8 
      9 try:
     10 	import aiohttp
     11 except ImportError:
     12 	raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)')
     13 
     14 
     15 # Set a default elasticsearch index if one is not provided
     16 default_index = 'ixp-' + time.strftime('%Y-%m-%d')
     17 
     18 
     19 def construct_map() -> dict:
     20 	'''Construct the Elasticsearch index mapping for records'''
     21 
     22 	# Match on exact value or full text search
     23 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     24 
     25 	# Construct the index mapping
     26 	mapping = {
     27 		'mappings': {
     28 			'properties': {
     29 				'name'           : { 'type': 'keyword' },
     30 				'alternatenames' : { 'type': 'keyword' },
     31 				'sources'        : { 'type': 'keyword' },
     32 				'prefixes'       : {
     33         			'properties': {
     34                			'ipv4' : { 'type': 'ip'       },
     35                   		'ipv6' : { 'type': 'ip_range' }
     36                     }
     37            		},
     38 				'url'            : { 'type': 'keyword' },
     39 				'region'         : { 'type': 'keyword' },
     40 				'country'        : { 'type': 'keyword' },
     41 				'city'           : { 'type': 'keyword' },
     42 				'state'          : { 'type': 'keyword' },
     43 				'zip'            : { 'type': 'keyword' },
     44 				'address'        : keyword_mapping,
     45 				'iata'           : { 'type': 'keyword' },
     46 				'latitude'       : { 'type': 'float'   },
     47 				'longitude'      : { 'type': 'float'   },
     48 				'geo_id'         : { 'type': 'integer' },
     49 				'ix_id'          : { 'type': 'integer' },
     50 				'org_id'         : { 'type': 'integer' },
     51 				'pdb_id'         : { 'type': 'integer' },
     52 				'pdb_org_id'     : { 'type': 'integer' },
     53 				'pch_id'         : { 'type': 'integer' }
     54 			}
     55 		}
     56 	}
     57 
     58 	return mapping
     59 
     60 
     61 async def process_data():
     62 	'''Read and process the transfers data.'''
     63 
     64 	try:
     65 		async with aiohttp.ClientSession() as session:
     66 			async with session.get('https://publicdata.caida.org/datasets/ixps/ixs_202401.jsonl') as response:
     67 				if response.status != 200:
     68 					raise Exception(f'Failed to fetch IXP data: {response.status}')
     69 
     70 				data = await response.text()
     71 
     72 				try:
     73 					json_data = json.loads(data)
     74 				except json.JSONDecodeError as e:
     75 					raise Exception(f'Failed to parse IXP data: {e}')
     76 
     77 				pass
     78 
     79 	except Exception as e:
     80 		raise Exception(f'Error processing IXP data: {e}')
     81 
     82 
     83 async def test():
     84 	'''Test the ingestion process'''
     85 
     86 	async for document in process_data():
     87 		print(document)
     88 
     89 
     90 
     91 if __name__ == '__main__':
     92 	import asyncio
     93 
     94 	asyncio.run(test())
     95 
     96 
     97 
     98 '''
     99 Output:
    100 {
    101 	"pch_id"         : 1848,
    102 	"name"           : "ANGONIX",
    103 	"country"        : "AO",
    104 	"region"         : "Africa",
    105 	"city"           : "Luanda",
    106 	"iata"           : "LAD",
    107 	"alternatenames" : [],
    108 	"sources"        : ["pch"],
    109 	"prefixes"       : {
    110 		"ipv4" : ["196.11.234.0/24"],
    111 		"ipv6" : ["2001:43f8:9d0::/48"]
    112 	},
    113 	"geo_id" : 2240449,
    114 	"ix_id"  : 10
    115 }
    116 '''