piermesh/src/Transmission/transmission.py

224 lines
7.2 KiB
Python

from sys import getsizeof
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
from Packets.Packets import Packets as Packets
from Packets.SinglePacket import SinglePacket
import time
from threading import Thread
from Components.daisy import Catch, Cache
import sys
import logging
# from Filters.base import Filter
import msgpack
import asyncio
import random
class Transmitter:
def __init__(self, device, filter, onodeID, cache, catch, cryptographyInfo, cLog):
self.cLog = cLog
self.cryptographyInfo = cryptographyInfo
self.filter = filter
self.tcache = cache
self.tcatch = catch
self.html = False
self.notConnected = True
self.messages = {}
self.acks = {}
# self.threads = {}
self.onodeID = onodeID
# Be careful with this
self.cpid = 0
self.tasks = {}
# TODO: use node id to deliver directly
pub.subscribe(self.onReceive, "meshtastic.receive")
pub.subscribe(self.onConnection, "meshtastic.connection.established")
self.interface = meshtastic.serial_interface.SerialInterface(device)
i = 0
while self.notConnected:
if i % 5000000 == 0:
self.cLog(20, "Waiting for node initialization...")
i += 1
self.cLog(20, "Initialized")
# TODO: Sending packets across multiple nodes/load balancing/distributed packet transmission/reception
def onReceive(self, packet, interface):
asyncio.new_event_loop().run_until_complete(self.filter.sieve(packet))
self.tcache.refresh()
async def sendAnnounce(self):
await self.addPackets(
msgpack.dumps(
{
"onodeID": self.onodeID,
"mnodeID": self.interface.localNode.nodeNum,
}
),
self.onodeID,
None,
True,
None,
packetsClass=0,
)
def onConnection(self, interface, topic=pub.AUTO_TOPIC):
# self.send("connect".encode("utf-8"))
# time.sleep(3)
asyncio.run(self.sendAnnounce())
self.notConnected = False
def responseCheck(self, packet):
rid = packet["decoded"]["requestId"]
if packet["decoded"]["routing"]["errorReason"] == "MAX_RETRANSMIT":
self.cLog(20, "Got ack error")
self.acks[str(rid)] = False
else:
self.acks[str(rid)] = True
# TODO: Threaded send method
def send(self, packet, recipientNode=False):
interface = self.interface
if recipientNode == False:
pid = interface.sendData(
packet, wantAck=True, onResponse=self.responseCheck
)
else:
pid = interface.sendData(
packet,
destinationId=recipientNode,
wantAck=True,
onResponse=self.responseCheck,
)
# Can I use waitForAckNak on cpid?
self.cpid = pid.id
return True
async def awaitResponse(self, pid):
for i in range(120):
await asyncio.sleep(1)
if str(pid) in self.acks:
break
return True
async def initNodeDH(self, dhefOb, recipientNode, onodeID):
await self.addPackets(
msgpack.dumps(
{"params": dhefOb.getParamsBytes(), "publicKey": dhefOb.publicKey}
),
self.onodeID,
000000,
000000,
onodeID,
directID=recipientNode,
packetsClass=3,
)
def awaitFullResponse(self, pid):
for i in range(1_000_000_000):
time.sleep(5)
if pid in self.messages.keys():
if self.messages[pid]["finished"]:
break
return True
async def addPackets(
self,
data,
sender,
senderName,
recipient,
recipientNode,
directID=False,
packetsClass=None,
encrypt=False,
):
interface = self.interface
tp = Packets(
data,
sender,
senderName,
recipient,
recipientNode,
packetsClass=packetsClass,
)
# print(sys.getsizeof(tp.packets[0]))
# print(tp.packets[0])
for p in tp.packets:
# time.sleep(5)
if recipientNode == None:
# print("sending none")
# print(p)
self.send(p)
else:
# print(p)
# print(recipientNode)
self.cLog(10, "Sending target: " + str(directID))
if directID != False:
recipientNode = directID
self.send(p, recipientNode=recipientNode)
awaitTask = asyncio.create_task(self.awaitResponse(self.cpid))
await asyncio.sleep(1)
currentTask = {
"ob": awaitTask,
"pid": str(self.cpid),
"packet": p,
"retry": False,
}
self.tasks[str(self.cpid)] = currentTask
async def progressCheck(self):
# interface = self.interface
while True:
await asyncio.sleep(90)
self.cLog(
20, "Checking progress of {0} tasks".format(len(self.tasks.keys()))
)
doneFlag = True
dcTasks = [k for k in self.tasks.keys()]
for task in dcTasks:
task = self.tasks[task]
if task["ob"]:
if task["pid"] in self.acks:
if not self.acks[task["pid"]]:
retry = task["retry"]
remove = False
if retry == False:
retry = 1
elif retry < 3:
retry += 1
else:
self.cLog(30, "Too many retries")
remove = True
if remove:
del self.tasks[task["pid"]]
else:
self.cLog(20, "Doing retry")
doneFlag = False
# TODO: Resend to specific node
self.send(task["packet"])
await_thread = asyncio.create_task(
self.awaitResponse(task["pid"])
)
await asyncio.sleep(1)
currentTask = {
"ob": await_thread,
"pid": str(self.cpid),
"packet": task["packet"],
}
currentTask["retry"] = retry
self.tasks[task["pid"]] = currentTask
else:
del self.tasks[task["pid"]]
async def announce(self):
while True:
self.cLog(10, "Announce")
await self.sendAnnounce()
await asyncio.sleep(180)