Skip to content

Commit

Permalink
websocket server multiplexing (#275)
Browse files Browse the repository at this point in the history
* in progress

* websocket multiplexing

* better nodes
  • Loading branch information
Brendonovich authored Jan 8, 2024
1 parent 1137670 commit 92f98a5
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 51 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/desktop/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ base64 = "0.21.4"
async-stream = "0.3.5"
tower-http = { version = "0.4.4", features = ["cors"] }
opener = "0.6.1"
streamunordered = "0.5.3"

[features]
# this feature is used for production builds or when `devPath` points to the filesystem
Expand Down
72 changes: 54 additions & 18 deletions apps/desktop/src-tauri/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,25 @@ impl DerefMut for WebSocketShutdown {

#[derive(Default)]
pub struct Ctx {
senders: Mutex<BTreeMap<u16, mpsc::Sender<String>>>,
senders: Mutex<BTreeMap<u16, Arc<Mutex<BTreeMap<u8, mpsc::Sender<String>>>>>>,
}

fn random_id<T>(map: &BTreeMap<u8, T>) -> u8 {
let mut i = 0;

loop {
if !map.contains_key(&i) {
break;
}

if i == u8::MAX {
panic!("No more ids available");
}

i += 1;
}

i
}

pub fn router() -> AlphaRouter<super::Ctx> {
Expand All @@ -54,18 +72,20 @@ pub fn router() -> AlphaRouter<super::Ctx> {

let addr = SocketAddr::from(([127, 0, 0, 1], port));

let (sender_tx, sender_rx) = mpsc::channel(64);
let (receiver_tx, mut receiver_rx) = mpsc::channel(64);
let (receiver_tx, mut receiver_rx) = mpsc::channel::<(u8, Message)>(16);

ctx.ws.senders.lock().await.insert(port, sender_tx);
let sender_txs = {
let mut senders = ctx.ws.senders.lock().await;
senders.entry(port).or_default().clone()
};

let server = axum::Server::bind(&addr)
.serve(
axum::Router::new()
.route("/", get(ws_handler))
.with_state(WsState {
receiver_tx,
sender_rx: Arc::new(Mutex::new(sender_rx)),
sender_txs,
shutdown_rx: WebSocketShutdown(ws_shutdown_rx),
})
.into_make_service(),
Expand Down Expand Up @@ -95,34 +115,49 @@ pub fn router() -> AlphaRouter<super::Ctx> {
#[specta(inline)]
struct Args {
port: u16,
client: u8,
data: String,
}

|ctx, Args { port, data }: Args| async move {
|ctx, Args { port, client, data }: Args| async move {
let senders = ctx.ws.senders.lock().await;

if let Some(sender) = senders.get(&port) {
sender.send(data).await.ok();
}
let Some(clients) = senders.get(&port) else {
return;
};

let clients = clients.lock().await;
let Some(client) = clients.get(&client) else {
return;
};

client.send(data).await.ok();
}
}),
)
}

#[derive(Clone)]
struct WsState {
sender_rx: Arc<Mutex<mpsc::Receiver<String>>>,
receiver_tx: mpsc::Sender<Message>,
sender_txs: Arc<Mutex<BTreeMap<u8, mpsc::Sender<String>>>>,
receiver_tx: mpsc::Sender<(u8, Message)>,
shutdown_rx: WebSocketShutdown,
}

async fn ws_handler(ws: WebSocketUpgrade, State(state): State<WsState>) -> Response {
let Ok(mut sender_rx) = state.sender_rx.clone().try_lock_owned() else {
return "Connection already established".into_response();
let (id, send_rx) = {
let mut clients = state.sender_txs.lock().await;

let (send_tx, send_rx) = mpsc::channel(16);

let id = random_id(&clients);
clients.insert(id, send_tx);

(id, send_rx)
};

ws.on_upgrade(move |socket| async move {
handle_socket(socket, state, &mut sender_rx).await;
handle_socket(socket, state, id, send_rx).await;
})
}

Expand All @@ -140,9 +175,10 @@ async fn handle_socket(
mut shutdown_rx,
..
}: WsState,
sender_rx: &mut mpsc::Receiver<String>,
id: u8,
mut sender_rx: mpsc::Receiver<String>,
) {
receiver_tx.send(Message::Connected).await.ok();
receiver_tx.send((id, Message::Connected)).await.ok();

loop {
tokio::select! {
Expand All @@ -154,7 +190,7 @@ async fn handle_socket(
if let Ok(msg) = msg {
match msg {
ws::Message::Text(t) => {
receiver_tx.send(Message::Text(t)).await.ok();
receiver_tx.send((id, Message::Text(t))).await.ok();
}
ws::Message::Close(_) => {
break;
Expand All @@ -168,5 +204,5 @@ async fn handle_socket(
};
}

receiver_tx.send(Message::Disconnected).await.ok();
receiver_tx.send((id, Message::Disconnected)).await.ok();
}
8 changes: 6 additions & 2 deletions apps/desktop/src/app.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { Core, RefreshedOAuthToken, createWsProvider } from "@macrograph/runtime";
import {
Core,
RefreshedOAuthToken,
createWsProvider,
} from "@macrograph/runtime";
import { Interface } from "@macrograph/interface";
import * as pkgs from "@macrograph/packages";
import { convertFileSrc } from "@tauri-apps/api/tauri";
Expand Down Expand Up @@ -51,7 +55,7 @@ export default function () {
async sendMessage(data) {
return client.mutation([
"websocket.send",
{ port: data.port, data: data.data },
{ port: data.port, client: data.client, data: data.data },
]);
},
});
Expand Down
4 changes: 2 additions & 2 deletions apps/desktop/src/rspc/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ export type Procedures = {
queries:
{ key: "fs.list", input: string, result: Entry[] },
mutations:
{ key: "websocket.send", input: { port: number; data: string }, result: null },
{ key: "websocket.send", input: { port: number; client: number; data: string }, result: null },
subscriptions:
{ key: "oauth.authorize", input: string, result: any | null } |
{ key: "websocket.server", input: number, result: Message }
{ key: "websocket.server", input: number, result: [number, Message] }
};

export type Message = { Text: string } | "Connected" | "Disconnected"
Expand Down
1 change: 0 additions & 1 deletion apps/desktop/tsconfig.tsbuildinfo

This file was deleted.

1 change: 0 additions & 1 deletion interface/tsconfig.tsbuildinfo

This file was deleted.

1 change: 0 additions & 1 deletion packages/json/tsconfig.tsbuildinfo

This file was deleted.

20 changes: 14 additions & 6 deletions packages/packages/src/streamdeck/ctx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@ export function createCtx(ws: WsProvider<unknown>, onEvent: OnEvent<Events>) {
try {
setState({ type: "Starting" });

const [connected, setConnected] = createSignal(false);
const [connectedClient, setConnectedClient] = createSignal<null | number>(
null
);
localStorage.setItem(SDWS, port.toString());

const server = await ws.startServer(port, (msg) => {
if (msg === "Connected") setConnected(true);
else if (msg === "Disconnected") setConnected(false);
else {
const server = await ws.startServer(port, ([client, msg]) => {
if (msg === "Connected" && connectedClient() === null)
setConnectedClient(client);
else if (msg === "Disconnected" && client === connectedClient())
setConnectedClient(null);
else if (
typeof msg === "object" &&
"Text" in msg &&
client === connectedClient()
) {
const parsed = MESSAGE.parse(JSON.parse(msg.Text));

onEvent({ name: parsed.event, data: parsed.payload });
Expand All @@ -81,7 +89,7 @@ export function createCtx(ws: WsProvider<unknown>, onEvent: OnEvent<Events>) {
setState({
type: "Running",
port,
connected,
connected: () => connectedClient() !== null,
async stop() {
await ws.stopServer(server);
localStorage.removeItem(SDWS);
Expand Down
4 changes: 1 addition & 3 deletions packages/packages/src/websocketServer/Settings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ export default ({ websockets, startServer, stopServer }: Ctx) => {
<span>{key}</span>
</td>
<td>
<span>
{value.hasConnection ? "Connected" : "Disconnected"}
</span>
<span>{value.connections.size} Connections</span>
</td>
<td>
<Button onClick={() => stopServer(key)}>Remove</Button>
Expand Down
22 changes: 13 additions & 9 deletions packages/packages/src/websocketServer/ctx.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { OnEvent, WsProvider } from "@macrograph/runtime";
import { Maybe } from "@macrograph/typesystem";
import { ReactiveMap } from "@solid-primitives/map";
import { ReactiveSet } from "@solid-primitives/set";

export const WS_PORTS_LOCALSTORAGE = "wsPorts";

export type ConnectionState = {
hasConnection: boolean;
connections: ReactiveSet<number>;
server: any;
};

Expand All @@ -23,17 +24,20 @@ export function createCtx(ws: WsProvider<unknown>, onEvent: OnEvent) {
});

async function startServer(port: number) {
const server = await ws.startServer(port, (msg) => {
let websocketData = websockets.get(port);
const server = await ws.startServer(port, ([client, msg]) => {
const websocketData = websockets.get(port);
if (!websocketData) return;

if (msg === "Connected")
websockets.set(port, { ...websocketData, hasConnection: true });
else if (msg === "Disconnected")
websockets.set(port, { ...websocketData, hasConnection: false });
else onEvent({ name: "wsEvent", data: { data: msg.Text, port: port } });
if (msg === "Connected") websocketData.connections.add(client);
else if (msg === "Disconnected") websocketData.connections.delete(client);
else
onEvent({
name: "wsEvent",
data: { data: msg.Text, client, port: port },
});
});
websockets.set(port, { hasConnection: false, server });

websockets.set(port, { connections: new ReactiveSet(), server });

localStorage.setItem(
WS_PORTS_LOCALSTORAGE,
Expand Down
16 changes: 14 additions & 2 deletions packages/packages/src/websocketServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ export function pkg<TServer>(ws: WsProvider<TServer>) {
return {
port: io.dataInput({
id: "port",
name: "WS port",
name: "Port",
type: t.int(),
}),
client: io.dataInput({
id: "client",
name: "Client",
type: t.int(),
}),
data: io.dataInput({
Expand All @@ -32,6 +37,7 @@ export function pkg<TServer>(ws: WsProvider<TServer>) {
run({ ctx, io }) {
ws.sendMessage({
port: ctx.getInput(io.port),
client: ctx.getInput(io.client),
data: ctx.getInput(io.data),
});
},
Expand All @@ -47,7 +53,12 @@ export function pkg<TServer>(ws: WsProvider<TServer>) {
}),
port: io.dataOutput({
id: "port",
name: "WS Port",
name: "Port",
type: t.int(),
}),
client: io.dataOutput({
id: "client",
name: "Client",
type: t.int(),
}),
data: io.dataOutput({
Expand All @@ -59,6 +70,7 @@ export function pkg<TServer>(ws: WsProvider<TServer>) {
},
run({ ctx, data, io }) {
ctx.setOutput(io.port, data.port);
ctx.setOutput(io.client, data.client);
ctx.setOutput(io.data, data.data);
ctx.exec(io.exec);
},
Expand Down
1 change: 0 additions & 1 deletion packages/packages/tsconfig.tsbuildinfo

This file was deleted.

11 changes: 9 additions & 2 deletions packages/runtime/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ export function makePersisted<T>(
export type WsMessage = "Connected" | "Disconnected" | { Text: string };

export interface WsProvider<TServer> {
startServer(port: number, cb: (text: WsMessage) => void): Promise<TServer>;
startServer(
port: number,
cb: (text: [number, WsMessage]) => void
): Promise<TServer>;
stopServer(server: TServer): Promise<void>;
sendMessage(data: { data: string; port: number }): Promise<null>;
sendMessage(data: {
data: string;
port: number;
client: number;
}): Promise<null>;
}

export function createWsProvider<T>(p: WsProvider<T>) {
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/tsconfig.tsbuildinfo

This file was deleted.

1 change: 0 additions & 1 deletion packages/typesystem/tsconfig.tsbuildinfo

This file was deleted.

1 change: 0 additions & 1 deletion packages/ui/tsconfig.tsbuildinfo

This file was deleted.

1 comment on commit 92f98a5

@vercel
Copy link

@vercel vercel bot commented on 92f98a5 Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.