Skip to content

Commit

Permalink
Merge pull request #39518 from JuliaLang/jn/detached-Task
Browse files Browse the repository at this point in the history
Add `errormonitor(::Task)` method to monitor for Task failures
  • Loading branch information
vtjnash authored Apr 2, 2021
2 parents 1339e46 + 291267d commit 3396d10
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 36 deletions.
12 changes: 7 additions & 5 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
@async exec_conn_func(w)
t = @async exec_conn_func(w)
else
# route request via node 1
@async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
end
end
Expand Down Expand Up @@ -242,10 +243,10 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
@async while isopen(sock)
errormonitor(@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end
end)
print(out, "julia_worker:") # print header
print(out, "$(string(LPROC.bind_port))#") # print port
print(out, LPROC.bind_addr)
Expand Down Expand Up @@ -274,7 +275,7 @@ end


function redirect_worker_output(ident, stream)
@async while !eof(stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand All @@ -284,6 +285,7 @@ function redirect_worker_output(ident, stream)
println(" From worker $(ident):\t$line")
end
end
errormonitor(t)
end

struct LaunchWorkerError <: Exception
Expand Down
3 changes: 2 additions & 1 deletion src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
@async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
end

function make_preduce_body(var, body)
Expand Down
19 changes: 6 additions & 13 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,15 @@ default_addprocs_params(::SSHManager) =
function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
# Launch one worker on each unique host in parallel. Additional workers are launched later.
# Wait for all launches to complete.
launch_tasks = Vector{Any}(undef, length(manager.machines))

for (i, (machine, cnt)) in enumerate(manager.machines)
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
launch_tasks[i] = @async try
launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
end
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
end
end
end

for t in launch_tasks
wait(t::Task)
end

notify(launch_ntfy)
end

Expand Down
26 changes: 12 additions & 14 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
@async run_work_thunk(rv, thunk)
errormonitor(@async run_work_thunk(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -111,7 +111,7 @@ end

## message event handlers ##
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
@async process_tcp_streams(r_stream, w_stream, incoming)
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Expand Down Expand Up @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake.
See also [`cluster_cookie`](@ref).
"""
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
@async message_handler_loop(r_stream, w_stream, incoming)
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
end

function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
Expand Down Expand Up @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
@async begin
errormonitor(@async begin
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
if isa(v, SyncTake)
try
Expand All @@ -285,18 +285,20 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
else
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
end
end
nothing
end)
end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
@async begin
errormonitor(@async begin
rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
end
nothing
end)
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)
errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true))
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down Expand Up @@ -330,8 +332,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
lazy = msg.lazy
PGRP.lazy = lazy

wait_tasks = Task[]
for (connect_at, rpid) in msg.other_workers
@sync for (connect_at, rpid) in msg.other_workers
wconfig = WorkerConfig()
wconfig.connect_at = connect_at

Expand All @@ -340,14 +341,11 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
else
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
@async connect_to_peer(cluster_manager, rpid, wconfig)
end
end
end

for wt in wait_tasks; Base.wait(wt); end

send_connection_hdr(controller, false)
send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
end
Expand Down
6 changes: 3 additions & 3 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ or to use a local [`Channel`](@ref) as a proxy:
```julia
p = 1
f = Future(p)
@async put!(f, remotecall_fetch(long_computation, p))
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
```
"""
Expand Down Expand Up @@ -249,10 +249,10 @@ end

const any_gc_flag = Condition()
function start_gc_msgs_task()
@async while true
errormonitor(@async while true
wait(any_gc_flag)
flush_gc_msgs()
end
end)
end

function send_del_client(rr)
Expand Down

0 comments on commit 3396d10

Please sign in to comment.