# PierMesh libraries from Sponge.base import Filter from Siph.map import Network from Daisy.Catch import Catch from Daisy.Cache import Cache from Splash.serve import Server from Transceiver.Transceiver import Transceiver from Cryptography.WhaleSong import DHEFern from ui import TUI # Generic imports import logging import os import asyncio import sys import time import datetime import traceback import threading import random # Process management library import psutil if __name__ == "__main__": global nodeOb, tuiOb """ Global objects for the PierMesh service and the TUI so we can terminate the associated processes later """ nodeOb = None tuiOb = None # Pull startup parameters device, webPort, serverInfoFile, delay, nodeNickname = sys.argv[1:] # Set up file based logging logPath = "logs" fileName = datetime.datetime.now().strftime("%m%d%Y_%H%M%S") logFormat = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" logging.basicConfig( format=logFormat, level=logging.INFO, filename="{0}/{2}_{1}.log".format(logPath, fileName, nodeNickname), ) class Node: """ Class that handles most of the PierMesh data `🔗 Source `_ Attributes ---------- toLog: list We store logs to be processed here actions: dict Dictionary mapping methods with the action prefix to the method name after action dynamically to be called through Sponge (`Sponge.base`) filtering todo: list[dict] List of actions to execute network: `Network` Network map catch: `Catch` Daisy cache for catchs, our domain analog cache: `Cache` Daisy cache for general use nodeInfo: Daisy Daisy (`Components.daisy.Daisy`) record containing some information about the node onodeID: str PierMesh node ID oTransceiver: Transceiver LoRa transceiver `Transceiver` processed: list List of IDs of already completed messages so that we don't reprocess messages proc: psutil.Process This process (`psutil.Process`), used for managing and monitoring PierMesh mTasks: dict Dictionary of PierMesh service tasks See Also -------- logPassLoop: Loop to handle logging to file and TUI """ def __init__(self): self.toLog = [] actionsList = [f for f in dir(self) if "action" in f] self.actions = {} for a in actionsList: self.actions[a.split("_")[1]] = getattr(self, a) self.todo = [] self.cLog(20, "Past action mapping") self.network = Network() self.catch = Catch(walk=True) self.cache = Cache(walk=True) self.nodeInfo = self.cache.get(serverInfoFile) if self.nodeInfo == False: self.cache.create(serverInfoFile, {"nodeID": random.randrange(0, 1000000)}) self.nodeInfo = self.cache.get(serverInfoFile) self.network.addin(self.nodeInfo.get()["nodeID"]) self.cLog(20, "Siph network stack initialized") self.onodeID = str(self.nodeInfo.get()["nodeID"]) self.server = None self.sponge = Filter(self.cache, self.onodeID, self.todo, self.cLog) self.cLog(20, "Filter initialized") self.cLog(10, "Command line arguments: " + ", ".join(sys.argv)) self.oTransceiver = None self.cLog(20, "Cryptography initializing") self.cryptographyInfo = DHEFern(self.cache, nodeNickname, self.cLog) self.cLog(20, "Cryptography initialized") self.processed = [] self.proc = psutil.Process(os.getpid()) self.mTasks = {} def cLog(self, priority: int, message: str): """ Convenience function that logs to the ui and log files Parameters ---------- priority: int Priority of message to be passed to logging message: str Message to log Returns ------- None """ logging.log(priority, message) self.toLog.append("[{0}]:\n{1}".format(datetime.datetime.now(), message)) async def monitor(self): global tuiOb """ Monitor and log ram and cpu usage """ while True: if tuiOb != None: if tuiOb.done: print("Terminating PierMesh service...") self.proc.terminate() await asyncio.sleep(10) memmb = self.proc.memory_info().rss / (1024 * 1024) memmb = round(memmb, 2) cpup = self.proc.cpu_percent(interval=1) self.cLog( 20, " MEM: {0} mB | CPU: {1} %".format( memmb, cpup, ), ) tuiOb.do_set_cpu_percent(float(cpup)) tuiOb.do_set_mem(memmb) else: self.cLog(20, "No TUI object, waiting 5 seconds...") await asyncio.sleep(5) async def spongeListen(self): """ Loop to watch for tasks to do See Also -------- Sponge.base.sieve: Packet filtering/parsing Notes ----- We use a common technique here that calls the function from our preloaded actions via dictionary entry """ while True: while len(self.todo) >= 1: todoNow = self.todo.pop() action = todoNow["action"] self.cLog(20, "Action: " + action) data = todoNow["data"] await self.actions[action](data) await asyncio.sleep(1) async def action_sendToPeer(self, data: dict): """ Send data to a peer connected to the server Parameters ---------- data: dict Data passed from the filter, this is a generic object so it's similar on all actions here See Also -------- Sponge.Protocols: Protocol based packet filtering webui.serve.Server: Runs a light Microdot web server with http/s and websocket functionality webui.serve.Server.sendToPeer: Function to actually execute the action """ self.server.sendToPeer(data["recipient"], data["res"]) async def action_sendCatch(self, data: dict): """ Get catch and return the data to a peer """ res = self.catch.get(data["head"], data["body"], fins=data["fins"]) self.server.sendToPeer(data["recipient"], res) async def action_map(self, data: dict): """ Map new network data to internal network map See Also -------- Siph.network.Network: Layered graph etwork representation """ self.network.addLookup(data["onodeID"], data["mnodeID"]) self.cLog(20, "Lookup addition done") self.network.addon(data["onodeID"]) async def action_initNodeDH(self, data: dict): """ Initialize diffie hellman key exchange See Also -------- Cryptography.DHEFern.DHEFern: End to end encryption functionality """ if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False: await self.oTransceiver.initNodeDH( self.cryptographyInfo, int(data["mnodeID"]), data["onodeID"] ) async def action_keyDeriveDH(self, data: dict): """ Derive key via diffie hellman key exchange """ try: self.cryptographyInfo.keyDerive( data["publicKey"], self.cryptographyInfo.getSalt(), data["recipientNode"], data["params"], ) except: self.cLog(30, traceback.format_exc()) async def logPassLoop(): """ Loop to pass logs up to the TUI See Also -------- ui.TUI: TUI implementation """ global tuiOb, nodeOb while True: await asyncio.sleep(1) if tuiOb == None or nodeOb == None: await asyncio.sleep(1) elif tuiOb.done == True: print("Terminating PierMesh service...") nodeOb.proc.terminate() else: ctoLog = [l for l in nodeOb.toLog] for l in ctoLog: tuiOb.do_write_line(l) nodeOb.toLog.pop() async def main(): """ Main method for running the PierMesh service """ global nodeOb try: nodeOb = Node() nodeOb.cLog(20, "Starting up") nodeOb.cLog(20, "Staggering {0} seconds, please wait".format(sys.argv[4])) time.sleep(int(sys.argv[4])) nodeOb.oTransceiver = Transceiver( sys.argv[1], nodeOb.sponge, nodeOb.onodeID, nodeOb.cache, nodeOb.catch, nodeOb.cryptographyInfo, nodeOb.cLog, ) nodeOb.server = Server( nodeOb.oTransceiver, nodeOb.catch, nodeOb.onodeID, nodeOb.network, nodeOb.cLog, ) nodeOb.mTasks["list"] = asyncio.create_task(nodeOb.spongeListen()) await asyncio.sleep(1) nodeOb.mTasks["pct"] = asyncio.create_task(nodeOb.oTransceiver.progressCheck()) await asyncio.sleep(1) nodeOb.mTasks["mon"] = asyncio.create_task(nodeOb.monitor()) await asyncio.sleep(1) nodeOb.mTasks["announce"] = asyncio.create_task(nodeOb.oTransceiver.announce()) await asyncio.sleep(1) await nodeOb.server.app.start_server(port=int(sys.argv[2]), debug=True) except KeyboardInterrupt: sys.exit() except Exception: nodeOb.cLog(20, traceback.format_exc()) if __name__ == "__main__": try: mainThread = threading.Thread(target=asyncio.run, args=(main(),)) mainThread.start() lplThread = threading.Thread(target=asyncio.run, args=(logPassLoop(),)) lplThread.start() tuiOb = TUI() tuiOb.nodeOb = nodeOb tuiOb.run() except KeyboardInterrupt: nodeOb.cLog(30, "Terminating PierMesh service...") except Exception: nodeOb.cLog(30, sys.gettrace())