meshtastic_mqtt- Unnamed repository; edit this file 'description' to name the repository. |
git clone git://git.acid.vegas/-c.git |
Log | Files | Refs | Archive | README | LICENSE |
client.py (13704B)
1 #!/usr/bin/env python 2 # Meshtastic MQTT Interface - Developed by acidvegas in Python (https://acid.vegas/meshtastic_mqtt_json) 3 4 import argparse 5 import base64 6 import json 7 import time 8 9 try: 10 from cryptography.hazmat.backends import default_backend 11 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 12 except ImportError: 13 raise ImportError('missing the cryptography module (pip install cryptography)') 14 15 try: 16 from google.protobuf.json_format import MessageToJson 17 except ImportError: 18 raise ImportError('missing the google protobuf module (pip install protobuf)') 19 20 try: 21 from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2 22 except ImportError: 23 raise ImportError('missing the meshtastic module (pip install meshtastic)') 24 25 try: 26 import paho.mqtt.client as mqtt 27 except ImportError: 28 raise ImportError('missing the paho-mqtt module (pip install paho-mqtt)') 29 30 31 def clean_json(data) -> dict: 32 ''' 33 Clean the JSON data by replacing NaN values with null 34 35 :param data: The JSON data to clean 36 ''' 37 # Handle protobuf messages 38 if hasattr(data, 'DESCRIPTOR'): 39 data = json.loads(MessageToJson(data)) 40 41 # Remove empty and NaN values from the JSON data 42 if isinstance(data, dict): 43 return {k: v for k, v in ((k, clean_json(v)) for k, v in data.items()) if str(v) not in ('None', 'nan', '')} 44 elif isinstance(data, list): 45 return [v for v in (clean_json(v) for v in data) if str(v) not in ('None', 'nan', '')] 46 47 # Return primitive types as-is 48 return data 49 50 51 class MeshtasticMQTT(object): 52 def __init__(self): 53 '''Initialize the Meshtastic MQTT client''' 54 55 self.broadcast_id = 4294967295 # Our channel ID 56 self.key = None 57 self.names = {} 58 self.filters = None 59 60 61 def connect(self, broker: str, port: int, root: str, channel: str, username: str, password: str, key: str): 62 ''' 63 Connect to the MQTT broker 64 65 :param broker: The MQTT broker address 66 :param port: The MQTT broker port 67 :param root: The root topic 68 :param channel: The channel name 69 :param username: The MQTT username 70 :param password: The MQTT password 71 :param key: The encryption key 72 ''' 73 74 # Initialize the MQTT client 75 client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id='', clean_session=True, userdata=None) 76 client.connect_timeout = 10 77 78 # Set the username and password for the MQTT broker 79 client.username_pw_set(username=username, password=password) 80 81 # Set the encryption key 82 self.key = '1PG7OiApB1nwvP+rz05pAQ==' if key == 'AQ==' else key 83 84 # Prepare the key for decryption 85 try: 86 padded_key = self.key.ljust(len(self.key) + ((4 - (len(self.key) % 4)) % 4), '=') 87 replaced_key = padded_key.replace('-', '+').replace('_', '/') 88 self.key_bytes = base64.b64decode(replaced_key.encode('ascii')) 89 except Exception as e: 90 print(f'Error decoding key: {e}') 91 raise 92 93 # Set the MQTT callbacks 94 client.on_connect = self.event_mqtt_connect 95 client.on_message = self.event_mqtt_recv 96 client.on_disconnect = self.event_mqtt_disconnect 97 98 # Connect to the MQTT broker 99 try: 100 client.connect(broker, port, 60) 101 except Exception as e: 102 print(f'Error connecting to MQTT broker: {e}') 103 self.event_mqtt_disconnect(client, '', 1, None) 104 105 # Set the subscribe topic 106 self.subscribe_topic = f'{root}{channel}/#' 107 108 # Keep-alive loop 109 client.loop_forever() 110 111 112 def decrypt_message_packet(self, mp): 113 ''' 114 Decrypt an encrypted message packet. 115 116 :param mp: The message packet to decrypt 117 ''' 118 try: 119 # Extract the nonce from the packet 120 nonce_packet_id = getattr(mp, 'id').to_bytes(8, 'little') 121 nonce_from_node = getattr(mp, 'from').to_bytes(8, 'little') 122 nonce = nonce_packet_id + nonce_from_node 123 124 # Decrypt the message 125 cipher = Cipher(algorithms.AES(self.key_bytes), modes.CTR(nonce), backend=default_backend()) 126 decryptor = cipher.decryptor() 127 decrypted_bytes = decryptor.update(getattr(mp, 'encrypted')) + decryptor.finalize() 128 129 # Parse the decrypted message 130 data = mesh_pb2.Data() 131 try: 132 data.ParseFromString(decrypted_bytes) 133 except: 134 # Ignore this as the message does not need to be decrypted 135 return None 136 137 mp.decoded.CopyFrom(data) 138 139 return mp 140 141 except Exception as e: 142 print(f'Error decrypting message: {e}') 143 print(f'Message packet details:') 144 print(f'- From: {getattr(mp, "from", "unknown")}') 145 print(f'- To: {getattr(mp, "to", "unknown")}') 146 print(f'- Channel: {getattr(mp, "channel", "unknown")}') 147 print(f'- ID: {getattr(mp, "id", "unknown")}') 148 return None 149 150 151 def event_mqtt_connect(self, client, userdata, flags, rc, properties): 152 ''' 153 Callback for when the client receives a CONNACK response from the server. 154 155 :param client: The client instance for this callback 156 :param userdata: The private user data as set in Client() or user_data_set() 157 :param flags: Response flags sent by the broker 158 :param rc: The connection result 159 :param properties: The properties returned by the broker 160 ''' 161 162 if rc == 0: 163 client.subscribe(self.subscribe_topic) 164 else: 165 print(f'Failed to connect to MQTT broker: {rc}') 166 167 168 def event_mqtt_recv(self, client, userdata, msg): 169 ''' 170 Callback for when a message is received from the server. 171 172 :param client: The client instance for this callback 173 :param userdata: The private user data as set in Client() or user_data_set() 174 :param msg: An instance of MQTTMessage 175 ''' 176 177 try: 178 # Define the service envelope 179 service_envelope = mqtt_pb2.ServiceEnvelope() 180 181 try: 182 # Parse the message payload 183 service_envelope.ParseFromString(msg.payload) 184 except Exception as e: 185 print(f'Error parsing service envelope: {e}') 186 print(f'Raw payload: {msg.payload}') 187 return 188 189 # Extract the message packet from the service envelope 190 mp = service_envelope.packet 191 192 # Check if the message is encrypted before decrypting it 193 if mp.HasField('encrypted'): 194 decrypted_mp = self.decrypt_message_packet(mp) 195 if decrypted_mp: 196 mp = decrypted_mp 197 else: 198 return 199 200 portnum_name = portnums_pb2.PortNum.Name(mp.decoded.portnum) 201 202 # Skip if message type doesn't match filter 203 if self.filters and portnum_name not in self.filters: 204 return 205 206 # Convert to JSON and handle NaN values in one shot 207 json_packet = clean_json(mp) 208 209 #print(f'Raw packet: {json_packet}') # Debug print 210 211 # Process the message based on its type 212 if mp.decoded.portnum == portnums_pb2.ADMIN_APP: 213 data = mesh_pb2.Admin() 214 data.ParseFromString(mp.decoded.payload) 215 msg = json.dumps(clean_json(data)) 216 print(f'{msg}') 217 218 elif mp.decoded.portnum == portnums_pb2.ATAK_FORWARDER: 219 data = mesh_pb2.AtakForwarder() 220 data.ParseFromString(mp.decoded.payload) 221 msg = json.dumps(clean_json(data)) 222 print(f'{msg}') 223 224 elif mp.decoded.portnum == portnums_pb2.ATAK_PLUGIN: 225 data = mesh_pb2.AtakPlugin() 226 data.ParseFromString(mp.decoded.payload) 227 msg = json.dumps(clean_json(data)) 228 print(f'{msg}') 229 230 elif mp.decoded.portnum == portnums_pb2.AUDIO_APP: 231 data = mesh_pb2.Audio() 232 data.ParseFromString(mp.decoded.payload) 233 msg = json.dumps(clean_json(data)) 234 print(f'{msg}') 235 236 elif mp.decoded.portnum == portnums_pb2.DETECTION_SENSOR_APP: 237 data = mesh_pb2.DetectionSensor() 238 data.ParseFromString(mp.decoded.payload) 239 msg = json.dumps(clean_json(data)) 240 print(f'{msg}') 241 242 elif mp.decoded.portnum == portnums_pb2.IP_TUNNEL_APP: 243 data = mesh_pb2.IPTunnel() 244 data.ParseFromString(mp.decoded.payload) 245 msg = json.dumps(clean_json(data)) 246 print(f'{msg}') 247 248 elif mp.decoded.portnum == portnums_pb2.MAP_REPORT_APP: 249 map_report = mesh_pb2.MapReport() 250 map_report.ParseFromString(mp.decoded.payload) 251 json_packet['decoded']['payload'] = clean_json(map_report) 252 print(f'{json.dumps(json_packet)}') 253 254 elif mp.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP: 255 neighborInfo = mesh_pb2.NeighborInfo() 256 neighborInfo.ParseFromString(mp.decoded.payload) 257 json_packet['decoded']['payload'] = clean_json(neighborInfo) 258 print(f'{json.dumps(json_packet)}') 259 260 elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP: 261 from_id = getattr(mp, 'from') 262 node_info = mesh_pb2.User() 263 node_info.ParseFromString(mp.decoded.payload) 264 json_packet['decoded']['payload'] = clean_json(node_info) 265 print(f'{json.dumps(json_packet)}') 266 self.names[from_id] = node_info.long_name 267 268 elif mp.decoded.portnum == portnums_pb2.PAXCOUNTER_APP: 269 data = mesh_pb2.Paxcounter() 270 data.ParseFromString(mp.decoded.payload) 271 msg = json.dumps(clean_json(data)) 272 print(f'{msg}') 273 274 elif mp.decoded.portnum == portnums_pb2.POSITION_APP: 275 position = mesh_pb2.Position() 276 position.ParseFromString(mp.decoded.payload) 277 json_packet['decoded']['payload'] = clean_json(position) 278 print(f'{json.dumps(json_packet)}') 279 280 elif mp.decoded.portnum == portnums_pb2.PRIVATE_APP: 281 data = mesh_pb2.Private() 282 data.ParseFromString(mp.decoded.payload) 283 msg = json.dumps(clean_json(data)) 284 print(f'{msg}') 285 286 elif mp.decoded.portnum == portnums_pb2.RANGE_TEST_APP: 287 data = mesh_pb2.RangeTest() 288 data.ParseFromString(mp.decoded.payload) 289 json_packet['decoded']['payload'] = clean_json(data) 290 print(f'{json.dumps(json_packet)}') 291 292 elif mp.decoded.portnum == portnums_pb2.REMOTE_HARDWARE_APP: 293 data = mesh_pb2.RemoteHardware() 294 data.ParseFromString(mp.decoded.payload) 295 msg = json.dumps(clean_json(data)) 296 print(f'{msg}') 297 298 elif mp.decoded.portnum == portnums_pb2.REPLY_APP: 299 data = mesh_pb2.Reply() 300 data.ParseFromString(mp.decoded.payload) 301 msg = json.dumps(clean_json(data)) 302 print(f'{msg}') 303 304 elif mp.decoded.portnum == portnums_pb2.ROUTING_APP: 305 routing = mesh_pb2.Routing() 306 routing.ParseFromString(mp.decoded.payload) 307 json_packet['decoded']['payload'] = clean_json(routing) 308 print(f'{json.dumps(json_packet)}') 309 310 elif mp.decoded.portnum == portnums_pb2.SERIAL_APP: 311 data = mesh_pb2.Serial() 312 data.ParseFromString(mp.decoded.payload) 313 msg = json.dumps(clean_json(data)) 314 print(f'{msg}') 315 316 elif mp.decoded.portnum == portnums_pb2.SIMULATOR_APP: 317 data = mesh_pb2.Simulator() 318 data.ParseFromString(mp.decoded.payload) 319 msg = json.dumps(clean_json(data)) 320 print(f'{msg}') 321 322 elif mp.decoded.portnum == portnums_pb2.STORE_FORWARD_APP: 323 print(f'{clean_json(mp)}') 324 print(f'{mp.decoded.payload}') 325 326 elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP: 327 telemetry = telemetry_pb2.Telemetry() 328 telemetry.ParseFromString(mp.decoded.payload) 329 json_packet['decoded']['payload'] = clean_json(telemetry) 330 print(f'{json.dumps(json_packet)}') 331 332 elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP: 333 text_payload = mp.decoded.payload.decode('utf-8') 334 json_packet['decoded']['payload'] = text_payload 335 print(f'{json.dumps(json_packet)}') 336 337 elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_COMPRESSED_APP: 338 data = mesh_pb2.TextMessageCompressed() 339 data.ParseFromString(mp.decoded.payload) 340 msg = json.dumps(clean_json(data)) 341 print(f'{msg}') 342 343 elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP: 344 routeDiscovery = mesh_pb2.RouteDiscovery() 345 routeDiscovery.ParseFromString(mp.decoded.payload) 346 json_packet['decoded']['payload'] = clean_json(routeDiscovery) 347 print(f'{json.dumps(json_packet)}') 348 349 elif mp.decoded.portnum == portnums_pb2.UNKNOWN_APP: 350 print(f'{clean_json(mp)}') 351 352 elif mp.decoded.portnum == portnums_pb2.WAYPOINT_APP: 353 data = mesh_pb2.Waypoint() 354 data.ParseFromString(mp.decoded.payload) 355 msg = json.dumps(clean_json(data)) 356 print(f'{msg}') 357 358 elif mp.decoded.portnum == portnums_pb2.ZPS_APP: 359 data = mesh_pb2.Zps() 360 data.ParseFromString(mp.decoded.payload) 361 msg = json.dumps(clean_json(data)) 362 print(f'{msg}') 363 364 else: 365 print(f'UNKNOWN: Received Portnum name: {portnum_name}') 366 msg = json.dumps(clean_json(mp)) 367 print(f'UNKNOWN: {msg}') 368 369 except Exception as e: 370 print(f'Error processing message: {e}') 371 print(f'Topic: {msg.topic}') 372 print(f'Payload: {msg.payload}') 373 374 375 def event_mqtt_disconnect(self, client, userdata, rc, packet_from_broker=None, properties=None, reason_code=None): 376 '''Callback for when the client disconnects from the server.''' 377 print(f'Disconnected with result code: {rc}') 378 while True: 379 print('Attempting to reconnect...') 380 try: 381 client.reconnect() 382 except Exception as e: 383 print(f'Error reconnecting to MQTT broker: {e}') 384 time.sleep(5) 385 else: 386 print('Reconnected to MQTT broker') 387 break 388 389 390 def main(): 391 parser = argparse.ArgumentParser(description='Meshtastic MQTT Interface') 392 parser.add_argument('--broker', default='mqtt.meshtastic.org', help='MQTT broker address') 393 parser.add_argument('--port', default=1883, type=int, help='MQTT broker port') 394 parser.add_argument('--root', default='msh/US/2/e/', help='Root topic') 395 parser.add_argument('--channel', default='LongFast', help='Channel name') 396 parser.add_argument('--username', default='meshdev', help='MQTT username') 397 parser.add_argument('--password', default='large4cats', help='MQTT password') 398 parser.add_argument('--key', default='AQ==', help='Encryption key') 399 parser.add_argument('--filter', help='Filter message types (comma-separated). Example: NODEINFO,POSITION,TEXT_MESSAGE') 400 args = parser.parse_args() 401 402 client = MeshtasticMQTT() 403 if args.filter: 404 client.filters = [f'{f.strip()}_APP' for f in args.filter.upper().split(',')] 405 else: 406 client.filters = None 407 client.connect(args.broker, args.port, args.root, args.channel, args.username, args.password, args.key) 408 409 410 411 if __name__ == '__main__': 412 main()