import meshtastic import meshtastic.serial_interface from pubsub import pub from Packets.Message import Message import time import msgpack import asyncio class Transceiver: """ Handling LoRa transceiving Attributes ---------- cLog Reference to `run.Node.cLog` for logging cryptographyInfo: Cryptography.WhaleSong.DHEFern Cryptography instance for encrypting transmissions filter: Sponge.base.Filter `Sponge.base.Filter` instance for filtering packets tcache: Daisy.Cache.Cache Data backend Daisy Cache tcatch: Daisy.Catch.Catch Daisy Catch Cache for Catch operations notConnected: bool Whether the transceiver has been connected to yet acks: dict Acknowledgements received per packet onodeID PierMesh node ID messages: dict Message completion acknowldgements Notes ----- TODO: Check if we can remove cpid """ def __init__(self, device, filter, onodeID, cache, catch, cryptographyInfo, cLog): self.cLog = cLog self.cryptographyInfo = cryptographyInfo self.filter = filter self.tcache = cache self.tcatch = catch self.notConnected = True self.messages = {} self.acks = {} self.onodeID = onodeID # Be careful with this self.cpid = 0 self.tasks = {} # TODO: use node id to deliver directly pub.subscribe(self.onReceive, "meshtastic.receive") pub.subscribe(self.onConnection, "meshtastic.connection.established") self.interface = meshtastic.serial_interface.SerialInterface(device) i = 0 while self.notConnected: if i % 5000000 == 0: self.cLog(20, "Waiting for node initialization...") i += 1 self.cLog(20, "Initialized") # TODO: Sending packets across multiple nodes/load balancing/distributed packet transmission/reception def onReceive(self, packet, interface): """ Run each received packet through Sponge.base.Filters sieve using a new event loop """ asyncio.new_event_loop().run_until_complete(self.filter.sieve(packet)) self.tcache.refresh() async def sendAnnounce(self): """ Send an announce packet (contains basic network mapping information) every so often so new nodes autoconnect """ await self.addPackets( msgpack.dumps( { "onodeID": self.onodeID, "mnodeID": self.interface.localNode.nodeNum, } ), self.onodeID, None, True, None, packetsClass=0, ) def onConnection(self, interface, topic=pub.AUTO_TOPIC): """ When the node connects start announce loop and end the waiting loop """ asyncio.run(self.sendAnnounce()) self.notConnected = False def responseCheck(self, packet): """ On acknowldgement response set acks based on response """ rid = packet["decoded"]["requestId"] if packet["decoded"]["routing"]["errorReason"] == "MAX_RETRANSMIT": self.cLog(20, "Got ack error") self.acks[str(rid)] = False else: self.acks[str(rid)] = True # TODO: Threaded send method def send(self, packet, recipientNode=False): """ Send individual packet Parameters ---------- recipientNode If set send to specified node """ interface = self.interface if recipientNode == False: pid = interface.sendData( packet, wantAck=True, onResponse=self.responseCheck ) else: pid = interface.sendData( packet, destinationId=recipientNode, wantAck=True, onResponse=self.responseCheck, ) # Can I use waitForAckNak on cpid? self.cpid = pid.id return True async def awaitResponse(self, pid): """ Wait for acknowldgement response """ for i in range(120): await asyncio.sleep(1) if str(pid) in self.acks: break return True async def initNodeDH(self, dhefOb, recipientNode, onodeID): """ Send Diffie Hellman initialization message """ await self.addPackets( msgpack.dumps( {"params": dhefOb.getParamsBytes(), "publicKey": dhefOb.publicKey} ), self.onodeID, 000000, 000000, onodeID, directID=recipientNode, packetsClass=3, ) def awaitFullResponse(self, pid): """ TODO Wait for message completed response """ for i in range(1_000_000_000): time.sleep(5) if pid in self.messages.keys(): if self.messages[pid]["finished"]: break return True async def addPackets( self, data, sender, senderName, recipient, recipientNode, directID=False, packetsClass=None, encrypt=False, ): """ Convert binary data to Message and send each packet Parameters ---------- data: bytes Data to send sender Peer/Node ID of sender senderName ID matching specific user title recipient Peer/Node ID of recipient recipientNode Node ID of node to route to directID If set send to this Node only packetsClass Protocol for message """ tp = Message( data, sender, senderName, recipient, recipientNode, packetsClass=packetsClass, ) for p in tp.packets: if recipientNode == None: self.send(p) else: self.cLog(10, "Sending target: " + str(directID)) if directID != False: recipientNode = directID self.send(p, recipientNode=recipientNode) awaitTask = asyncio.create_task(self.awaitResponse(self.cpid)) await asyncio.sleep(1) currentTask = { "ob": awaitTask, "pid": str(self.cpid), "packet": p, "retry": False, } self.tasks[str(self.cpid)] = currentTask async def progressCheck(self): """ Checks if acknowldgement was received per packet and if not resends """ while True: await asyncio.sleep(90) self.cLog( 20, "Checking progress of {0} tasks".format(len(self.tasks.keys())) ) doneFlag = True dcTasks = [k for k in self.tasks.keys()] for task in dcTasks: task = self.tasks[task] if task["ob"]: if task["pid"] in self.acks: if not self.acks[task["pid"]]: retry = task["retry"] remove = False if retry == False: retry = 1 elif retry < 3: retry += 1 else: self.cLog(30, "Too many retries") remove = True if remove: del self.tasks[task["pid"]] else: self.cLog(20, "Doing retry") doneFlag = False # TODO: Resend to specific node self.send(task["packet"]) await_thread = asyncio.create_task( self.awaitResponse(task["pid"]) ) await asyncio.sleep(1) currentTask = { "ob": await_thread, "pid": str(self.cpid), "packet": task["packet"], } currentTask["retry"] = retry self.tasks[task["pid"]] = currentTask else: del self.tasks[task["pid"]] async def announce(self): """ Announce loop runner """ while True: self.cLog(10, "Announce") await self.sendAnnounce() await asyncio.sleep(180)