diff --git a/src/cluster.jl b/src/cluster.jl index 7329e1b91d37b6..482e7da44390d3 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -160,11 +160,12 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - @async exec_conn_func(w) + t = @async exec_conn_func(w) else # route request via node 1 - @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end + errormonitor(t) wait_for_conn(w) end end @@ -242,10 +243,10 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std else sock = listen(interface, LPROC.bind_port) end - @async while isopen(sock) + errormonitor(@async while isopen(sock) client = accept(sock) process_messages(client, client, true) - end + end) print(out, "julia_worker:") # print header print(out, "$(string(LPROC.bind_port))#") # print port print(out, LPROC.bind_addr) @@ -274,7 +275,7 @@ end function redirect_worker_output(ident, stream) - @async while !eof(stream) + t = @async while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -284,6 +285,7 @@ function redirect_worker_output(ident, stream) println(" From worker $(ident):\t$line") end end + errormonitor(t) end struct LaunchWorkerError <: Exception diff --git a/src/macros.jl b/src/macros.jl index b53890017d4dea..6603d627c34092 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -279,9 +279,10 @@ function preduce(reducer, f, R) end function pfor(f, R) - @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end + errormonitor(t) end function make_preduce_body(var, body) diff --git a/src/managers.jl b/src/managers.jl index 3519259190fbc3..ce99d85801e176 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -158,22 +158,15 @@ default_addprocs_params(::SSHManager) = function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition) # Launch one worker on each unique host in parallel. Additional workers are launched later. # Wait for all launches to complete. - launch_tasks = Vector{Any}(undef, length(manager.machines)) - - for (i, (machine, cnt)) in enumerate(manager.machines) + @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - launch_tasks[i] = @async try - launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) - catch e - print(stderr, "exception launching on machine $(machine) : $(e)\n") - end + @async try + launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) + catch e + print(stderr, "exception launching on machine $(machine) : $(e)\n") + end end end - - for t in launch_tasks - wait(t::Task) - end - notify(launch_ntfy) end diff --git a/src/process_messages.jl b/src/process_messages.jl index 3216a4e1c73c6e..8d5dac5af571e9 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -78,7 +78,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - @async run_work_thunk(rv, thunk) + errormonitor(@async run_work_thunk(rv, thunk)) return rv end end @@ -111,7 +111,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - @async process_tcp_streams(r_stream, w_stream, incoming) + errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - @async message_handler_loop(r_stream, w_stream, incoming) + errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - @async begin + errormonitor(@async begin v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -285,18 +285,20 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi else deliver_result(w_stream, :call_fetch, header.notify_oid, v) end - end + nothing + end) end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - @async begin + errormonitor(@async begin rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) - end + nothing + end) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - @async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true) + errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) @@ -330,8 +332,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) lazy = msg.lazy PGRP.lazy = lazy - wait_tasks = Task[] - for (connect_at, rpid) in msg.other_workers + @sync for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() wconfig.connect_at = connect_at @@ -340,14 +341,11 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else - t = @async connect_to_peer(cluster_manager, rpid, wconfig) - push!(wait_tasks, t) + @async connect_to_peer(cluster_manager, rpid, wconfig) end end end - for wt in wait_tasks; Base.wait(wt); end - send_connection_hdr(controller, false) send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid())) end diff --git a/src/remotecall.jl b/src/remotecall.jl index f4845221a611a4..9b2127d4f499a3 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -192,7 +192,7 @@ or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) -@async put!(f, remotecall_fetch(long_computation, p)) +errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """ @@ -249,10 +249,10 @@ end const any_gc_flag = Condition() function start_gc_msgs_task() - @async while true + errormonitor(@async while true wait(any_gc_flag) flush_gc_msgs() - end + end) end function send_del_client(rr)