diff --git a/README.md b/README.md
@@ -25,15 +25,57 @@ pip install meshtastic-mqtt-json
## usage
```bash
-python meshtastic_mqtt_json [options]
+meshtastic_mqtt_json [options]
```
```python
from meshtastic_mqtt_json import MeshtasticMQTT
+
+# Create client instance
client = MeshtasticMQTT()
-client.connect(broker='mqtt.meshtastic.org', port=1883, root='msh/US/2/e/', channel='LongFast', username='meshdev', password='large4cats', key='AQ==')
+
+# Register callbacks for specific message types
+def on_text_message(json_data):
+ print(f'Received text message: {json_data["decoded"]["payload"]}')
+
+def on_position(json_data):
+ print(f'Received position update: {json_data["decoded"]["payload"]}')
+
+client.register_callback('TEXT_MESSAGE_APP', on_text_message)
+client.register_callback('POSITION_APP', on_position)
+
+# Connect to MQTT broker
+client.connect(
+ broker='mqtt.meshtastic.org',
+ port=1883,
+ root='msh/US/2/e/',
+ channel='LongFast',
+ username='meshdev',
+ password='large4cats',
+ key='AQ=='
+)
```
+### Callback System
+The library provides a callback system that allows you to register handlers for specific message types. Each callback function receives a JSON object containing the parsed message data.
+
+```python
+# Register a callback
+client.register_callback('MESSAGE_TYPE', callback_function)
+
+# Unregister a callback
+client.unregister_callback('MESSAGE_TYPE')
+```
+
+The callback function should accept a single parameter that will receive the JSON data:
+```python
+def my_callback(json_data):
+ # json_data contains the parsed message
+ print(json_data)
+```
+
+If no callback is registered for a message type, the message will be printed to the console by default.
+
### Command Line Options
| Option | Description | Default |
| ------------ | ------------------------------|---------------------- |
diff --git a/src/meshtastic_mqtt_json/client.py b/src/meshtastic_mqtt_json/client.py
@@ -56,6 +56,47 @@ class MeshtasticMQTT(object):
self.key = None
self.names = {}
self.filters = None
+ self.callbacks = {} # Dictionary to store message type callbacks
+
+
+ def register_callback(self, message_type: str, callback: callable):
+ '''
+ Register a callback function for a specific message type
+
+ :param message_type: The message type to register for (e.g. 'TEXT_MESSAGE_APP', 'POSITION_APP')
+ :param callback: The callback function to call when a message of this type is received
+ '''
+ if not message_type.endswith('_APP'):
+ message_type = f'{message_type}_APP'
+ self.callbacks[message_type] = callback
+
+
+ def unregister_callback(self, message_type: str):
+ '''
+ Unregister a callback function for a specific message type
+
+ :param message_type: The message type to unregister
+ '''
+ if not message_type.endswith('_APP'):
+ message_type = f'{message_type}_APP'
+ if message_type in self.callbacks:
+ del self.callbacks[message_type]
+
+
+ def _handle_message(self, mp, json_packet: dict, portnum_name: str):
+ '''
+ Handle a message by calling registered callbacks
+
+ :param mp: The message packet
+ :param json_packet: The JSON representation of the packet
+ :param portnum_name: The name of the port number
+ '''
+ # Call registered callback if one exists
+ if portnum_name in self.callbacks:
+ self.callbacks[portnum_name](json_packet)
+ else:
+ # Default behavior - print to console
+ print(f'{json.dumps(json_packet)}')
def connect(self, broker: str, port: int, root: str, channel: str, username: str, password: str, key: str):
@@ -205,166 +246,163 @@ class MeshtasticMQTT(object):
# Convert to JSON and handle NaN values in one shot
json_packet = clean_json(mp)
-
- #print(f'Raw packet: {json_packet}') # Debug print
# Process the message based on its type
if mp.decoded.portnum == portnums_pb2.ADMIN_APP:
data = mesh_pb2.Admin()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.ATAK_FORWARDER:
data = mesh_pb2.AtakForwarder()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.ATAK_PLUGIN:
data = mesh_pb2.AtakPlugin()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.AUDIO_APP:
data = mesh_pb2.Audio()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.DETECTION_SENSOR_APP:
data = mesh_pb2.DetectionSensor()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.IP_TUNNEL_APP:
data = mesh_pb2.IPTunnel()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.MAP_REPORT_APP:
map_report = mesh_pb2.MapReport()
map_report.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(map_report)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP:
neighborInfo = mesh_pb2.NeighborInfo()
neighborInfo.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(neighborInfo)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP:
from_id = getattr(mp, 'from')
node_info = mesh_pb2.User()
node_info.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(node_info)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
self.names[from_id] = node_info.long_name
elif mp.decoded.portnum == portnums_pb2.PAXCOUNTER_APP:
data = mesh_pb2.Paxcounter()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.POSITION_APP:
position = mesh_pb2.Position()
position.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(position)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.PRIVATE_APP:
data = mesh_pb2.Private()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.RANGE_TEST_APP:
data = mesh_pb2.RangeTest()
data.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(data)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.REMOTE_HARDWARE_APP:
data = mesh_pb2.RemoteHardware()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.REPLY_APP:
data = mesh_pb2.Reply()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.ROUTING_APP:
routing = mesh_pb2.Routing()
routing.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(routing)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.SERIAL_APP:
data = mesh_pb2.Serial()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.SIMULATOR_APP:
data = mesh_pb2.Simulator()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.STORE_FORWARD_APP:
- print(f'{clean_json(mp)}')
- print(f'{mp.decoded.payload}')
+ json_packet['decoded']['payload'] = mp.decoded.payload
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP:
telemetry = telemetry_pb2.Telemetry()
telemetry.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(telemetry)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
text_payload = mp.decoded.payload.decode('utf-8')
json_packet['decoded']['payload'] = text_payload
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_COMPRESSED_APP:
data = mesh_pb2.TextMessageCompressed()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP:
routeDiscovery = mesh_pb2.RouteDiscovery()
routeDiscovery.ParseFromString(mp.decoded.payload)
json_packet['decoded']['payload'] = clean_json(routeDiscovery)
- print(f'{json.dumps(json_packet)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.UNKNOWN_APP:
- print(f'{clean_json(mp)}')
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.WAYPOINT_APP:
data = mesh_pb2.Waypoint()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
elif mp.decoded.portnum == portnums_pb2.ZPS_APP:
data = mesh_pb2.Zps()
data.ParseFromString(mp.decoded.payload)
- msg = json.dumps(clean_json(data))
- print(f'{msg}')
+ json_packet['decoded']['payload'] = clean_json(data)
+ self._handle_message(mp, json_packet, portnum_name)
else:
print(f'UNKNOWN: Received Portnum name: {portnum_name}')
- msg = json.dumps(clean_json(mp))
- print(f'UNKNOWN: {msg}')
+ self._handle_message(mp, json_packet, portnum_name)
except Exception as e:
print(f'Error processing message: {e}')
| |