eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_ixps.py (2975B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_ixps.py 4 5 import json 6 import ipaddress 7 import time 8 9 try: 10 import aiohttp 11 except ImportError: 12 raise ImportError('Missing required \'aiohttp\' library. (pip install aiohttp)') 13 14 15 # Set a default elasticsearch index if one is not provided 16 default_index = 'ixp-' + time.strftime('%Y-%m-%d') 17 18 19 def construct_map() -> dict: 20 '''Construct the Elasticsearch index mapping for records''' 21 22 # Match on exact value or full text search 23 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 24 25 # Construct the index mapping 26 mapping = { 27 'mappings': { 28 'properties': { 29 'name' : { 'type': 'keyword' }, 30 'alternatenames' : { 'type': 'keyword' }, 31 'sources' : { 'type': 'keyword' }, 32 'prefixes' : { 33 'properties': { 34 'ipv4' : { 'type': 'ip' }, 35 'ipv6' : { 'type': 'ip_range' } 36 } 37 }, 38 'url' : { 'type': 'keyword' }, 39 'region' : { 'type': 'keyword' }, 40 'country' : { 'type': 'keyword' }, 41 'city' : { 'type': 'keyword' }, 42 'state' : { 'type': 'keyword' }, 43 'zip' : { 'type': 'keyword' }, 44 'address' : keyword_mapping, 45 'iata' : { 'type': 'keyword' }, 46 'latitude' : { 'type': 'float' }, 47 'longitude' : { 'type': 'float' }, 48 'geo_id' : { 'type': 'integer' }, 49 'ix_id' : { 'type': 'integer' }, 50 'org_id' : { 'type': 'integer' }, 51 'pdb_id' : { 'type': 'integer' }, 52 'pdb_org_id' : { 'type': 'integer' }, 53 'pch_id' : { 'type': 'integer' } 54 } 55 } 56 } 57 58 return mapping 59 60 61 async def process_data(): 62 '''Read and process the transfers data.''' 63 64 try: 65 async with aiohttp.ClientSession() as session: 66 async with session.get('https://publicdata.caida.org/datasets/ixps/ixs_202401.jsonl') as response: 67 if response.status != 200: 68 raise Exception(f'Failed to fetch IXP data: {response.status}') 69 70 data = await response.text() 71 72 try: 73 json_data = json.loads(data) 74 except json.JSONDecodeError as e: 75 raise Exception(f'Failed to parse IXP data: {e}') 76 77 pass 78 79 except Exception as e: 80 raise Exception(f'Error processing IXP data: {e}') 81 82 83 async def test(): 84 '''Test the ingestion process''' 85 86 async for document in process_data(): 87 print(document) 88 89 90 91 if __name__ == '__main__': 92 import asyncio 93 94 asyncio.run(test()) 95 96 97 98 ''' 99 Output: 100 { 101 "pch_id" : 1848, 102 "name" : "ANGONIX", 103 "country" : "AO", 104 "region" : "Africa", 105 "city" : "Luanda", 106 "iata" : "LAD", 107 "alternatenames" : [], 108 "sources" : ["pch"], 109 "prefixes" : { 110 "ipv4" : ["196.11.234.0/24"], 111 "ipv6" : ["2001:43f8:9d0::/48"] 112 }, 113 "geo_id" : 2240449, 114 "ix_id" : 10 115 } 116 '''