Skip to content

Commit

Permalink
Backend performance: delete stale connections (#2977)
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored Aug 5, 2024
1 parent 8b7a65d commit 7bb276b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 8 deletions.
5 changes: 3 additions & 2 deletions frontend/common/PlutoConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const create_ws_connection = (address, { on_message, on_socket_close }, timeout_

const send_encoded = (message) => {
const encoded = pack(message)
if (socket.readyState === WebSocket.CLOSED || socket.readyState === WebSocket.CLOSING) throw new Error("Socket is closed")
socket.send(encoded)
}

Expand Down Expand Up @@ -266,7 +267,7 @@ const default_ws_address = () => ws_address_from_base(window.location.href)
*
* @param {{
* on_unrequested_update: (message: PlutoMessage, by_me: boolean) => void,
* on_reconnect: () => boolean,
* on_reconnect: () => Promise<boolean>,
* on_connection_status: (connection_status: boolean, hopeless: boolean) => void,
* connect_metadata?: Object,
* ws_address?: String,
Expand Down Expand Up @@ -377,7 +378,7 @@ export const create_pluto_connection = async ({
await connect() // reconnect!

console.log(`Starting state sync`, new Date().toLocaleTimeString())
const accept = on_reconnect()
const accept = await on_reconnect()
console.log(`State sync ${accept ? "" : "not "}successful`, new Date().toLocaleTimeString())
on_connection_status(accept, false)
if (!accept) {
Expand Down
2 changes: 0 additions & 2 deletions frontend/components/CellInput.js
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,6 @@ export const CellInput = ({
if (show_static_fake) return
if (dom_node_ref.current == null) return

console.log("Rendering cell input", cell_id)

const keyMapSubmit = (/** @type {EditorView} */ cm) => {
autocomplete.closeCompletion(cm)
on_submit()
Expand Down
12 changes: 10 additions & 2 deletions frontend/components/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ patch: ${JSON.stringify(
backend_launch_phase: this.state.backend_launch_phase == null ? null : BackendLaunchPhase.ready,
})

// TODO Do this from julia itself
this.client.send("complete", { query: "sq" }, { notebook_id: this.state.notebook.notebook_id })
this.client.send("complete", { query: "\\sq" }, { notebook_id: this.state.notebook.notebook_id })

Expand Down Expand Up @@ -893,9 +892,18 @@ patch: ${JSON.stringify(
}
}

const on_reconnect = () => {
const on_reconnect = async () => {
console.warn("Reconnected! Checking states")

await this.client.send(
"reset_shared_state",
{},
{
notebook_id: this.state.notebook.notebook_id,
},
false
)

return true
}

Expand Down
2 changes: 1 addition & 1 deletion frontend/components/welcome/Welcome.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export const Welcome = ({ launch_params }) => {
const client_promise = create_pluto_connection({
on_unrequested_update: on_update,
on_connection_status: on_connection_status,
on_reconnect: () => true,
on_reconnect: async () => true,
ws_address: launch_params.pluto_server_url ? ws_address_from_base(launch_params.pluto_server_url) : undefined,
})
client_promise.then(async (client) => {
Expand Down
11 changes: 10 additions & 1 deletion src/webserver/WebServer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ function run!(session::ServerSession)
if HTTP.WebSockets.isclosed(clientstream)
return
end
found_client_id_ref = Ref(Symbol(:none))
try
for message in clientstream
# This stream contains data received over the WebSocket.
Expand All @@ -221,6 +222,9 @@ function run!(session::ServerSession)
end

did_read = true
if found_client_id_ref[] === :none
found_client_id_ref[] = Symbol(parentbody["client_id"])
end
process_ws_message(session, parentbody, clientstream)
catch ex
if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError
Expand All @@ -242,6 +246,11 @@ function run!(session::ServerSession)
bt = stacktrace(catch_backtrace())
@warn "Reading WebSocket client stream failed for unknown reason:" exception = (ex, bt)
end
finally
if haskey(session.connected_clients, found_client_id_ref[])
@debug "Removing client $(found_client_id_ref[]) from connected_clients"
delete!(session.connected_clients, found_client_id_ref[])
end
end
end
catch ex
Expand Down Expand Up @@ -379,7 +388,7 @@ end
"All messages sent over the WebSocket get decoded+deserialized and end up here."
function process_ws_message(session::ServerSession, parentbody::Dict, clientstream)
client_id = Symbol(parentbody["client_id"])
client = get!(session.connected_clients, client_id ) do
client = get!(session.connected_clients, client_id) do
ClientSession(client_id, clientstream, session.options.server.simulated_lag)
end
client.stream = clientstream # it might change when the same client reconnects
Expand Down

0 comments on commit 7bb276b

Please sign in to comment.