Skip to content

Commit

Permalink
Merge pull request #68 from visualDust/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
visualDust authored Nov 24, 2023
2 parents 9467782 + 7bcc1bd commit b7e59bd
Show file tree
Hide file tree
Showing 23 changed files with 927 additions and 549 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 100
extend-ignore = E203
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
~~一个自产自销的仓库~~ Logging/Debugging/Tracing/Managing/Facilitating your deep learning projects

A small part of the documentation at [neetbox.550w.host](https://neetbox.550w.host). (We are not ready for the doc yet)

## Star History

[![Star History Chart](https://api.star-history.com/svg?repos=visualDust/neetbox&type=Date)](https://star-history.com/#visualDust/neetbox&Date)
17 changes: 12 additions & 5 deletions neetbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from neetbox.daemon import _try_attach_daemon
from neetbox.utils.framing import get_frame_module_traceback

module = get_frame_module_traceback(1).__name__
module = get_frame_module_traceback(1).__name__ # type: ignore
config_file_name = f"{module}.toml"


Expand All @@ -18,6 +18,11 @@ def post_init():
project_name = get_module_level_config()["name"]
setproctitle.setproctitle(project_name)

from neetbox.daemon.client._connection import connection

# post init ws
connection._init_ws()


def init(path=None, load=False, **kwargs) -> bool:
if path:
Expand Down Expand Up @@ -60,16 +65,16 @@ def init(path=None, load=False, **kwargs) -> bool:
from neetbox.logging.logger import Logger

logger = Logger("NEETBOX") # builtin standalone logger
logger.ok(f"Loaded workspace config from {config_file_path}.")
logger.ok(f"found workspace config from {config_file_path}.")
_try_attach_daemon() # try attach daemon
post_init()
logger.debug(f"running post init...")
return True
except Exception as e:
from neetbox.logging.logger import Logger

logger = Logger("NEETBOX") # builtin standalone logger
logger.err(f"Failed to load config from {config_file_path}: {e}")
return False
logger.err(f"failed to load config from {config_file_path}: {e}")
raise e


is_in_daemon_process = (
Expand All @@ -80,3 +85,5 @@ def init(path=None, load=False, **kwargs) -> bool:
success = init(load=True) # init from config file
if not success:
os._exit(255)
# run post init
post_init()
Empty file added neetbox/core/packing.py
Empty file.
6 changes: 3 additions & 3 deletions neetbox/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import subprocess
import time

import neetbox
from neetbox.config import get_module_level_config
from neetbox.daemon.client._action_agent import _NeetActionManager as NeetActionManager
from neetbox.daemon.client._connection import connection
from neetbox.daemon.client._daemon_client import connect_daemon
from neetbox.daemon.server.daemonable_process import DaemonableProcess
from neetbox.logging import logger
Expand Down Expand Up @@ -84,4 +83,5 @@ def _try_attach_daemon():


action = NeetActionManager.register
__all__ = ["watch", "listen", "action", "NeetActionManager", "_try_attach_daemon"]
ws_subscribe = connection.ws_subscribe
__all__ = ["watch", "listen", "action", "ws_subscribe", "NeetActionManager", "_try_attach_daemon"]
71 changes: 71 additions & 0 deletions neetbox/daemon/_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import functools
import inspect
from ast import literal_eval
from threading import Thread
from typing import Callable, Optional

from neetbox.core import Registry
from neetbox.logging import logger
from neetbox.utils.mvc import Singleton


class PackedAction(Callable):
def __init__(self, function: Callable, name=None, **kwargs):
super().__init__(**kwargs)
self.function = function
self.name = name if name else function.__name__
self.argspec = inspect.getfullargspec(self.function)

def __call__(self, **argv):
self.function(argv)

def eval_call(self, params: dict):
eval_params = dict((k, literal_eval(v)) for k, v in params.items())
return self.function(**eval_params)


class _NeetAction(metaclass=Singleton):
__ACTION_POOL: Registry = Registry("__NEET_ACTIONS")

def register(
self,
*,
name: Optional[str] = None,
):
return functools.partial(self._register, name=name)

def _register(self, function: Callable, name: str = None):
packed = PackedAction(function=function, name=name)
_NeetAction.__ACTION_POOL._register(what=packed, name=packed.name, force=True)
return function

def get_actions(self):
action_names = _NeetAction.__ACTION_POOL.keys()
actions = {}
for n in action_names:
actions[n] = _NeetAction.__ACTION_POOL[n].argspec
return actions

def eval_call(self, name: str, params: dict):
if name not in _NeetAction.__ACTION_POOL:
logger.err(f"Could not find action with name {name}, action stopped.")
return False
return _NeetAction.__ACTION_POOL[name].eval_call(params)


# singleton
neet_action = _NeetAction()


# example
if __name__ == "__main__":

@neet_action.register(name="some")
def some(a, b):
print(a, b)

print("registered actions:")
print(neet_action.get_actions())

print("calling 'some")
neet_action.eval_call("some", {"a": "3", "b": "4"})
1 change: 1 addition & 0 deletions neetbox/daemon/client/_action_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def run_and_callback(target_action, params, callback):
Thread(
target=run_and_callback,
kwargs={"target_action": target_action, "params": params, "callback": callback},
daemon=True,
).start()
return None
else: # blocking run
Expand Down
2 changes: 1 addition & 1 deletion neetbox/daemon/client/_client_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def get_status_of(name=None):
name = name or ""
api_addr = f"{base_addr}/status"
api_addr = f"{base_addr}/web/list"
logger.info(f"Fetching from {api_addr}")
r = connection.http.get(api_addr)
_data = r.json()
Expand Down
118 changes: 105 additions & 13 deletions neetbox/daemon/client/_connection.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,54 @@
import asyncio
import functools
import json
import logging
from typing import Callable, Optional
import time
from dataclasses import dataclass
from threading import Thread
from typing import Any, Callable, Optional

import httpx
import websocket

from neetbox.config import get_module_level_config
from neetbox.core import Registry
from neetbox.daemon.server._server import CLIENT_API_ROOT
from neetbox.logging import logger
from neetbox.utils.mvc import Singleton

httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.ERROR)

EVENT_TYPE_NAME_KEY = "event-type"
EVENT_PAYLOAD_NAME_KEY = "payload"
EVENT_ID_NAME_KEY = "event-id"
NAME_NAME_KEY = "name"
PAYLOAD_NAME_KEY = "payload"


@dataclass
class WsMsg:
name: str
event_type: str
payload: Any
event_id: int = -1

def json(self):
return {
NAME_NAME_KEY: self.name,
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
PAYLOAD_NAME_KEY: self.payload,
}


# singleton
class ClientConn(metaclass=Singleton):
http: httpx.Client = None

__ws_client: None # _websocket_client
__ws_client: websocket.WebSocketApp = None # _websocket_client
__ws_subscription = Registry("__client_ws_subscription") # { event-type-name : list(Callable)}

def __init__(self) -> None:
cfg = get_module_level_config()

def __load_http_client():
__local_http_client = httpx.Client(
proxies={
Expand All @@ -38,9 +60,68 @@ def __load_http_client():

# create htrtp client
ClientConn.http = __load_http_client()
# todo establishing socket connection

def __on_ws_message(msg):
def _init_ws():
cfg = get_module_level_config()
_root_config = get_module_level_config("@")
ClientConn._display_name = cfg["displayName"] or _root_config["name"]

# ws server url
ClientConn.ws_server_addr = f"ws://{cfg['host']}:{cfg['port'] + 1}{CLIENT_API_ROOT}"

# create websocket app
logger.log(f"creating websocket connection to {ClientConn.ws_server_addr}")

ws = websocket.WebSocketApp(
ClientConn.ws_server_addr,
on_open=ClientConn.__on_ws_open,
on_message=ClientConn.__on_ws_message,
on_error=ClientConn.__on_ws_err,
on_close=ClientConn.__on_ws_close,
)

Thread(target=ws.run_forever, kwargs={"reconnect": True}, daemon=True).start()

# assign self to websocket log writer
from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter

_assign_connection_to_WebSocketLogWriter(ClientConn)

def __on_ws_open(ws: websocket.WebSocketApp):
_display_name = ClientConn._display_name
logger.ok(f"client websocket connected. sending handshake as '{_display_name}'...")
ws.send( # send handshake request
json.dumps(
{
NAME_NAME_KEY: {_display_name},
EVENT_TYPE_NAME_KEY: "handshake",
PAYLOAD_NAME_KEY: {"who": "cli"},
EVENT_ID_NAME_KEY: 0, # todo how does ack work
},
default=str,
)
)
logger.ok(f"handshake succeed.")
ClientConn.__ws_client = ws

def __on_ws_err(ws: websocket.WebSocketApp, msg):
logger.err(f"client websocket encountered {msg}")

def __on_ws_close(ws: websocket.WebSocketApp, close_status_code, close_msg):
logger.warn(f"client websocket closed")
if close_status_code or close_msg:
logger.warn(f"ws close status code: {close_status_code}")
logger.warn("ws close message: {close_msg}")
ClientConn.__ws_client = None

def __on_ws_message(ws: websocket.WebSocketApp, msg):
"""EXAMPLE JSON
{
"event-type": "action",
"event-id": 111 (optional?)
"payload": ...
}
"""
logger.debug(f"ws received {msg}")
# message should be json
event_type_name = msg[EVENT_TYPE_NAME_KEY]
Expand All @@ -50,17 +131,28 @@ def __on_ws_message(msg):
)
for subscriber in ClientConn._ws_subscribe[event_type_name]:
try:
subscriber(msg[EVENT_PAYLOAD_NAME_KEY]) # pass payload message into subscriber
subscriber(msg) # pass payload message into subscriber
except Exception as e:
# subscriber throws error
logger.err(
f"Subscriber {subscriber} crashed on message event {event_type_name}, ignoring."
)

def ws_send(msg):
logger.debug(f"ws sending {msg}")
# send to ws if ws is connected, otherwise drop message? idk
pass
def ws_send(event_type: str, payload):
logger.debug(f"ws sending {payload}")
if ClientConn.__ws_client: # if ws client exist
ClientConn.__ws_client.send(
json.dumps(
{
NAME_NAME_KEY: ClientConn._display_name,
EVENT_TYPE_NAME_KEY: event_type,
PAYLOAD_NAME_KEY: payload,
EVENT_ID_NAME_KEY: -1, # todo how does ack work
}
)
)
else:
logger.debug("ws client not exist, message dropped.")

def ws_subscribe(event_type_name: str):
"""let a function subscribe to ws messages with event type name.
Expand All @@ -81,5 +173,5 @@ def _ws_subscribe(function: Callable, event_type_name: str):


# singleton
ClientConn() # run init
ClientConn() # __init__ setup http client only
connection = ClientConn
6 changes: 2 additions & 4 deletions neetbox/daemon/client/_daemon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
def _upload_thread(daemon_config, base_addr, display_name):
_ctr = 0
_api_name = "sync"
_api_addr = f"{base_addr}/{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_api_addr = f"{base_addr}{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_disconnect_flag = False
_disconnect_retries = 10
while True:
Expand Down Expand Up @@ -91,9 +91,7 @@ def _check_daemon_alive(_api_addr):
global __upload_thread
if __upload_thread is None or not __upload_thread.is_alive():
__upload_thread = Thread(
target=_upload_thread,
daemon=True,
args=[cfg, _base_addr, _display_name],
target=_upload_thread, args=[cfg, _base_addr, _display_name], daemon=True
)
__upload_thread.start()

Expand Down
Loading

1 comment on commit b7e59bd

@vercel
Copy link

@vercel vercel bot commented on b7e59bd Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.