From 7bb276b96938da9f3e4cb80de175abe987730d52 Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Mon, 5 Aug 2024 15:20:42 +0200 Subject: [PATCH] Backend performance: delete stale connections (#2977) --- frontend/common/PlutoConnection.js | 5 +++-- frontend/components/CellInput.js | 2 -- frontend/components/Editor.js | 12 ++++++++++-- frontend/components/welcome/Welcome.js | 2 +- src/webserver/WebServer.jl | 11 ++++++++++- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/frontend/common/PlutoConnection.js b/frontend/common/PlutoConnection.js index 8cacfd617a..e6735b1bbe 100644 --- a/frontend/common/PlutoConnection.js +++ b/frontend/common/PlutoConnection.js @@ -108,6 +108,7 @@ const create_ws_connection = (address, { on_message, on_socket_close }, timeout_ const send_encoded = (message) => { const encoded = pack(message) + if (socket.readyState === WebSocket.CLOSED || socket.readyState === WebSocket.CLOSING) throw new Error("Socket is closed") socket.send(encoded) } @@ -266,7 +267,7 @@ const default_ws_address = () => ws_address_from_base(window.location.href) * * @param {{ * on_unrequested_update: (message: PlutoMessage, by_me: boolean) => void, - * on_reconnect: () => boolean, + * on_reconnect: () => Promise, * on_connection_status: (connection_status: boolean, hopeless: boolean) => void, * connect_metadata?: Object, * ws_address?: String, @@ -377,7 +378,7 @@ export const create_pluto_connection = async ({ await connect() // reconnect! console.log(`Starting state sync`, new Date().toLocaleTimeString()) - const accept = on_reconnect() + const accept = await on_reconnect() console.log(`State sync ${accept ? "" : "not "}successful`, new Date().toLocaleTimeString()) on_connection_status(accept, false) if (!accept) { diff --git a/frontend/components/CellInput.js b/frontend/components/CellInput.js index 35773c436d..91065a7b9b 100644 --- a/frontend/components/CellInput.js +++ b/frontend/components/CellInput.js @@ -461,8 +461,6 @@ export const CellInput = ({ if (show_static_fake) return if (dom_node_ref.current == null) return - console.log("Rendering cell input", cell_id) - const keyMapSubmit = (/** @type {EditorView} */ cm) => { autocomplete.closeCompletion(cm) on_submit() diff --git a/frontend/components/Editor.js b/frontend/components/Editor.js index 85935512bd..b1a705eedd 100644 --- a/frontend/components/Editor.js +++ b/frontend/components/Editor.js @@ -861,7 +861,6 @@ patch: ${JSON.stringify( backend_launch_phase: this.state.backend_launch_phase == null ? null : BackendLaunchPhase.ready, }) - // TODO Do this from julia itself this.client.send("complete", { query: "sq" }, { notebook_id: this.state.notebook.notebook_id }) this.client.send("complete", { query: "\\sq" }, { notebook_id: this.state.notebook.notebook_id }) @@ -893,9 +892,18 @@ patch: ${JSON.stringify( } } - const on_reconnect = () => { + const on_reconnect = async () => { console.warn("Reconnected! Checking states") + await this.client.send( + "reset_shared_state", + {}, + { + notebook_id: this.state.notebook.notebook_id, + }, + false + ) + return true } diff --git a/frontend/components/welcome/Welcome.js b/frontend/components/welcome/Welcome.js index f836db7cd7..2f2c20acf8 100644 --- a/frontend/components/welcome/Welcome.js +++ b/frontend/components/welcome/Welcome.js @@ -68,7 +68,7 @@ export const Welcome = ({ launch_params }) => { const client_promise = create_pluto_connection({ on_unrequested_update: on_update, on_connection_status: on_connection_status, - on_reconnect: () => true, + on_reconnect: async () => true, ws_address: launch_params.pluto_server_url ? ws_address_from_base(launch_params.pluto_server_url) : undefined, }) client_promise.then(async (client) => { diff --git a/src/webserver/WebServer.jl b/src/webserver/WebServer.jl index a632fc685a..dbbfded50d 100644 --- a/src/webserver/WebServer.jl +++ b/src/webserver/WebServer.jl @@ -206,6 +206,7 @@ function run!(session::ServerSession) if HTTP.WebSockets.isclosed(clientstream) return end + found_client_id_ref = Ref(Symbol(:none)) try for message in clientstream # This stream contains data received over the WebSocket. @@ -221,6 +222,9 @@ function run!(session::ServerSession) end did_read = true + if found_client_id_ref[] === :none + found_client_id_ref[] = Symbol(parentbody["client_id"]) + end process_ws_message(session, parentbody, clientstream) catch ex if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError @@ -242,6 +246,11 @@ function run!(session::ServerSession) bt = stacktrace(catch_backtrace()) @warn "Reading WebSocket client stream failed for unknown reason:" exception = (ex, bt) end + finally + if haskey(session.connected_clients, found_client_id_ref[]) + @debug "Removing client $(found_client_id_ref[]) from connected_clients" + delete!(session.connected_clients, found_client_id_ref[]) + end end end catch ex @@ -379,7 +388,7 @@ end "All messages sent over the WebSocket get decoded+deserialized and end up here." function process_ws_message(session::ServerSession, parentbody::Dict, clientstream) client_id = Symbol(parentbody["client_id"]) - client = get!(session.connected_clients, client_id ) do + client = get!(session.connected_clients, client_id) do ClientSession(client_id, clientstream, session.options.server.simulated_lag) end client.stream = clientstream # it might change when the same client reconnects