meshtastic- Experiments with Meshtastic 🛰️ |
git clone git://git.acid.vegas/meshtastic.git |
Log | Files | Refs | Archive | README | LICENSE |
meshapi.py (7714B)
1 #!/usr/bin/env python 2 # Meshtastic Serial Interface - Developed by Acidvegas in Python (https://git.acid.vegas) 3 4 import argparse 5 import logging 6 import os 7 import time 8 9 try: 10 import meshtastic 11 except ImportError: 12 raise ImportError('meshtastic library not found (pip install meshtastic)') 13 14 try: 15 from pubsub import pub 16 except ImportError: 17 raise ImportError('pubsub library not found (pip install pypubsub)') 18 19 20 # Initialize logging 21 logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)9s | %(funcName)s | %(message)s', datefmt='%Y-%m-%d %I:%M:%S') 22 23 24 def now(): 25 '''Returns the current date and time in a formatted string''' 26 27 return time.strftime('%Y-%m-%d %H:%M:%S') 28 29 30 class MeshtasticClient(object): 31 def __init__(self, option: str, value: str): 32 self.interface = None 33 self.me = {} 34 self.nodes = {} 35 36 self.interface_option = option 37 self.interface_value = value 38 39 if self.interface_option == 'serial': 40 from meshtastic.serial_interface import SerialInterface as MeshInterface 41 from meshtastic.util import findPorts 42 elif self.interface_option == 'tcp': 43 from meshtastic.tcp_interface import TCPInterface as MeshInterface 44 45 46 def connect(self, option: str, value: str): 47 ''' 48 Connect to the Meshtastic interface 49 50 :param option: The interface option to connect to 51 :param value: The value of the interface option 52 ''' 53 54 while True: 55 try: 56 if option == 'serial': 57 if devices := findPorts(): 58 if not os.path.exists(args.serial) or not args.serial in devices: 59 raise Exception(f'Invalid serial port specified: {args.serial} (Available: {devices})') 60 else: 61 raise Exception('No serial devices found') 62 self.interface = SerialInterface(value) 63 64 elif option == 'tcp': 65 self.interface = TCPInterface(value) 66 67 else: 68 raise SystemExit('Invalid interface option') 69 70 except Exception as e: 71 logging.error(f'Failed to connect to the radio: {e}') 72 logging.error('Retrying in 15 seconds...') 73 time.sleep(15) 74 75 else: 76 self.me = self.interface.getMyNodeInfo() 77 break 78 79 80 def send(self, message: str): 81 ''' 82 Send a message to the Meshtastic interface 83 84 :param message: The message to send 85 ''' 86 87 if len(message) > 255: 88 logging.warning('Message exceeds 255 characters') 89 message = message[:255] 90 91 self.interface.sendText(message) 92 93 logging.info(f'Sent broadcast message: {message}') 94 95 96 def listen(self): 97 '''Create the Meshtastic callback subscriptions''' 98 99 pub.subscribe(self.event_connect, 'meshtastic.connection.established') 100 pub.subscribe(self.event_data, 'meshtastic.receive.data.portnum') 101 pub.subscribe(self.event_disconnect, 'meshtastic.connection.lost') 102 pub.subscribe(self.event_node, 'meshtastic.node') 103 pub.subscribe(self.event_position, 'meshtastic.receive.position') 104 pub.subscribe(self.event_text, 'meshtastic.receive.text') 105 pub.subscribe(self.event_user, 'meshtastic.receive.user') 106 107 logging.debug('Listening for Meshtastic events...') 108 109 110 def event_connect(self, interface, topic=pub.AUTO_TOPIC): 111 ''' 112 Callback function for connection established 113 114 :param interface: Meshtastic interface 115 :param topic: PubSub topic 116 ''' 117 118 logging.info(f'Connected to the {self.me["user"]["longName"]} radio on {self.me["user"]["hwModel"]} hardware') 119 logging.info(f'Found a total of {len(self.nodes):,} nodes') 120 121 122 def event_data(self, packet: dict, interface): 123 ''' 124 Callback function for data updates 125 126 :param packet: Data information 127 :param interface: Meshtastic interface 128 ''' 129 130 logging.info(f'Data update: {data}') 131 132 133 def event_disconnect(self, interface, topic=pub.AUTO_TOPIC): 134 ''' 135 Callback function for connection lost 136 137 :param interface: Meshtastic interface 138 :param topic: PubSub topic 139 ''' 140 141 logging.warning('Lost connection to radio!') 142 143 time.sleep(10) 144 145 # TODO: Consider storing the interface option and value in a class variable since we don't want to reference the args object inside the class 146 self.connect('serial' if args.serial else 'tcp', args.serial if args.serial else args.tcp) 147 148 149 def event_node(self, node): 150 ''' 151 Callback function for node updates 152 153 :param node: Node information 154 ''' 155 156 # Node ID Formula = f'!{hex(node_num)[2:]}' 157 158 self.nodes[node['num']] = node 159 160 logging.info(f'Node found: {node["user"]["id"]} - {node["user"]["shortName"].ljust(4)} - {node["user"]["longName"]}') 161 print(node) 162 163 164 def event_position(self, packet: dict, interface): 165 ''' 166 Callback function for position updates 167 168 :param packet: Position information 169 :param interface: Meshtastic interface 170 ''' 171 172 sender = packet['from'] 173 msg = packet['decoded']['payload'].hex() 174 id = self.nodes[sender]['user']['id'] if sender in self.nodes else '!unk ' 175 name = self.nodes[sender]['user']['longName'] if sender in self.nodes else 'UNK' 176 longitude = packet['decoded']['position']['longitudeI'] / 1e7 177 latitude = packet['decoded']['position']['latitudeI'] / 1e7 178 altitude = packet['decoded']['position']['altitude'] 179 snr = packet['rxSnr'] 180 rssi = packet['rxRssi'] 181 182 logging.info(f'{id} - {name}: {longitude}, {latitude}, {altitude}m (SNR: {snr}, RSSI: {rssi}) - {msg}') 183 184 185 def event_text(self, packet: dict, interface): 186 ''' 187 Callback function for received packets 188 189 :param packet: Packet received 190 ''' 191 192 sender = packet['from'] 193 to = packet['to'] 194 msg = packet['decoded']['payload'].decode('utf-8') 195 id = self.nodes[sender]['user']['id'] if sender in self.nodes else '!unk ' 196 name = self.nodes[sender]['user']['longName'] if sender in self.nodes else 'UNK' 197 target = self.nodes[to]['user']['longName'] if to in self.nodes else 'UNK' 198 199 logging.info(f'{id} {name} -> {target}: {msg}') 200 print(packet) 201 202 203 def event_user(self, packet: dict, interface): 204 ''' 205 Callback function for user updates 206 207 :param user: User information 208 ''' 209 210 ''' 211 { 212 'from' : 862341900, 213 'to' : 4294967295, 214 'decoded' : { 215 'portnum' : 'NODEINFO_APP', 216 'payload' : b'\n\t!33664b0c\x12\x08HELLDIVE\x1a\x04H3LL"\x06d\xe83fK\x0c(+8\x03', 217 'wantResponse' : True, 218 'user' : { 219 'id' : '!33664b0c', 220 'longName' : 'HELLDIVE', 221 'shortName' : 'H3LL', 222 'macaddr' : 'ZOgzZksM', 223 'hwModel' : 'HELTEC_V3', 224 'role' : 'ROUTER_CLIENT', 225 'raw' : 'rm this' 226 } 227 }, 228 'id' : 1612906268, 229 'rxTime' : 1714279638, 230 'rxSnr' : 6.25, 231 'hopLimit' : 3, 232 'rxRssi' : -38, 233 'hopStart' : 3, 234 'raw' : 'rm this' 235 } 236 ''' 237 238 # Not sure what to do with this yet... 239 pass 240 241 242 243 if __name__ == '__main__': 244 parser = argparse.ArgumentParser(description='Meshtastic Interfacing Tool') 245 parser.add_argument('--serial', help='Use serial interface') # Typically /dev/ttyUSB0 or /dev/ttyACM0 246 parser.add_argument('--tcp', help='Use TCP interface') # Can be an IP address or hostname (meshtastic.local) 247 args = parser.parse_args() 248 249 # Ensure one interface is specified 250 if (not args.serial and not args.tcp) or (args.serial and args.tcp): 251 raise SystemExit('Must specify either --serial or --tcp interface') 252 253 # Initialize the Meshtastic client 254 mesh = MeshtasticClient() 255 256 # Listen for Meshtastic events 257 mesh.listen() 258 259 # Connect to the Meshtastic interface 260 mesh.connect('serial' if args.serial else 'tcp', args.serial if args.serial else args.tcp) 261 262 # Keep-alive loop 263 try: 264 while True: 265 time.sleep(60) 266 except KeyboardInterrupt: 267 try: 268 mesh.interface.close() 269 except: 270 pass 271 finally: 272 logging.info('Connection to radio lost') 273 274 ''' 275 Notes: 276 conf = self.interface.localNode.localConfig 277 ok = interface.getNode('^local') 278 print(ok.channels) 279 '''