From e1aa8d4b60bef18f45ccd389dfb86ca08ed4d8ad Mon Sep 17 00:00:00 2001 From: Trey Roessig Date: Wed, 16 Dec 2020 14:30:09 -0800 Subject: [PATCH] Moving sync_end to a parallel structure --- base/task.jl | 48 +++++++++++++-------- stdlib/Distributed/test/distributed_exec.jl | 36 +++++++++++++--- test/threads_exec.jl | 43 ++++++++++++++++++ 3 files changed, 101 insertions(+), 26 deletions(-) diff --git a/base/task.jl b/base/task.jl index 8c9516fb39766..2075f9fcd8a6f 100644 --- a/base/task.jl +++ b/base/task.jl @@ -336,32 +336,42 @@ end ## lexically-scoped waiting for multiple items +_istaskdone(x::Tuple{Task, Bool}) = istaskdone(x[1]) +_istaskfailed(x::Tuple{Task, Bool}) = istaskfailed(x[1]) + function sync_end(c::Channel{Any}) - local c_ex - while isready(c) - r = take!(c) - if isa(r, Task) - _wait(r) - if istaskfailed(r) - if !@isdefined(c_ex) - c_ex = CompositeException() + ts = Tuple{Task, Bool}[] + try + while isready(c) # Pull all queued tasks, wait for them, + while isready(c) # then add more if they arrive. This + t = take!(c) # is needed as it is possible for + if isa(t, Task) # Tasks to generate new subTasks + push!(ts, (t, true)) + else + push!(ts, (schedule(Task(()->wait(t))), false)) end - push!(c_ex, TaskFailedException(r)) end - else - try - wait(r) - catch e - if !@isdefined(c_ex) + while true + yield() + if any(_istaskfailed, ts) # Throw exception on first occurence c_ex = CompositeException() + for (t, istask) in ts + if istaskfailed(t) + if istask + push!(c_ex, TaskFailedException(t)) + else + push!(c_ex, t.exception) # Required to match handling + end # of Future exceptions + end + end + throw(c_ex) + elseif all(_istaskdone, ts) || isready(c) # Check for new tasks + break end - push!(c_ex, e) end end - end - close(c) - if @isdefined(c_ex) - throw(c_ex) + finally + close(c) end nothing end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 656792e2d7337..925130d875fd8 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -981,16 +981,16 @@ let (p, p2) = filter!(p -> p != myid(), procs()) error("test failed to throw") catch excpt if procs isa Int - ex = Any[excpt] + exs = Any[excpt] else - ex = (excpt::CompositeException).exceptions + exs = (excpt::CompositeException).exceptions end - for (p, ex) in zip(procs, ex) - local p - if procs isa Int || p != myid() - @test (ex::RemoteException).pid == p - ex = ((ex::RemoteException).captured::CapturedException).ex + for ex in exs # Can no longer guarantee Exceptions show in proc order + if ex isa RemoteException + @test procs isa Int || ex.pid != myid() + ex = ex.captured.ex else + @test !(procs isa Int) ex = (ex::TaskFailedException).task.exception end @test (ex::ErrorException).msg == msg @@ -1706,6 +1706,28 @@ for p in procs() @test @fetchfrom(p, i27429) == 27429 end +# issue #32677, ensuring no side-effects for Distributed +let + (p, p2) = filter!(p -> p != myid(), procs()) + t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 60) # this test should take <10 seconds + c = Channel(0) + try + @sync begin + @spawnat p begin + put!(c,0) + end + @spawnat p2 begin + undefined() + take!(c) + end + end + catch e + @test e.exceptions[1].captured.ex isa UndefVarError + end + close(c) + close(t) # stop the fast watchdog +end + include("splitrange.jl") # Run topology tests last after removing all workers, since a given diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 691fca2fb2afa..1578632020b62 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -880,3 +880,46 @@ end end @test sort!(collect(ys)) == 1:3 end + +# issue #32677 +@testset "@sync exception handling" begin + let + t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 10) # this test should take <1 seconds + c = Channel(0) + try + @sync begin + @async begin + put!(c,0) + end + @async begin + undefined() + take!(c) + end + end + catch e + @test e.exceptions[1].task.exception isa UndefVarError + end + close(c) + close(t) # stop the fast watchdog + end + + let + t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 10) # this test should take <1 seconds + c = Channel(0) + try + @sync begin + Threads.@spawn begin + put!(c,0) + end + Threads.@spawn begin + undefined() + take!(c) + end + end + catch e + @test e.exceptions[1].task.exception isa UndefVarError + end + close(c) + close(t) # stop the fast watchdog + end +end