piermesh/src/run.py

337 lines
10 KiB
Python
Executable File

# PierMesh libraries
from Sponge.base import Filter
from Siph.map import Network
from Components.daisy import Catch
from Components.daisy import Cache
from webui.serve import Server
from Transmission.transmission import Transmitter
from Cryptography.DHEFern import DHEFern
from ui import TUI
# Generic imports
import logging
import os
import asyncio
import sys
import time
import datetime
import traceback
import threading
import random
# Process management library
import psutil
if __name__ == "__main__":
global nodeOb, tuiOb
"""
Global objects for the PierMesh service and the TUI so we can terminate the associated processes later
"""
tuiOb = None
nodeOb = None
# Pull startup parameters
device, webPort, serverInfoFile, delay, nodeNickname = sys.argv[1:]
# Set up file based logging
logPath = "logs"
fileName = datetime.datetime.now().strftime("%m%d%Y_%H%M%S")
logFormat = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
logging.basicConfig(
format=logFormat,
level=logging.INFO,
filename="{0}/{2}_{1}.log".format(logPath, fileName, nodeNickname),
)
class Node:
"""
Node: Class that handles most of the PierMesh data
`🔗 Source <https://git.utopic.work/PierMesh/piermesh/src/branch/main/src/run.py>`_
Attributes
----------
toLog: list
We store logs to be processed here
actions: dict
Dictionary mapping methods with the action prefix to the method name after action dynamically to be called through Sponge (`Sponge.base`) filtering
todo: list[dict]
List of actions to execute
network: `Network`
Network map
catch: `Catch`
Daisy cache for catchs, our domain analog
cache: `Cache`
Daisy cache for general use
nodeInfo: Daisy
Daisy (`Components.daisy.Daisy`) record containing some information about the node
onodeID: str
PierMesh node ID
oTransmitter: Transmitter
LoRa transmitter `Transmitter`
processed: list
List of IDs of already completed messages so that we don't reprocess messages
proc: psutil.Process
This process (`psutil.Process`), used for managing and monitoring PierMesh
mTasks: dict
Dictionary of PierMesh service tasks
See Also
--------
logPassLoop: Loop to handle logging to file and TUI
"""
def __init__(self):
self.toLog = []
actionsList = [f for f in dir(self) if "action" in f]
self.actions = {}
for a in actionsList:
self.actions[a.split("_")[1]] = getattr(self, a)
self.todo = []
self.cLog(20, "Past action mapping")
self.network = Network()
self.catch = Catch(walk=True)
self.cache = Cache(walk=True)
self.nodeInfo = self.cache.get(serverInfoFile)
if self.nodeInfo == False:
self.cache.create(serverInfoFile, {"nodeID": random.randrange(0, 1000000)})
self.nodeInfo = self.cache.get(serverInfoFile)
self.network.addin(self.serverInfo.get()["nodeID"])
self.cLog(20, "Siph network stack initialized")
self.onodeID = str(self.nodeInfo.get()["nodeID"])
self.server = None
self.sponge = Filter(self.cache, self.onodeID, self.todo, self.cLog)
self.cLog(20, "Filter initialized")
self.cLog(10, "Command line arguments: " + ", ".join(sys.argv))
self.oTransmitter = None
self.cLog(20, "Cryptography initializing")
self.cryptographyInfo = DHEFern(self.cache, nodeNickname, self.cLog)
self.cLog(20, "Cryptography initialized")
self.processed = []
self.proc = psutil.Process(os.getpid())
self.mTasks = {}
def cLog(self, priority: int, message: str):
"""
Convenience function that logs to the ui and log files
Parameters
----------
priority: int
Priority of message to be passed to logging
message: str
Message to log
Returns
-------
None
"""
logging.log(priority, message)
self.toLog.append("[{0}]:\n{1}".format(datetime.datetime.now(), message))
async def monitor(self):
"""
Monitor and log ram and cpu usage
"""
while True:
if tuiOb != None:
if tuiOb.done:
print("Terminating PierMesh service...")
self.proc.terminate()
await asyncio.sleep(10)
memmb = self.proc.memory_info().rss / (1024 * 1024)
memmb = round(memmb, 2)
cpup = self.proc.cpu_percent(interval=1)
self.cLog(
20,
" MEM: {0} mB | CPU: {1} %".format(
memmb,
cpup,
),
)
# Set cpu and memory usage in the TUI
tuiOb.do_set_cpu_percent(float(cpup))
tuiOb.do_set_mem(memmb)
async def spongeListen(self):
"""
Loop to watch for tasks to do
See Also
--------
Filters.base.sieve: Packet filtering/parsing
Notes
-----
We use a common technique here that calls the function from our preloaded actions via dictionary entry
"""
while True:
while len(self.todo) >= 1:
todoNow = self.todo.pop()
action = todoNow["action"]
self.cLog(20, "Action: " + action)
data = todoNow["data"]
await self.actions[action](data)
await asyncio.sleep(1)
async def action_sendToPeer(self, data: dict):
"""
Send data to a peer connected to the server
Parameters
----------
data: dict
Data passed from the filter, this is a generic object so it's similar on all actions here
See Also
--------
Filters.Protocols: Protocol based packet filtering
webui.serve.Server: Runs a light Microdot web server with http/s and websocket functionality
webui.serve.Server.sendToPeer: Function to actually execute the action
"""
self.server.sendToPeer(data["recipient"], data["res"])
async def action_sendCatch(self, data: dict):
"""
Get catch and return the data to a peer
See Also
--------
Bubble.router.Router: Routing class
"""
res = self.catch.get(data["head"], data["body"], fins=data["fins"])
self.server.sendToPeer(data["recipient"], res)
async def action_map(self, data: dict):
"""
Map new network data to internal network map
See Also
--------
Siph.network.Network: Layered graph etwork representation
"""
self.network.addLookup(data["onodeID"], data["mnodeID"])
self.cLog(20, "Lookup addition done")
self.network.addon(data["onodeID"])
async def action_initNodeDH(self, data: dict):
"""
Initialize diffie hellman key exchange
See Also
--------
Cryptography.DHEFern.DHEFern: End to end encryption functionality
"""
if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False:
await self.oTransmitter.initNodeDH(
self.cryptographyInfo, int(data["mnodeID"]), data["onodeID"]
)
async def action_keyDeriveDH(self, data: dict):
"""
Derive key via diffie hellman key exchange
"""
try:
self.cryptographyInfo.keyDerive(
data["publicKey"],
self.cryptographyInfo.getSalt(),
data["recipientNode"],
data["params"],
)
except:
self.cLog(30, traceback.format_exc())
async def logPassLoop():
"""
Loop to pass logs up to the TUI
See Also
--------
ui.TUI: TUI implementation
"""
global tuiOb, nodeOb
while True:
await asyncio.sleep(1)
if tuiOb == None or nodeOb == None:
await asyncio.sleep(1)
elif tuiOb.done == True:
print("Terminating PierMesh service...")
nodeOb.proc.terminate()
else:
ctoLog = [l for l in nodeOb.toLog]
for l in ctoLog:
tuiOb.do_write_line(l)
nodeOb.toLog.pop()
async def main():
"""
Main method for running the PierMesh service
"""
global nodeOb
try:
nodeOb = Node()
nodeOb.cLog(20, "Starting up")
nodeOb.cLog(20, "Staggering {0} seconds, please wait".format(sys.argv[4]))
time.sleep(int(sys.argv[4]))
nodeOb.oTransmitter = Transmitter(
sys.argv[1],
nodeOb.sponge,
nodeOb.onodeID,
nodeOb.cache,
nodeOb.catch,
nodeOb.cryptographyInfo,
nodeOb.cLog,
)
nodeOb.server = Server(
nodeOb.oTransmitter,
nodeOb.catch,
nodeOb.onodeID,
nodeOb.network,
nodeOb.cLog,
)
nodeOb.mTasks["list"] = asyncio.create_task(nodeOb.spongeListen())
await asyncio.sleep(1)
nodeOb.mTasks["pct"] = asyncio.create_task(nodeOb.oTransmitter.progressCheck())
await asyncio.sleep(1)
nodeOb.mTasks["mon"] = asyncio.create_task(nodeOb.monitor())
await asyncio.sleep(1)
nodeOb.mTasks["announce"] = asyncio.create_task(nodeOb.oTransmitter.announce())
await asyncio.sleep(1)
await nodeOb.server.app.start_server(port=int(sys.argv[2]), debug=True)
except KeyboardInterrupt:
sys.exit()
if __name__ == "__main__":
try:
mainThread = threading.Thread(target=asyncio.run, args=(main(),))
mainThread.start()
lplThread = threading.Thread(target=asyncio.run, args=(logPassLoop(),))
lplThread.start()
tuiOb = TUI()
tuiOb.nodeOb = nodeOb
tuiOb.run()
except KeyboardInterrupt:
nodeOb.cLog(30, "Terminating PierMesh service...")
except Exception:
nodeOb.cLog(30, sys.gettrace())