From 1ea70eb07aa5de470721ad40f3b74dd7232215a6 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 10 Sep 2024 22:18:51 +0200 Subject: [PATCH] use channel for inbox (#215) * use channel for inbox * remove errormonitor * fix websocket handler * close session * add actual check for threadid * fixes * fix some waiting for session hangs --- src/HTTPServer/browser-display.jl | 2 +- src/app.jl | 2 +- src/connection/ijulia.jl | 2 +- src/connection/websocket-handler.jl | 7 +------ src/display.jl | 2 +- src/session.jl | 14 ++++---------- src/types.jl | 22 ++++++++++++++++++++-- 7 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/HTTPServer/browser-display.jl b/src/HTTPServer/browser-display.jl index 57b323ab..dd1e787d 100644 --- a/src/HTTPServer/browser-display.jl +++ b/src/HTTPServer/browser-display.jl @@ -81,7 +81,7 @@ function Base.display(display::BrowserDisplay, app::App) if success handler.session.status = Bonito.DISPLAYED # if open_browser, we need to let the caller wait! - wait_for_ready(handler.session) + wait_for(()-> isready(handler.session)) wait_for_ready(app) end end diff --git a/src/app.jl b/src/app.jl index 85ddb069..7e383317 100644 --- a/src/app.jl +++ b/src/app.jl @@ -146,5 +146,5 @@ end function wait_for_ready(app::App; timeout=100) wait_for(()-> !isnothing(app.session[]); timeout=timeout) - wait_for_ready(app.session[]; timeout=timeout) + wait_for(()-> isready(app.session[]); timeout=timeout) end diff --git a/src/connection/ijulia.jl b/src/connection/ijulia.jl index e6cb99c7..27bee80f 100644 --- a/src/connection/ijulia.jl +++ b/src/connection/ijulia.jl @@ -69,7 +69,7 @@ function setup_connection(session::Session, ::Nothing) comm.on_msg = function (msg) data_b64 = msg.content["data"] bytes = $(Base64).base64decode(data_b64) - $(Bonito).process_message(session, bytes) + put!(session.inbox, bytes) end comm.on_close = (args...) -> close(session) end diff --git a/src/connection/websocket-handler.jl b/src/connection/websocket-handler.jl index 99670b2c..88727ffd 100644 --- a/src/connection/websocket-handler.jl +++ b/src/connection/websocket-handler.jl @@ -92,12 +92,7 @@ function run_connection_loop(session::Session, handler::WebSocketHandler, websoc bytes = safe_read(websocket) # nothing means the browser closed the connection so we're done isnothing(bytes) && break - try - process_message(session, bytes) - catch e - # Only print any internal error to not close the connection - @warn "error while processing received msg" exception = (e, Base.catch_backtrace()) - end + put!(session.inbox, bytes) end end diff --git a/src/display.jl b/src/display.jl index 53cb8493..333a206f 100644 --- a/src/display.jl +++ b/src/display.jl @@ -23,7 +23,7 @@ function Page(; exportable::Union{Bool,Nothing}=nothing, connection::Union{Nothing, FrontendConnection}=nothing, current_page_dir = abspath(pwd()), # For documenter server - server_config...) + server_config...) old_session = CURRENT_SESSION[] if !isempty(server_config) configure_server!(; server_config...) diff --git a/src/session.jl b/src/session.jl index 553a1d05..cd7cceec 100644 --- a/src/session.jl +++ b/src/session.jl @@ -20,17 +20,7 @@ Base.show(io::IO, ::MIME"text/plain", session::Session) = show_session(io, sessi Base.show(io::IO, session::Session) = show_session(io, session) function wait_for_ready(session::Session; timeout=100) - if isready(session) - return :success - end - session.status == CLOSED && return nothing - if session.status !== DISPLAYED - error("Session got not displayed yet, so waiting for it to become ready is futile. Status: $(session.status)") - end return wait_for(timeout=timeout) do - if !isnothing(session.init_error[]) - throw(session.init_error[]) - end return isready(session) end end @@ -143,6 +133,7 @@ function Base.close(session::Session) Observables.clear(session.on_close) session.current_app[] = nothing session.io_context[] = nothing + close(session.inbox) end return end @@ -192,6 +183,9 @@ end Base.isopen(session::Session) = isopen(session.connection) function Base.isready(session::Session) + if !isnothing(session.init_error[]) + throw(session.init_error[]) + end return isready(session.connection_ready) && isopen(session) end diff --git a/src/types.jl b/src/types.jl index d36030ad..53221a0b 100644 --- a/src/types.jl +++ b/src/types.jl @@ -189,6 +189,8 @@ mutable struct Session{Connection <: FrontendConnection} current_app::RefValue{Any} io_context::RefValue{Union{Nothing, IOContext}} stylesheets::Dict{HTMLElement, Set{CSS}} + inbox::Channel{Vector{UInt8}} + threadid::Int function Session( parent::Union{Session, Nothing}, @@ -210,7 +212,9 @@ mutable struct Session{Connection <: FrontendConnection} imports::OrderedSet{Asset}, title::String, compression_enabled::Bool, + n_inbox = Inf ) where {Connection} + inbox = Channel{Vector{UInt8}}(n_inbox) session = new{Connection}( UNINITIALIZED, time(), @@ -237,9 +241,23 @@ mutable struct Session{Connection <: FrontendConnection} Base.ReentrantLock(), RefValue{Any}(nothing), RefValue{Union{Nothing,IOContext}}(nothing), - Dict{HTMLElement,Set{CSS}}() + Dict{HTMLElement,Set{CSS}}(), + inbox, + Threads.threadid() ) - finalizer(close, session) + task = Task() do + for message in inbox + try + process_message(session, message) + catch e + @warn "error while processing received msg" exception = ( + e, Base.catch_backtrace() + ) + end + end + end + bind(inbox, task) + schedule(task) return session end end