diff --git a/base/channels.jl b/base/channels.jl index 27f1719341375..38145f0d636d8 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -192,8 +192,10 @@ Stacktrace: ``` """ function bind(c::Channel, task::Task) - ref = WeakRef(c) - register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref)) + # TODO: implement "schedulewait" and deprecate taskdone_hook + #T = Task(() -> close_chnl_on_taskdone(task, c)) + #schedulewait(task, T) + register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c)) return c end @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n)) return (chnls, tasks) end -function close_chnl_on_taskdone(t::Task, ref::WeakRef) - c = ref.value - if c isa Channel - isopen(c) || return - cleanup = () -> try - isopen(c) || return - if istaskfailed(t) - excp = task_result(t) - if excp isa Exception - close(c, excp) - return - end +function close_chnl_on_taskdone(t::Task, c::Channel) + isopen(c) || return + cleanup = () -> try + isopen(c) || return + if istaskfailed(t) + excp = task_result(t) + if excp isa Exception + close(c, excp) + return end - close(c) - return - finally - unlock(c) end - if trylock(c) - # can't use `lock`, since attempts to task-switch to wait for it - # will just silently fail and leave us with broken state - cleanup() - else - # so schedule this to happen once we are finished destroying our task - # (on a new Task) - @async (lock(c); cleanup()) + close(c) + return + finally + unlock(c) end + if trylock(c) + # can't use `lock`, since attempts to task-switch to wait for it + # will just silently fail and leave us with broken state + cleanup() + else + # so schedule this to happen once we are finished destroying our task + # (on a new Task) + @async (lock(c); cleanup()) end nothing end diff --git a/test/channels.jl b/test/channels.jl index c73d2dcb054c2..a01c1d98972a5 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -88,19 +88,20 @@ using Distributed @testset "channels bound to tasks" for N in [0, 10] # Normal exit of task c = Channel(N) - bind(c, @async (yield(); nothing)) + bind(c, @async (GC.gc(); yield(); nothing)) @test_throws InvalidStateException take!(c) @test !isopen(c) # Error exception in task c = Channel(N) - bind(c, @async (yield(); error("foo"))) + bind(c, @async (GC.gc(); yield(); error("foo"))) @test_throws ErrorException take!(c) @test !isopen(c) # Multiple channels closed by the same bound task cs = [Channel(N) for i in 1:5] - tf2 = () -> begin + tf2() = begin + GC.gc() if N > 0 foreach(c -> (@assert take!(c) === 2), cs) end @@ -129,8 +130,8 @@ using Distributed # Multiple tasks, first one to terminate closes the channel nth = rand(1:5) ref = Ref(0) - cond = Condition() tf3(i) = begin + GC.gc() if i == nth ref[] = i else @@ -138,7 +139,7 @@ using Distributed end end - tasks = [Task(()->tf3(i)) for i in 1:5] + tasks = [Task(() -> tf3(i)) for i in 1:5] c = Channel(N) foreach(t -> bind(c, t), tasks) foreach(schedule, tasks)