eris

- Elasticsearch Recon Ingestion Scripts (ERIS) 🔎
git clone git://git.acid.vegas/eris.git
Log | Files | Refs | Archive | README | LICENSE

ingest_meshtastic.py (6550B)

      1 #!/usr/bin/env python
      2 # Elasticsearch Recon Ingestion Scripts (ERIS) - Developed by Acidvegas (https://git.acid.vegas/eris)
      3 # ingest_meshtastic.py
      4 
      5 '''
      6 This plugin needs the meshtastic-mqtt-json library to convert Meshtastic MQTT messages to JSON.
      7 	pip install meshtastic-mqtt-json
      8 
      9 Use this command to pipe Meshtastic MQTT messages to the ERIS FIFO when using the --watch flag:
     10 	meshtastic-mqtt-json > ERIS_FIFO_PATH
     11 '''
     12 
     13 import asyncio
     14 import json
     15 import logging
     16 import time
     17 
     18 try:
     19 	import aiofiles
     20 except ImportError:
     21 	raise ImportError('Missing required \'aiofiles\' library. (pip install aiofiles)')
     22 
     23 
     24 # Set a default elasticsearch index if one is not provided
     25 default_index = 'eris-meshtastic'
     26 
     27 
     28 def construct_map() -> dict:
     29 	'''Construct the Elasticsearch index mapping for Meshtastic records.'''
     30 
     31 	# Match on exact value or full text search
     32 	keyword_mapping = { 'type': 'text', 'fields': { 'keyword': { 'type': 'keyword', 'ignore_above': 256 } } }
     33 
     34 	# Construct the index mapping
     35 	mapping = {
     36 		'mappings': {
     37 			'properties': {
     38 				'channel' : { 'type': 'long'},
     39 				'decoded' : {
     40 					'properties': {
     41 						'bitfield'       : { 'type': 'long' },
     42 						'payload'        : {
     43 							'type'       : 'nested',
     44 							'dynamic'    : True,
     45 							'properties' : {
     46 								'HDOP'                      : { 'type': 'long' },
     47 								'PDOP'                      : { 'type': 'long' },
     48 								'altitude'                  : { 'type': 'long' },
     49 								'altitudeGeoidalSeparation' : { 'type': 'long' },
     50 								'altitudeHae'               : { 'type': 'long' },
     51 								'deviceMetrics' : {
     52 									'properties' : {
     53 										'airUtilTx'          : { 'type': 'float' },
     54 										'batteryLevel'       : { 'type': 'long'  },
     55 										'channelUtilization' : { 'type': 'float' },
     56 										'uptimeSeconds'      : { 'type': 'long'  },
     57 										'voltage'            : { 'type': 'float' }
     58 									}
     59 								},
     60 								'environmentMetrics' : {
     61 									'properties' : {
     62 										'barometricPressure' : { 'type': 'float' },
     63 										'current'            : { 'type': 'float' },
     64 										'distance'           : keyword_mapping,
     65 										'gasResistance'      : { 'type': 'float' },
     66 										'iaq'                : { 'type': 'long'  },
     67 										'lux'                : { 'type': 'float' },
     68 										'relativeHumidity'   : { 'type': 'float' },
     69 										'temperature'        : { 'type': 'float' },
     70 										'voltage'            : { 'type': 'float' },
     71 										'whiteLux'           : { 'type': 'float' },
     72 										'windDirection'      : { 'type': 'long'  },
     73 										'windSpeed'          : { 'type': 'float' }
     74 									}
     75 								},
     76 								'errorReason'    : keyword_mapping,
     77 								'groundSpeed'    : { 'type': 'long'    },
     78 								'groundTrack'    : { 'type': 'long'    },
     79 								'hwModel'        : keyword_mapping,
     80 								'id'             : keyword_mapping,
     81 								'isLicensed'     : { 'type': 'boolean' },
     82 								'lastSentById'   : { 'type': 'long'    },
     83 								'latitudeI'      : { 'type': 'long'    },
     84 								'locationSource' : keyword_mapping,
     85 								'longName'       : keyword_mapping,
     86 								'longitudeI'     : { 'type': 'long'    },
     87 								'macaddr'        : keyword_mapping,
     88 								'neighbors'      : {
     89 									'properties' : {
     90 										'nodeId' : { 'type': 'long'  },
     91 										'snr'    : { 'type': 'float' }
     92 									}
     93 								},
     94 								'nodeBroadcastIntervalSecs' : { 'type': 'long' },
     95 								'nodeId'                    : { 'type': 'long' },
     96 								'powerMetrics'              : {
     97 									'properties': {
     98 										'ch1Current' : { 'type': 'float' },
     99 										'ch1Voltage' : { 'type': 'float' },
    100 										'ch2Current' : { 'type': 'float' },
    101 										'ch2Voltage' : { 'type': 'float' },
    102 										'ch3Current' : { 'type': 'float' },
    103 										'ch3Voltage' : { 'type': 'float' }
    104 									}
    105 								},
    106 								'precisionBits' : { 'type': 'long' },
    107 								'publicKey'     : keyword_mapping,
    108 								'role'          : keyword_mapping,
    109 								'route'         : { 'type': 'long' },
    110 								'routeBack'     : { 'type': 'long' },
    111 								'satsInView'    : { 'type': 'long' },
    112 								'seqNumber'     : { 'type': 'long' },
    113 								'shortName'     : keyword_mapping,
    114 								'snrBack'       : { 'type': 'long' },
    115 								'snrTowards'    : { 'type': 'long' },
    116 								'time'          : { 'type': 'date' },
    117 								'timestamp'     : { 'type': 'date' }
    118 							}
    119 						},
    120 						'portnum'      : keyword_mapping,
    121 						'requestId'    : { 'type': 'long'    },
    122 						'wantResponse' : { 'type': 'boolean' }
    123 					}
    124 				},
    125 				'from'     : { 'type': 'long'    },
    126 				'hopLimit' : { 'type': 'long'    },
    127 				'hopStart' : { 'type': 'long'    },
    128 				'id'       : { 'type': 'long'    },
    129 				'priority' : keyword_mapping,
    130 				'rxRssi'   : { 'type': 'long'    },
    131 				'rxSnr'    : { 'type': 'float'   },
    132 				'rxTime'   : { 'type': 'date'    },
    133 				'to'       : { 'type': 'long'    },
    134 				'viaMqtt'  : { 'type': 'boolean' },
    135 				'wantAck'  : { 'type': 'boolean' }
    136 			}
    137 		}
    138 	}
    139 
    140 	return mapping
    141 
    142 
    143 async def process_data(input_path: str):
    144 	'''
    145 	Read and process the input file
    146 
    147 	:param input_path: Path to the input file
    148 	'''
    149 
    150 	async with aiofiles.open(input_path) as input_file:
    151 		async for line in input_file:
    152 			line = line.strip()
    153 
    154 			if line == '~eof':
    155 				break
    156 
    157 			if not line or not line.startswith('{'):
    158 				continue
    159 
    160 			try:
    161 				record = json.loads(line)
    162 			except json.decoder.JSONDecodeError:
    163 				logging.error(f'Failed to parse JSON record! ({line})')
    164 				continue
    165 
    166 			# Convert Unix timestamps to Zulu time format
    167 			if 'rxTime' in record:
    168 				record['rxTime'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(record['rxTime']))
    169 
    170 			# Handle payload processing
    171 			if 'decoded' in record and 'payload' in record['decoded']:
    172 				payload = record['decoded']['payload']
    173 				
    174 				# If payload is not a dict, wrap it in a nested array with a value field
    175 				if not isinstance(payload, dict):
    176 					record['decoded']['payload'] = [{'value': payload}]
    177 				else:
    178 					# Process timestamps in payload object and ensure it's in an array
    179 					if 'time' in payload:
    180 						payload['time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['time']))
    181 					if 'timestamp' in payload:
    182 						payload['timestamp'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(payload['timestamp']))
    183 					record['decoded']['payload'] = [payload]
    184 
    185 			yield {'_index': default_index, '_source': record}
    186 
    187 
    188 async def test():
    189 	'''Test the ingestion process.'''
    190 
    191 	async for document in process_data():
    192 		print(document)
    193 
    194 
    195 
    196 if __name__ == '__main__':
    197 	asyncio.run(test())