meshtastic_mqtt- Unnamed repository; edit this file 'description' to name the repository. |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
client.py (16027B)
1 #!/usr/bin/env python 2 # Meshtastic MQTT Interface - Developed by acidvegas in Python (https://acid.vegas/meshtastic_mqtt_json) 3 4 import argparse 5 import base64 6 import json 7 import time 8 9 try: 10 from cryptography.hazmat.backends import default_backend 11 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 12 except ImportError: 13 raise ImportError('missing the cryptography module (pip install cryptography)') 14 15 try: 16 from google.protobuf.json_format import MessageToJson 17 except ImportError: 18 raise ImportError('missing the google protobuf module (pip install protobuf)') 19 20 try: 21 from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2 22 except ImportError: 23 raise ImportError('missing the meshtastic module (pip install meshtastic)') 24 25 try: 26 import paho.mqtt.client as mqtt 27 except ImportError: 28 raise ImportError('missing the paho-mqtt module (pip install paho-mqtt)') 29 30 31 def clean_json(data) -> dict: 32 ''' 33 Clean the JSON data by replacing NaN values with null 34 35 :param data: The JSON data to clean 36 ''' 37 # Handle protobuf messages 38 if hasattr(data, 'DESCRIPTOR'): 39 data = json.loads(MessageToJson(data)) 40 41 # Remove empty and NaN values from the JSON data 42 if isinstance(data, dict): 43 return {k: v for k, v in ((k, clean_json(v)) for k, v in data.items()) if str(v) not in ('None', 'nan', '')} 44 elif isinstance(data, list): 45 return [v for v in (clean_json(v) for v in data) if str(v) not in ('None', 'nan', '')] 46 47 # Return primitive types as-is 48 return data 49 50 51 class MeshtasticMQTT(object): 52 def __init__(self): 53 '''Initialize the Meshtastic MQTT client''' 54 55 self.broadcast_id = 4294967295 # Our channel ID 56 self.key = None 57 self.names = {} 58 self.filters = None 59 self.callbacks = {} # Dictionary to store message type callbacks 60 61 62 def register_callback(self, message_type: str, callback: callable): 63 ''' 64 Register a callback function for a specific message type 65 66 :param message_type: The message type to register for (e.g. 'TEXT_MESSAGE_APP', 'POSITION_APP') 67 :param callback: The callback function to call when a message of this type is received 68 ''' 69 if not message_type.endswith('_APP'): 70 message_type = f'{message_type}_APP' 71 self.callbacks[message_type] = callback 72 73 74 def unregister_callback(self, message_type: str): 75 ''' 76 Unregister a callback function for a specific message type 77 78 :param message_type: The message type to unregister 79 ''' 80 if not message_type.endswith('_APP'): 81 message_type = f'{message_type}_APP' 82 if message_type in self.callbacks: 83 del self.callbacks[message_type] 84 85 86 def _handle_message(self, mp, json_packet: dict, portnum_name: str): 87 ''' 88 Handle a message by calling registered callbacks 89 90 :param mp: The message packet 91 :param json_packet: The JSON representation of the packet 92 :param portnum_name: The name of the port number 93 ''' 94 # Call registered callback if one exists 95 if portnum_name in self.callbacks: 96 self.callbacks[portnum_name](json_packet) 97 else: 98 # Default behavior - print to console 99 print(f'{json.dumps(json_packet)}') 100 101 102 def connect(self, broker: str, port: int, root: str, channel: str, username: str, password: str, key: str): 103 ''' 104 Connect to the MQTT broker 105 106 :param broker: The MQTT broker address 107 :param port: The MQTT broker port 108 :param root: The root topic 109 :param channel: The channel name 110 :param username: The MQTT username 111 :param password: The MQTT password 112 :param key: The encryption key 113 ''' 114 115 # Initialize the MQTT client 116 client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id='', clean_session=True, userdata=None) 117 client.connect_timeout = 10 118 119 # Set the username and password for the MQTT broker 120 client.username_pw_set(username=username, password=password) 121 122 # Set the encryption key 123 self.key = '1PG7OiApB1nwvP+rz05pAQ==' if key == 'AQ==' else key 124 125 # Prepare the key for decryption 126 try: 127 padded_key = self.key.ljust(len(self.key) + ((4 - (len(self.key) % 4)) % 4), '=') 128 replaced_key = padded_key.replace('-', '+').replace('_', '/') 129 self.key_bytes = base64.b64decode(replaced_key.encode('ascii')) 130 except Exception as e: 131 print(f'Error decoding key: {e}') 132 raise 133 134 # Set the MQTT callbacks 135 client.on_connect = self.event_mqtt_connect 136 client.on_message = self.event_mqtt_recv 137 client.on_disconnect = self.event_mqtt_disconnect 138 139 # Connect to the MQTT broker 140 try: 141 client.connect(broker, port, 60) 142 except Exception as e: 143 print(f'Error connecting to MQTT broker: {e}') 144 self.event_mqtt_disconnect(client, '', 1, None) 145 146 # Set the subscribe topic 147 self.subscribe_topic = f'{root}{channel}/#' 148 149 # Keep-alive loop 150 client.loop_forever() 151 152 153 def decrypt_message_packet(self, mp): 154 ''' 155 Decrypt an encrypted message packet. 156 157 :param mp: The message packet to decrypt 158 ''' 159 try: 160 # Extract the nonce from the packet 161 nonce_packet_id = getattr(mp, 'id').to_bytes(8, 'little') 162 nonce_from_node = getattr(mp, 'from').to_bytes(8, 'little') 163 nonce = nonce_packet_id + nonce_from_node 164 165 # Decrypt the message 166 cipher = Cipher(algorithms.AES(self.key_bytes), modes.CTR(nonce), backend=default_backend()) 167 decryptor = cipher.decryptor() 168 decrypted_bytes = decryptor.update(getattr(mp, 'encrypted')) + decryptor.finalize() 169 170 # Parse the decrypted message 171 data = mesh_pb2.Data() 172 try: 173 data.ParseFromString(decrypted_bytes) 174 except: 175 # Ignore this as the message does not need to be decrypted 176 return None 177 178 mp.decoded.CopyFrom(data) 179 180 return mp 181 182 except Exception as e: 183 print(f'Error decrypting message: {e}') 184 print(f'Message packet details:') 185 print(f'- From: {getattr(mp, "from", "unknown")}') 186 print(f'- To: {getattr(mp, "to", "unknown")}') 187 print(f'- Channel: {getattr(mp, "channel", "unknown")}') 188 print(f'- ID: {getattr(mp, "id", "unknown")}') 189 return None 190 191 192 def event_mqtt_connect(self, client, userdata, flags, rc, properties): 193 ''' 194 Callback for when the client receives a CONNACK response from the server. 195 196 :param client: The client instance for this callback 197 :param userdata: The private user data as set in Client() or user_data_set() 198 :param flags: Response flags sent by the broker 199 :param rc: The connection result 200 :param properties: The properties returned by the broker 201 ''' 202 203 if rc == 0: 204 client.subscribe(self.subscribe_topic) 205 else: 206 print(f'Failed to connect to MQTT broker: {rc}') 207 208 209 def event_mqtt_recv(self, client, userdata, msg): 210 ''' 211 Callback for when a message is received from the server. 212 213 :param client: The client instance for this callback 214 :param userdata: The private user data as set in Client() or user_data_set() 215 :param msg: An instance of MQTTMessage 216 ''' 217 218 try: 219 # Define the service envelope 220 service_envelope = mqtt_pb2.ServiceEnvelope() 221 222 try: 223 # Parse the message payload 224 service_envelope.ParseFromString(msg.payload) 225 except Exception as e: 226 print(f'Error parsing service envelope: {e}') 227 print(f'Raw payload: {msg.payload}') 228 return 229 230 # Extract the message packet from the service envelope 231 mp = service_envelope.packet 232 233 # Check if the message is encrypted before decrypting it 234 if mp.HasField('encrypted'): 235 decrypted_mp = self.decrypt_message_packet(mp) 236 if decrypted_mp: 237 mp = decrypted_mp 238 else: 239 return 240 241 portnum_name = portnums_pb2.PortNum.Name(mp.decoded.portnum) 242 243 # Skip if message type doesn't match filter 244 if self.filters and portnum_name not in self.filters: 245 return 246 247 # Convert to JSON and handle NaN values in one shot 248 json_packet = clean_json(mp) 249 250 # Process the message based on its type 251 if mp.decoded.portnum == portnums_pb2.ADMIN_APP: 252 data = mesh_pb2.Admin() 253 data.ParseFromString(mp.decoded.payload) 254 json_packet['decoded']['payload'] = clean_json(data) 255 self._handle_message(mp, json_packet, portnum_name) 256 257 elif mp.decoded.portnum == portnums_pb2.ATAK_FORWARDER: 258 data = mesh_pb2.AtakForwarder() 259 data.ParseFromString(mp.decoded.payload) 260 json_packet['decoded']['payload'] = clean_json(data) 261 self._handle_message(mp, json_packet, portnum_name) 262 263 elif mp.decoded.portnum == portnums_pb2.ATAK_PLUGIN: 264 data = mesh_pb2.AtakPlugin() 265 data.ParseFromString(mp.decoded.payload) 266 json_packet['decoded']['payload'] = clean_json(data) 267 self._handle_message(mp, json_packet, portnum_name) 268 269 elif mp.decoded.portnum == portnums_pb2.AUDIO_APP: 270 data = mesh_pb2.Audio() 271 data.ParseFromString(mp.decoded.payload) 272 json_packet['decoded']['payload'] = clean_json(data) 273 self._handle_message(mp, json_packet, portnum_name) 274 275 elif mp.decoded.portnum == portnums_pb2.DETECTION_SENSOR_APP: 276 data = mesh_pb2.DetectionSensor() 277 data.ParseFromString(mp.decoded.payload) 278 json_packet['decoded']['payload'] = clean_json(data) 279 self._handle_message(mp, json_packet, portnum_name) 280 281 elif mp.decoded.portnum == portnums_pb2.IP_TUNNEL_APP: 282 data = mesh_pb2.IPTunnel() 283 data.ParseFromString(mp.decoded.payload) 284 json_packet['decoded']['payload'] = clean_json(data) 285 self._handle_message(mp, json_packet, portnum_name) 286 287 elif mp.decoded.portnum == portnums_pb2.MAP_REPORT_APP: 288 map_report = mesh_pb2.MapReport() 289 map_report.ParseFromString(mp.decoded.payload) 290 json_packet['decoded']['payload'] = clean_json(map_report) 291 self._handle_message(mp, json_packet, portnum_name) 292 293 elif mp.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP: 294 neighborInfo = mesh_pb2.NeighborInfo() 295 neighborInfo.ParseFromString(mp.decoded.payload) 296 json_packet['decoded']['payload'] = clean_json(neighborInfo) 297 self._handle_message(mp, json_packet, portnum_name) 298 299 elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP: 300 from_id = getattr(mp, 'from') 301 node_info = mesh_pb2.User() 302 node_info.ParseFromString(mp.decoded.payload) 303 json_packet['decoded']['payload'] = clean_json(node_info) 304 self._handle_message(mp, json_packet, portnum_name) 305 self.names[from_id] = node_info.long_name 306 307 elif mp.decoded.portnum == portnums_pb2.PAXCOUNTER_APP: 308 data = mesh_pb2.Paxcounter() 309 data.ParseFromString(mp.decoded.payload) 310 json_packet['decoded']['payload'] = clean_json(data) 311 self._handle_message(mp, json_packet, portnum_name) 312 313 elif mp.decoded.portnum == portnums_pb2.POSITION_APP: 314 position = mesh_pb2.Position() 315 position.ParseFromString(mp.decoded.payload) 316 json_packet['decoded']['payload'] = clean_json(position) 317 self._handle_message(mp, json_packet, portnum_name) 318 319 elif mp.decoded.portnum == portnums_pb2.PRIVATE_APP: 320 data = mesh_pb2.Private() 321 data.ParseFromString(mp.decoded.payload) 322 json_packet['decoded']['payload'] = clean_json(data) 323 self._handle_message(mp, json_packet, portnum_name) 324 325 elif mp.decoded.portnum == portnums_pb2.RANGE_TEST_APP: 326 data = mesh_pb2.RangeTest() 327 data.ParseFromString(mp.decoded.payload) 328 json_packet['decoded']['payload'] = clean_json(data) 329 self._handle_message(mp, json_packet, portnum_name) 330 331 elif mp.decoded.portnum == portnums_pb2.REMOTE_HARDWARE_APP: 332 data = mesh_pb2.RemoteHardware() 333 data.ParseFromString(mp.decoded.payload) 334 json_packet['decoded']['payload'] = clean_json(data) 335 self._handle_message(mp, json_packet, portnum_name) 336 337 elif mp.decoded.portnum == portnums_pb2.REPLY_APP: 338 data = mesh_pb2.Reply() 339 data.ParseFromString(mp.decoded.payload) 340 json_packet['decoded']['payload'] = clean_json(data) 341 self._handle_message(mp, json_packet, portnum_name) 342 343 elif mp.decoded.portnum == portnums_pb2.ROUTING_APP: 344 routing = mesh_pb2.Routing() 345 routing.ParseFromString(mp.decoded.payload) 346 json_packet['decoded']['payload'] = clean_json(routing) 347 self._handle_message(mp, json_packet, portnum_name) 348 349 elif mp.decoded.portnum == portnums_pb2.SERIAL_APP: 350 data = mesh_pb2.Serial() 351 data.ParseFromString(mp.decoded.payload) 352 json_packet['decoded']['payload'] = clean_json(data) 353 self._handle_message(mp, json_packet, portnum_name) 354 355 elif mp.decoded.portnum == portnums_pb2.SIMULATOR_APP: 356 data = mesh_pb2.Simulator() 357 data.ParseFromString(mp.decoded.payload) 358 json_packet['decoded']['payload'] = clean_json(data) 359 self._handle_message(mp, json_packet, portnum_name) 360 361 elif mp.decoded.portnum == portnums_pb2.STORE_FORWARD_APP: 362 json_packet['decoded']['payload'] = mp.decoded.payload 363 self._handle_message(mp, json_packet, portnum_name) 364 365 elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP: 366 telemetry = telemetry_pb2.Telemetry() 367 telemetry.ParseFromString(mp.decoded.payload) 368 json_packet['decoded']['payload'] = clean_json(telemetry) 369 self._handle_message(mp, json_packet, portnum_name) 370 371 elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP: 372 text_payload = mp.decoded.payload.decode('utf-8') 373 json_packet['decoded']['payload'] = text_payload 374 self._handle_message(mp, json_packet, portnum_name) 375 376 elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_COMPRESSED_APP: 377 data = mesh_pb2.TextMessageCompressed() 378 data.ParseFromString(mp.decoded.payload) 379 json_packet['decoded']['payload'] = clean_json(data) 380 self._handle_message(mp, json_packet, portnum_name) 381 382 elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP: 383 routeDiscovery = mesh_pb2.RouteDiscovery() 384 routeDiscovery.ParseFromString(mp.decoded.payload) 385 json_packet['decoded']['payload'] = clean_json(routeDiscovery) 386 self._handle_message(mp, json_packet, portnum_name) 387 388 elif mp.decoded.portnum == portnums_pb2.UNKNOWN_APP: 389 self._handle_message(mp, json_packet, portnum_name) 390 391 elif mp.decoded.portnum == portnums_pb2.WAYPOINT_APP: 392 data = mesh_pb2.Waypoint() 393 data.ParseFromString(mp.decoded.payload) 394 json_packet['decoded']['payload'] = clean_json(data) 395 self._handle_message(mp, json_packet, portnum_name) 396 397 elif mp.decoded.portnum == portnums_pb2.ZPS_APP: 398 data = mesh_pb2.Zps() 399 data.ParseFromString(mp.decoded.payload) 400 json_packet['decoded']['payload'] = clean_json(data) 401 self._handle_message(mp, json_packet, portnum_name) 402 403 else: 404 print(f'UNKNOWN: Received Portnum name: {portnum_name}') 405 self._handle_message(mp, json_packet, portnum_name) 406 407 except Exception as e: 408 print(f'Error processing message: {e}') 409 print(f'Topic: {msg.topic}') 410 print(f'Payload: {msg.payload}') 411 412 413 def event_mqtt_disconnect(self, client, userdata, rc, packet_from_broker=None, properties=None, reason_code=None): 414 '''Callback for when the client disconnects from the server.''' 415 print(f'Disconnected with result code: {rc}') 416 while True: 417 print('Attempting to reconnect...') 418 try: 419 client.reconnect() 420 except Exception as e: 421 print(f'Error reconnecting to MQTT broker: {e}') 422 time.sleep(5) 423 else: 424 print('Reconnected to MQTT broker') 425 break 426 427 428 def main(): 429 parser = argparse.ArgumentParser(description='Meshtastic MQTT Interface') 430 parser.add_argument('--broker', default='mqtt.meshtastic.org', help='MQTT broker address') 431 parser.add_argument('--port', default=1883, type=int, help='MQTT broker port') 432 parser.add_argument('--root', default='msh/US/2/e/', help='Root topic') 433 parser.add_argument('--channel', default='LongFast', help='Channel name') 434 parser.add_argument('--username', default='meshdev', help='MQTT username') 435 parser.add_argument('--password', default='large4cats', help='MQTT password') 436 parser.add_argument('--key', default='AQ==', help='Encryption key') 437 parser.add_argument('--filter', help='Filter message types (comma-separated). Example: NODEINFO,POSITION,TEXT_MESSAGE') 438 args = parser.parse_args() 439 440 client = MeshtasticMQTT() 441 if args.filter: 442 client.filters = [f'{f.strip()}_APP' for f in args.filter.upper().split(',')] 443 else: 444 client.filters = None 445 client.connect(args.broker, args.port, args.root, args.channel, args.username, args.password, args.key) 446 447 448 449 if __name__ == '__main__': 450 main()