Skip to content

Commit

Permalink
daemon patch
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 24, 2023
1 parent ed7f66f commit d9643c8
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 123 deletions.
8 changes: 7 additions & 1 deletion neetbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ def post_init():
project_name = get_module_level_config()["name"]
setproctitle.setproctitle(project_name)

from neetbox.daemon.client._connection import connection

# post init ws
connection._init_ws()


def init(path=None, load=False, **kwargs) -> bool:
if path:
Expand Down Expand Up @@ -63,7 +68,6 @@ def init(path=None, load=False, **kwargs) -> bool:
logger.ok(f"found workspace config from {config_file_path}.")
_try_attach_daemon() # try attach daemon
logger.debug(f"running post init...")
post_init()
return True
except Exception as e:
from neetbox.logging.logger import Logger
Expand All @@ -81,3 +85,5 @@ def init(path=None, load=False, **kwargs) -> bool:
success = init(load=True) # init from config file
if not success:
os._exit(255)
# run post init
post_init()
55 changes: 39 additions & 16 deletions neetbox/daemon/client/_connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import functools
import json
import logging
import time
from dataclasses import dataclass
from threading import Thread
from typing import Any, Callable, Optional
Expand All @@ -25,8 +27,8 @@
@dataclass
class WsMsg:
event_type: str
event_id: int = -1
payload: Any
event_id: int = -1

def json(self):
return {
Expand All @@ -45,6 +47,8 @@ class ClientConn(metaclass=Singleton):

def __init__(self) -> None:
cfg = get_module_level_config()
_root_config = get_module_level_config("@")
ClientConn._display_name = cfg["displayName"] or _root_config["name"]

def __load_http_client():
__local_http_client = httpx.Client(
Expand All @@ -62,40 +66,51 @@ def _init_ws():
cfg = get_module_level_config()

# ws server url
ClientConn.ws_server_addr = f"ws://{cfg['host']}/{CLIENT_API_ROOT}"
ClientConn.ws_server_addr = f"ws://{cfg['host']}:{cfg['port'] + 1}{CLIENT_API_ROOT}"

# todo wait for server online
# create websocket app
logger.log("creating websocket connection...")
logger.log(f"creating websocket connection to {ClientConn.ws_server_addr}")
# todo does run_forever reconnect after close?
websocket.WebSocketApp(
ClientConn.__ws_client = websocket.WebSocketApp(
ClientConn.ws_server_addr,
on_open=ClientConn.__on_ws_open,
on_message=ClientConn.__on_ws_message,
on_error=ClientConn.__on_ws_err,
on_close=ClientConn.__on_ws_close,
).run_forever(reconnect=True)
)

Thread(target=ClientConn.__ws_client.run_forever, kwargs={"reconnect": True}).start()

# assign self to websocket log writer
from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter

_assign_connection_to_WebSocketLogWriter(ClientConn)

def __on_ws_open(ws):
ClientConn.__ws_client = ws
logger.ok(f"client websocket {ws} connected")
def __on_ws_open(ws: websocket.WebSocketApp):
logger.ok(f"client websocket connected")
ws.send( # send handshake request
json.dumps(
{
"event-type": "handshake",
"payload": {"name": {ClientConn._display_name}, "who": "cli"},
"event-id": 0,
},
default=str,
)
)

def __on_ws_err(ws, msg):
logger.err(f"client websocket {ws} encountered {msg}")
def __on_ws_err(ws: websocket.WebSocketApp, msg):
logger.err(f"client websocket encountered {msg}")

def __on_ws_close(ws, close_status_code, close_msg):
logger.warn(f"client websocket {ws} closed")
def __on_ws_close(ws: websocket.WebSocketApp, close_status_code, close_msg):
logger.warn(f"client websocket closed")
if close_status_code or close_msg:
logger.warn(f"ws close status code: {close_status_code}")
logger.warn("ws close message: {close_msg}")
ClientConn.__ws_client = None

def __on_ws_message(ws, msg):
def __on_ws_message(ws: websocket.WebSocketApp, msg):
"""EXAMPLE JSON
{
"event-type": "action",
Expand All @@ -121,10 +136,18 @@ def __on_ws_message(ws, msg):
f"Subscriber {subscriber} crashed on message event {event_type_name}, ignoring."
)

def ws_send(msg):
logger.debug(f"ws sending {msg}")
def ws_send(event_type: str, payload):
logger.debug(f"ws sending {payload}")
if ClientConn.__ws_client: # if ws client exist
ClientConn.__ws_client.send(msg)
ClientConn.__ws_client.send(
json.dumps(
{
EVENT_TYPE_NAME_KEY: event_type,
EVENT_PAYLOAD_NAME_KEY: payload,
EVENT_ID_NAME_KEY: -1, # todo
}
)
)
else:
logger.debug("ws client not exist, message dropped.")

Expand Down
4 changes: 1 addition & 3 deletions neetbox/daemon/client/_daemon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
def _upload_thread(daemon_config, base_addr, display_name):
_ctr = 0
_api_name = "sync"
_api_addr = f"{base_addr}/{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_api_addr = f"{base_addr}{CLIENT_API_ROOT}/{_api_name}/{display_name}"
_disconnect_flag = False
_disconnect_retries = 10
while True:
Expand Down Expand Up @@ -83,8 +83,6 @@ def _check_daemon_alive(_api_addr):
try:
_check_daemon_alive(_base_addr)
logger.ok(f"daemon alive at {_base_addr}")
# post init ws
connection._init_ws()
except Exception as e:
logger.err(e)
return False
Expand Down
113 changes: 113 additions & 0 deletions neetbox/daemon/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# DAEMON readme

## WS message standard

websocke messages are described in json. There is a dataclass representing websocket message:

```python
@dataclass
class WsMsg:
event_type: str
payload: Any
event_id: int = -1

def json(self):
return {
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
EVENT_PAYLOAD_NAME_KEY: self.payload,
}
```

```json
{
"event-type" : ...,
"payload" : ...,
"event-id" : ...
}
```

| key | value type | description |
| :--------: | :--------: | :----------------------------------------------------: |
| event-type | string | indicate type of data in payload |
| payload | string | actual data |
| event-id | int | for events who need ack. default -1 means no event id. |

## Event types

the table is increasing. a frequent check would keep you up to date.

| event-type | accepting direction | means |
| :--------: | :---------------------------: | :----------------------------------------------------------: |
| handshake | cli <--> server <--> frontend | string in `payload` indicate connection type ('cli'/'web') |
| log | cli -> server -> frontend | `payload` contains log data |
| action | cli <- server <- frontend | `payload` contains action trigger |
| ack | cli <--> server <--> frontend | `payload` contains ack, and `event-id` should be a valid key |

## Examples of websocket data

### handshake

for instance, frontend connected to server. frontend should report connection type immediately by sending:

```json
{
"event-type": "handshake",
"payload": {
"name": "project name",
"who": "web"
},
"event-id": X
}
```

where `event-id` is used to send ack to the starter of the connection, it should be a random int value.

### cli sending log to frontend

cli sents log(s) via websocket, server will receives and broadcast this message to related frontends. cli should send:

```json
{
"event-type": "log",
"payload": {
"name" = "...",
"log" = {...json representing log data...}
},
"event-id": -1
}
```

where `event-id` is a useless segment, leave it default. it's okay if nobody receives log.

### frontend(s) querys action to cli

frontend send action request to server, and server will forwards the message to cli. frontend should send:

```json
{
"event-type" : "action",
"payload" : {...json representing action trigger...},
"event-id" : x
}
```

front may want to know the result of action. for example, whether the action was invoked successfully. therefore, `event-id` is necessary for cli to shape a ack response.

### cli acks frontend action query

cli execute action query(s) from frontend, and gives response by sending ack:

```json
{
"event-type" : "ack",
"payload" : {...json representing action result...},
"event-id" : x
}
```

where `event-id` is same as received action query.

---

Those are only examples. use them wisely.
Loading

0 comments on commit d9643c8

Please sign in to comment.