meshtastic

- Experiments with Meshtastic 🛰️
git clone git://git.acid.vegas/meshtastic.git
Log | Files | Refs | Archive | README | LICENSE

meshmqtt.py (17508B)

      1 #!/usr/bin/env python
      2 # Meshtastic MQTT Interface - Developed by acidvegas in Python (https://acid.vegas/meshtastic)
      3 
      4 import argparse
      5 import base64
      6 import logging
      7 
      8 try:
      9 	from cryptography.hazmat.backends           import default_backend
     10 	from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
     11 except ImportError:
     12 	raise SystemExit('missing the cryptography module (pip install cryptography)')
     13 
     14 try:
     15 	from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2
     16 except ImportError:
     17 	raise SystemExit('missing the meshtastic module (pip install meshtastic)')
     18 
     19 try:
     20 	import paho.mqtt.client as mqtt
     21 except ImportError:
     22 	raise SystemExit('missing the paho-mqtt module (pip install paho-mqtt)')
     23 
     24 
     25 # Initialize the logging module
     26 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %I:%M:%S')
     27 
     28 
     29 def clean_dict(dictionary: dict) -> dict:
     30 	'''
     31 	Remove empty fields from a dictionary.
     32 
     33 	:param dictionary: The dictionary to remove empty fields from
     34 	'''
     35 	
     36 	return {key: value for key, value in dictionary.items() if value}
     37 
     38 
     39 class MeshtasticMQTT(object):
     40 	def __init__(self):
     41 		'''Initialize the Meshtastic MQTT client'''
     42 
     43 		self.broadcast_id = 4294967295 # Our channel ID
     44 		self.key = None
     45 
     46 
     47 	def connect(self, broker: str, port: int, root: str, tls: bool, username: str, password: str, key: str):
     48 		'''
     49 		Connect to the MQTT broker
     50 
     51 		:param broker:   The MQTT broker address
     52 		:param port:     The MQTT broker port
     53 		:param root:     The root topic to subscribe to
     54 		:param tls:      Enable TLS/SSL
     55 		:param username: The MQTT username
     56 		:param password: The MQTT password
     57 		:param key:      The encryption key
     58 		'''
     59 
     60 		# Initialize the MQTT client (these arguments were the only way to get it to work properly..)
     61 		client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id='', clean_session=True, userdata=None)
     62 
     63 		# Set the username and password for the MQTT broker
     64 		client.username_pw_set(username=username, password=password)
     65 
     66 		# Set the encryption key global in the client class (the default key is padded to ensure it's the correct length for AES)
     67 		self.key = '1PG7OiApB1nwvP+rz05pAQ==' if key == 'AQ==' else key
     68 
     69 		# Enable TLS/SSL if the user specified it
     70 		if tls:
     71 			client.tls_set()
     72 			#client.tls_insecure_set(False)
     73 
     74 		# Set the MQTT callbacks
     75 		client.on_connect     = self.on_connect
     76 		client.on_message     = self.on_message
     77 		client.on_subscribe   = self.on_subscribe
     78 		client.on_unsubscribe = self.on_unsubscribe
     79 
     80 		# Connect to the MQTT broker and subscribe to the root topic
     81 		client.connect(broker, port, 60)
     82 		client.subscribe(root, 0)
     83 
     84 		# Keep-alive loop
     85 		client.loop_forever()
     86 
     87 
     88 	def decrypt_message_packet(self, message_packet):
     89 		'''
     90 		Decrypt an encrypted message packet.
     91 		
     92 		:param message_packet: The message packet to decrypt
     93 		'''
     94 
     95 		# Ensure the key is formatted and padded correctly before turning it into bytes
     96 		padded_key = self.key.ljust(len(self.key) + ((4 - (len(self.key) % 4)) % 4), '=')
     97 		key        = padded_key.replace('-', '+').replace('_', '/')
     98 		key_bytes  = base64.b64decode(key.encode('ascii'))
     99 
    100 		# Extract the nonce from the packet
    101 		nonce_packet_id = getattr(message_packet, 'id').to_bytes(8, 'little')
    102 		nonce_from_node = getattr(message_packet, 'from').to_bytes(8, 'little')
    103 		nonce           = nonce_packet_id + nonce_from_node
    104 
    105 		# Decrypt the message
    106 		cipher          = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
    107 		decryptor       = cipher.decryptor()
    108 		decrypted_bytes = decryptor.update(getattr(message_packet, 'encrypted')) + decryptor.finalize()
    109 
    110 		# Parse the decrypted message
    111 		data = mesh_pb2.Data()
    112 		data.ParseFromString(decrypted_bytes)
    113 		message_packet.decoded.CopyFrom(data)
    114 
    115 		return message_packet
    116 
    117 
    118 	def on_connect(self, client, userdata, flags, rc, properties):
    119 		'''
    120 		Callback for when the client receives a CONNACK response from the server.
    121 
    122 		:param client:     The client instance for this callback
    123 		:param userdata:   The private user data as set in Client() or user_data_set()
    124 		:param flags:      Response flags sent by the broker
    125 		:param rc:         The connection result
    126 		:param properties: The properties returned by the broker
    127 		'''
    128 
    129 		if rc == 0:
    130 			logging.info('Connected to MQTT broker')
    131 		else:
    132 			logging.error(f'Failed to connect to MQTT broker: {rc}')
    133 
    134 
    135 	def on_message(self, client, userdata, msg):
    136 		'''
    137 		Callback for when a message is received from the server.
    138 
    139 		:param client:   The client instance for this callback
    140 		:param userdata: The private user data as set in Client() or user_data_set()
    141 		:param msg:      An instance of MQTTMessage. This is a
    142 		'''
    143 
    144 		# Define the service envelope
    145 		service_envelope = mqtt_pb2.ServiceEnvelope()
    146 
    147 		try:
    148 			# Parse the message payload
    149 			service_envelope.ParseFromString(msg.payload)
    150 
    151 			# Extract the message packet from the service envelope
    152 			message_packet = service_envelope.packet
    153 		except Exception as e:
    154 			#logging.error(f'Failed to parse message: {str(e)}')
    155 			return
    156 
    157 		# Check if the message is encrypted before decrypting it
    158 		if message_packet.HasField('encrypted') and not message_packet.HasField('decoded'):
    159 			message_packet = self.decrypt_message_packet(message_packet)
    160 
    161 			text = {
    162 				'from'       : getattr(message_packet, 'from'),
    163 				'to'         : getattr(message_packet, 'to'),
    164 				'channel'    : getattr(message_packet, 'channel'),
    165 				'id'         : getattr(message_packet, 'id'),
    166 				'rx_time'    : getattr(message_packet, 'rx_time'),
    167 				'hop_limit'  : getattr(message_packet, 'hop_limit'),
    168 				'priority'   : getattr(message_packet, 'priority'),
    169 				'hop_start'  : getattr(message_packet, 'hop_start')
    170 			}
    171 			logging.info(text)
    172 
    173 			if message_packet.decoded.portnum == portnums_pb2.UNKNOWN_APP:
    174 				logging.warning('Received an unknown app message:')
    175 				logging.info(message_packet)
    176 
    177 			elif message_packet.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
    178 				text_payload = message_packet.decoded.payload.decode('utf-8')
    179 				text = {
    180 					'message' : text_payload,
    181 					'from'    : getattr(message_packet, 'from'),
    182 					'id'      : getattr(message_packet, 'id'),
    183 					'to'      : getattr(message_packet, 'to')
    184 				}
    185 				logging.info('Received text message:')
    186 				logging.info(text)
    187 
    188 			elif message_packet.decoded.portnum == portnums_pb2.REMOTE_HARDWARE_APP:
    189 				data = mesh_pb2.RemoteHardware()
    190 				data.ParseFromString(message_packet.decoded.payload)
    191 				logging.info('Received remote hardware:')
    192 				logging.info(data)
    193 
    194 			elif message_packet.decoded.portnum == portnums_pb2.POSITION_APP:
    195 				data = mesh_pb2.Position()
    196 				data.ParseFromString(message_packet.decoded.payload)
    197 
    198 				data_dict = {key: value for key, value in data}
    199 				print(data_dict)
    200 
    201 				logging.info('Received position:')
    202 				loc = {
    203 					'lattitude'       : getattr(data, 'latitude_i') / 1e7,
    204 					'longitude'       : getattr(data, 'longitude_i') / 1e7,
    205 					'altitude'        : getattr(data, 'altitude') / 1000,
    206 					'location_source' : getattr(data, 'location_source'),
    207 					'altitude_source' : getattr(data, 'altitude_source'),
    208 					'pdop'            : getattr(data, 'PDOP'),
    209 					'hdop'            : getattr(data, 'HDOP'),
    210 					'vdop'            : getattr(data, 'VDOP'),
    211 					'gps_accuracy'    : getattr(data, 'gps_accuracy'),
    212 					'ground_speed'    : getattr(data, 'ground_speed'),
    213 					'ground_track'    : getattr(data, 'ground_track'),
    214 					'fix_quality'     : getattr(data, 'fix_quality'),
    215 					'fix_type'        : getattr(data, 'fix_type'),
    216 					'sats_in_view'    : getattr(data, 'sats_in_view'),
    217 					'sensor_id'       : getattr(data, 'sensor_id'),
    218 					'next_update'     : getattr(data, 'next_update'),
    219 					'seq_number'      : getattr(data, 'seq_number'),
    220 					'precision_bits'  : getattr(data, 'precision_bits')
    221 				}
    222 
    223 				if (loc := clean_dict(loc)):
    224 					logging.info(loc)
    225 
    226 			elif message_packet.decoded.portnum == portnums_pb2.NODEINFO_APP:
    227 				data = mesh_pb2.NodeInfo()
    228 				#data.ParseFromString(message_packet.decoded.payload)
    229 				logging.info('Received node info:')
    230 				logging.info(message_packet)
    231 
    232 			elif message_packet.decoded.portnum == portnums_pb2.ROUTING_APP:
    233 				data = mesh_pb2.Routing()
    234 				data.ParseFromString(message_packet.decoded.payload)
    235 				logging.info('Received routing:')
    236 				logging.info(data)
    237 
    238 			elif message_packet.decoded.portnum == portnums_pb2.ADMIN_APP:
    239 				data = mesh_pb2.Admin()
    240 				data.ParseFromString(message_packet.decoded.payload)
    241 				logging.info('Received admin:')
    242 				logging.info(data)
    243 
    244 			elif message_packet.decoded.portnum == portnums_pb2.TEXT_MESSAGE_COMPRESSED_APP:
    245 				data = mesh_pb2.TextMessageCompressed()
    246 				data.ParseFromString(message_packet.decoded.payload)
    247 				logging.info('Received compressed text message:')
    248 				logging.info(data)
    249 
    250 			elif message_packet.decoded.portnum == portnums_pb2.WAYPOINT_APP:
    251 				data = mesh_pb2.Waypoint()
    252 				data.ParseFromString(message_packet.decoded.payload)
    253 				logging.info('Received waypoint:')
    254 				logging.info(data)
    255 
    256 			elif message_packet.decoded.portnum == portnums_pb2.AUDIO_APP:
    257 				data = mesh_pb2.Audio()
    258 				data.ParseFromString(message_packet.decoded.payload)
    259 				logging.info('Received audio:')
    260 				logging.info(data)
    261 
    262 			elif message_packet.decoded.portnum == portnums_pb2.DETECTION_SENSOR_APP:
    263 				data = mesh_pb2.DetectionSensor()
    264 				data.ParseFromString(message_packet.decoded.payload)
    265 				logging.info('Received detection sensor:')
    266 				logging.info(data)
    267 
    268 			elif message_packet.decoded.portnum == portnums_pb2.REPLY_APP:
    269 				data = mesh_pb2.Reply()
    270 				data.ParseFromString(message_packet.decoded.payload)
    271 				logging.info('Received reply:')
    272 				logging.info(data)
    273 
    274 			elif message_packet.decoded.portnum == portnums_pb2.IP_TUNNEL_APP:
    275 				data = mesh_pb2.IPTunnel()
    276 				data.ParseFromString(message_packet.decoded.payload)
    277 				logging.info('Received IP tunnel:')
    278 				logging.info(data)
    279 
    280 			elif message_packet.decoded.portnum == portnums_pb2.PAXCOUNTER_APP:
    281 				data = mesh_pb2.Paxcounter()
    282 				data.ParseFromString(message_packet.decoded.payload)
    283 				logging.info('Received paxcounter:')
    284 				logging.info(data)
    285 
    286 			elif message_packet.decoded.portnum == portnums_pb2.SERIAL_APP:
    287 				data = mesh_pb2.Serial()
    288 				data.ParseFromString(message_packet.decoded.payload)
    289 				logging.info('Received serial:')
    290 				logging.info(data)
    291 
    292 			elif message_packet.decoded.portnum == portnums_pb2.STORE_FORWARD_APP:
    293 				logging.info('Received store and forward:')
    294 				logging.info(message_packet)
    295 				logging.info(message_packet.decoded.payload)
    296 
    297 			elif message_packet.decoded.portnum == portnums_pb2.RANGE_TEST_APP:
    298 				data = mesh_pb2.RangeTest()
    299 				data.ParseFromString(message_packet.decoded.payload)
    300 				logging.info('Received range test:')
    301 				logging.info(data)
    302 
    303 			elif message_packet.decoded.portnum == portnums_pb2.TELEMETRY_APP:
    304 				data = telemetry_pb2.Telemetry()
    305 				data.ParseFromString(message_packet.decoded.payload)
    306 				logging.info('Received telemetry:')
    307 
    308 				data_dict = {}
    309 				for field, value in data.ListFields():
    310 					if field.name == 'device_metrics':
    311 						text = clean_dict({item.name: getattr(value, item.name) for item in value.DESCRIPTOR.fields if hasattr(value, item.name)})
    312 						if text:
    313 							logging.info(text)
    314 					else:
    315 						data_dict[field.name] = value
    316 						
    317 				logging.info(data_dict)
    318 				
    319 				if getattr(data, 'device_metrics'):
    320 					text = {
    321 						'battery_level'       : getattr(data.device_metrics, 'battery_level'),
    322 						'voltage'             : getattr(data.device_metrics, 'voltage'),
    323 						'channel_utilization' : getattr(data.device_metrics, 'channel_utilization'),
    324 						'air_util_tx'         : getattr(data.device_metrics, 'air_util_tx'),
    325 						'uptime_seconds'      : getattr(data.device_metrics, 'uptime_seconds')
    326 					}
    327 					if (text := clean_dict(text)):
    328 						logging.info(text)
    329 
    330 				if getattr(data, 'environment_metrics'):
    331 					env_metrics = {
    332 						'barometric_pressure' : getattr(data.environment_metrics, 'barometric_pressure'),
    333 						'current'             : getattr(data.environment_metrics, 'current'),
    334 						'distance'            : getattr(data.environment_metrics, 'distance'),
    335 						'gas_resistance'      : getattr(data.environment_metrics, 'gas_resistance'),
    336 						'iaq'                 : getattr(data.environment_metrics, 'iaq'),
    337 						'relative_humidity'   : getattr(data.environment_metrics, 'relative_humidity'),
    338 						'temperature'         : getattr(data.environment_metrics, 'temperature'),
    339 						'voltage'             : getattr(data.environment_metrics, 'voltage')
    340 					}
    341 					if (env_metrics := clean_dict(env_metrics)):
    342 						logging.info(env_metrics)
    343 
    344 			elif message_packet.decoded.portnum == portnums_pb2.ZPS_APP:
    345 				data = mesh_pb2.Zps()
    346 				data.ParseFromString(message_packet.decoded.payload)
    347 				logging.info('Received ZPS:')
    348 				logging.info(data)
    349 
    350 			elif message_packet.decoded.portnum == portnums_pb2.SIMULATOR_APP:
    351 				data = mesh_pb2.Simulator()
    352 				data.ParseFromString(message_packet.decoded.payload)
    353 				logging.info('Received simulator:')
    354 				logging.info(data)
    355 
    356 			elif message_packet.decoded.portnum == portnums_pb2.TRACEROUTE_APP:
    357 				routeDiscovery = mesh_pb2.RouteDiscovery()
    358 				routeDiscovery.ParseFromString(message_packet.decoded.payload)
    359 				logging.info('Received traceroute:')
    360 				logging.info(routeDiscovery)
    361 
    362 			elif message_packet.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP:
    363 				neighborInfo = mesh_pb2.NeighborInfo()
    364 				neighborInfo.ParseFromString(message_packet.decoded.payload)
    365 				logging.info('Received neighbor info:')
    366 				info = {
    367 					'node_id'                      : getattr(neighborInfo, 'node_id'),
    368 					'last_sent_by_id'              : getattr(neighborInfo, 'last_sent_by_id'),
    369 					'node_broadcast_interval_secs' : getattr(neighborInfo, 'node_broadcast_interval_secs'),
    370 					'neighbors'                    : getattr(neighborInfo, 'neighbors')
    371 				}
    372 				logging.info(info)
    373 
    374 			elif message_packet.decoded.portnum == portnums_pb2.ATAK_PLUGIN:
    375 				data = mesh_pb2.AtakPlugin()
    376 				data.ParseFromString(message_packet.decoded.payload)
    377 				logging.info('Received ATAK plugin:')
    378 				logging.info(data)
    379 
    380 			elif message_packet.decoded.portnum == portnums_pb2.PRIVATE_APP:
    381 				data = mesh_pb2.Private()
    382 				data.ParseFromString(message_packet.decoded.payload)
    383 				logging.info('Received private:')
    384 				logging.info(data)
    385 
    386 			elif message_packet.decoded.portnum == portnums_pb2.ATAK_FORWARDER:
    387 				data = mesh_pb2.AtakForwarder()
    388 				data.ParseFromString(message_packet.decoded.payload)
    389 				logging.info('Received ATAK forwarder:')
    390 				logging.info(data)
    391 
    392 			else:
    393 				logging.warning('Received an unknown message:')
    394 				logging.info(message_packet)
    395 
    396 		# Unencrypted messages
    397 		else:
    398 			if message_packet.decoded.portnum == portnums_pb2.MAP_REPORT_APP:
    399 				pos = mesh_pb2.Position()
    400 				pos.ParseFromString(message_packet.decoded.payload)
    401 				logging.info('Received map report:')
    402 				logging.info(pos)
    403 
    404 			else:
    405 				logging.warning('Received an unencrypted message')
    406 				logging.info(f'Payload: {message_packet}')
    407 
    408 
    409 	def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
    410 		'''
    411 		Callback for when the client receives a SUBACK response from the server.
    412 
    413 		:param client:           The client instance for this callback
    414 		:param userdata:         The private user data as set in Client() or user_data_set()
    415 		:param mid:              The message ID of the subscribe request
    416 		:param reason_code_list: A list of SUBACK reason codes
    417 		:param properties:       The properties returned by the broker
    418 		'''
    419 
    420 		# Since we subscribed only for a single channel, reason_code_list contains a single entry
    421 		if reason_code_list[0].is_failure:
    422 			logging.error(f'Broker rejected you subscription: {reason_code_list[0]}')
    423 		else:
    424 			logging.info(f'Broker granted the following QoS: {reason_code_list[0].value}')
    425 
    426 
    427 	def on_unsubscribe(self, client, userdata, mid, reason_code_list, properties):
    428 		'''
    429 		Callback for when the client receives a UNSUBACK response from the server.
    430 
    431 		:param client:           The client instance for this callback
    432 		:param userdata:         The private user data as set in Client() or user_data_set()
    433 		:param mid:              The message ID of the unsubscribe request
    434 		:param reason_code_list: A list of UNSUBACK reason codes
    435 		:param properties:       The properties returned by the broker
    436 		'''
    437 
    438 		# reason_code_list is only present in MQTTv5, it will always be empty in MQTTv3
    439 		if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
    440 			logging.info('Broker accepted the unsubscription(s)')
    441 		else:
    442 			logging.error(f'Broker replied with failure: {reason_code_list[0]}')
    443 
    444 		# Disconnect from the broker
    445 		client.disconnect()
    446 
    447 
    448 
    449 if __name__ == '__main__':
    450 	parser = argparse.ArgumentParser(description='Meshtastic MQTT Interface')
    451 	parser.add_argument('--broker', default='mqtt.meshtastic.org', help='MQTT broker address')
    452 	parser.add_argument('--port', default=1883, type=int, help='MQTT broker port')
    453 	parser.add_argument('--root', default='#', help='Root topic')
    454 	parser.add_argument('--tls', action='store_true', help='Enable TLS/SSL')
    455 	parser.add_argument('--username', default='meshdev', help='MQTT username')
    456 	parser.add_argument('--password', default='large4cats', help='MQTT password')
    457 	parser.add_argument('--key', default='AQ==', help='Encryption key')
    458 	args = parser.parse_args()
    459 
    460 	client = MeshtasticMQTT()
    461 	client.connect(args.broker, args.port, args.root, args.tls, args.username, args.password, args.key)