Skip to content

Commit

Permalink
channels: remove WeakRef from Condition
Browse files Browse the repository at this point in the history
Using a WeakRef meant we might not actually `bind` the result.
If nobody was still holding a reference to put contents into the Condition,
we would simply garbage collect it, and then never need to close it.
Since that does not seem to be the intent,
instead move to just keeping a strong reference
(alternatively, we would have to switch to using `stream_wait`
with ref-counting, but that seems suboptimal for several reasons.).

fix #31507
  • Loading branch information
vtjnash committed Apr 10, 2019
1 parent 4671132 commit 66cb288
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 29 deletions.
51 changes: 25 additions & 26 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ Stacktrace:
```
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
# TODO: implement "schedulewait" and deprecate taskdone_hook
#T = Task(() -> close_chnl_on_taskdone(task, c))
#schedulewait(task, T)
register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c))
return c
end

Expand Down Expand Up @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
c = ref.value
if c isa Channel
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
function close_chnl_on_taskdone(t::Task, c::Channel)
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
end
nothing
end
Expand Down
8 changes: 5 additions & 3 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,20 @@ using Distributed
@testset "channels bound to tasks" for N in [0, 10]
# Normal exit of task
c = Channel(N)
bind(c, @async (yield(); nothing))
bind(c, @async (GC.gc(); yield(); nothing))
@test_throws InvalidStateException take!(c)
@test !isopen(c)

# Error exception in task
c = Channel(N)
bind(c, @async (yield(); error("foo")))
bind(c, @async (GC.gc(); yield(); error("foo")))
@test_throws ErrorException take!(c)
@test !isopen(c)

# Multiple channels closed by the same bound task
cs = [Channel(N) for i in 1:5]
tf2 = () -> begin
GC.gc()
if N > 0
foreach(c -> (@assert take!(c) === 2), cs)
end
Expand Down Expand Up @@ -131,14 +132,15 @@ using Distributed
ref = Ref(0)
cond = Condition()
tf3(i) = begin
GC.gc()
if i == nth
ref[] = i
else
sleep(2.0)
end
end

tasks = [Task(()->tf3(i)) for i in 1:5]
tasks = [Task(() -> tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t -> bind(c, t), tasks)
foreach(schedule, tasks)
Expand Down

0 comments on commit 66cb288

Please sign in to comment.