Skip to content

Commit

Permalink
now client side websocket works
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 24, 2023
1 parent d9643c8 commit 94d6d3c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
28 changes: 17 additions & 11 deletions neetbox/daemon/client/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@

EVENT_TYPE_NAME_KEY = "event-type"
EVENT_ID_NAME_KEY = "event-id"
EVENT_PAYLOAD_NAME_KEY = "payload"
NAME_NAME_KEY = "name"
PAYLOAD_NAME_KEY = "payload"


@dataclass
class WsMsg:
name: str
event_type: str
payload: Any
event_id: int = -1

def json(self):
return {
NAME_NAME_KEY: self.name,
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
EVENT_PAYLOAD_NAME_KEY: self.payload,
PAYLOAD_NAME_KEY: self.payload,
}


Expand Down Expand Up @@ -72,33 +75,37 @@ def _init_ws():
# create websocket app
logger.log(f"creating websocket connection to {ClientConn.ws_server_addr}")
# todo does run_forever reconnect after close?
ClientConn.__ws_client = websocket.WebSocketApp(
ws = websocket.WebSocketApp(
ClientConn.ws_server_addr,
on_open=ClientConn.__on_ws_open,
on_message=ClientConn.__on_ws_message,
on_error=ClientConn.__on_ws_err,
on_close=ClientConn.__on_ws_close,
)

Thread(target=ClientConn.__ws_client.run_forever, kwargs={"reconnect": True}).start()
Thread(target=ws.run_forever, kwargs={"reconnect": True}).start()

# assign self to websocket log writer
from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter

_assign_connection_to_WebSocketLogWriter(ClientConn)

def __on_ws_open(ws: websocket.WebSocketApp):
logger.ok(f"client websocket connected")
_display_name = ClientConn._display_name
logger.ok(f"client websocket connected. sending handshake as '{_display_name}'...")
ws.send( # send handshake request
json.dumps(
{
"event-type": "handshake",
"payload": {"name": {ClientConn._display_name}, "who": "cli"},
"event-id": 0,
NAME_NAME_KEY: {_display_name},
EVENT_TYPE_NAME_KEY: "handshake",
PAYLOAD_NAME_KEY: {"who": "cli"},
EVENT_ID_NAME_KEY: 0, # todo how does ack work
},
default=str,
)
)
logger.ok(f"handshake succeed.")
ClientConn.__ws_client = ws

def __on_ws_err(ws: websocket.WebSocketApp, msg):
logger.err(f"client websocket encountered {msg}")
Expand All @@ -119,8 +126,6 @@ def __on_ws_message(ws: websocket.WebSocketApp, msg):
}
"""
logger.debug(f"ws received {msg}")
# ack to sender
ws.send(WsMsg(event_type="ack", payload="0"))
# message should be json
event_type_name = msg[EVENT_TYPE_NAME_KEY]
if event_type_name not in ClientConn.__ws_subscription:
Expand All @@ -142,8 +147,9 @@ def ws_send(event_type: str, payload):
ClientConn.__ws_client.send(
json.dumps(
{
NAME_NAME_KEY: ClientConn._display_name,
EVENT_TYPE_NAME_KEY: event_type,
EVENT_PAYLOAD_NAME_KEY: payload,
PAYLOAD_NAME_KEY: payload,
EVENT_ID_NAME_KEY: -1, # todo
}
)
Expand Down
18 changes: 12 additions & 6 deletions neetbox/daemon/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class WsMsg:
return {
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
EVENT_PAYLOAD_NAME_KEY: self.payload,
PAYLOAD_NAME_KEY: self.payload,
}
```

Expand Down Expand Up @@ -53,8 +53,8 @@ for instance, frontend connected to server. frontend should report connection ty
```json
{
"event-type": "handshake",
"name": "project name",
"payload": {
"name": "project name",
"who": "web"
},
"event-id": X
Expand All @@ -70,9 +70,9 @@ cli sents log(s) via websocket, server will receives and broadcast this message
```json
{
"event-type": "log",
"name": "project name",
"payload": {
"name" = "...",
"log" = {...json representing log data...}
"log" : {...json representing log data...}
},
"event-id": -1
}
Expand All @@ -87,7 +87,10 @@ frontend send action request to server, and server will forwards the message to
```json
{
"event-type" : "action",
"payload" : {...json representing action trigger...},
"name": "project name",
"payload" : {
"action" : {...json representing action trigger...}
},
"event-id" : x
}
```
Expand All @@ -101,7 +104,10 @@ cli execute action query(s) from frontend, and gives response by sending ack:
```json
{
"event-type" : "ack",
"payload" : {...json representing action result...},
"name": "project name",
"payload" : {
"action" : {...json representing action result...}
},
"event-id" : x
}
```
Expand Down
46 changes: 27 additions & 19 deletions neetbox/daemon/server/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@

EVENT_TYPE_NAME_KEY = "event-type"
EVENT_ID_NAME_KEY = "event-id"
EVENT_PAYLOAD_NAME_KEY = "payload"
PAYLOAD_NAME_KEY = "payload"
NAME_NAME_KEY = "name"


@dataclass
class WsMsg:
name: str
event_type: str
payload: Any
event_id: int = -1

def json(self):
return {
NAME_NAME_KEY: self.name,
EVENT_TYPE_NAME_KEY: self.event_type,
EVENT_ID_NAME_KEY: self.event_id,
EVENT_PAYLOAD_NAME_KEY: self.payload,
PAYLOAD_NAME_KEY: self.payload,
}


Expand Down Expand Up @@ -74,7 +77,7 @@ def ws_send(self):
# websocket server
ws_server = WebsocketServer(port=cfg["port"] + 1)
__BRIDGES = {} # manage connections
connected_clients: Dict(str, Tuple(str, str)) = {} # {sid:(name,type)} store connection only
connected_clients: Dict(int, Tuple(str, str)) = {} # {cid:(name,type)} store connection only

# ======================== WS SERVER ===========================

Expand Down Expand Up @@ -129,24 +132,28 @@ def handle_ws_connect(client, server):
print(f"client {client} connected. waiting for assigning...")

def handle_ws_disconnect(client, server):
name, conn_type = connected_clients[request.sid]
# remove sid from Client entity
if conn_type == "cli": # remove client sid from Client
__BRIDGES[name].cli_ws = None
else:
__BRIDGES[name].web_ws_list.remove(request.sid)
del connected_clients[request.sid]
_project_name, _who = connected_clients[client["id"]]
if _who == "cli": # remove client from Bridge
__BRIDGES[_project_name].cli_ws = None
else: # remove frontend from Bridge
_new_web_ws_list = [
c for c in __BRIDGES[_project_name].web_ws_list if c["id"] != client["id"]
]
__BRIDGES[_project_name].web_ws_list = _new_web_ws_list
del connected_clients[client["id"]]
print(f"a {_who} disconnected with id {client['id']}")
# logger.info(f"Websocket ({conn_type}) for {name} disconnected")

def handle_ws_message(client, server: WebsocketServer, message):
message = json.loads(message)
print(message) # debug
# handle event-type
_event_type = message["event-type"]
_payload = message["payload"]
_event_id = message["event-id"]
_event_type = message[EVENT_TYPE_NAME_KEY]
_payload = message[PAYLOAD_NAME_KEY]
_event_id = message[EVENT_ID_NAME_KEY]
_project_name = message[NAME_NAME_KEY]
if _event_type == "handshake": # handle handshake
# assign this client to a Bridge
_project_name = _payload["name"]
_who = _payload["who"]
if _who == "web":
# new connection from frontend
Expand All @@ -158,18 +165,19 @@ def handle_ws_message(client, server: WebsocketServer, message):
event_type="ack",
event_id=_event_id,
payload={"result": "404", "reason": "name not found"},
),
).json(),
)
else: # assign web to bridge
_target_bridge = __BRIDGES[_project_name]
_target_bridge.web_ws_list.append(client)
connected_clients[client["id"]] = (_project_name, "web")
server.send_message(
client=client,
msg=WsMsg(
event_type="ack",
event_id=_event_id,
payload={"result": "200", "reason": "join success"},
),
).json(),
)
elif _who == "cli":
# new connection from cli
Expand All @@ -178,26 +186,26 @@ def handle_ws_message(client, server: WebsocketServer, message):
_target_bridge = Bridge(name=_project_name) # create new bridge for this name
__BRIDGES[_project_name] = _target_bridge
__BRIDGES[_project_name].cli_ws = client # assign cli to bridge
connected_clients[client["id"]] = (_project_name, "web")
server.send_message(
client=client,
msg=WsMsg(
name="_project_name",
event_type="ack",
event_id=_event_id,
payload={"result": "200", "reason": "join success"},
),
).json(),
)

elif _event_type == "log": # handle log
# forward log to frontend
_project_name = _payload["name"]
if _project_name not in __BRIDGES:
# project name must exist
# drop anyway if not exist
return
else:
# forward to frontends
_target_bridge = __BRIDGES[_project_name]
_log_data = _payload["log"]
for web_ws in _target_bridge.web_ws_list:
server.send_message(
client=web_ws, msg=message
Expand Down

0 comments on commit 94d6d3c

Please sign in to comment.