Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #68

Merged
merged 23 commits into from
Nov 24, 2023
Merged

Dev #68

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8388c4d
fix code style
AndPuQing Nov 21, 2023
89606dd
Merge branch 'dev' of https://github.com/visualDust/neetbox
AndPuQing Nov 23, 2023
0631211
Merge branch 'dev' of https://github.com/visualDust/neetbox into to-f…
AndPuQing Nov 23, 2023
16d1b70
Merge branch 'dev' of https://github.com/visualDust/neetbox into to-f…
AndPuQing Nov 23, 2023
f6e1fca
Refactor neetbox daemon module
AndPuQing Nov 23, 2023
b75b692
Merge branch 'dev' of https://github.com/visualDust/neetbox into to-f…
AndPuQing Nov 23, 2023
82fc817
Merge branch 'dev' into master
visualDust Nov 23, 2023
83013d4
Merge pull request #63 from AndPuQing/master
visualDust Nov 23, 2023
ebee16c
to fastapi
AndPuQing Nov 23, 2023
1418f54
Merge branch 'dev' of https://github.com/visualDust/neetbox into to-f…
AndPuQing Nov 23, 2023
3878637
Merge pull request #65 from AndPuQing/to-fastapi
visualDust Nov 23, 2023
5ba0659
Update README.md
AndPuQing Nov 23, 2023
432801b
Merge pull request #66 from AndPuQing/master
visualDust Nov 23, 2023
d733d62
patch work
visualDust Nov 23, 2023
8fc4fa7
logger patchwork
visualDust Nov 23, 2023
61b9f96
patch fix
visualDust Nov 24, 2023
a5c298c
fixed logger errors for previous patch works
visualDust Nov 24, 2023
d8599ef
patch work
visualDust Nov 24, 2023
ed7f66f
client patch
visualDust Nov 24, 2023
d9643c8
daemon patch
visualDust Nov 24, 2023
94d6d3c
now client side websocket works
visualDust Nov 24, 2023
b8f9014
fixed websockt thread not exiting with main thread
visualDust Nov 24, 2023
7bcc1bd
fixed logger style varies for same identity
visualDust Nov 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 100
extend-ignore = E203
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
~~一个自产自销的仓库~~ Logging/Debugging/Tracing/Managing/Facilitating your deep learning projects

A small part of the documentation at [neetbox.550w.host](https://neetbox.550w.host). (We are not ready for the doc yet)

## Star History

[![Star History Chart](https://api.star-history.com/svg?repos=visualDust/neetbox&type=Date)](https://star-history.com/#visualDust/neetbox&Date)
17 changes: 12 additions & 5 deletions neetbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from neetbox.daemon import _try_attach_daemon
from neetbox.utils.framing import get_frame_module_traceback

module = get_frame_module_traceback(1).__name__
module = get_frame_module_traceback(1).__name__ # type: ignore
config_file_name = f"{module}.toml"


Expand All @@ -18,6 +18,11 @@ def post_init():
project_name = get_module_level_config()["name"]
setproctitle.setproctitle(project_name)

from neetbox.daemon.client._connection import connection

# post init ws
connection._init_ws()


def init(path=None, load=False, **kwargs) -> bool:
if path:
Expand Down Expand Up @@ -60,16 +65,16 @@ def init(path=None, load=False, **kwargs) -> bool:
from neetbox.logging.logger import Logger

logger = Logger("NEETBOX") # builtin standalone logger
logger.ok(f"Loaded workspace config from {config_file_path}.")
logger.ok(f"found workspace config from {config_file_path}.")
_try_attach_daemon() # try attach daemon
post_init()
logger.debug(f"running post init...")
return True
except Exception as e:
from neetbox.logging.logger import Logger

logger = Logger("NEETBOX") # builtin standalone logger
logger.err(f"Failed to load config from {config_file_path}: {e}")
return False
logger.err(f"failed to load config from {config_file_path}: {e}")
raise e


is_in_daemon_process = (
Expand All @@ -80,3 +85,5 @@ def init(path=None, load=False, **kwargs) -> bool:
success = init(load=True) # init from config file
if not success:
os._exit(255)
# run post init
post_init()
Empty file added neetbox/core/packing.py
Empty file.
6 changes: 3 additions & 3 deletions neetbox/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import subprocess
import time

import neetbox
from neetbox.config import get_module_level_config
from neetbox.daemon.client._action_agent import _NeetActionManager as NeetActionManager
from neetbox.daemon.client._connection import connection
from neetbox.daemon.client._daemon_client import connect_daemon
from neetbox.daemon.server.daemonable_process import DaemonableProcess
from neetbox.logging import logger
Expand Down Expand Up @@ -84,4 +83,5 @@ def _try_attach_daemon():


action = NeetActionManager.register
__all__ = ["watch", "listen", "action", "NeetActionManager", "_try_attach_daemon"]
ws_subscribe = connection.ws_subscribe
__all__ = ["watch", "listen", "action", "ws_subscribe", "NeetActionManager", "_try_attach_daemon"]
71 changes: 71 additions & 0 deletions neetbox/daemon/_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import functools
import inspect
from ast import literal_eval
from threading import Thread
from typing import Callable, Optional

from neetbox.core import Registry
from neetbox.logging import logger
from neetbox.utils.mvc import Singleton


class PackedAction(Callable):
def __init__(self, function: Callable, name=None, **kwargs):
super().__init__(**kwargs)
self.function = function
self.name = name if name else function.__name__
self.argspec = inspect.getfullargspec(self.function)

def __call__(self, **argv):
self.function(argv)

def eval_call(self, params: dict):
eval_params = dict((k, literal_eval(v)) for k, v in params.items())
return self.function(**eval_params)


class _NeetAction(metaclass=Singleton):
__ACTION_POOL: Registry = Registry("__NEET_ACTIONS")

def register(
self,
*,
name: Optional[str] = None,
):
return functools.partial(self._register, name=name)

def _register(self, function: Callable, name: str = None):
packed = PackedAction(function=function, name=name)
_NeetAction.__ACTION_POOL._register(what=packed, name=packed.name, force=True)
return function

def get_actions(self):
action_names = _NeetAction.__ACTION_POOL.keys()
actions = {}
for n in action_names:
actions[n] = _NeetAction.__ACTION_POOL[n].argspec
return actions

def eval_call(self, name: str, params: dict):
if name not in _NeetAction.__ACTION_POOL:
logger.err(f"Could not find action with name {name}, action stopped.")
return False
return _NeetAction.__ACTION_POOL[name].eval_call(params)


# singleton
neet_action = _NeetAction()


# example
if __name__ == "__main__":

@neet_action.register(name="some")
def some(a, b):
print(a, b)

print("registered actions:")
print(neet_action.get_actions())

print("calling 'some")
neet_action.eval_call("some", {"a": "3", "b": "4"})
1 change: 1 addition & 0 deletions neetbox/daemon/client/_action_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def run_and_callback(target_action, params, callback):
Thread(
target=run_and_callback,
kwargs={"target_action": target_action, "params": params, "callback": callback},
daemon=True,
).start()
return None
else: # blocking run
Expand Down
2 changes: 1 addition & 1 deletion neetbox/daemon/client/_client_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def get_status_of(name=None):
name = name or ""
api_addr = f"{base_addr}/status"
api_addr = f"{base_addr}/web/list"
logger.info(f"Fetching from {api_addr}")
r = connection.http.get(api_addr)
_data = r.json()
Expand Down
118 changes: 105 additions & 13 deletions neetbox/daemon/client/_connection.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,54 @@
import asyncio
import functools
import json
import logging
from typing import Callable, Optional
import time
from dataclasses import dataclass
from threading import Thread
from typing import Any, Callable, Optional

import httpx
import websocket

from neetbox.config import get_module_level_config
from neetbox.core import Registry
from neetbox.daemon.server._server import CLIENT_API_ROOT
from neetbox.logging import logger
from neetbox.utils.mvc import Singleton

httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.ERROR)

EVENT_TYPE_NAME_KEY = "event-type"
EVENT_PAYLOAD_NAME_KEY = "payload"
EVENT_ID_NAME_KEY = "event-id"
NAME_NAME_KEY = "name"
PAYLOAD_NAME_KEY = "payload"


@dataclass
class WsMsg:
name: str
event_type: str
payload: Any
event_id: int = -1

def json(self):
return {
NAME_NAME_KEY: self.name,
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
PAYLOAD_NAME_KEY: self.payload,
}


# singleton
class ClientConn(metaclass=Singleton):
http: httpx.Client = None

__ws_client: None # _websocket_client
__ws_client: websocket.WebSocketApp = None # _websocket_client
__ws_subscription = Registry("__client_ws_subscription") # { event-type-name : list(Callable)}

def __init__(self) -> None:
cfg = get_module_level_config()

def __load_http_client():
__local_http_client = httpx.Client(
proxies={
Expand All @@ -38,9 +60,68 @@ def __load_http_client():

# create htrtp client
ClientConn.http = __load_http_client()
# todo establishing socket connection

def __on_ws_message(msg):
def _init_ws():
cfg = get_module_level_config()
_root_config = get_module_level_config("@")
ClientConn._display_name = cfg["displayName"] or _root_config["name"]

# ws server url
ClientConn.ws_server_addr = f"ws://{cfg['host']}:{cfg['port'] + 1}{CLIENT_API_ROOT}"

# create websocket app
logger.log(f"creating websocket connection to {ClientConn.ws_server_addr}")

ws = websocket.WebSocketApp(
ClientConn.ws_server_addr,
on_open=ClientConn.__on_ws_open,
on_message=ClientConn.__on_ws_message,
on_error=ClientConn.__on_ws_err,
on_close=ClientConn.__on_ws_close,
)

Thread(target=ws.run_forever, kwargs={"reconnect": True}, daemon=True).start()

# assign self to websocket log writer
from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter

_assign_connection_to_WebSocketLogWriter(ClientConn)

def __on_ws_open(ws: websocket.WebSocketApp):
_display_name = ClientConn._display_name
logger.ok(f"client websocket connected. sending handshake as '{_display_name}'...")
ws.send( # send handshake request
json.dumps(
{
NAME_NAME_KEY: {_display_name},
EVENT_TYPE_NAME_KEY: "handshake",
PAYLOAD_NAME_KEY: {"who": "cli"},
EVENT_ID_NAME_KEY: 0, # todo how does ack work
},
default=str,
)
)
logger.ok(f"handshake succeed.")
ClientConn.__ws_client = ws

def __on_ws_err(ws: websocket.WebSocketApp, msg):
logger.err(f"client websocket encountered {msg}")

def __on_ws_close(ws: websocket.WebSocketApp, close_status_code, close_msg):
logger.warn(f"client websocket closed")
if close_status_code or close_msg:
logger.warn(f"ws close status code: {close_status_code}")
logger.warn("ws close message: {close_msg}")
ClientConn.__ws_client = None

def __on_ws_message(ws: websocket.WebSocketApp, msg):
"""EXAMPLE JSON
{
"event-type": "action",
"event-id": 111 (optional?)
"payload": ...
}
"""
logger.debug(f"ws received {msg}")
# message should be json
event_type_name = msg[EVENT_TYPE_NAME_KEY]
Expand All @@ -50,17 +131,28 @@ def __on_ws_message(msg):
)
for subscriber in ClientConn._ws_subscribe[event_type_name]:
try:
subscriber(msg[EVENT_PAYLOAD_NAME_KEY]) # pass payload message into subscriber
subscriber(msg) # pass payload message into subscriber
except Exception as e:
# subscriber throws error
logger.err(
f"Subscriber {subscriber} crashed on message event {event_type_name}, ignoring."
)

def ws_send(msg):
logger.debug(f"ws sending {msg}")
# send to ws if ws is connected, otherwise drop message? idk
pass
def ws_send(event_type: str, payload):
logger.debug(f"ws sending {payload}")
if ClientConn.__ws_client: # if ws client exist
ClientConn.__ws_client.send(
json.dumps(
{
NAME_NAME_KEY: ClientConn._display_name,
EVENT_TYPE_NAME_KEY: event_type,
PAYLOAD_NAME_KEY: payload,
EVENT_ID_NAME_KEY: -1, # todo how does ack work
}
)
)
else:
logger.debug("ws client not exist, message dropped.")

def ws_subscribe(event_type_name: str):
"""let a function subscribe to ws messages with event type name.
Expand All @@ -81,5 +173,5 @@ def _ws_subscribe(function: Callable, event_type_name: str):


# singleton
ClientConn() # run init
ClientConn() # __init__ setup http client only
connection = ClientConn
6 changes: 2 additions & 4 deletions neetbox/daemon/client/_daemon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
def _upload_thread(daemon_config, base_addr, display_name):
_ctr = 0
_api_name = "sync"
_api_addr = f"{base_addr}/{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_api_addr = f"{base_addr}{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_disconnect_flag = False
_disconnect_retries = 10
while True:
Expand Down Expand Up @@ -91,9 +91,7 @@ def _check_daemon_alive(_api_addr):
global __upload_thread
if __upload_thread is None or not __upload_thread.is_alive():
__upload_thread = Thread(
target=_upload_thread,
daemon=True,
args=[cfg, _base_addr, _display_name],
target=_upload_thread, args=[cfg, _base_addr, _display_name], daemon=True
)
__upload_thread.start()

Expand Down
Loading