Skip to content

Commit

Permalink
channels: remove WeakRef from Condition (#31673)
Browse files Browse the repository at this point in the history
Using a WeakRef meant we might not actually `bind` the result.
If nobody was still holding a reference to put contents into the Condition,
we would simply garbage collect it, and then never need to close it.
Since that does not seem to be the intent,
instead move to just keeping a strong reference
(alternatively, we would have to switch to using `stream_wait`
with ref-counting, but that seems suboptimal for several reasons.).

fix #31507

(cherry picked from commit 29f61cd)
  • Loading branch information
vtjnash authored and KristofferC committed Apr 15, 2019
1 parent 5c61d3a commit 95e2a54
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
51 changes: 25 additions & 26 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ Stacktrace:
```
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
# TODO: implement "schedulewait" and deprecate taskdone_hook
#T = Task(() -> close_chnl_on_taskdone(task, c))
#schedulewait(task, T)
register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c))
return c
end

Expand Down Expand Up @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
c = ref.value
if c isa Channel
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
function close_chnl_on_taskdone(t::Task, c::Channel)
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
end
nothing
end
Expand Down
11 changes: 6 additions & 5 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,20 @@ using Distributed
@testset "channels bound to tasks" for N in [0, 10]
# Normal exit of task
c = Channel(N)
bind(c, @async (yield(); nothing))
bind(c, @async (GC.gc(); yield(); nothing))
@test_throws InvalidStateException take!(c)
@test !isopen(c)

# Error exception in task
c = Channel(N)
bind(c, @async (yield(); error("foo")))
bind(c, @async (GC.gc(); yield(); error("foo")))
@test_throws ErrorException take!(c)
@test !isopen(c)

# Multiple channels closed by the same bound task
cs = [Channel(N) for i in 1:5]
tf2 = () -> begin
tf2() = begin
GC.gc()
if N > 0
foreach(c -> (@assert take!(c) === 2), cs)
end
Expand Down Expand Up @@ -129,16 +130,16 @@ using Distributed
# Multiple tasks, first one to terminate closes the channel
nth = rand(1:5)
ref = Ref(0)
cond = Condition()
tf3(i) = begin
GC.gc()
if i == nth
ref[] = i
else
sleep(2.0)
end
end

tasks = [Task(()->tf3(i)) for i in 1:5]
tasks = [Task(() -> tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t -> bind(c, t), tasks)
foreach(schedule, tasks)
Expand Down

0 comments on commit 95e2a54

Please sign in to comment.