Skip to content

Commit

Permalink
use channel for inbox (#215)
Browse files Browse the repository at this point in the history
* use channel for inbox

* remove errormonitor

* fix websocket handler

* close session

* add actual check for threadid

* fixes

* fix some waiting for session hangs
  • Loading branch information
SimonDanisch authored Sep 10, 2024
1 parent e58743f commit 1ea70eb
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/HTTPServer/browser-display.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ function Base.display(display::BrowserDisplay, app::App)
if success
handler.session.status = Bonito.DISPLAYED
# if open_browser, we need to let the caller wait!
wait_for_ready(handler.session)
wait_for(()-> isready(handler.session))
wait_for_ready(app)
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/app.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ end

function wait_for_ready(app::App; timeout=100)
wait_for(()-> !isnothing(app.session[]); timeout=timeout)
wait_for_ready(app.session[]; timeout=timeout)
wait_for(()-> isready(app.session[]); timeout=timeout)
end
2 changes: 1 addition & 1 deletion src/connection/ijulia.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ function setup_connection(session::Session, ::Nothing)
comm.on_msg = function (msg)
data_b64 = msg.content["data"]
bytes = $(Base64).base64decode(data_b64)
$(Bonito).process_message(session, bytes)
put!(session.inbox, bytes)
end
comm.on_close = (args...) -> close(session)
end
Expand Down
7 changes: 1 addition & 6 deletions src/connection/websocket-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ function run_connection_loop(session::Session, handler::WebSocketHandler, websoc
bytes = safe_read(websocket)
# nothing means the browser closed the connection so we're done
isnothing(bytes) && break
try
process_message(session, bytes)
catch e
# Only print any internal error to not close the connection
@warn "error while processing received msg" exception = (e, Base.catch_backtrace())
end
put!(session.inbox, bytes)
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/display.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function Page(;
exportable::Union{Bool,Nothing}=nothing,
connection::Union{Nothing, FrontendConnection}=nothing,
current_page_dir = abspath(pwd()), # For documenter server
server_config...)
server_config...)
old_session = CURRENT_SESSION[]
if !isempty(server_config)
configure_server!(; server_config...)
Expand Down
14 changes: 4 additions & 10 deletions src/session.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@ Base.show(io::IO, ::MIME"text/plain", session::Session) = show_session(io, sessi
Base.show(io::IO, session::Session) = show_session(io, session)

function wait_for_ready(session::Session; timeout=100)
if isready(session)
return :success
end
session.status == CLOSED && return nothing
if session.status !== DISPLAYED
error("Session got not displayed yet, so waiting for it to become ready is futile. Status: $(session.status)")
end
return wait_for(timeout=timeout) do
if !isnothing(session.init_error[])
throw(session.init_error[])
end
return isready(session)
end
end
Expand Down Expand Up @@ -143,6 +133,7 @@ function Base.close(session::Session)
Observables.clear(session.on_close)
session.current_app[] = nothing
session.io_context[] = nothing
close(session.inbox)
end
return
end
Expand Down Expand Up @@ -192,6 +183,9 @@ end
Base.isopen(session::Session) = isopen(session.connection)

function Base.isready(session::Session)
if !isnothing(session.init_error[])
throw(session.init_error[])
end
return isready(session.connection_ready) && isopen(session)
end

Expand Down
22 changes: 20 additions & 2 deletions src/types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ mutable struct Session{Connection <: FrontendConnection}
current_app::RefValue{Any}
io_context::RefValue{Union{Nothing, IOContext}}
stylesheets::Dict{HTMLElement, Set{CSS}}
inbox::Channel{Vector{UInt8}}
threadid::Int

function Session(
parent::Union{Session, Nothing},
Expand All @@ -210,7 +212,9 @@ mutable struct Session{Connection <: FrontendConnection}
imports::OrderedSet{Asset},
title::String,
compression_enabled::Bool,
n_inbox = Inf
) where {Connection}
inbox = Channel{Vector{UInt8}}(n_inbox)
session = new{Connection}(
UNINITIALIZED,
time(),
Expand All @@ -237,9 +241,23 @@ mutable struct Session{Connection <: FrontendConnection}
Base.ReentrantLock(),
RefValue{Any}(nothing),
RefValue{Union{Nothing,IOContext}}(nothing),
Dict{HTMLElement,Set{CSS}}()
Dict{HTMLElement,Set{CSS}}(),
inbox,
Threads.threadid()
)
finalizer(close, session)
task = Task() do
for message in inbox
try
process_message(session, message)
catch e
@warn "error while processing received msg" exception = (
e, Base.catch_backtrace()
)
end
end
end
bind(inbox, task)
schedule(task)
return session
end
end
Expand Down

0 comments on commit 1ea70eb

Please sign in to comment.