piermesh/src/run.py

538 lines
18 KiB
Python
Executable File

# PierMesh libraries
from Sponge.base import Filter
from Siph.map import Network
from Daisy import Daisy as du
from Daisy.Catch import Catch
from Daisy.Cache import Cache
from Daisy.Index import Index
from Daisy.CryptographyUtil import SteelPetal
from Splash.serve import Server
from Transceiver.Transceiver import Transceiver
from Cryptography.WhaleSong import Transport
import Components.hopper as hopper
from Packets.Messages.Protocols.hopper.Response import HopperResponse
from Packets.Messages.Protocols.catch.Response import CatchResponse
from Packets.Messages.Protocols.cryptography.Handshake import Handshake
import tlog
from tlog import VHandler
# Generic imports
import logging
import os
import asyncio
import time
import datetime
import traceback
import threading
import random
import argparse
import configparser
import shutil
import queue
import json
# Process management library
import psutil
# Faster asyncio drop-in
import uvloop
# TODO: Switch config to toml
# TODO: Better protocol management
# TODO: Dry run
# TODO: Fix scripts/marcille
# TODO: Manually trigger announce
# TODO: Run announce at initialization
if __name__ == "__main__":
"""
Global objects for the PierMesh service and the TUI so we can terminate the associated processes later and configuration objects for the command line and config (.piermesh)
"""
logger = ""
passkey = input("Enter node decryption key: ")
# TODO: Check for PSK
# psk = input("Enter psk: ")
nodeOb = None
# NOTE: Command line argument parser, example of a proper configuration
# is in scripts/falin
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device", help="Set transceiver device path (ex. /dev/ttyACM0)")
parser.add_argument("-p", "--port", help="Web UI server port, default: 5000")
# NOTE: Detetmines where your node's data is stored oj disk
parser.add_argument("-n", "--nickname", help="Node nickname (sets node data path)")
parser.add_argument(
"-s", "--startupDelay", help="Startup delay (useful for testing)"
)
parser.add_argument(
"-o", "--override", help="Whether to override config file", default=False
)
# TODO: Fix
parser.add_argument("-x", "--showTUI",
help="Whether to show TUI (needs fix)", default=True)
parser.add_argument(
"-l",
"--confList",
help="Comma separated list of conf files to load for multi-node mode (TODO)",
default=False,
)
parser.add_argument(
"-k",
"--PSK",
help="Pre shared key for initializing communications",
default=False,
)
argConfig = parser.parse_args()
# NOTE: .piermesh conf parser, proper example is in .piermesh.example
config = configparser.ConfigParser()
if argConfig.confList != False:
pass
if not os.path.exists(".piermesh"):
print("No config at .piermesh, duplicating and loading example config...")
shutil.copyfile(".piermesh.example", ".piermesh")
config.read(".piermesh")
device = ""
if "transceiverPort" in config["OPERATOR_REQUIRED"]:
if argConfig.override:
device = argConfig.device
else:
device = config["OPERATOR_REQUIRED"]["transceiverPort"]
else:
if argConfig.device == False:
print("No device set exiting...")
exit(0)
else:
device = argConfig.device
webPort = config["DEFAULT"]["WebUIPort"]
if argConfig.override:
webPort = argConfig.port
else:
if "WebUIPort" in config["OPERATOR_OVERRIDES"]:
webPort = config["OPERATOR_OVERRIDES"]["WebUIPort"]
webPort = int(webPort)
delay = config["DEFAULT"]["StartupDelay"]
if argConfig.override:
delay = argConfig.startupDelay
else:
if "StartupDelay" in config["OPERATOR_OVERRIDES"]:
delay = config["OPERATOR_OVERRIDES"]["StartupDelay"]
delay = int(delay)
nodeNickname = config["DEFAULT"]["Nickname"]
if argConfig.override:
nodeNickname = argConfig.nickname
else:
if "Nickname" in config["OPERATOR_OVERRIDES"]:
nodeNickname = config["OPERATOR_OVERRIDES"]["Nickname"]
showTUI = config["DEFAULT"]["ShowTUI"]
if argConfig.override:
showTUI = argConfig.showTUI
else:
if "ShowTUI" in config["OPERATOR_OVERRIDES"]:
showTUI = config["OPERATOR_OVERRIDES"]["ShowTUI"]
showTUI = showTUI == "True"
psk = False
if argConfig.override:
if not argConfig.PSK:
print("No PSK set quitting...")
exit(0)
else:
psk = argConfig.PSK
# else:
if "PSK" in config["OPERATOR_REQUIRED"]:
psk = config["OPERATOR_REQUIRED"]["PSK"]
if delay > 0:
print("Staggering {0} seconds please wait".format(delay))
time.sleep(delay)
class Node:
"""
Class that handles most of the PierMesh data
`🔗 Source <https://git.utopic.work/PierMesh/piermesh/src/branch/main/src/run.py>`_
Attributes
----------
actions: dict
Dictionary mapping methods with the action prefix to the method name after action dynamically to be called through Sponge (`Sponge.base`) filtering
todo: list[dict]
List of actions to execute
network: `Network`
Network map
catch: `Catch`
Daisy cache for catchs, our domain analog
cache: `Cache`
Daisy cache for general use
nodeInfo: Daisy
Daisy (`Components.daisy.Daisy`) record containing some information about the node
onodeID: str
PierMesh node ID
oTransceiver: Transceiver
LoRa transceiver `Transceiver`
proc: psutil.Process
This process (`psutil.Process`), used for managing and monitoring PierMesh
mTasks: dict
Dictionary of PierMesh service tasks
"""
# isDone = False
def __init__(self):
actionsList = [f for f in dir(self) if "action" in f]
self.actions = {}
for a in actionsList:
self.actions[a.split("_")[1]] = getattr(self, a)
self.todo = []
logger.log(20, "Past action mapping")
self.network = Network()
self.daisyCryptography = SteelPetal(passkey)
# TODO: Better directory structure
self.catch = Catch(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/catch")
self.cache = Cache(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/cache")
du._json_to_msg("mlookup")
shutil.copy("mlookup", f"{nodeNickname}/daisy/cache/mlookup")
logger.info("Protocols:\n" + json.dumps(self.cache.get("mlookup").get(), indent=4))
self.remoteCatchIndex = Index(nodeNickname, self.daisyCryptography)
serverInfoFile = nodeNickname + ".info"
self.nodeInfo = self.cache.get(serverInfoFile)
if self.nodeInfo == False:
# TODO: Get/set toml
self.cache.create(
serverInfoFile, {"nodeID": random.randrange(0, 1000000)})
self.nodeInfo = self.cache.get(serverInfoFile)
self.network.addin(self.nodeInfo.get()["nodeID"])
logger.log(20, "Siph network stack initialized")
self.onodeID = str(self.nodeInfo.get()["nodeID"])
logger.log(20, "Cryptography initializing")
self.cryptographyInfo = Transport(
self.cache, nodeNickname, self.daisyCryptography, psk
)
logger.log(20, "Cryptography initialized")
self.sponge = Filter(self.cache, self.onodeID,
self.todo, self.cryptographyInfo)
logger.log(20, "Filter initialized")
# TODO: Run in thread
self.oTransceiver = Transceiver(
device,
self.sponge,
self.onodeID,
self.cache,
self.catch,
self.cryptographyInfo,
self.network
)
self.server = Server(
self.oTransceiver,
self.catch,
self.onodeID,
self.network,
self.cryptographyInfo,
self.remoteCatchIndex,
self.cache,
)
self.proc = psutil.Process(os.getpid())
self.mTasks = {}
async def main(self):
"""
Main loop, sets up the message listening, system monitoring and server running loops
"""
self.mTasks["list"] = asyncio.create_task(self.spongeListen())
await asyncio.sleep(1)
# self.mTasks["pct"] = asyncio.create_task(self.oTransceiver.progressCheck())
# await asyncio.sleep(1)
self.mTasks["mon"] = asyncio.create_task(self.monitor())
await asyncio.sleep(1)
self.mTasks["announce"] = asyncio.create_task(
self.oTransceiver.announce())
await asyncio.sleep(1)
await self.server.app.start_server(port=int(webPort))
async def fsInit(self):
"""
Initialize the file system for use
"""
# TODO: Flesh out and properly link everything
if not os.path.exists("data"):
os.makedirs("data")
if not os.path.exists("data/" + nodeNickname):
os.makedirs("data/" + nodeNickname)
async def monitor(self):
"""
Monitor and log ram and cpu usage
"""
global tuiOb, logger
while True:
if tuiOb is not None and showTUI:
await asyncio.sleep(10)
memmb = self.proc.memory_info().rss / (1024 * 1024)
memmb = round(memmb, 2)
cpup = self.proc.cpu_percent(interval=1)
logger.info(
" MEM: {0} mB | CPU: {1} %".format(
memmb,
cpup,
),
)
tuiOb.do_set_cpu_percent(float(cpup))
tuiOb.do_set_mem(memmb)
elif not showTUI:
await asyncio.sleep(10)
memmb = self.proc.memory_info().rss / (1024 * 1024)
memmb = round(memmb, 2)
cpup = self.proc.cpu_percent(interval=1)
logger.info(
" MEM: {0} mB | CPU: {1} %".format(
memmb,
cpup,
),
)
else:
logger.log(20, "No TUI object, waiting 5 seconds...")
await asyncio.sleep(5)
async def spongeListen(self):
"""
Loop to watch for tasks to do
See Also
--------
Sponge.base.sieve: Packet filtering/parsing
Notes
-----
We use a common technique here that calls the function from our preloaded actions via dictionary entry
"""
global logger
while True:
while (len(self.todo) >= 1):
try:
logger.log(10, "In todo")
todoNow = self.todo[0]
action = todoNow.getAction()
logger.log(20, "Action: " + action)
data = todoNow.getData()
logger.log(20, "Doing action")
await self.actions[action](data)
logger.log(20, "After action")
except Exception:
logger.log(20, traceback.format_exc())
self.todo.pop()
await asyncio.sleep(1)
async def action_sendToPeer(self, data: dict):
"""
Send data to a peer connected to the server
Parameters
----------
data: dict
Data passed from the filter, this is a generic object so it's similar on all actions here
See Also
--------
Sponge.Protocols: Protocol based packet filtering
Splash.serve.Server: Runs a light Microdot web server with http/s and websocket functionality
Splash.serve.Server.sendToPeer: Function to actually execute the action
"""
logger.info(data)
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["data"], data["target"]))
await asyncio.sleep(1)
async def action_sendCatch(self, data: dict):
"""
Get catch and return the data to a peer
"""
# TODO: Seperators
if "fins" in data:
res = self.catch.get(data["head"], data["body"], fins=data["fins"])
else:
res = self.catch.get(data["head"], data["body"])
logger.info(res)
r = CatchResponse(
self.onodeID,
000000,
self.onodeID,
data["recipient"],
data["recipientNode"],
self.cryptographyInfo,
res,
pskEncrypt=True
)
await asyncio.create_task(self.oTransceiver.sendMessage(r))
await asyncio.sleep(1)
async def action_routeCatch(self, data: dict):
"""
Route received catch to peer who requested it
"""
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["html"], data["target"]))
await asyncio.sleep(1)
async def action_syncIndex(self, data: dict):
"""
Add received index entries to Catch via the a remote Catch index
"""
for entry in data["index"]:
self.remoteCatchIndex.addEntry(entry)
async def action_map(self, data: dict):
# TODO: Normalize node ids to strings
"""
Map new network data to internal network map
See Also
--------
Siph.network.Network: Layered graph network representation
"""
if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False:
logger.log(20, "In map")
await asyncio.create_task(self.network.addLookup(data["onodeID"], data["mnodeID"]))
await asyncio.sleep(1)
logger.log(20, "After add lookup")
logger.log(20, "Adding public key")
self.cryptographyInfo.addPublickey(data["onodeID"], data["publicKey"])
logger.log(20, "Public key addition done")
self.network.omap.add_node(data["onodeID"])
logger.log(20, "Added node to outer map")
else:
logger.log(20, "Node already exists skipping addition to map")
if "sessionKey" not in self.cryptographyInfo.getRecord("key", data["onodeID"]).keys():
if not data["dontRespond"]:
h = Handshake(self.nodeInfo.get()["nodeID"], self.nodeInfo.get()["nodeID"], data["onodeID"], data["onodeID"], self.cryptographyInfo, data["onodeID"], self.onodeID)
await asyncio.create_task(self.oTransceiver.sendMessage(h))
await asyncio.sleep(1)
async def action_initCryptography(self, data: dict):
"""
Initialize AES-GCM encrypted transport session
See Also
--------
Cryptography.WhaleSong.Transport: End to end encryption functionality
"""
# TODO: Expiration
self.cryptographyInfo.sessionSetup(data["yctx"]["sourceNode"]["val"], data["ephemeralKey"])
logger.log(20, "Cryptography handshake complete")
async def action_hop(self, data):
"""
Proxy a request to the main internet (in the future cross protocol/link)
"""
try:
r = None
if data["method"] == "get":
r = hopper.get(
data["url"],
params=data["parameters"],
followTags=["img", "script", "link"],
)
elif data["method"] == "post":
r = hopper.post(data["url"], params=data["parameters"])
if r != None:
r = HopperResponse(
self.onodeID,
000000,
self.onodeID,
data["recipient"],
data["recipientNode"],
r,
self.cryptographyInfo,
)
await asyncio.create_task(self.oTransceiver.sendMessage(r))
await asyncio.sleep(1)
except:
logger.log(30, traceback.format_exc())
async def action_routeHop(self, data: dict):
"""
Return proxy request results to requester
"""
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["res"], data["target"]))
await asyncio.sleep(1)
async def action_addPSK(self, data):
"""
Action to add PSK for specific node, currently unused
"""
self.cryptographyInfo.createEmpty(data["nodeID"])
self.cryptographyInfo.update(data["nodeID"], {"PSK": data["PSK"]})
async def main():
"""
Kick on main method for running the PierMesh service
"""
try:
nodeOb = Node()
await nodeOb.main()
nodeOb.isDone = True
except:
logger.log(20, traceback.format_exc())
def initLogger(nodeName, tolog, tui=True):
"""
Initialize our logging setup, we use a custom logger when running the TUI to consume the logs
"""
global logger
logger = logging.getLogger(__name__)
logger.propagate = False
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if tui:
vh = VHandler(logging.DEBUG, tolog)
vh.setFormatter(formatter)
logger.addHandler(vh)
else:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(formatter)
logger.addHandler(sh)
dt = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
fh = logging.FileHandler(f"logs/{nodeName}_{dt}.log")
fh.setFormatter(formatter)
logger.addHandler(fh)
if __name__ == "__main__":
try:
tolog = queue.Queue()
initLogger(nodeNickname, tolog, tui=showTUI)
mainThread = threading.Thread(target=uvloop.run, args=(main(),))
mainThread.start()
if showTUI:
tlog.runLogUI(tolog, nodeNickname)
else:
# TODO: Resource logging
print("No TUI")
except Exception:
print(traceback.format_exc())
os._exit(1)