eris- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎 |
git clone git://git.acid.vegas/eris.git |
Log | Files | Refs | Archive | README | LICENSE |
ingest_meshtastic.py (6550B)
1 #!/usr/bin/env python 2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris) 3 # ingest_meshtastic.py 4 5 ''' 6 This plugin needs the meshtastic-mqtt-json library to convert Meshtastic MQTT messages to JSON. 7 pip install meshtastic-mqtt-json 8 9 Use this command to pipe Meshtastic MQTT messages to the ERIS FIFO when using the --watch flag: 10 meshtastic-mqtt-json > ERIS_FIFO_PATH 11 ''' 12 13 import asyncio 14 import json 15 import logging 16 import time 17 18 try: 19 import aiofiles 20 except ImportError: 21 raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)') 22 23 24 # Set a default elasticsearch index if one is not provided 25 default_index = 'eris-meshtastic' 26 27 28 def construct_map() -> dict: 29 '''Construct the Elasticsearch index mapping for Meshtastic records.''' 30 31 # Match on exact value or full text search 32 keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } } 33 34 # Construct the index mapping 35 mapping = { 36 'mappings': { 37 'properties': { 38 'channel' : { 'type': 'long'}, 39 'decoded' : { 40 'properties': { 41 'bitfield' : { 'type': 'long' }, 42 'payload' : { 43 'type' : 'nested', 44 'dynamic' : True, 45 'properties' : { 46 'HDOP' : { 'type': 'long' }, 47 'PDOP' : { 'type': 'long' }, 48 'altitude' : { 'type': 'long' }, 49 'altitudeGeoidalSeparation' : { 'type': 'long' }, 50 'altitudeHae' : { 'type': 'long' }, 51 'deviceMetrics' : { 52 'properties' : { 53 'airUtilTx' : { 'type': 'float' }, 54 'batteryLevel' : { 'type': 'long' }, 55 'channelUtilization' : { 'type': 'float' }, 56 'uptimeSeconds' : { 'type': 'long' }, 57 'voltage' : { 'type': 'float' } 58 } 59 }, 60 'environmentMetrics' : { 61 'properties' : { 62 'barometricPressure' : { 'type': 'float' }, 63 'current' : { 'type': 'float' }, 64 'distance' : keyword_mapping, 65 'gasResistance' : { 'type': 'float' }, 66 'iaq' : { 'type': 'long' }, 67 'lux' : { 'type': 'float' }, 68 'relativeHumidity' : { 'type': 'float' }, 69 'temperature' : { 'type': 'float' }, 70 'voltage' : { 'type': 'float' }, 71 'whiteLux' : { 'type': 'float' }, 72 'windDirection' : { 'type': 'long' }, 73 'windSpeed' : { 'type': 'float' } 74 } 75 }, 76 'errorReason' : keyword_mapping, 77 'groundSpeed' : { 'type': 'long' }, 78 'groundTrack' : { 'type': 'long' }, 79 'hwModel' : keyword_mapping, 80 'id' : keyword_mapping, 81 'isLicensed' : { 'type': 'boolean' }, 82 'lastSentById' : { 'type': 'long' }, 83 'latitudeI' : { 'type': 'long' }, 84 'locationSource' : keyword_mapping, 85 'longName' : keyword_mapping, 86 'longitudeI' : { 'type': 'long' }, 87 'macaddr' : keyword_mapping, 88 'neighbors' : { 89 'properties' : { 90 'nodeId' : { 'type': 'long' }, 91 'snr' : { 'type': 'float' } 92 } 93 }, 94 'nodeBroadcastIntervalSecs' : { 'type': 'long' }, 95 'nodeId' : { 'type': 'long' }, 96 'powerMetrics' : { 97 'properties': { 98 'ch1Current' : { 'type': 'float' }, 99 'ch1Voltage' : { 'type': 'float' }, 100 'ch2Current' : { 'type': 'float' }, 101 'ch2Voltage' : { 'type': 'float' }, 102 'ch3Current' : { 'type': 'float' }, 103 'ch3Voltage' : { 'type': 'float' } 104 } 105 }, 106 'precisionBits' : { 'type': 'long' }, 107 'publicKey' : keyword_mapping, 108 'role' : keyword_mapping, 109 'route' : { 'type': 'long' }, 110 'routeBack' : { 'type': 'long' }, 111 'satsInView' : { 'type': 'long' }, 112 'seqNumber' : { 'type': 'long' }, 113 'shortName' : keyword_mapping, 114 'snrBack' : { 'type': 'long' }, 115 'snrTowards' : { 'type': 'long' }, 116 'time' : { 'type': 'date' }, 117 'timestamp' : { 'type': 'date' } 118 } 119 }, 120 'portnum' : keyword_mapping, 121 'requestId' : { 'type': 'long' }, 122 'wantResponse' : { 'type': 'boolean' } 123 } 124 }, 125 'from' : { 'type': 'long' }, 126 'hopLimit' : { 'type': 'long' }, 127 'hopStart' : { 'type': 'long' }, 128 'id' : { 'type': 'long' }, 129 'priority' : keyword_mapping, 130 'rxRssi' : { 'type': 'long' }, 131 'rxSnr' : { 'type': 'float' }, 132 'rxTime' : { 'type': 'date' }, 133 'to' : { 'type': 'long' }, 134 'viaMqtt' : { 'type': 'boolean' }, 135 'wantAck' : { 'type': 'boolean' } 136 } 137 } 138 } 139 140 return mapping 141 142 143 async def process_data(input_path: str): 144 ''' 145 Read and process the input file 146 147 :param input_path: Path to the input file 148 ''' 149 150 async with aiofiles.open(input_path) as input_file: 151 async for line in input_file: 152 line = line.strip() 153 154 if line == '~eof': 155 break 156 157 if not line or not line.startswith('{'): 158 continue 159 160 try: 161 record = json.loads(line) 162 except json.decoder.JSONDecodeError: 163 logging.error(f'Failed to parse JSON record! ({line})') 164 continue 165 166 # Convert Unix timestamps to Zulu time format 167 if 'rxTime' in record: 168 record['rxTime'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(record['rxTime'])) 169 170 # Handle payload processing 171 if 'decoded' in record and 'payload' in record['decoded']: 172 payload = record['decoded']['payload'] 173 174 # If payload is not a dict, wrap it in a nested array with a value field 175 if not isinstance(payload, dict): 176 record['decoded']['payload'] = [{'value': payload}] 177 else: 178 # Process timestamps in payload object and ensure it's in an array 179 if 'time' in payload: 180 payload['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['time'])) 181 if 'timestamp' in payload: 182 payload['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['timestamp'])) 183 record['decoded']['payload'] = [payload] 184 185 yield {'_index': default_index, '_source': record} 186 187 188 async def test(): 189 '''Test the ingestion process.''' 190 191 async for document in process_data(): 192 print(document) 193 194 195 196 if __name__ == '__main__': 197 asyncio.run(test())