meshtastic_mqtt

- Unnamed repository; edit this file 'description' to name the repository.
git clone git://git.acid.vegas/-c.git
Log | Files | Refs | Archive | README | LICENSE

client.py (16027B)

      1 #!/usr/bin/env python
      2 # Meshtastic MQTT Interface - Developed by acidvegas in Python (https://acid.vegas/meshtastic_mqtt_json)
      3 
      4 import argparse
      5 import base64
      6 import json
      7 import time
      8 
      9 try:
     10 	from cryptography.hazmat.backends           import default_backend
     11 	from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
     12 except ImportError:
     13 	raise ImportError('missing the cryptography module (pip install cryptography)')
     14 
     15 try:
     16 	from google.protobuf.json_format import MessageToJson
     17 except ImportError:
     18 	raise ImportError('missing the google protobuf module (pip install protobuf)')
     19 
     20 try:
     21 	from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2
     22 except ImportError:
     23 	raise ImportError('missing the meshtastic module (pip install meshtastic)')
     24 
     25 try:
     26 	import paho.mqtt.client as mqtt
     27 except ImportError:
     28 	raise ImportError('missing the paho-mqtt module (pip install paho-mqtt)')
     29 
     30 
     31 def clean_json(data) -> dict:
     32 	'''
     33 	Clean the JSON data by replacing NaN values with null
     34 
     35 	:param data: The JSON data to clean
     36 	'''
     37 	# Handle protobuf messages
     38 	if hasattr(data, 'DESCRIPTOR'):
     39 		data = json.loads(MessageToJson(data))
     40 
     41 	# Remove empty and NaN values from the JSON data
     42 	if isinstance(data, dict):
     43 		return {k: v for k, v in ((k, clean_json(v)) for k, v in data.items()) if str(v) not in ('None', 'nan', '')}
     44 	elif isinstance(data, list):
     45 		return [v for v in (clean_json(v) for v in data) if str(v) not in ('None', 'nan', '')]
     46 
     47 	# Return primitive types as-is
     48 	return data
     49 
     50 
     51 class MeshtasticMQTT(object):
     52 	def __init__(self):
     53 		'''Initialize the Meshtastic MQTT client'''
     54 
     55 		self.broadcast_id = 4294967295 # Our channel ID
     56 		self.key          = None
     57 		self.names        = {}
     58 		self.filters      = None
     59 		self.callbacks    = {}  # Dictionary to store message type callbacks
     60 
     61 
     62 	def register_callback(self, message_type: str, callback: callable):
     63 		'''
     64 		Register a callback function for a specific message type
     65 
     66 		:param message_type: The message type to register for (e.g. 'TEXT_MESSAGE_APP', 'POSITION_APP')
     67 		:param callback:     The callback function to call when a message of this type is received
     68 		'''
     69 		if not message_type.endswith('_APP'):
     70 			message_type = f'{message_type}_APP'
     71 		self.callbacks[message_type] = callback
     72 
     73 
     74 	def unregister_callback(self, message_type: str):
     75 		'''
     76 		Unregister a callback function for a specific message type
     77 
     78 		:param message_type: The message type to unregister
     79 		'''
     80 		if not message_type.endswith('_APP'):
     81 			message_type = f'{message_type}_APP'
     82 		if message_type in self.callbacks:
     83 			del self.callbacks[message_type]
     84 
     85 
     86 	def _handle_message(self, mp, json_packet: dict, portnum_name: str):
     87 		'''
     88 		Handle a message by calling registered callbacks
     89 
     90 		:param mp:           The message packet
     91 		:param json_packet:  The JSON representation of the packet
     92 		:param portnum_name: The name of the port number
     93 		'''
     94 		# Call registered callback if one exists
     95 		if portnum_name in self.callbacks:
     96 			self.callbacks[portnum_name](json_packet)
     97 		else:
     98 			# Default behavior - print to console
     99 			print(f'{json.dumps(json_packet)}')
    100 
    101 
    102 	def connect(self, broker: str, port: int, root: str, channel: str, username: str, password: str, key: str):
    103 		'''
    104 		Connect to the MQTT broker
    105 
    106 		:param broker:   The MQTT broker address
    107 		:param port:     The MQTT broker port
    108 		:param root:     The root topic
    109 		:param channel:  The channel name
    110 		:param username: The MQTT username
    111 		:param password: The MQTT password
    112 		:param key:      The encryption key
    113 		'''
    114 
    115 		# Initialize the MQTT client
    116 		client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id='', clean_session=True, userdata=None)
    117 		client.connect_timeout = 10
    118 
    119 		# Set the username and password for the MQTT broker
    120 		client.username_pw_set(username=username, password=password)
    121 
    122 		# Set the encryption key
    123 		self.key = '1PG7OiApB1nwvP+rz05pAQ==' if key == 'AQ==' else key
    124 
    125 		# Prepare the key for decryption
    126 		try:
    127 			padded_key = self.key.ljust(len(self.key) + ((4 - (len(self.key) % 4)) % 4), '=')
    128 			replaced_key = padded_key.replace('-', '+').replace('_', '/')
    129 			self.key_bytes = base64.b64decode(replaced_key.encode('ascii'))
    130 		except Exception as e:
    131 			print(f'Error decoding key: {e}')
    132 			raise
    133 
    134 		# Set the MQTT callbacks
    135 		client.on_connect    = self.event_mqtt_connect
    136 		client.on_message    = self.event_mqtt_recv
    137 		client.on_disconnect = self.event_mqtt_disconnect
    138 
    139 		# Connect to the MQTT broker
    140 		try:
    141 			client.connect(broker, port, 60)
    142 		except Exception as e:
    143 			print(f'Error connecting to MQTT broker: {e}')
    144 			self.event_mqtt_disconnect(client, '', 1, None)
    145 
    146 		# Set the subscribe topic
    147 		self.subscribe_topic = f'{root}{channel}/#'
    148 
    149 		# Keep-alive loop
    150 		client.loop_forever()
    151 
    152 
    153 	def decrypt_message_packet(self, mp):
    154 		'''
    155 		Decrypt an encrypted message packet.
    156 
    157 		:param mp: The message packet to decrypt
    158 		'''
    159 		try:
    160 			# Extract the nonce from the packet
    161 			nonce_packet_id = getattr(mp, 'id').to_bytes(8, 'little')
    162 			nonce_from_node = getattr(mp, 'from').to_bytes(8, 'little')
    163 			nonce = nonce_packet_id + nonce_from_node
    164 
    165 			# Decrypt the message
    166 			cipher = Cipher(algorithms.AES(self.key_bytes), modes.CTR(nonce), backend=default_backend())
    167 			decryptor = cipher.decryptor()
    168 			decrypted_bytes = decryptor.update(getattr(mp, 'encrypted')) + decryptor.finalize()
    169 
    170 			# Parse the decrypted message
    171 			data = mesh_pb2.Data()
    172 			try:
    173 				data.ParseFromString(decrypted_bytes)
    174 			except:
    175 				# Ignore this as the message does not need to be decrypted
    176 				return None
    177 
    178 			mp.decoded.CopyFrom(data)
    179 
    180 			return mp
    181 
    182 		except Exception as e:
    183 			print(f'Error decrypting message: {e}')
    184 			print(f'Message packet details:')
    185 			print(f'- From: {getattr(mp, "from", "unknown")}')
    186 			print(f'- To: {getattr(mp, "to", "unknown")}')
    187 			print(f'- Channel: {getattr(mp, "channel", "unknown")}')
    188 			print(f'- ID: {getattr(mp, "id", "unknown")}')
    189 			return None
    190 
    191 
    192 	def event_mqtt_connect(self, client, userdata, flags, rc, properties):
    193 		'''
    194 		Callback for when the client receives a CONNACK response from the server.
    195 
    196 		:param client:     The client instance for this callback
    197 		:param userdata:   The private user data as set in Client() or user_data_set()
    198 		:param flags:      Response flags sent by the broker
    199 		:param rc:         The connection result
    200 		:param properties: The properties returned by the broker
    201 		'''
    202 
    203 		if rc == 0:
    204 			client.subscribe(self.subscribe_topic)
    205 		else:
    206 			print(f'Failed to connect to MQTT broker: {rc}')
    207 
    208 
    209 	def event_mqtt_recv(self, client, userdata, msg):
    210 		'''
    211 		Callback for when a message is received from the server.
    212 
    213 		:param client:   The client instance for this callback
    214 		:param userdata: The private user data as set in Client() or user_data_set()
    215 		:param msg:      An instance of MQTTMessage
    216 		'''
    217 
    218 		try:
    219 			# Define the service envelope
    220 			service_envelope = mqtt_pb2.ServiceEnvelope()
    221 
    222 			try:
    223 				# Parse the message payload
    224 				service_envelope.ParseFromString(msg.payload)
    225 			except Exception as e:
    226 				print(f'Error parsing service envelope: {e}')
    227 				print(f'Raw payload: {msg.payload}')
    228 				return
    229 
    230 			# Extract the message packet from the service envelope
    231 			mp = service_envelope.packet
    232 
    233 			# Check if the message is encrypted before decrypting it
    234 			if mp.HasField('encrypted'):
    235 				decrypted_mp = self.decrypt_message_packet(mp)
    236 				if decrypted_mp:
    237 					mp = decrypted_mp
    238 				else:
    239 					return
    240 
    241 			portnum_name = portnums_pb2.PortNum.Name(mp.decoded.portnum)
    242 
    243 			# Skip if message type doesn't match filter
    244 			if self.filters and portnum_name not in self.filters:
    245 				return
    246 
    247 			# Convert to JSON and handle NaN values in one shot
    248 			json_packet = clean_json(mp)
    249 
    250 			# Process the message based on its type
    251 			if mp.decoded.portnum == portnums_pb2.ADMIN_APP:
    252 				data = mesh_pb2.Admin()
    253 				data.ParseFromString(mp.decoded.payload)
    254 				json_packet['decoded']['payload'] = clean_json(data)
    255 				self._handle_message(mp, json_packet, portnum_name)
    256 
    257 			elif mp.decoded.portnum == portnums_pb2.ATAK_FORWARDER:
    258 				data = mesh_pb2.AtakForwarder()
    259 				data.ParseFromString(mp.decoded.payload)
    260 				json_packet['decoded']['payload'] = clean_json(data)
    261 				self._handle_message(mp, json_packet, portnum_name)
    262 
    263 			elif mp.decoded.portnum == portnums_pb2.ATAK_PLUGIN:
    264 				data = mesh_pb2.AtakPlugin()
    265 				data.ParseFromString(mp.decoded.payload)
    266 				json_packet['decoded']['payload'] = clean_json(data)
    267 				self._handle_message(mp, json_packet, portnum_name)
    268 
    269 			elif mp.decoded.portnum == portnums_pb2.AUDIO_APP:
    270 				data = mesh_pb2.Audio()
    271 				data.ParseFromString(mp.decoded.payload)
    272 				json_packet['decoded']['payload'] = clean_json(data)
    273 				self._handle_message(mp, json_packet, portnum_name)
    274 
    275 			elif mp.decoded.portnum == portnums_pb2.DETECTION_SENSOR_APP:
    276 				data = mesh_pb2.DetectionSensor()
    277 				data.ParseFromString(mp.decoded.payload)
    278 				json_packet['decoded']['payload'] = clean_json(data)
    279 				self._handle_message(mp, json_packet, portnum_name)
    280 
    281 			elif mp.decoded.portnum == portnums_pb2.IP_TUNNEL_APP:
    282 				data = mesh_pb2.IPTunnel()
    283 				data.ParseFromString(mp.decoded.payload)
    284 				json_packet['decoded']['payload'] = clean_json(data)
    285 				self._handle_message(mp, json_packet, portnum_name)
    286 
    287 			elif mp.decoded.portnum == portnums_pb2.MAP_REPORT_APP:
    288 				map_report = mesh_pb2.MapReport()
    289 				map_report.ParseFromString(mp.decoded.payload)
    290 				json_packet['decoded']['payload'] = clean_json(map_report)
    291 				self._handle_message(mp, json_packet, portnum_name)
    292 
    293 			elif mp.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP:
    294 				neighborInfo = mesh_pb2.NeighborInfo()
    295 				neighborInfo.ParseFromString(mp.decoded.payload)
    296 				json_packet['decoded']['payload'] = clean_json(neighborInfo)
    297 				self._handle_message(mp, json_packet, portnum_name)
    298 
    299 			elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP:
    300 				from_id = getattr(mp, 'from')
    301 				node_info = mesh_pb2.User()
    302 				node_info.ParseFromString(mp.decoded.payload)
    303 				json_packet['decoded']['payload'] = clean_json(node_info)
    304 				self._handle_message(mp, json_packet, portnum_name)
    305 				self.names[from_id] = node_info.long_name
    306 
    307 			elif mp.decoded.portnum == portnums_pb2.PAXCOUNTER_APP:
    308 				data = mesh_pb2.Paxcounter()
    309 				data.ParseFromString(mp.decoded.payload)
    310 				json_packet['decoded']['payload'] = clean_json(data)
    311 				self._handle_message(mp, json_packet, portnum_name)
    312 
    313 			elif mp.decoded.portnum == portnums_pb2.POSITION_APP:
    314 				position = mesh_pb2.Position()
    315 				position.ParseFromString(mp.decoded.payload)
    316 				json_packet['decoded']['payload'] = clean_json(position)
    317 				self._handle_message(mp, json_packet, portnum_name)
    318 
    319 			elif mp.decoded.portnum == portnums_pb2.PRIVATE_APP:
    320 				data = mesh_pb2.Private()
    321 				data.ParseFromString(mp.decoded.payload)
    322 				json_packet['decoded']['payload'] = clean_json(data)
    323 				self._handle_message(mp, json_packet, portnum_name)
    324 
    325 			elif mp.decoded.portnum == portnums_pb2.RANGE_TEST_APP:
    326 				data = mesh_pb2.RangeTest()
    327 				data.ParseFromString(mp.decoded.payload)
    328 				json_packet['decoded']['payload'] = clean_json(data)
    329 				self._handle_message(mp, json_packet, portnum_name)
    330 
    331 			elif mp.decoded.portnum == portnums_pb2.REMOTE_HARDWARE_APP:
    332 				data = mesh_pb2.RemoteHardware()
    333 				data.ParseFromString(mp.decoded.payload)
    334 				json_packet['decoded']['payload'] = clean_json(data)
    335 				self._handle_message(mp, json_packet, portnum_name)
    336 
    337 			elif mp.decoded.portnum == portnums_pb2.REPLY_APP:
    338 				data = mesh_pb2.Reply()
    339 				data.ParseFromString(mp.decoded.payload)
    340 				json_packet['decoded']['payload'] = clean_json(data)
    341 				self._handle_message(mp, json_packet, portnum_name)
    342 
    343 			elif mp.decoded.portnum == portnums_pb2.ROUTING_APP:
    344 				routing = mesh_pb2.Routing()
    345 				routing.ParseFromString(mp.decoded.payload)
    346 				json_packet['decoded']['payload'] = clean_json(routing)
    347 				self._handle_message(mp, json_packet, portnum_name)
    348 
    349 			elif mp.decoded.portnum == portnums_pb2.SERIAL_APP:
    350 				data = mesh_pb2.Serial()
    351 				data.ParseFromString(mp.decoded.payload)
    352 				json_packet['decoded']['payload'] = clean_json(data)
    353 				self._handle_message(mp, json_packet, portnum_name)
    354 
    355 			elif mp.decoded.portnum == portnums_pb2.SIMULATOR_APP:
    356 				data = mesh_pb2.Simulator()
    357 				data.ParseFromString(mp.decoded.payload)
    358 				json_packet['decoded']['payload'] = clean_json(data)
    359 				self._handle_message(mp, json_packet, portnum_name)
    360 
    361 			elif mp.decoded.portnum == portnums_pb2.STORE_FORWARD_APP:
    362 				json_packet['decoded']['payload'] = mp.decoded.payload
    363 				self._handle_message(mp, json_packet, portnum_name)
    364 
    365 			elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP:
    366 				telemetry = telemetry_pb2.Telemetry()
    367 				telemetry.ParseFromString(mp.decoded.payload)
    368 				json_packet['decoded']['payload'] = clean_json(telemetry)
    369 				self._handle_message(mp, json_packet, portnum_name)
    370 
    371 			elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
    372 				text_payload = mp.decoded.payload.decode('utf-8')
    373 				json_packet['decoded']['payload'] = text_payload
    374 				self._handle_message(mp, json_packet, portnum_name)
    375 
    376 			elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_COMPRESSED_APP:
    377 				data = mesh_pb2.TextMessageCompressed()
    378 				data.ParseFromString(mp.decoded.payload)
    379 				json_packet['decoded']['payload'] = clean_json(data)
    380 				self._handle_message(mp, json_packet, portnum_name)
    381 
    382 			elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP:
    383 				routeDiscovery = mesh_pb2.RouteDiscovery()
    384 				routeDiscovery.ParseFromString(mp.decoded.payload)
    385 				json_packet['decoded']['payload'] = clean_json(routeDiscovery)
    386 				self._handle_message(mp, json_packet, portnum_name)
    387 
    388 			elif mp.decoded.portnum == portnums_pb2.UNKNOWN_APP:
    389 				self._handle_message(mp, json_packet, portnum_name)
    390 
    391 			elif mp.decoded.portnum == portnums_pb2.WAYPOINT_APP:
    392 				data = mesh_pb2.Waypoint()
    393 				data.ParseFromString(mp.decoded.payload)
    394 				json_packet['decoded']['payload'] = clean_json(data)
    395 				self._handle_message(mp, json_packet, portnum_name)
    396 
    397 			elif mp.decoded.portnum == portnums_pb2.ZPS_APP:
    398 				data = mesh_pb2.Zps()
    399 				data.ParseFromString(mp.decoded.payload)
    400 				json_packet['decoded']['payload'] = clean_json(data)
    401 				self._handle_message(mp, json_packet, portnum_name)
    402 
    403 			else:
    404 				print(f'UNKNOWN: Received Portnum name: {portnum_name}')
    405 				self._handle_message(mp, json_packet, portnum_name)
    406 
    407 		except Exception as e:
    408 			print(f'Error processing message: {e}')
    409 			print(f'Topic: {msg.topic}')
    410 			print(f'Payload: {msg.payload}')
    411 
    412 
    413 	def event_mqtt_disconnect(self, client, userdata, rc, packet_from_broker=None, properties=None, reason_code=None):
    414 		'''Callback for when the client disconnects from the server.'''
    415 		print(f'Disconnected with result code: {rc}')
    416 		while True:
    417 			print('Attempting to reconnect...')
    418 			try:
    419 				client.reconnect()
    420 			except Exception as e:
    421 				print(f'Error reconnecting to MQTT broker: {e}')
    422 				time.sleep(5)
    423 			else:
    424 				print('Reconnected to MQTT broker')
    425 				break
    426 
    427 
    428 def main():
    429     parser = argparse.ArgumentParser(description='Meshtastic MQTT Interface')
    430     parser.add_argument('--broker', default='mqtt.meshtastic.org', help='MQTT broker address')
    431     parser.add_argument('--port', default=1883, type=int, help='MQTT broker port')
    432     parser.add_argument('--root', default='msh/US/2/e/', help='Root topic')
    433     parser.add_argument('--channel', default='LongFast', help='Channel name')
    434     parser.add_argument('--username', default='meshdev', help='MQTT username')
    435     parser.add_argument('--password', default='large4cats', help='MQTT password')
    436     parser.add_argument('--key', default='AQ==', help='Encryption key')
    437     parser.add_argument('--filter', help='Filter message types (comma-separated). Example: NODEINFO,POSITION,TEXT_MESSAGE')
    438     args = parser.parse_args()
    439 
    440     client = MeshtasticMQTT()
    441     if args.filter:
    442         client.filters = [f'{f.strip()}_APP' for f in args.filter.upper().split(',')]
    443     else:
    444         client.filters = None
    445     client.connect(args.broker, args.port, args.root, args.channel, args.username, args.password, args.key)
    446 
    447 
    448 
    449 if __name__ == '__main__':
    450     main()