Skip to content

Commit

Permalink
Moving sync_end to a parallel structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Trey Roessig committed Dec 17, 2020
1 parent 8f6432e commit de884d9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 26 deletions.
48 changes: 29 additions & 19 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -336,32 +336,42 @@ end

## lexically-scoped waiting for multiple items

_istaskdone(x::Tuple{Task, Symbol}) = istaskdone(x[1])
_istaskfailed(x::Tuple{Task, Symbol}) = istaskfailed(x[1])

function sync_end(c::Channel{Any})
local c_ex
while isready(c)
r = take!(c)
if isa(r, Task)
_wait(r)
if istaskfailed(r)
if !@isdefined(c_ex)
c_ex = CompositeException()
ts = Tuple{Task, Symbol}[]
try
while isready(c) || !all(_istaskdone, ts) # Pull all queued tasks, wait for them,
while isready(c) # then pull the next batch. This is
t = take!(c) # needed, as it is possible for Tasks
if isa(t, Task) # to generate new subTasks, eg println
push!(ts, (t, :Task))
else
push!(ts, (schedule(Task(()->wait(t))), :Other))
end
push!(c_ex, TaskFailedException(r))
end
else
try
wait(r)
catch e
if !@isdefined(c_ex)
while true
yield()
if any(_istaskfailed, ts) # Throw exception on first occurence
c_ex = CompositeException()
for t in ts
if _istaskfailed(t)
if t[2] == :Task
push!(c_ex, TaskFailedException(t[1]))
else
push!(c_ex, t[1].exception) # Required to match handling
end # of Future exceptions
end
end
throw(c_ex)
elseif all(_istaskdone, ts)
break
end
push!(c_ex, e)
end
end
end
close(c)
if @isdefined(c_ex)
throw(c_ex)
finally
close(c)
end
nothing
end
Expand Down
36 changes: 29 additions & 7 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -981,16 +981,16 @@ let (p, p2) = filter!(p -> p != myid(), procs())
error("test failed to throw")
catch excpt
if procs isa Int
ex = Any[excpt]
exs = Any[excpt]
else
ex = (excpt::CompositeException).exceptions
exs = (excpt::CompositeException).exceptions
end
for (p, ex) in zip(procs, ex)
local p
if procs isa Int || p != myid()
@test (ex::RemoteException).pid == p
ex = ((ex::RemoteException).captured::CapturedException).ex
for ex in exs # Can no longer guarantee Exceptions show in proc order
if ex isa RemoteException
@test procs isa Int || ex.pid != myid()
ex = ex.captured.ex
else
@test !(procs isa Int)
ex = (ex::TaskFailedException).task.exception
end
@test (ex::ErrorException).msg == msg
Expand Down Expand Up @@ -1706,6 +1706,28 @@ for p in procs()
@test @fetchfrom(p, i27429) == 27429
end

# issue #32677, ensuring no side-effects for Distributed
let
(p, p2) = filter!(p -> p != myid(), procs())
t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 60) # this test should take <10 seconds
c = Channel(0)
try
@sync begin
@spawnat p begin
put!(c,0)
end
@spawnat p2 begin
undefined()
take!(c)
end
end
catch e
@test e.exceptions[1].captured.ex isa UndefVarError
end
close(c)
close(t) # stop the fast watchdog
end

include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
Expand Down
43 changes: 43 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -880,3 +880,46 @@ end
end
@test sort!(collect(ys)) == 1:3
end

# issue #32677
@testset "@sync exception handling" begin
let
t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 10) # this test should take <1 seconds
c = Channel(0)
try
@sync begin
@async begin
put!(c,0)
end
@async begin
undefined()
take!(c)
end
end
catch e
@test e.exceptions[1].task.exception isa UndefVarError
end
close(c)
close(t) # stop the fast watchdog
end

let
t = Timer(t -> killjob("KILLING BY QUICK KILL WATCHDOG\n"), 10) # this test should take <1 seconds
c = Channel(0)
try
@sync begin
Threads.@spawn begin
put!(c,0)
end
Threads.@spawn begin
undefined()
take!(c)
end
end
catch e
@test e.exceptions[1].task.exception isa UndefVarError
end
close(c)
close(t) # stop the fast watchdog
end
end

0 comments on commit de884d9

Please sign in to comment.