538 lines
18 KiB
Python
Executable File
538 lines
18 KiB
Python
Executable File
# PierMesh libraries
|
|
from Sponge.base import Filter
|
|
from Siph.map import Network
|
|
from Daisy import Daisy as du
|
|
from Daisy.Catch import Catch
|
|
from Daisy.Cache import Cache
|
|
from Daisy.Index import Index
|
|
from Daisy.CryptographyUtil import SteelPetal
|
|
from Splash.serve import Server
|
|
from Transceiver.Transceiver import Transceiver
|
|
from Cryptography.WhaleSong import Transport
|
|
import Components.hopper as hopper
|
|
from Packets.Messages.Protocols.hopper.Response import HopperResponse
|
|
from Packets.Messages.Protocols.catch.Response import CatchResponse
|
|
from Packets.Messages.Protocols.cryptography.Handshake import Handshake
|
|
import tlog
|
|
from tlog import VHandler
|
|
|
|
# Generic imports
|
|
import logging
|
|
import os
|
|
import asyncio
|
|
import time
|
|
import datetime
|
|
import traceback
|
|
import threading
|
|
import random
|
|
import argparse
|
|
import configparser
|
|
import shutil
|
|
import queue
|
|
import json
|
|
|
|
# Process management library
|
|
import psutil
|
|
|
|
# Faster asyncio drop-in
|
|
import uvloop
|
|
|
|
# TODO: Switch config to toml
|
|
# TODO: Better protocol management
|
|
# TODO: Dry run
|
|
# TODO: Fix scripts/marcille
|
|
# TODO: Manually trigger announce
|
|
# TODO: Run announce at initialization
|
|
if __name__ == "__main__":
|
|
"""
|
|
Global objects for the PierMesh service and the TUI so we can terminate the associated processes later and configuration objects for the command line and config (.piermesh)
|
|
"""
|
|
logger = ""
|
|
passkey = input("Enter node decryption key: ")
|
|
# TODO: Check for PSK
|
|
# psk = input("Enter psk: ")
|
|
nodeOb = None
|
|
# NOTE: Command line argument parser, example of a proper configuration
|
|
# is in scripts/falin
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-d", "--device", help="Set transceiver device path (ex. /dev/ttyACM0)")
|
|
parser.add_argument("-p", "--port", help="Web UI server port, default: 5000")
|
|
# NOTE: Detetmines where your node's data is stored oj disk
|
|
parser.add_argument("-n", "--nickname", help="Node nickname (sets node data path)")
|
|
parser.add_argument(
|
|
"-s", "--startupDelay", help="Startup delay (useful for testing)"
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--override", help="Whether to override config file", default=False
|
|
)
|
|
# TODO: Fix
|
|
parser.add_argument("-x", "--showTUI",
|
|
help="Whether to show TUI (needs fix)", default=True)
|
|
parser.add_argument(
|
|
"-l",
|
|
"--confList",
|
|
help="Comma separated list of conf files to load for multi-node mode (TODO)",
|
|
default=False,
|
|
)
|
|
parser.add_argument(
|
|
"-k",
|
|
"--PSK",
|
|
help="Pre shared key for initializing communications",
|
|
default=False,
|
|
)
|
|
|
|
argConfig = parser.parse_args()
|
|
# NOTE: .piermesh conf parser, proper example is in .piermesh.example
|
|
config = configparser.ConfigParser()
|
|
if argConfig.confList != False:
|
|
pass
|
|
if not os.path.exists(".piermesh"):
|
|
print("No config at .piermesh, duplicating and loading example config...")
|
|
shutil.copyfile(".piermesh.example", ".piermesh")
|
|
config.read(".piermesh")
|
|
|
|
device = ""
|
|
if "transceiverPort" in config["OPERATOR_REQUIRED"]:
|
|
if argConfig.override:
|
|
device = argConfig.device
|
|
else:
|
|
device = config["OPERATOR_REQUIRED"]["transceiverPort"]
|
|
else:
|
|
if argConfig.device == False:
|
|
print("No device set exiting...")
|
|
exit(0)
|
|
else:
|
|
device = argConfig.device
|
|
webPort = config["DEFAULT"]["WebUIPort"]
|
|
if argConfig.override:
|
|
webPort = argConfig.port
|
|
else:
|
|
if "WebUIPort" in config["OPERATOR_OVERRIDES"]:
|
|
webPort = config["OPERATOR_OVERRIDES"]["WebUIPort"]
|
|
webPort = int(webPort)
|
|
delay = config["DEFAULT"]["StartupDelay"]
|
|
if argConfig.override:
|
|
delay = argConfig.startupDelay
|
|
else:
|
|
if "StartupDelay" in config["OPERATOR_OVERRIDES"]:
|
|
delay = config["OPERATOR_OVERRIDES"]["StartupDelay"]
|
|
delay = int(delay)
|
|
nodeNickname = config["DEFAULT"]["Nickname"]
|
|
if argConfig.override:
|
|
nodeNickname = argConfig.nickname
|
|
else:
|
|
if "Nickname" in config["OPERATOR_OVERRIDES"]:
|
|
nodeNickname = config["OPERATOR_OVERRIDES"]["Nickname"]
|
|
showTUI = config["DEFAULT"]["ShowTUI"]
|
|
if argConfig.override:
|
|
showTUI = argConfig.showTUI
|
|
else:
|
|
if "ShowTUI" in config["OPERATOR_OVERRIDES"]:
|
|
showTUI = config["OPERATOR_OVERRIDES"]["ShowTUI"]
|
|
showTUI = showTUI == "True"
|
|
psk = False
|
|
if argConfig.override:
|
|
if not argConfig.PSK:
|
|
print("No PSK set quitting...")
|
|
exit(0)
|
|
else:
|
|
psk = argConfig.PSK
|
|
# else:
|
|
if "PSK" in config["OPERATOR_REQUIRED"]:
|
|
psk = config["OPERATOR_REQUIRED"]["PSK"]
|
|
if delay > 0:
|
|
print("Staggering {0} seconds please wait".format(delay))
|
|
time.sleep(delay)
|
|
|
|
|
|
class Node:
|
|
"""
|
|
Class that handles most of the PierMesh data
|
|
|
|
`🔗 Source <https://git.utopic.work/PierMesh/piermesh/src/branch/main/src/run.py>`_
|
|
|
|
Attributes
|
|
----------
|
|
|
|
actions: dict
|
|
Dictionary mapping methods with the action prefix to the method name after action dynamically to be called through Sponge (`Sponge.base`) filtering
|
|
|
|
todo: list[dict]
|
|
List of actions to execute
|
|
|
|
network: `Network`
|
|
Network map
|
|
|
|
catch: `Catch`
|
|
Daisy cache for catchs, our domain analog
|
|
|
|
cache: `Cache`
|
|
Daisy cache for general use
|
|
|
|
nodeInfo: Daisy
|
|
Daisy (`Components.daisy.Daisy`) record containing some information about the node
|
|
|
|
onodeID: str
|
|
PierMesh node ID
|
|
|
|
oTransceiver: Transceiver
|
|
LoRa transceiver `Transceiver`
|
|
|
|
proc: psutil.Process
|
|
This process (`psutil.Process`), used for managing and monitoring PierMesh
|
|
|
|
mTasks: dict
|
|
Dictionary of PierMesh service tasks
|
|
|
|
"""
|
|
# isDone = False
|
|
def __init__(self):
|
|
actionsList = [f for f in dir(self) if "action" in f]
|
|
self.actions = {}
|
|
for a in actionsList:
|
|
self.actions[a.split("_")[1]] = getattr(self, a)
|
|
self.todo = []
|
|
logger.log(20, "Past action mapping")
|
|
self.network = Network()
|
|
self.daisyCryptography = SteelPetal(passkey)
|
|
# TODO: Better directory structure
|
|
self.catch = Catch(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/catch")
|
|
self.cache = Cache(self.daisyCryptography, walk=True, path=f"{nodeNickname}/daisy/cache")
|
|
du._json_to_msg("mlookup")
|
|
shutil.copy("mlookup", f"{nodeNickname}/daisy/cache/mlookup")
|
|
logger.info("Protocols:\n" + json.dumps(self.cache.get("mlookup").get(), indent=4))
|
|
self.remoteCatchIndex = Index(nodeNickname, self.daisyCryptography)
|
|
serverInfoFile = nodeNickname + ".info"
|
|
|
|
self.nodeInfo = self.cache.get(serverInfoFile)
|
|
if self.nodeInfo == False:
|
|
# TODO: Get/set toml
|
|
self.cache.create(
|
|
serverInfoFile, {"nodeID": random.randrange(0, 1000000)})
|
|
self.nodeInfo = self.cache.get(serverInfoFile)
|
|
self.network.addin(self.nodeInfo.get()["nodeID"])
|
|
logger.log(20, "Siph network stack initialized")
|
|
self.onodeID = str(self.nodeInfo.get()["nodeID"])
|
|
|
|
logger.log(20, "Cryptography initializing")
|
|
self.cryptographyInfo = Transport(
|
|
self.cache, nodeNickname, self.daisyCryptography, psk
|
|
)
|
|
logger.log(20, "Cryptography initialized")
|
|
|
|
self.sponge = Filter(self.cache, self.onodeID,
|
|
self.todo, self.cryptographyInfo)
|
|
logger.log(20, "Filter initialized")
|
|
# TODO: Run in thread
|
|
self.oTransceiver = Transceiver(
|
|
device,
|
|
self.sponge,
|
|
self.onodeID,
|
|
self.cache,
|
|
self.catch,
|
|
self.cryptographyInfo,
|
|
self.network
|
|
)
|
|
self.server = Server(
|
|
self.oTransceiver,
|
|
self.catch,
|
|
self.onodeID,
|
|
self.network,
|
|
self.cryptographyInfo,
|
|
self.remoteCatchIndex,
|
|
self.cache,
|
|
)
|
|
self.proc = psutil.Process(os.getpid())
|
|
self.mTasks = {}
|
|
|
|
async def main(self):
|
|
"""
|
|
Main loop, sets up the message listening, system monitoring and server running loops
|
|
"""
|
|
self.mTasks["list"] = asyncio.create_task(self.spongeListen())
|
|
await asyncio.sleep(1)
|
|
# self.mTasks["pct"] = asyncio.create_task(self.oTransceiver.progressCheck())
|
|
# await asyncio.sleep(1)
|
|
self.mTasks["mon"] = asyncio.create_task(self.monitor())
|
|
await asyncio.sleep(1)
|
|
self.mTasks["announce"] = asyncio.create_task(
|
|
self.oTransceiver.announce())
|
|
await asyncio.sleep(1)
|
|
await self.server.app.start_server(port=int(webPort))
|
|
|
|
async def fsInit(self):
|
|
"""
|
|
Initialize the file system for use
|
|
"""
|
|
# TODO: Flesh out and properly link everything
|
|
if not os.path.exists("data"):
|
|
os.makedirs("data")
|
|
if not os.path.exists("data/" + nodeNickname):
|
|
os.makedirs("data/" + nodeNickname)
|
|
|
|
async def monitor(self):
|
|
"""
|
|
Monitor and log ram and cpu usage
|
|
"""
|
|
global tuiOb, logger
|
|
while True:
|
|
if tuiOb is not None and showTUI:
|
|
await asyncio.sleep(10)
|
|
memmb = self.proc.memory_info().rss / (1024 * 1024)
|
|
memmb = round(memmb, 2)
|
|
cpup = self.proc.cpu_percent(interval=1)
|
|
logger.info(
|
|
" MEM: {0} mB | CPU: {1} %".format(
|
|
memmb,
|
|
cpup,
|
|
),
|
|
)
|
|
tuiOb.do_set_cpu_percent(float(cpup))
|
|
tuiOb.do_set_mem(memmb)
|
|
elif not showTUI:
|
|
await asyncio.sleep(10)
|
|
memmb = self.proc.memory_info().rss / (1024 * 1024)
|
|
memmb = round(memmb, 2)
|
|
cpup = self.proc.cpu_percent(interval=1)
|
|
logger.info(
|
|
" MEM: {0} mB | CPU: {1} %".format(
|
|
memmb,
|
|
cpup,
|
|
),
|
|
)
|
|
else:
|
|
logger.log(20, "No TUI object, waiting 5 seconds...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def spongeListen(self):
|
|
"""
|
|
Loop to watch for tasks to do
|
|
|
|
See Also
|
|
--------
|
|
Sponge.base.sieve: Packet filtering/parsing
|
|
|
|
Notes
|
|
-----
|
|
We use a common technique here that calls the function from our preloaded actions via dictionary entry
|
|
"""
|
|
global logger
|
|
while True:
|
|
while (len(self.todo) >= 1):
|
|
try:
|
|
logger.log(10, "In todo")
|
|
todoNow = self.todo[0]
|
|
action = todoNow.getAction()
|
|
logger.log(20, "Action: " + action)
|
|
data = todoNow.getData()
|
|
logger.log(20, "Doing action")
|
|
await self.actions[action](data)
|
|
logger.log(20, "After action")
|
|
except Exception:
|
|
logger.log(20, traceback.format_exc())
|
|
self.todo.pop()
|
|
await asyncio.sleep(1)
|
|
|
|
async def action_sendToPeer(self, data: dict):
|
|
"""
|
|
Send data to a peer connected to the server
|
|
|
|
Parameters
|
|
----------
|
|
data: dict
|
|
Data passed from the filter, this is a generic object so it's similar on all actions here
|
|
|
|
See Also
|
|
--------
|
|
Sponge.Protocols: Protocol based packet filtering
|
|
|
|
Splash.serve.Server: Runs a light Microdot web server with http/s and websocket functionality
|
|
|
|
Splash.serve.Server.sendToPeer: Function to actually execute the action
|
|
"""
|
|
logger.info(data)
|
|
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["data"], data["target"]))
|
|
await asyncio.sleep(1)
|
|
|
|
async def action_sendCatch(self, data: dict):
|
|
"""
|
|
Get catch and return the data to a peer
|
|
"""
|
|
# TODO: Seperators
|
|
if "fins" in data:
|
|
res = self.catch.get(data["head"], data["body"], fins=data["fins"])
|
|
else:
|
|
res = self.catch.get(data["head"], data["body"])
|
|
logger.info(res)
|
|
r = CatchResponse(
|
|
self.onodeID,
|
|
000000,
|
|
self.onodeID,
|
|
data["recipient"],
|
|
data["recipientNode"],
|
|
self.cryptographyInfo,
|
|
res,
|
|
pskEncrypt=True
|
|
)
|
|
|
|
await asyncio.create_task(self.oTransceiver.sendMessage(r))
|
|
await asyncio.sleep(1)
|
|
|
|
async def action_routeCatch(self, data: dict):
|
|
"""
|
|
Route received catch to peer who requested it
|
|
"""
|
|
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["html"], data["target"]))
|
|
await asyncio.sleep(1)
|
|
|
|
async def action_syncIndex(self, data: dict):
|
|
"""
|
|
Add received index entries to Catch via the a remote Catch index
|
|
"""
|
|
for entry in data["index"]:
|
|
self.remoteCatchIndex.addEntry(entry)
|
|
|
|
async def action_map(self, data: dict):
|
|
# TODO: Normalize node ids to strings
|
|
"""
|
|
Map new network data to internal network map
|
|
|
|
See Also
|
|
--------
|
|
Siph.network.Network: Layered graph network representation
|
|
"""
|
|
if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False:
|
|
logger.log(20, "In map")
|
|
await asyncio.create_task(self.network.addLookup(data["onodeID"], data["mnodeID"]))
|
|
await asyncio.sleep(1)
|
|
logger.log(20, "After add lookup")
|
|
logger.log(20, "Adding public key")
|
|
self.cryptographyInfo.addPublickey(data["onodeID"], data["publicKey"])
|
|
logger.log(20, "Public key addition done")
|
|
self.network.omap.add_node(data["onodeID"])
|
|
logger.log(20, "Added node to outer map")
|
|
else:
|
|
logger.log(20, "Node already exists skipping addition to map")
|
|
if "sessionKey" not in self.cryptographyInfo.getRecord("key", data["onodeID"]).keys():
|
|
if not data["dontRespond"]:
|
|
h = Handshake(self.nodeInfo.get()["nodeID"], self.nodeInfo.get()["nodeID"], data["onodeID"], data["onodeID"], self.cryptographyInfo, data["onodeID"], self.onodeID)
|
|
await asyncio.create_task(self.oTransceiver.sendMessage(h))
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
async def action_initCryptography(self, data: dict):
|
|
"""
|
|
Initialize AES-GCM encrypted transport session
|
|
|
|
See Also
|
|
--------
|
|
Cryptography.WhaleSong.Transport: End to end encryption functionality
|
|
"""
|
|
# TODO: Expiration
|
|
self.cryptographyInfo.sessionSetup(data["yctx"]["sourceNode"]["val"], data["ephemeralKey"])
|
|
logger.log(20, "Cryptography handshake complete")
|
|
|
|
async def action_hop(self, data):
|
|
"""
|
|
Proxy a request to the main internet (in the future cross protocol/link)
|
|
"""
|
|
try:
|
|
r = None
|
|
if data["method"] == "get":
|
|
r = hopper.get(
|
|
data["url"],
|
|
params=data["parameters"],
|
|
followTags=["img", "script", "link"],
|
|
)
|
|
elif data["method"] == "post":
|
|
r = hopper.post(data["url"], params=data["parameters"])
|
|
if r != None:
|
|
r = HopperResponse(
|
|
self.onodeID,
|
|
000000,
|
|
self.onodeID,
|
|
data["recipient"],
|
|
data["recipientNode"],
|
|
r,
|
|
self.cryptographyInfo,
|
|
)
|
|
await asyncio.create_task(self.oTransceiver.sendMessage(r))
|
|
await asyncio.sleep(1)
|
|
|
|
except:
|
|
logger.log(30, traceback.format_exc())
|
|
|
|
async def action_routeHop(self, data: dict):
|
|
"""
|
|
Return proxy request results to requester
|
|
"""
|
|
await asyncio.create_task(self.server.sendToPeer(data["recipient"], data["res"], data["target"]))
|
|
await asyncio.sleep(1)
|
|
|
|
async def action_addPSK(self, data):
|
|
"""
|
|
Action to add PSK for specific node, currently unused
|
|
"""
|
|
self.cryptographyInfo.createEmpty(data["nodeID"])
|
|
self.cryptographyInfo.update(data["nodeID"], {"PSK": data["PSK"]})
|
|
|
|
|
|
async def main():
|
|
"""
|
|
Kick on main method for running the PierMesh service
|
|
"""
|
|
try:
|
|
nodeOb = Node()
|
|
await nodeOb.main()
|
|
nodeOb.isDone = True
|
|
except:
|
|
logger.log(20, traceback.format_exc())
|
|
|
|
|
|
def initLogger(nodeName, tolog, tui=True):
|
|
"""
|
|
Initialize our logging setup, we use a custom logger when running the TUI to consume the logs
|
|
"""
|
|
global logger
|
|
logger = logging.getLogger(__name__)
|
|
logger.propagate = False
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
if tui:
|
|
vh = VHandler(logging.DEBUG, tolog)
|
|
|
|
vh.setFormatter(formatter)
|
|
|
|
logger.addHandler(vh)
|
|
else:
|
|
sh = logging.StreamHandler()
|
|
sh.setLevel(logging.DEBUG)
|
|
sh.setFormatter(formatter)
|
|
logger.addHandler(sh)
|
|
|
|
dt = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
fh = logging.FileHandler(f"logs/{nodeName}_{dt}.log")
|
|
fh.setFormatter(formatter)
|
|
|
|
logger.addHandler(fh)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
tolog = queue.Queue()
|
|
initLogger(nodeNickname, tolog, tui=showTUI)
|
|
mainThread = threading.Thread(target=uvloop.run, args=(main(),))
|
|
mainThread.start()
|
|
if showTUI:
|
|
tlog.runLogUI(tolog, nodeNickname)
|
|
else:
|
|
# TODO: Resource logging
|
|
print("No TUI")
|
|
except Exception:
|
|
print(traceback.format_exc())
|
|
os._exit(1)
|