# PierMesh libraries from Sponge.base import Filter from Siph.map import Network from Daisy import Daisy as du from Daisy.Catch import Catch from Daisy.Cache import Cache from Daisy.Index import Index from Daisy.CryptographyUtil import SteelPetal from Splash.serve import Server from Transceiver.Transceiver import Transceiver from Cryptography.WhaleSong import Transport import Components.hopper as hopper from Packets.Messages.Protocols.hopper.Response import HopperResponse from Packets.Messages.Protocols.catch.Response import CatchResponse from Packets.Messages.Protocols.cryptography.Handshake import Handshake import tlog from tlog import VHandler # Generic imports import logging import os import asyncio import time import datetime import traceback import threading import random import argparse import configparser import shutil import queue import json # Process management library import psutil # Faster asyncio drop-in import uvloop # TODO: Switch config to toml # TODO: Better protocol management # TODO: Dry run # TODO: Fix scripts/marcille # TODO: Manually trigger announce # TODO: Run announce at initialization if __name__ == "__main__": """ Global objects for the PierMesh service and the TUI so we can terminate the associated processes later and configuration objects for the command line and config (.piermesh) """ logger = "" passkey = input("Enter node decryption key: ") # TODO: Check for PSK # psk = input("Enter psk: ") nodeOb = None # NOTE: Command line argument parser, example of a proper configuration # is in scripts/falin parser = argparse.ArgumentParser() parser.add_argument("-d", "--device", help="Set transceiver device path (ex. /dev/ttyACM0)") parser.add_argument("-p", "--port", help="Web UI server port, default: 5000") # NOTE: Detetmines where your node's data is stored oj disk parser.add_argument("-n", "--nickname", help="Node nickname (sets node data path)") parser.add_argument( "-s", "--startupDelay", help="Startup delay (useful for testing)" ) parser.add_argument( "-o", "--override", help="Whether to override config file", default=False ) # TODO: Fix parser.add_argument("-x", "--showTUI", help="Whether to show TUI (needs fix)", default=True) parser.add_argument( "-l", "--confList", help="Comma separated list of conf files to load for multi-node mode (TODO)", default=False, ) parser.add_argument( "-k", "--PSK", help="Pre shared key for initializing communications", default=False, ) argConfig = parser.parse_args() # NOTE: .piermesh conf parser, proper example is in .piermesh.example config = configparser.ConfigParser() if argConfig.confList != False: pass if not os.path.exists(".piermesh"): print("No config at .piermesh, duplicating and loading example config...") shutil.copyfile(".piermesh.example", ".piermesh") config.read(".piermesh") device = "" if "transceiverPort" in config["OPERATOR_REQUIRED"]: if argConfig.override: device = argConfig.device else: device = config["OPERATOR_REQUIRED"]["transceiverPort"] else: if argConfig.device == False: print("No device set exiting...") exit(0) else: device = argConfig.device webPort = config["DEFAULT"]["WebUIPort"] if argConfig.override: webPort = argConfig.port else: if "WebUIPort" in config["OPERATOR_OVERRIDES"]: webPort = config["OPERATOR_OVERRIDES"]["WebUIPort"] webPort = int(webPort) delay = config["DEFAULT"]["StartupDelay"] if argConfig.override: delay = argConfig.startupDelay else: if "StartupDelay" in config["OPERATOR_OVERRIDES"]: delay = config["OPERATOR_OVERRIDES"]["StartupDelay"] delay = int(delay) nodeNickname = config["DEFAULT"]["Nickname"] if argConfig.override: nodeNickname = argConfig.nickname else: if "Nickname" in config["OPERATOR_OVERRIDES"]: nodeNickname = config["OPERATOR_OVERRIDES"]["Nickname"] showTUI = config["DEFAULT"]["ShowTUI"] if argConfig.override: showTUI = argConfig.showTUI else: if "ShowTUI" in config["OPERATOR_OVERRIDES"]: showTUI = config["OPERATOR_OVERRIDES"]["ShowTUI"] showTUI = showTUI == "True" psk = False if argConfig.override: if not argConfig.PSK: print("No PSK set quitting...") exit(0) else: psk = argConfig.PSK # else: if "PSK" in config["OPERATOR_REQUIRED"]: psk = config["OPERATOR_REQUIRED"]["PSK"] if delay > 0: print("Staggering {0} seconds please wait".format(delay)) time.sleep(delay) class Node: """ Class that handles most of the PierMesh data `🔗 Source `_ Attributes ---------- actions: dict Dictionary mapping methods with the action prefix to the method name after action dynamically to be called through Sponge (`Sponge.base`) filtering todo: list[dict] List of actions to execute network: `Network` Network map catch: `Catch` Daisy cache for catchs, our domain analog cache: `Cache` Daisy cache for general use nodeInfo: Daisy Daisy (`Components.daisy.Daisy`) record containing some information about the node onodeID: str PierMesh node ID oTransceiver: Transceiver LoRa transceiver `Transceiver` proc: psutil.Process This process (`psutil.Process`), used for managing and monitoring PierMesh mTasks: dict Dictionary of PierMesh service tasks """ # isDone = False def __init__(self): actionsList = [f for f in dir(self) if "action" in f] self.actions = {} for a in actionsList: self.actions[a.split("_")[1]] = getattr(self, a) self.todo = [] logger.log(20, "Past action mapping") self.network = Network() self.daisyCryptography = SteelPetal(passkey) # TODO: Better directory structure self.catch = Catch(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/catch") self.cache = Cache(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/cache") du._json_to_msg("mlookup") shutil.copy("mlookup", f"{nodeNickname}/daisy/cache/mlookup") logger.info("Protocols:\n" + json.dumps(self.cache.get("mlookup").get(), indent=4)) self.remoteCatchIndex = Index(nodeNickname, self.daisyCryptography) serverInfoFile = nodeNickname + ".info" self.nodeInfo = self.cache.get(serverInfoFile) if self.nodeInfo == False: # TODO: Get/set toml self.cache.create( serverInfoFile, {"nodeID": random.randrange(0, 1000000)}) self.nodeInfo = self.cache.get(serverInfoFile) self.network.addin(self.nodeInfo.get()["nodeID"]) logger.log(20, "Siph network stack initialized") self.onodeID = str(self.nodeInfo.get()["nodeID"]) logger.log(20, "Cryptography initializing") self.cryptographyInfo = Transport( self.cache, nodeNickname, self.daisyCryptography, psk ) logger.log(20, "Cryptography initialized") self.sponge = Filter(self.cache, self.onodeID, self.todo, self.cryptographyInfo) logger.log(20, "Filter initialized") # TODO: Run in thread self.oTransceiver = Transceiver( device, self.sponge, self.onodeID, self.cache, self.catch, self.cryptographyInfo, self.network ) self.server = Server( self.oTransceiver, self.catch, self.onodeID, self.network, self.cryptographyInfo, self.remoteCatchIndex, self.cache, ) self.proc = psutil.Process(os.getpid()) self.mTasks = {} async def main(self): """ Main loop, sets up the message listening, system monitoring and server running loops """ self.mTasks["list"] = asyncio.create_task(self.spongeListen()) await asyncio.sleep(1) # self.mTasks["pct"] = asyncio.create_task(self.oTransceiver.progressCheck()) # await asyncio.sleep(1) self.mTasks["mon"] = asyncio.create_task(self.monitor()) await asyncio.sleep(1) self.mTasks["announce"] = asyncio.create_task( self.oTransceiver.announce()) await asyncio.sleep(1) await self.server.app.start_server(port=int(webPort)) async def fsInit(self): """ Initialize the file system for use """ # TODO: Flesh out and properly link everything if not os.path.exists("data"): os.makedirs("data") if not os.path.exists("data/" + nodeNickname): os.makedirs("data/" + nodeNickname) async def monitor(self): """ Monitor and log ram and cpu usage """ global tuiOb, logger while True: if tuiOb is not None and showTUI: await asyncio.sleep(10) memmb = self.proc.memory_info().rss / (1024 * 1024) memmb = round(memmb, 2) cpup = self.proc.cpu_percent(interval=1) logger.info( " MEM: {0} mB | CPU: {1} %".format( memmb, cpup, ), ) tuiOb.do_set_cpu_percent(float(cpup)) tuiOb.do_set_mem(memmb) elif not showTUI: await asyncio.sleep(10) memmb = self.proc.memory_info().rss / (1024 * 1024) memmb = round(memmb, 2) cpup = self.proc.cpu_percent(interval=1) logger.info( " MEM: {0} mB | CPU: {1} %".format( memmb, cpup, ), ) else: logger.log(20, "No TUI object, waiting 5 seconds...") await asyncio.sleep(5) async def spongeListen(self): """ Loop to watch for tasks to do See Also -------- Sponge.base.sieve: Packet filtering/parsing Notes ----- We use a common technique here that calls the function from our preloaded actions via dictionary entry """ global logger while True: while (len(self.todo) >= 1): try: logger.log(10, "In todo") todoNow = self.todo[0] action = todoNow.getAction() logger.log(20, "Action: " + action) data = todoNow.getData() logger.log(20, "Doing action") await self.actions[action](data) logger.log(20, "After action") except Exception: logger.log(20, traceback.format_exc()) self.todo.pop() await asyncio.sleep(1) async def action_sendToPeer(self, data: dict): """ Send data to a peer connected to the server Parameters ---------- data: dict Data passed from the filter, this is a generic object so it's similar on all actions here See Also -------- Sponge.Protocols: Protocol based packet filtering Splash.serve.Server: Runs a light Microdot web server with http/s and websocket functionality Splash.serve.Server.sendToPeer: Function to actually execute the action """ logger.info(data) await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["data"], data["target"])) await asyncio.sleep(1) async def action_sendCatch(self, data: dict): """ Get catch and return the data to a peer """ # TODO: Seperators if "fins" in data: res = self.catch.get(data["head"], data["body"], fins=data["fins"]) else: res = self.catch.get(data["head"], data["body"]) logger.info(res) r = CatchResponse( self.onodeID, 000000, self.onodeID, data["recipient"], data["recipientNode"], self.cryptographyInfo, res, pskEncrypt=True ) await asyncio.create_task(self.oTransceiver.sendMessage(r)) await asyncio.sleep(1) async def action_routeCatch(self, data: dict): """ Route received catch to peer who requested it """ await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["html"], data["target"])) await asyncio.sleep(1) async def action_syncIndex(self, data: dict): """ Add received index entries to Catch via the a remote Catch index """ for entry in data["index"]: self.remoteCatchIndex.addEntry(entry) async def action_map(self, data: dict): # TODO: Normalize node ids to strings """ Map new network data to internal network map See Also -------- Siph.network.Network: Layered graph network representation """ if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False: logger.log(20, "In map") await asyncio.create_task(self.network.addLookup(data["onodeID"], data["mnodeID"])) await asyncio.sleep(1) logger.log(20, "After add lookup") logger.log(20, "Adding public key") self.cryptographyInfo.addPublickey(data["onodeID"], data["publicKey"]) logger.log(20, "Public key addition done") self.network.omap.add_node(data["onodeID"]) logger.log(20, "Added node to outer map") else: logger.log(20, "Node already exists skipping addition to map") if "sessionKey" not in self.cryptographyInfo.getRecord("key", data["onodeID"]).keys(): if not data["dontRespond"]: h = Handshake(self.nodeInfo.get()["nodeID"], self.nodeInfo.get()["nodeID"], data["onodeID"], data["onodeID"], self.cryptographyInfo, data["onodeID"], self.onodeID) await asyncio.create_task(self.oTransceiver.sendMessage(h)) await asyncio.sleep(1) async def action_initCryptography(self, data: dict): """ Initialize AES-GCM encrypted transport session See Also -------- Cryptography.WhaleSong.Transport: End to end encryption functionality """ # TODO: Expiration self.cryptographyInfo.sessionSetup(data["yctx"]["sourceNode"]["val"], data["ephemeralKey"]) logger.log(20, "Cryptography handshake complete") async def action_hop(self, data): """ Proxy a request to the main internet (in the future cross protocol/link) """ try: r = None if data["method"] == "get": r = hopper.get( data["url"], params=data["parameters"], followTags=["img", "script", "link"], ) elif data["method"] == "post": r = hopper.post(data["url"], params=data["parameters"]) if r != None: r = HopperResponse( self.onodeID, 000000, self.onodeID, data["recipient"], data["recipientNode"], r, self.cryptographyInfo, ) await asyncio.create_task(self.oTransceiver.sendMessage(r)) await asyncio.sleep(1) except: logger.log(30, traceback.format_exc()) async def action_routeHop(self, data: dict): """ Return proxy request results to requester """ await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["res"], data["target"])) await asyncio.sleep(1) async def action_addPSK(self, data): """ Action to add PSK for specific node, currently unused """ self.cryptographyInfo.createEmpty(data["nodeID"]) self.cryptographyInfo.update(data["nodeID"], {"PSK": data["PSK"]}) async def main(): """ Kick on main method for running the PierMesh service """ try: nodeOb = Node() await nodeOb.main() nodeOb.isDone = True except: logger.log(20, traceback.format_exc()) def initLogger(nodeName, tolog, tui=True): """ Initialize our logging setup, we use a custom logger when running the TUI to consume the logs """ global logger logger = logging.getLogger(__name__) logger.propagate = False logger.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') if tui: vh = VHandler(logging.DEBUG, tolog) vh.setFormatter(formatter) logger.addHandler(vh) else: sh = logging.StreamHandler() sh.setLevel(logging.DEBUG) sh.setFormatter(formatter) logger.addHandler(sh) dt = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fh = logging.FileHandler(f"logs/{nodeName}_{dt}.log") fh.setFormatter(formatter) logger.addHandler(fh) if __name__ == "__main__": try: tolog = queue.Queue() initLogger(nodeNickname, tolog, tui=showTUI) mainThread = threading.Thread(target=uvloop.run, args=(main(),)) mainThread.start() if showTUI: tlog.runLogUI(tolog, nodeNickname) else: # TODO: Resource logging print("No TUI") except Exception: print(traceback.format_exc()) os._exit(1)