From d08c3d8b8c67a2c950fcfe2a0855669f003d6fd3 Mon Sep 17 00:00:00 2001 From: SimonDanisch Date: Fri, 20 Oct 2023 15:49:49 +0200 Subject: [PATCH] make evaljs_value threadsafe --- js_dependencies/JSServe.bundled.js | 8 +++--- js_dependencies/JSServe.js | 5 +++- js_dependencies/Sessions.js | 2 +- src/connection/websocket.jl | 5 +++- src/session.jl | 43 +++++++++++++++++++----------- src/types.jl | 13 ++++----- test/subsessions.jl | 15 ++++------- 7 files changed, 54 insertions(+), 37 deletions(-) diff --git a/js_dependencies/JSServe.bundled.js b/js_dependencies/JSServe.bundled.js index 7ca0a636..df733867 100644 --- a/js_dependencies/JSServe.bundled.js +++ b/js_dependencies/JSServe.bundled.js @@ -3957,6 +3957,7 @@ const mod1 = { OBJECT_FREEING_LOCK: OBJECT_FREEING_LOCK, lock_loading: lock_loading, lookup_global_object: lookup_global_object, + free_object: free_object, track_deleted_sessions: track_deleted_sessions, done_initializing_session: done_initializing_session, init_session: init_session, @@ -4026,7 +4027,7 @@ function onany(observables, f) { } const { send_error: send_error1 , send_warning: send_warning1 , process_message: process_message1 , on_connection_open: on_connection_open1 , on_connection_close: on_connection_close1 , send_close_session: send_close_session1 , send_pingpong: send_pingpong1 , can_send_to_julia: can_send_to_julia1 } = mod; const { base64decode: base64decode1 , base64encode: base64encode1 , decode_binary: decode_binary1 , encode_binary: encode_binary1 , decode_base64_message: decode_base64_message1 } = mod2; -const { init_session: init_session1 , free_session: free_session1 , lookup_global_object: lookup_global_object1 , update_or_replace: update_or_replace1 , lock_loading: lock_loading1 , OBJECT_FREEING_LOCK: OBJECT_FREEING_LOCK1 } = mod1; +const { init_session: init_session1 , free_session: free_session1 , lookup_global_object: lookup_global_object1 , update_or_replace: update_or_replace1 , lock_loading: lock_loading1 , OBJECT_FREEING_LOCK: OBJECT_FREEING_LOCK1 , free_object: free_object1 } = mod1; function update_node_attribute(node, attribute, value) { if (node) { if (node[attribute] != value) { @@ -4079,8 +4080,9 @@ const JSServe = { update_or_replace: update_or_replace1, OBJECT_FREEING_LOCK: OBJECT_FREEING_LOCK1, can_send_to_julia: can_send_to_julia1, - onany + onany, + free_object: free_object1 }; window.JSServe = JSServe; -export { mod2 as Protocol, base64decode1 as base64decode, base64encode1 as base64encode, decode_binary1 as decode_binary, encode_binary1 as encode_binary, decode_base64_message1 as decode_base64_message, mod as Connection, send_error1 as send_error, send_warning1 as send_warning, process_message1 as process_message, on_connection_open1 as on_connection_open, on_connection_close1 as on_connection_close, send_close_session1 as send_close_session, send_pingpong1 as send_pingpong, mod1 as Sessions, init_session1 as init_session, free_session1 as free_session, lock_loading1 as lock_loading, update_node_attribute as update_node_attribute, update_dom_node as update_dom_node, lookup_global_object1 as lookup_global_object, update_or_replace1 as update_or_replace, onany as onany, OBJECT_FREEING_LOCK1 as OBJECT_FREEING_LOCK, can_send_to_julia1 as can_send_to_julia }; +export { mod2 as Protocol, base64decode1 as base64decode, base64encode1 as base64encode, decode_binary1 as decode_binary, encode_binary1 as encode_binary, decode_base64_message1 as decode_base64_message, mod as Connection, send_error1 as send_error, send_warning1 as send_warning, process_message1 as process_message, on_connection_open1 as on_connection_open, on_connection_close1 as on_connection_close, send_close_session1 as send_close_session, send_pingpong1 as send_pingpong, mod1 as Sessions, init_session1 as init_session, free_session1 as free_session, lock_loading1 as lock_loading, update_node_attribute as update_node_attribute, update_dom_node as update_dom_node, lookup_global_object1 as lookup_global_object, update_or_replace1 as update_or_replace, onany as onany, OBJECT_FREEING_LOCK1 as OBJECT_FREEING_LOCK, can_send_to_julia1 as can_send_to_julia, free_object1 as free_object }; diff --git a/js_dependencies/JSServe.js b/js_dependencies/JSServe.js index 00dab089..4907cc80 100644 --- a/js_dependencies/JSServe.js +++ b/js_dependencies/JSServe.js @@ -29,6 +29,7 @@ const { update_or_replace, lock_loading, OBJECT_FREEING_LOCK, + free_object, } = Sessions; function update_node_attribute(node, attribute, value) { @@ -92,7 +93,8 @@ const JSServe = { update_or_replace, OBJECT_FREEING_LOCK, can_send_to_julia, - onany + onany, + free_object, }; // @ts-ignore @@ -126,4 +128,5 @@ export { onany, OBJECT_FREEING_LOCK, can_send_to_julia, + free_object, }; diff --git a/js_dependencies/Sessions.js b/js_dependencies/Sessions.js index 99f1a7ce..654e3651 100644 --- a/js_dependencies/Sessions.js +++ b/js_dependencies/Sessions.js @@ -50,7 +50,7 @@ function is_still_referenced(id) { return false; } -function free_object(id) { +export function free_object(id) { const data = GLOBAL_OBJECT_CACHE[id]; if (data) { if (data instanceof Promise) { diff --git a/src/connection/websocket.jl b/src/connection/websocket.jl index 0e73c2e2..13e4c6ec 100644 --- a/src/connection/websocket.jl +++ b/src/connection/websocket.jl @@ -93,7 +93,10 @@ function run_connection_loop(server::Server, session::Session, connection::WebSo bytes = save_read(websocket) # nothing means the browser closed the connection so we're done isnothing(bytes) && break - try + # Needs to be async to not block websocket read loop if events + # messages being processed are blocking, which happens + # Easily with on(some_longer_processing, obs) + @async try process_message(session, bytes) catch e # Only print any internal error to not close the connection diff --git a/src/session.jl b/src/session.jl index 8501be57..0695cfd7 100644 --- a/src/session.jl +++ b/src/session.jl @@ -247,28 +247,41 @@ function evaljs_value(session::Session, js; error_on_closed=true, timeout=10.0) It may unblock, if the browser is still connecting and opening the session later on. If this is expected, you may try setting `error_on_closed=false`") end - comm = root.js_comm - comm[] = nothing - js_with_result = js""" - try{ - const maybe_promise = $(js); - // support returning a promise: - Promise.resolve(maybe_promise).then(result=> { - $(comm).notify({result}); - }) - }catch(e){ - $(comm).notify({error: e.toString()}); + # For each request we need a new observable to have this thread safe + # And multiple request not waiting on the same observable + comm = Observable{Any}(nothing) + js_with_result = js"""{ + const comm = $(comm); + try{ + const maybe_promise = $(js); + // support returning a promise: + Promise.resolve(maybe_promise).then(result=> { + comm.notify({result}); + }) + }catch(e){ + comm.notify({error: e.toString()}); + } finally { + // manually free!! + JSServe.free_object(comm.id); + } } """ - evaljs(session, js_with_result) # TODO, have an on error callback, that triggers when evaljs goes wrong # (e.g. because of syntax error that isn't caught by the above try catch!) # TODO do this with channels, but we still dont have a way to timeout for wait(channel)... so... - wait_for(()-> !isnothing(comm[]); timeout=timeout) - value = comm[] + value = nothing + # Nothing should be deleted while we wait! + lock(root.deletion_lock) do + wait_for(timeout=timeout) do + return !isnothing(comm[]) + end + value = comm[] + # manually free observable, since it exists outside session lifetimes + delete!(session.session_objects, comm.id) + delete!(root.session_objects, comm.id) + end Observables.clear(comm) # cleanup - comm[] = nothing if isnothing(value) error("Timed out") end diff --git a/src/types.jl b/src/types.jl index a8c94e30..b1115d28 100644 --- a/src/types.jl +++ b/src/types.jl @@ -239,13 +239,14 @@ end function Session(parent_session::Session; asset_server=similar(parent_session.asset_server), on_connection_ready=init_session, title=parent_session.title) - root = root_session(parent_session) - connection = SubConnection(root) - session = Session(connection; asset_server=asset_server, on_connection_ready=on_connection_ready, title=title) - session.parent = parent_session - parent_session.children[session.id] = session - return session + return lock(root.deletion_lock) do + connection = SubConnection(root) + session = Session(connection; asset_server=asset_server, on_connection_ready=on_connection_ready, title=title) + session.parent = parent_session + parent_session.children[session.id] = session + return session + end end mutable struct App diff --git a/test/subsessions.jl b/test/subsessions.jl index 36dad128..1faeb9bc 100644 --- a/test/subsessions.jl +++ b/test/subsessions.jl @@ -125,23 +125,18 @@ edisplay = JSServe.use_electron_display(devtools=true) end -@testset "js comm" begin +@testset "cleanup comm" begin app = App() do s return DOM.div() end display(edisplay, app) @test !isnothing(app.session[]) @test isready(app.session[]) - @test evaljs_value(app.session[], js"Object.keys(JSServe.Sessions.SESSIONS).length") == 2 + @test run(edisplay.window, "Object.keys(JSServe.Sessions.SESSIONS).length") == 2 root = JSServe.root_session(app.session[]) @test root !== app.session[] - @test evaljs_value(app.session[], js"Object.keys(JSServe.Sessions.GLOBAL_OBJECT_CACHE).length") == 1 - @test evaljs_value(app.session[], js"Object.keys(JSServe.Sessions.GLOBAL_OBJECT_CACHE)[0]") == root.js_comm.id - @test length(root.session_objects) == 1 - @test haskey(root.session_objects, root.js_comm.id) + @test run(edisplay.window, "Object.keys(JSServe.Sessions.GLOBAL_OBJECT_CACHE).length") == 0 + @test isempty(root.session_objects) close(app.session[]) - @test evaljs_value(root, js"Object.keys(JSServe.Sessions.GLOBAL_OBJECT_CACHE).length") == 1 - @test evaljs_value(root, js"Object.keys(JSServe.Sessions.GLOBAL_OBJECT_CACHE)[0]") == root.js_comm.id - @test evaljs_value(root, js"Object.keys(JSServe.Sessions.SESSIONS).length") == 1 - @test isempty(root.js_comm.listeners) + @test run(edisplay.window, "Object.keys(JSServe.Sessions.SESSIONS).length") == 1 end