From 7b7b4e1d24aa449bf33af93e684ca8924b66a010 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Thu, 12 Jan 2017 15:37:23 +0530 Subject: [PATCH] deprecate produce, consume and task iteration (#19841) --- NEWS.md | 5 + base/channels.jl | 194 +++++++++++++++++++++++++++++++-- base/deprecated.jl | 95 ++++++++++++++++ base/docs/helpdb/Base.jl | 16 --- base/event.jl | 7 +- base/file.jl | 17 +-- base/task.jl | 122 +++++---------------- doc/src/manual/control-flow.md | 76 +++++++------ doc/src/stdlib/collections.md | 1 - doc/src/stdlib/parallel.md | 3 +- test/bitarray.jl | 152 +++++++++++++------------- test/channels.jl | 114 +++++++++++++++++++ test/copy.jl | 4 +- test/file.jl | 36 +++--- test/parallel_exec.jl | 10 -- test/spawn.jl | 8 -- test/worlds.jl | 46 ++++---- 17 files changed, 605 insertions(+), 301 deletions(-) diff --git a/NEWS.md b/NEWS.md index 4debf14c80e7a..672f3d29049fa 100644 --- a/NEWS.md +++ b/NEWS.md @@ -161,6 +161,8 @@ Library improvements * New `iszero(x)` function to quickly check whether `x` is zero (or is all zeros, for an array) ([#19950]). + * `notify` now returns a count of tasks woken up ([#19841]). + Compiler/Runtime improvements ----------------------------- @@ -187,6 +189,9 @@ Deprecated or removed functions (`airyai`, `airybi`, `airyaiprime`, `airybiprimex`, `airyaix`, `airybix`, `airyaiprimex`, `airybiprimex`) ([#18050]). + * `produce`, `consume` and iteration over a Task object has been deprecated in favor of + using Channels for inter-task communication ([#19841]). + Julia v0.5.0 Release Notes ========================== diff --git a/base/channels.jl b/base/channels.jl index f272052141be5..94eec5d43c731 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -21,6 +21,7 @@ type Channel{T} <: AbstractChannel cond_take::Condition # waiting for data to become available cond_put::Condition # waiting for a writeable slot state::Symbol + excp::Nullable{Exception} # Exception to be thrown when state != :open data::Array{T,1} sz_max::Int # maximum size of channel @@ -39,7 +40,7 @@ type Channel{T} <: AbstractChannel if sz < 0 throw(ArgumentError("Channel size must be either 0, a positive integer or Inf")) end - new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0)) + new(Condition(), Condition(), :open, Nullable{Exception}(), Array{T}(0), sz, Array{Condition}(0)) end # deprecated empty constructor @@ -53,6 +54,69 @@ end Channel(sz) = Channel{Any}(sz) +# special constructors +""" + Channel(func::Function; ctype=Any, csize=0, taskref=nothing) + +Creates a new task from `func`, binds it to a new channel of type +`ctype` and size `csize`, schedules the task, all in a single call. + +`func` must accept the bound channel as its only argument. + +If you need a reference to the created task, pass a `Ref{Task}` object via +keyword argument `taskref`. + +Returns a Channel. + +```jldoctest +julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4)); + +julia> @show typeof(chnl); +typeof(chnl) = Channel{Any} + +julia> for i in chnl + @show i + end; +i = 1 +i = 2 +i = 3 +i = 4 + +``` + +An example of referencing the created task: + +```jldoctest +julia> taskref = Ref{Task}(); + +julia> chnl = Channel(c->(@show take!(c)); taskref=taskref); + +julia> task = taskref[]; + +julia> @show istaskdone(task); +istaskdone(task) = false + +julia> put!(chnl, "Hello"); +take!(c) = "Hello" + +julia> @show istaskdone(task); +istaskdone(task) = true + +``` +""" +function Channel(func::Function; ctype=Any, csize=0, taskref=nothing) + chnl = Channel{ctype}(csize) + task = Task(()->func(chnl)) + bind(chnl,task) + schedule(task) + yield() + + isa(taskref, Ref{Task}) && (taskref.x = task) + return chnl +end + + + # deprecated empty constructor Channel() = Channel{Any}() @@ -60,6 +124,12 @@ closed_exception() = InvalidStateException("Channel is closed.", :closed) isbuffered(c::Channel) = c.sz_max==0 ? false : true +function check_channel_state(c::Channel) + if !isopen(c) + !isnull(c.excp) && throw(get(c.excp)) + throw(closed_exception()) + end +end """ close(c::Channel) @@ -70,11 +140,110 @@ Closes a channel. An exception is thrown by: """ function close(c::Channel) c.state = :closed - notify_error(c::Channel, closed_exception()) + c.excp = Nullable{}(closed_exception()) + notify_error(c) nothing end isopen(c::Channel) = (c.state == :open) +""" + bind(chnl::Channel, task::Task) + +Associates the lifetime of `chnl` with a task. +Channel `chnl` is automatically closed when the task terminates. +Any uncaught exception in the task is propagated to all waiters on `chnl`. + +The `chnl` object can be explicitly closed independent of task termination. +Terminating tasks have no effect on already closed Channel objects. + +When a channel is bound to multiple tasks, the first task to terminate will +close the channel. When multiple channels are bound to the same task, +termination of the task will close all channels. + +```jldoctest +julia> c = Channel(0); + +julia> task = @schedule foreach(i->put!(c, i), 1:4); + +julia> bind(c,task); + +julia> for i in c + @show i + end; +i = 1 +i = 2 +i = 3 +i = 4 + +julia> @show isopen(c); +isopen(c) = false + +``` + +```jldoctest +julia> c = Channel(0); + +julia> task = @schedule (put!(c,1);error("foo")); + +julia> bind(c,task); + +julia> take!(c); + +julia> put!(c,1); +ERROR: foo +Stacktrace: + [1] check_channel_state(::Channel{Any}) at ./channels.jl:129 + [2] put!(::Channel{Any}, ::Int64) at ./channels.jl:247 + +``` +""" +function bind(c::Channel, task::Task) + ref = WeakRef(c) + register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref)) + c +end + +""" + channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n)) + +A convenience method to create `n` channels and bind them to tasks started +from the provided functions in a single call. Each `func` must accept `n` arguments +which are the created channels. Channel types and sizes may be specified via +keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are +of type `Channel{Any}(0)`. + +Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks. +""" +function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n)) + @assert length(csizes) == n + @assert length(ctypes) == n + + chnls = map(i->Channel{ctypes[i]}(csizes[i]), 1:n) + tasks=Task[Task(()->f(chnls...)) for f in funcs] + + # bind all tasks to all channels and schedule them + foreach(t -> foreach(c -> bind(c,t), chnls), tasks) + foreach(t->schedule(t), tasks) + + yield() # Allow scheduled tasks to run + + return (chnls, tasks) +end + +function close_chnl_on_taskdone(t::Task, ref::WeakRef) + if ref.value !== nothing + c = ref.value + !isopen(c) && return + if istaskfailed(t) + c.state = :closed + c.excp = Nullable{Exception}(task_result(t)) + notify_error(c) + else + close(c) + end + end +end + type InvalidStateException <: Exception msg::AbstractString state::Symbol @@ -89,7 +258,7 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ task. """ function put!(c::Channel, v) - !isopen(c) && throw(closed_exception()) + check_channel_state(c) isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v) end @@ -98,7 +267,9 @@ function put_buffered(c::Channel, v) wait(c.cond_put) end push!(c.data, v) - notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call. + + # notify all, since some of the waiters may be on a "fetch" call. + notify(c.cond_take, nothing, true, false) v end @@ -108,7 +279,7 @@ function put_unbuffered(c::Channel, v) wait(c.cond_put) end cond_taker = shift!(c.takers) - notify(cond_taker, v, false, false) + notify(cond_taker, v, false, false) > 0 && yield() v end @@ -148,7 +319,7 @@ shift!(c::Channel) = take!(c) # 0-size channel function take_unbuffered(c::Channel) - !isopen(c) && throw(closed_exception()) + check_channel_state(c) cond_taker = Condition() push!(c.takers, cond_taker) notify(c.cond_put, nothing, false, false) @@ -178,7 +349,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put) function wait(c::Channel) while !isready(c) - !isopen(c) && throw(closed_exception()) + check_channel_state(c) wait(c.cond_take) end nothing @@ -189,19 +360,20 @@ function notify_error(c::Channel, err) notify_error(c.cond_put, err) foreach(x->notify_error(x, err), c.takers) end +notify_error(c::Channel) = notify_error(c, get(c.excp)) eltype{T}(::Type{Channel{T}}) = T show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))") -type ChannelState{T} +type ChannelIterState{T} hasval::Bool val::T - ChannelState(x) = new(x) + ChannelIterState(x) = new(x) end -start{T}(c::Channel{T}) = ChannelState{T}(false) -function done(c::Channel, state::ChannelState) +start{T}(c::Channel{T}) = ChannelIterState{T}(false) +function done(c::Channel, state::ChannelIterState) try # we are waiting either for more data or channel to be closed state.hasval && return false diff --git a/base/deprecated.jl b/base/deprecated.jl index 096fb21b2523c..a93744db4b5bd 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1543,4 +1543,99 @@ unsafe_wrap(::Type{String}, p::Cstring, len::Integer, own::Bool=false) = # Rename LibGit2.GitAnyObject to LibGit2.GitUnknownObject (part of #19839) eval(LibGit2, :(Base.@deprecate_binding GitAnyObject GitUnknownObject)) +## produce, consume, and task iteration +# NOTE: When removing produce/consume, also remove field Task.consumers and related code in +# task.jl and event.jl + +function produce(v) + depwarn("produce is now deprecated. Use Channels for inter-task communication.", :produce) + + ct = current_task() + local empty, t, q + while true + q = ct.consumers + if isa(q,Task) + t = q + ct.consumers = nothing + empty = true + break + elseif isa(q,Condition) && !isempty(q.waitq) + t = shift!(q.waitq) + empty = isempty(q.waitq) + break + end + wait() + end + + t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable")) + if empty + schedule_and_wait(t, v) + while true + # wait until there are more consumers + q = ct.consumers + if isa(q,Task) + return q.result + elseif isa(q,Condition) && !isempty(q.waitq) + return q.waitq[1].result + end + wait() + end + else + schedule(t, v) + # make sure `t` runs before us. otherwise, the producer might + # finish before `t` runs again, causing it to see the producer + # as done, causing done(::Task, _) to miss the value `v`. + # see issue #7727 + yield() + return q.waitq[1].result + end +end +produce(v...) = produce(v) + +function consume(P::Task, values...) + depwarn("consume is now deprecated. Use Channels for inter-task communication.", :consume) + + if istaskdone(P) + return wait(P) + end + + ct = current_task() + ct.result = length(values)==1 ? values[1] : values + + #### un-optimized version + #if P.consumers === nothing + # P.consumers = Condition() + #end + #push!(P.consumers.waitq, ct) + # optimized version that avoids the queue for 1 consumer + if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq)) + P.consumers = ct + else + if isa(P.consumers, Task) + t = P.consumers + P.consumers = Condition() + push!(P.consumers.waitq, t) + end + push!(P.consumers.waitq, ct) + end + + P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice +end + +function start(t::Task) + depwarn(string("Task iteration is now deprecated.", + " Use Channels for inter-task communication. ", + " A for-loop on a Channel object is terminated by calling `close` on the object."), :taskfor) + nothing +end +function done(t::Task, val) + t.result = consume(t) + istaskdone(t) +end +next(t::Task, val) = (t.result, nothing) +iteratorsize(::Type{Task}) = SizeUnknown() +iteratoreltype(::Type{Task}) = EltypeUnknown() + +isempty(::Task) = error("isempty not defined for Tasks") + # End deprecations scheduled for 0.6 diff --git a/base/docs/helpdb/Base.jl b/base/docs/helpdb/Base.jl index de81445f463b4..944cbd2c1b33a 100644 --- a/base/docs/helpdb/Base.jl +++ b/base/docs/helpdb/Base.jl @@ -527,14 +527,6 @@ Show every part of the representation of a value. """ dump -""" - consume(task, values...) - -Receive the next value passed to `produce` by the specified task. Additional arguments may -be passed, to be returned from the last `produce` call in the producer. -""" -consume - """ isinteractive() -> Bool @@ -1468,14 +1460,6 @@ the topmost backend that does not throw a `MethodError`). """ pushdisplay -""" - produce(value) - -Send the given value to the last `consume` call, switching to the consumer task. If the next -`consume` call passes any values, they are returned by `produce`. -""" -produce - """ StackOverflowError() diff --git a/base/event.jl b/base/event.jl index 8c153f387409e..a6a67c8074353 100644 --- a/base/event.jl +++ b/base/event.jl @@ -37,19 +37,24 @@ end Wake up tasks waiting for a condition, passing them `val`. If `all` is `true` (the default), all waiting tasks are woken, otherwise only one is. If `error` is `true`, the passed value is raised as an exception in the woken tasks. + +Returns the count of tasks woken up. Returns 0 if no tasks are waiting on `condition`. """ notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error) function notify(c::Condition, arg, all, error) + cnt = 0 if all + cnt = length(c.waitq) for t in c.waitq error ? schedule(t, arg, error=error) : schedule(t, arg) end empty!(c.waitq) elseif !isempty(c.waitq) + cnt = 1 t = shift!(c.waitq) error ? schedule(t, arg, error=error) : schedule(t, arg) end - nothing + cnt end notify_error(c::Condition, err) = notify(c, err, true, true) diff --git a/base/file.jl b/base/file.jl index d9bd882489a3f..1cc1829c33d4f 100644 --- a/base/file.jl +++ b/base/file.jl @@ -454,8 +454,10 @@ function walkdir(root; topdown=true, follow_symlinks=false, onerror=throw) catch err isa(err, SystemError) || throw(err) onerror(err) - #Need to return an empty task to skip the current root folder - return Task(()->()) + # Need to return an empty closed channel to skip the current root folder + chnl = Channel(0) + close(chnl) + return chnl end dirs = Array{eltype(content)}(0) files = Array{eltype(content)}(0) @@ -467,23 +469,24 @@ function walkdir(root; topdown=true, follow_symlinks=false, onerror=throw) end end - function _it() + function _it(chnl) if topdown - produce(root, dirs, files) + put!(chnl, (root, dirs, files)) end for dir in dirs path = joinpath(root,dir) if follow_symlinks || !islink(path) for (root_l, dirs_l, files_l) in walkdir(path, topdown=topdown, follow_symlinks=follow_symlinks, onerror=onerror) - produce(root_l, dirs_l, files_l) + put!(chnl, (root_l, dirs_l, files_l)) end end end if !topdown - produce(root, dirs, files) + put!(chnl, (root, dirs, files)) end end - Task(_it) + + return Channel(_it) end function unlink(p::AbstractString) diff --git a/base/task.jl b/base/task.jl index 243b3090b3f18..ff40447b205e4 100644 --- a/base/task.jl +++ b/base/task.jl @@ -104,7 +104,7 @@ julia> istaskdone(b) true ``` """ -istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed)) +istaskdone(t::Task) = ((t.state == :done) | istaskfailed(t)) """ istaskstarted(t::Task) -> Bool @@ -122,6 +122,10 @@ false """ istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0 +istaskfailed(t::Task) = (t.state == :failed) + +task_result(t::Task) = t.result + task_local_storage() = get_task_tls(current_task()) function get_task_tls(t::Task) if t.storage === nothing @@ -172,19 +176,25 @@ function wait(t::Task) while !istaskdone(t) wait(t.donenotify) end - if t.state == :failed + if istaskfailed(t) throw(t.exception) end - return t.result + return task_result(t) end suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false +function register_taskdone_hook(t::Task, hook) + tls = get_task_tls(t) + push!(get!(tls, :TASKDONE_HOOKS, []), hook) + t +end + # runtime system hook called when a task finishes function task_done_hook(t::Task) # `finish_task` sets `sigatomic` before entering this function - err = (t.state == :failed) - result = t.result + err = istaskfailed(t) + result = task_result(t) handled = false if err t.backtrace = catch_backtrace() @@ -198,6 +208,13 @@ function task_done_hook(t::Task) notify(t.donenotify, result, error=err) end + # Execute any other hooks registered in the TLS + if isa(t.storage, ObjectIdDict) && haskey(t.storage, :TASKDONE_HOOKS) + foreach(hook -> hook(t), t.storage[:TASKDONE_HOOKS]) + delete!(t.storage, :TASKDONE_HOOKS) + handled = true + end + #### un-optimized version #isa(q,Condition) && notify(q, result, error=err) if isa(q,Task) @@ -236,95 +253,6 @@ function task_done_hook(t::Task) end -## produce, consume, and task iteration - -function produce(v) - #### un-optimized version - #q = current_task().consumers - #t = shift!(q.waitq) - #empty = isempty(q.waitq) - ct = current_task() - local empty, t, q - while true - q = ct.consumers - if isa(q,Task) - t = q - ct.consumers = nothing - empty = true - break - elseif isa(q,Condition) && !isempty(q.waitq) - t = shift!(q.waitq) - empty = isempty(q.waitq) - break - end - wait() - end - - t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable")) - if empty - schedule_and_wait(t, v) - while true - # wait until there are more consumers - q = ct.consumers - if isa(q,Task) - return q.result - elseif isa(q,Condition) && !isempty(q.waitq) - return q.waitq[1].result - end - wait() - end - else - schedule(t, v) - # make sure `t` runs before us. otherwise, the producer might - # finish before `t` runs again, causing it to see the producer - # as done, causing done(::Task, _) to miss the value `v`. - # see issue #7727 - yield() - return q.waitq[1].result - end -end -produce(v...) = produce(v) - -function consume(P::Task, values...) - if istaskdone(P) - return wait(P) - end - - ct = current_task() - ct.result = length(values)==1 ? values[1] : values - - #### un-optimized version - #if P.consumers === nothing - # P.consumers = Condition() - #end - #push!(P.consumers.waitq, ct) - # optimized version that avoids the queue for 1 consumer - if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq)) - P.consumers = ct - else - if isa(P.consumers, Task) - t = P.consumers - P.consumers = Condition() - push!(P.consumers.waitq, t) - end - push!(P.consumers.waitq, ct) - end - - P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice -end - -start(t::Task) = nothing -function done(t::Task, val) - t.result = consume(t) - istaskdone(t) -end -next(t::Task, val) = (t.result, nothing) -iteratorsize(::Type{Task}) = SizeUnknown() -iteratoreltype(::Type{Task}) = EltypeUnknown() - -isempty(::Task) = error("isempty not defined for Tasks") - - ## dynamically-scoped waiting for multiple items sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ()))) @@ -341,12 +269,12 @@ function sync_end() try wait(r) catch ex - if !isa(r, Task) || (isa(r, Task) && !(r.state == :failed)) + if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r)) rethrow(ex) end finally - if isa(r, Task) && (r.state == :failed) - push!(c_ex, CapturedException(r.result, r.backtrace)) + if isa(r, Task) && istaskfailed(r) + push!(c_ex, CapturedException(task_result(r), r.backtrace)) end end end diff --git a/doc/src/manual/control-flow.md b/doc/src/manual/control-flow.md index 4a2071e968152..227018a13098c 100644 --- a/doc/src/manual/control-flow.md +++ b/doc/src/manual/control-flow.md @@ -842,52 +842,55 @@ them. The consumer cannot simply call a producer function to get a value, becaus may have more values to generate and so might not yet be ready to return. With tasks, the producer and consumer can both run as long as they need to, passing values back and forth as necessary. -Julia provides the functions [`produce()`](@ref) and [`consume()`](@ref) for solving this problem. -A producer is a function that calls [`produce()`](@ref) on each value it needs to produce: +Julia provides a [`Channel`](@ref) mechanism for solving this problem. +A [`Channel`](@ref) is a waitable FIFO queue which can have multiple tasks reading and writing to it. + +Let's define a producer task, which produces values via the [`put!`](@ref) call. ```julia -julia> function producer() - produce("start") +julia> function producer(c::Channel) + put!(c, "start") for n=1:4 - produce(2n) + put!(c, 2n) end - produce("stop") + put!(c, "stop") end; ``` -To consume values, first the producer is wrapped in a [`Task`](@ref), then [`consume()`](@ref) -is called repeatedly on that object: +To consume values, we need to schedule the producer to run in a new task. A special [`Channel`](@ref) +constructor which accepts a 1-arg function as an argument can be used to run a task bound to a channel. +We can then [`take!()`](@ref) values repeatedly from the channel object: -```julia -julia> p = Task(producer); +```jldoctest +julia> chnl = Channel(producer); -julia> consume(p) +julia> take!(chnl) "start" -julia> consume(p) +julia> take!(chnl) 2 -julia> consume(p) +julia> take!(chnl) 4 -julia> consume(p) +julia> take!(chnl) 6 -julia> consume(p) +julia> take!(chnl) 8 -julia> consume(p) +julia> take!(chnl) "stop" ``` One way to think of this behavior is that `producer` was able to return multiple times. Between -calls to [`produce()`](@ref), the producer's execution is suspended and the consumer has control. +calls to [`put!()`](@ref), the producer's execution is suspended and the consumer has control. -A [`Task`](@ref) can be used as an iterable object in a `for` loop, in which case the loop variable takes -on all the produced values: +The returned [`Channel`](@ref) can be used as an iterable object in a `for` loop, in which case the +loop variable takes on all the produced values. The loop is terminated when the channel is closed. -```julia -julia> for x in Task(producer) +```jldoctest +julia> for x in Channel(producer) println(x) end start @@ -898,10 +901,17 @@ start stop ``` -Note that the [`Task()`](@ref) constructor expects a 0-argument function. A common pattern is -for the producer to be parameterized, in which case a partial function application is needed to -create a 0-argument [anonymous function](@ref man-anonymous-functions). This can be done either directly or by use of -a convenience macro: +Note that we did not have to explcitly close the channel in the producer. This is because +the act of binding a [`Channel`](@ref) to a [`Task()`](@ref) associates the open lifetime of +a channel with that of the bound task. The channel object is closed automatically when the task +terminates. Multiple channels can be bound to a task, and vice-versa. + +While the [`Task()`](@ref) constructor expects a 0-argument function, the [`Channel()`](@ref) +method which creates a channel bound task expects a function that accepts a single argument of +type [`Channel`](@ref). A common pattern is for the producer to be parameterized, in which case a partial +function application is needed to create a 0 or 1 argument [anonymous function](@ref man-anonymous-functions). + +For [`Task()`](@ref) objects this can be done either directly or by use of a convenience macro: ``` function mytask(myarg) @@ -913,13 +923,16 @@ taskHdl = Task(() -> mytask(7)) taskHdl = @task mytask(7) ``` -[`produce()`](@ref) and [`consume()`](@ref) do not launch threads that can run on separate CPUs. +To orchestrate more advanced work distribution patterns, [`bind()`](@ref) and [`schedule()`](@ref) +can be used in conjunction with [`Task()`](@ref) and [`Channel()`](@ref) +constructors to explicitly link a set of channels with a set of producer/consumer tasks. + +Note that currently Julia tasks are not scheduled to run on separate CPU cores. True kernel threads are discussed under the topic of [Parallel Computing](@ref). ### Core task operations -While [`produce()`](@ref) and [`consume()`](@ref) illustrate the essential nature of tasks, they -are actually implemented as library functions using a more primitive function, [`yieldto()`](@ref). +Let us explore the low level construct [`yieldto()`](@ref) to underestand how task switching works. `yieldto(task,value)` suspends the current task, switches to the specified `task`, and causes that task's last [`yieldto()`](@ref) call to return the specified `value`. Notice that [`yieldto()`](@ref) is the only operation required to use task-style control flow; instead of calling and returning @@ -929,9 +942,10 @@ coroutines"; each task is switched to and from using the same mechanism. [`yieldto()`](@ref) is powerful, but most uses of tasks do not invoke it directly. Consider why this might be. If you switch away from the current task, you will probably want to switch back to it at some point, but knowing when to switch back, and knowing which task has the responsibility -of switching back, can require considerable coordination. For example, [`produce()`](@ref) needs -to maintain some state to remember who the consumer is. Not needing to manually keep track of -the consuming task is what makes [`produce()`](@ref) easier to use than [`yieldto()`](@ref). +of switching back, can require considerable coordination. For example, [`put!()`](@ref) and [`take!()`](@ref) +are blocking operations, which, when used in the context of channels maintain state to remember +who the consumers is. Not needing to manually keep track of the consuming task is what makes [`put!()`](@ref) +easier to use than the low-level [`yieldto()`](@ref). In addition to [`yieldto()`](@ref), a few other basic functions are needed to use tasks effectively. diff --git a/doc/src/stdlib/collections.md b/doc/src/stdlib/collections.md index 336214b8457b2..29d8c38f3b1a8 100644 --- a/doc/src/stdlib/collections.md +++ b/doc/src/stdlib/collections.md @@ -47,7 +47,6 @@ Fully implemented by: * `EachLine` * `AbstractString` * [`Set`](@ref) - * [`Task`](@ref) ## General Collections diff --git a/doc/src/stdlib/parallel.md b/doc/src/stdlib/parallel.md index c0b02b9fed10b..89986a99a66de 100644 --- a/doc/src/stdlib/parallel.md +++ b/doc/src/stdlib/parallel.md @@ -8,8 +8,6 @@ Base.yieldto Base.current_task Base.istaskdone Base.istaskstarted -Base.consume -Base.produce Base.yield Base.task_local_storage(::Any) Base.task_local_storage(::Any, ::Any) @@ -26,6 +24,7 @@ Base.take!(::Channel) Base.isready(::Channel) Base.fetch(::Channel) Base.close(::Channel) +Base.bind(c::Channel, task::Task) Base.asyncmap Base.asyncmap! ``` diff --git a/test/bitarray.jl b/test/bitarray.jl index 7d9c41ec458d0..8413cd5bfc39e 100644 --- a/test/bitarray.jl +++ b/test/bitarray.jl @@ -268,33 +268,33 @@ timesofar("constructors") b2 = bitrand(m1, m2) @check_bit_operation copy!(b1, b2) BitMatrix - function gen_getindex_data() + function gen_getindex_data(c) m1, m2 = rand_m1m2() - produce((m1, m2, Bool)) + put!(c, (m1, m2, Bool)) m1, m2 = rand_m1m2() - produce((m1, 1:m2, BitVector)) - produce((m1, :, BitVector)) + put!(c, (m1, 1:m2, BitVector)) + put!(c, (m1, :, BitVector)) m1, m2 = rand_m1m2() - produce((m1, randperm(m2), BitVector)) + put!(c, (m1, randperm(m2), BitVector)) m1, m2 = rand_m1m2() - produce((1:m1, m2, BitVector)) - produce((:, m2, BitVector)) + put!(c, (1:m1, m2, BitVector)) + put!(c, (:, m2, BitVector)) m1, m2 = rand_m1m2() - produce((1:m1, 1:m2, BitMatrix)) - produce((:, :, BitMatrix)) + put!(c, (1:m1, 1:m2, BitMatrix)) + put!(c, (:, :, BitMatrix)) m1, m2 = rand_m1m2() - produce((1:m1, randperm(m2), BitMatrix)) - produce((:, randperm(m2), BitMatrix)) + put!(c, (1:m1, randperm(m2), BitMatrix)) + put!(c, (:, randperm(m2), BitMatrix)) m1, m2 = rand_m1m2() - produce((randperm(m1), m2, BitVector)) + put!(c, (randperm(m1), m2, BitVector)) m1, m2 = rand_m1m2() - produce((randperm(m1), 1:m2, BitMatrix)) - produce((randperm(m1), :, BitMatrix)) + put!(c, (randperm(m1), 1:m2, BitMatrix)) + put!(c, (randperm(m1), :, BitMatrix)) m1, m2 = rand_m1m2() - produce((randperm(m1), randperm(m2), BitMatrix)) + put!(c, (randperm(m1), randperm(m2), BitMatrix)) end - for (k1, k2, T) in Task(gen_getindex_data) + for (k1, k2, T) in Channel(gen_getindex_data) # println(typeof(k1), " ", typeof(k2), " ", T) # uncomment to debug @check_bit_operation getindex(b1, k1, k2) T end @@ -304,81 +304,81 @@ timesofar("constructors") @check_bit_operation getindex(b1, :, randperm(m2), 1) BitMatrix b1 = bitrand(s1, s2, s3, s4) - function gen_getindex_data4() + function gen_getindex_data4(c) m1, m2, m3, m4 = (:, :, :, :) - produce((m1, m2, m3, m4, BitArray{4})) + put!(c, (m1, m2, m3, m4, BitArray{4})) m1, m2, m3, m4 = (2, :, :, :) - produce((m1, m2, m3, m4, BitArray{3})) + put!(c, (m1, m2, m3, m4, BitArray{3})) m1, m2, m3, m4 = (:, :, 2, :) - produce((m1, m2, m3, m4, BitArray{3})) + put!(c, (m1, m2, m3, m4, BitArray{3})) m1, m2, m3, m4 = (:, :, :, 2) - produce((m1, m2, m3, m4, BitArray{3})) + put!(c, (m1, m2, m3, m4, BitArray{3})) m1, m2, m3, m4 = (2, :, :, 2) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (:, 2, :, 2) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (2, :, 2, :) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (2, 2, :, :) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (:, 2, 2, 2) - produce((m1, m2, m3, m4, BitArray{1})) + put!(c, (m1, m2, m3, m4, BitArray{1})) m1, m2, m3, m4 = (2, 2, :, 2) - produce((m1, m2, m3, m4, BitArray{1})) + put!(c, (m1, m2, m3, m4, BitArray{1})) m1, m2, m3, m4 = (4, 3:7, 2:2, 2) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (1:2, 5, 1, 2:7) - produce((m1, m2, m3, m4, BitArray{2})) + put!(c, (m1, m2, m3, m4, BitArray{2})) m1, m2, m3, m4 = (2:3, 2:7, 1:2, 4:6) - produce((m1, m2, m3, m4, BitArray{4})) + put!(c, (m1, m2, m3, m4, BitArray{4})) end - for (k1, k2, k3, k4, T) in Task(gen_getindex_data4) + for (k1, k2, k3, k4, T) in Channel(gen_getindex_data4) #println(typeof(k1), " ", typeof(k2), " ", typeof(k3), " ", typeof(k4), " ", T) # uncomment to debug @check_bit_operation getindex(b1, k1, k2, k3, k4) T end b1 = bitrand(n1, n2) - function gen_setindex_data() + function gen_setindex_data(c) m1, m2 = rand_m1m2() - produce((rand(Bool), m1, m2)) + put!(c, (rand(Bool), m1, m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), m1, 1:m2)) - produce((rand(Bool), m1, :)) - produce((bitrand(m2), m1, 1:m2)) + put!(c, (rand(Bool), m1, 1:m2)) + put!(c, (rand(Bool), m1, :)) + put!(c, (bitrand(m2), m1, 1:m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), m1, randperm(m2))) - produce((bitrand(m2), m1, randperm(m2))) + put!(c, (rand(Bool), m1, randperm(m2))) + put!(c, (bitrand(m2), m1, randperm(m2))) m1, m2 = rand_m1m2() - produce((rand(Bool), 1:m1, m2)) - produce((rand(Bool), :, m2)) - produce((bitrand(m1), 1:m1, m2)) + put!(c, (rand(Bool), 1:m1, m2)) + put!(c, (rand(Bool), :, m2)) + put!(c, (bitrand(m1), 1:m1, m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), 1:m1, 1:m2)) - produce((rand(Bool), :, :)) - produce((bitrand(m1, m2), 1:m1, 1:m2)) + put!(c, (rand(Bool), 1:m1, 1:m2)) + put!(c, (rand(Bool), :, :)) + put!(c, (bitrand(m1, m2), 1:m1, 1:m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), 1:m1, randperm(m2))) - produce((rand(Bool), :, randperm(m2))) - produce((bitrand(m1, m2), 1:m1, randperm(m2))) + put!(c, (rand(Bool), 1:m1, randperm(m2))) + put!(c, (rand(Bool), :, randperm(m2))) + put!(c, (bitrand(m1, m2), 1:m1, randperm(m2))) m1, m2 = rand_m1m2() - produce((rand(Bool), randperm(m1), m2)) - produce((bitrand(m1), randperm(m1), m2)) + put!(c, (rand(Bool), randperm(m1), m2)) + put!(c, (bitrand(m1), randperm(m1), m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), randperm(m1), 1:m2)) - produce((rand(Bool), randperm(m1), :)) - produce((bitrand(m1,m2), randperm(m1), 1:m2)) + put!(c, (rand(Bool), randperm(m1), 1:m2)) + put!(c, (rand(Bool), randperm(m1), :)) + put!(c, (bitrand(m1,m2), randperm(m1), 1:m2)) m1, m2 = rand_m1m2() - produce((rand(Bool), randperm(m1), randperm(m2))) - produce((bitrand(m1,m2), randperm(m1), randperm(m2))) + put!(c, (rand(Bool), randperm(m1), randperm(m2))) + put!(c, (bitrand(m1,m2), randperm(m1), randperm(m2))) end - for (b2, k1, k2) in Task(gen_setindex_data) + for (b2, k1, k2) in Channel(gen_setindex_data) # println(typeof(b2), " ", typeof(k1), " ", typeof(k2)) # uncomment to debug @check_bit_operation setindex!(b1, b2, k1, k2) BitMatrix end @@ -392,47 +392,47 @@ timesofar("constructors") @check_bit_operation setindex!(b1, b2, m1, 1:m2, 1) BitMatrix b1 = bitrand(s1, s2, s3, s4) - function gen_setindex_data4() + function gen_setindex_data4(c) m1, m2, m3, m4 = (:, :, :, :) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(s1, s2, s3, s4), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(s1, s2, s3, s4), m1, m2, m3, m4)) m1, m2, m3, m4 = (2, :, :, :) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(s2, s3, s4), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(s2, s3, s4), m1, m2, m3, m4)) m1, m2, m3, m4 = (:, :, 2, :) - produce((bitrand(s1, s2, s4), m1, m2, m3, m4)) + put!(c, (bitrand(s1, s2, s4), m1, m2, m3, m4)) m1, m2, m3, m4 = (:, :, :, 2) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(s1, s2, s3), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(s1, s2, s3), m1, m2, m3, m4)) m1, m2, m3, m4 = (2, :, :, 2) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(s2, s3), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(s2, s3), m1, m2, m3, m4)) m1, m2, m3, m4 = (:, 2, :, 2) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(s1, s3), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(s1, s3), m1, m2, m3, m4)) m1, m2, m3, m4 = (2, :, 2, :) - produce((bitrand(s2, s4), m1, m2, m3, m4)) + put!(c, (bitrand(s2, s4), m1, m2, m3, m4)) m1, m2, m3, m4 = (:, 2, 2, :) - produce((bitrand(s1, s4), m1, m2, m3, m4)) + put!(c, (bitrand(s1, s4), m1, m2, m3, m4)) m1, m2, m3, m4 = (:, 2, 2, 2) - produce((bitrand(s1), m1, m2, m3, m4)) + put!(c, (bitrand(s1), m1, m2, m3, m4)) m1, m2, m3, m4 = (2, 2, :, 2) - produce((bitrand(s3), m1, m2, m3, m4)) + put!(c, (bitrand(s3), m1, m2, m3, m4)) m1, m2, m3, m4 = (4, 3:7, 2:2, 2) - produce((bitrand(5, 1), m1, m2, m3, m4)) + put!(c, (bitrand(5, 1), m1, m2, m3, m4)) m1, m2, m3, m4 = (1:2, 5, 1, 2:7) - produce((rand(Bool), m1, m2, m3, m4)) - produce((bitrand(2, 6), m1, m2, m3, m4)) + put!(c, (rand(Bool), m1, m2, m3, m4)) + put!(c, (bitrand(2, 6), m1, m2, m3, m4)) m1, m2, m3, m4 = (2:3, 2:7, 1:2, 4:6) - produce((bitrand(2, 6, 2, 3), m1, m2, m3, m4)) + put!(c, (bitrand(2, 6, 2, 3), m1, m2, m3, m4)) end - for (b2, k1, k2, k3, k4) in Task(gen_setindex_data4) + for (b2, k1, k2, k3, k4) in Channel(gen_setindex_data4) # println(typeof(b2), " ", typeof(k1), " ", typeof(k2), " ", typeof(k3), " ", typeof(k4)) # uncomment to debug @check_bit_operation setindex!(b1, b2, k1, k2, k3, k4) BitArray{4} end diff --git a/test/channels.jl b/test/channels.jl index 27897215b401a..376086d5fd534 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -75,6 +75,120 @@ while !done(c,s) end @test res == Int[1:10...] +# Tests for channels bound to tasks. +for N in [0,10] + # Normal exit of task + c=Channel(N) + bind(c, @schedule (yield();nothing)) + @test_throws InvalidStateException take!(c) + @test !isopen(c) + + # Error exception in task + c=Channel(N) + bind(c, @schedule (yield();error("foo"))) + @test_throws ErrorException take!(c) + @test !isopen(c) + + # Multiple channels closed by the same bound task + cs = [Channel(N) for i in 1:5] + tf2 = () -> begin + if N > 0 + foreach(c->assert(take!(c)==2), cs) + end + yield() + error("foo") + end + task = Task(tf2) + foreach(c->bind(c, task), cs) + schedule(task) + + if N > 0 + for i in 1:5 + @test put!(cs[i], 2) == 2 + end + end + for i in 1:5 + while (isopen(cs[i])); yield(); end + @test_throws ErrorException wait(cs[i]) + @test_throws ErrorException take!(cs[i]) + @test_throws ErrorException put!(cs[i], 1) + @test_throws ErrorException fetch(cs[i]) + end + + # Multiple tasks, first one to terminate, closes the channel + nth = rand(1:5) + ref = Ref(0) + cond = Condition() + tf3(i) = begin + if i == nth + ref.x = i + else + sleep(2.0) + end + end + + tasks = [Task(()->tf3(i)) for i in 1:5] + c = Channel(N) + foreach(t->bind(c,t), tasks) + foreach(t->schedule(t), tasks) + @test_throws InvalidStateException wait(c) + @test !isopen(c) + @test ref.x == nth + + # channeled_tasks + for T in [Any, Int] + chnls, tasks = Base.channeled_tasks(2, (c1,c2)->(assert(take!(c1)==1); put!(c2,2)); ctypes=[T,T], csizes=[N,N]) + put!(chnls[1], 1) + @test take!(chnls[2]) == 2 + @test_throws InvalidStateException wait(chnls[1]) + @test_throws InvalidStateException wait(chnls[2]) + @test istaskdone(tasks[1]) + @test !isopen(chnls[1]) + @test !isopen(chnls[2]) + + f=Future() + tf4 = (c1,c2) -> begin + assert(take!(c1)==1) + wait(f) + end + + tf5 = (c1,c2) -> begin + put!(c2,2) + wait(f) + end + + chnls, tasks = Base.channeled_tasks(2, tf4, tf5; ctypes=[T,T], csizes=[N,N]) + put!(chnls[1], 1) + @test take!(chnls[2]) == 2 + yield() + put!(f, 1) + + @test_throws InvalidStateException wait(chnls[1]) + @test_throws InvalidStateException wait(chnls[2]) + @test istaskdone(tasks[1]) + @test istaskdone(tasks[2]) + @test !isopen(chnls[1]) + @test !isopen(chnls[2]) + end + + # channel + tf6 = c -> begin + assert(take!(c)==2) + error("foo") + end + + for T in [Any, Int] + taskref = Ref{Task}() + chnl = Channel(tf6, ctype=T, csize=N, taskref=taskref) + put!(chnl, 2) + yield() + @test_throws ErrorException wait(chnl) + @test istaskdone(taskref[]) + @test !isopen(chnl) + @test_throws ErrorException take!(chnl) + end +end + # Testing timedwait on multiple channels @sync begin diff --git a/test/copy.jl b/test/copy.jl index 59adeebf2ec10..40e3fc022ac86 100644 --- a/test/copy.jl +++ b/test/copy.jl @@ -5,12 +5,12 @@ mainres = ([4, 5, 3], bitres = ([true, true, false], [false, true, false]) -tskprod(x) = @task for i in x; produce(i); end +chnlprod(x) = Channel(c->for i in x; put!(c,i); end) for (dest, src, bigsrc, emptysrc, res) in [ ([1, 2, 3], () -> [4, 5], () -> [1, 2, 3, 4, 5], () -> Int[], mainres), ([1, 2, 3], () -> 4:5, () -> 1:5, () -> 1:0, mainres), - ([1, 2, 3], () -> tskprod(4:5), () -> tskprod(1:5), () -> tskprod(1:0), mainres), + ([1, 2, 3], () -> chnlprod(4:5), () -> chnlprod(1:5), () -> chnlprod(1:0), mainres), (falses(3), () -> trues(2), () -> trues(5), () -> trues(0), bitres)] @test copy!(copy(dest), src()) == res[1] diff --git a/test/file.jl b/test/file.jl index f8a7620e5ad51..bf1af867fcb6b 100644 --- a/test/file.jl +++ b/test/file.jl @@ -957,29 +957,29 @@ cd(dirwalk) do follow_symlink_vec = has_symlinks ? [true, false] : [false] has_symlinks && symlink(abspath("sub_dir2"), joinpath("sub_dir1", "link")) for follow_symlinks in follow_symlink_vec - task = walkdir(".", follow_symlinks=follow_symlinks) - root, dirs, files = consume(task) + chnl = walkdir(".", follow_symlinks=follow_symlinks) + root, dirs, files = take!(chnl) @test root == "." @test dirs == ["sub_dir1", "sub_dir2"] @test files == ["file1", "file2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) @test root == joinpath(".", "sub_dir1") @test dirs == (has_symlinks ? ["link", "subsub_dir1", "subsub_dir2"] : ["subsub_dir1", "subsub_dir2"]) @test files == ["file1", "file2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) if follow_symlinks @test root == joinpath(".", "sub_dir1", "link") @test dirs == [] @test files == ["file_dir2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) end for i=1:2 @test root == joinpath(".", "sub_dir1", "subsub_dir$i") @test dirs == [] @test files == [] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) end @test root == joinpath(".", "sub_dir2") @@ -988,51 +988,51 @@ cd(dirwalk) do end for follow_symlinks in follow_symlink_vec - task = walkdir(".", follow_symlinks=follow_symlinks, topdown=false) - root, dirs, files = consume(task) + chnl = walkdir(".", follow_symlinks=follow_symlinks, topdown=false) + root, dirs, files = take!(chnl) if follow_symlinks @test root == joinpath(".", "sub_dir1", "link") @test dirs == [] @test files == ["file_dir2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) end for i=1:2 @test root == joinpath(".", "sub_dir1", "subsub_dir$i") @test dirs == [] @test files == [] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) end @test root == joinpath(".", "sub_dir1") @test dirs == (has_symlinks ? ["link", "subsub_dir1", "subsub_dir2"] : ["subsub_dir1", "subsub_dir2"]) @test files == ["file1", "file2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) @test root == joinpath(".", "sub_dir2") @test dirs == [] @test files == ["file_dir2"] - root, dirs, files = consume(task) + root, dirs, files = take!(chnl) @test root == "." @test dirs == ["sub_dir1", "sub_dir2"] @test files == ["file1", "file2"] end #test of error handling - task_error = walkdir(".") - task_noerror = walkdir(".", onerror=x->x) - root, dirs, files = consume(task_error) + chnl_error = walkdir(".") + chnl_noerror = walkdir(".", onerror=x->x) + root, dirs, files = take!(chnl_error) @test root == "." @test dirs == ["sub_dir1", "sub_dir2"] @test files == ["file1", "file2"] rm(joinpath("sub_dir1"), recursive=true) - @test_throws SystemError consume(task_error) # throws an error because sub_dir1 do not exist + @test_throws SystemError take!(chnl_error) # throws an error because sub_dir1 do not exist - root, dirs, files = consume(task_noerror) + root, dirs, files = take!(chnl_noerror) @test root == "." @test dirs == ["sub_dir1", "sub_dir2"] @test files == ["file1", "file2"] - root, dirs, files = consume(task_noerror) # skips sub_dir1 as it no longer exist + root, dirs, files = take!(chnl_noerror) # skips sub_dir1 as it no longer exist @test root == joinpath(".", "sub_dir2") @test dirs == [] @test files == ["file_dir2"] diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index cc849897d8a57..7657ce89d4fdc 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -998,16 +998,6 @@ if is_unix() # aka have ssh end # unix-only end # full-test -# issue #7727 -let A = [], B = [] - t = @task produce(11) - @sync begin - @async for x in t; push!(A,x); end - @async for x in t; push!(B,x); end - end - @test (A == [11]) != (B == [11]) -end - let t = @task 42 schedule(t, ErrorException(""), error=true) @test_throws ErrorException wait(t) diff --git a/test/spawn.jl b/test/spawn.jl index d788ae7bf9547..a707ea47b37fa 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -217,14 +217,6 @@ end # issue #5904 @test run(pipeline(ignorestatus(falsecmd), truecmd)) === nothing - -# issue #6010 -# TODO: should create separate set of task tests -ducer = @async for i=1:100; produce(i); end -yield() -@test consume(ducer) == 1 -@test consume(ducer) == 2 - @testset "redirect_*" begin let OLD_STDOUT = STDOUT, fname = tempname(), diff --git a/test/worlds.jl b/test/worlds.jl index cd28b0039f429..ec31c778a13b5 100644 --- a/test/worlds.jl +++ b/test/worlds.jl @@ -94,47 +94,51 @@ B265(x::Any, dummy::Void) = B265{UInt8}(x, dummy) # test oldworld call / inference +function wfunc(c1,c2) + while true + (f, args) = take!(c1) + put!(c2, f(args...)) + end +end +function put_n_take!(v...) + put!(chnls[1], v) + take!(chnls[2]) +end + g265() = [f265(x) for x in 1:3.] wc265 = world_counter() f265(::Any) = 1.0 @test wc265 + 1 == world_counter() -t265 = @async begin - local ret = nothing - while true - (f, args) = produce(ret) - ret = f(args...) - end -end -@test consume(t265) === nothing +chnls, tasks = Base.channeled_tasks(2, wfunc) +t265 = tasks[1] + wc265 = world_counter() -@test consume(t265, world_counter, ()) == wc265 -@test consume(t265, tls_world_age, ()) == wc265 +@test put_n_take!(world_counter, ()) == wc265 +@test put_n_take!(tls_world_age, ()) == wc265 f265(::Int) = 1 -@test consume(t265, world_counter, ()) == wc265 + 1 == world_counter() == tls_world_age() -@test consume(t265, tls_world_age, ()) == wc265 +@test put_n_take!(world_counter, ()) == wc265 + 1 == world_counter() == tls_world_age() +@test put_n_take!(tls_world_age, ()) == wc265 @test g265() == Int[1, 1, 1] @test Core.Inference.return_type(f265, (Any,)) == Union{Float64, Int} @test Core.Inference.return_type(f265, (Int,)) == Int @test Core.Inference.return_type(f265, (Float64,)) == Float64 -@test consume(t265, g265, ()) == Float64[1.0, 1.0, 1.0] -@test consume(t265, Core.Inference.return_type, (f265, (Any,))) == Float64 -@test consume(t265, Core.Inference.return_type, (f265, (Int,))) == Float64 -@test consume(t265, Core.Inference.return_type, (f265, (Float64,))) == Float64 -@test consume(t265, Core.Inference.return_type, (f265, (Float64,))) == Float64 - +@test put_n_take!(g265, ()) == Float64[1.0, 1.0, 1.0] +@test put_n_take!(Core.Inference.return_type, (f265, (Any,))) == Float64 +@test put_n_take!(Core.Inference.return_type, (f265, (Int,))) == Float64 +@test put_n_take!(Core.Inference.return_type, (f265, (Float64,))) == Float64 +@test put_n_take!(Core.Inference.return_type, (f265, (Float64,))) == Float64 # test that reflection ignores worlds @test Base.return_types(f265, (Any,)) == Any[Int, Float64] -@test consume(t265, Base.return_types, (f265, (Any,))) == Any[Int, Float64] - +@test put_n_take!(Base.return_types, (f265, (Any,))) == Any[Int, Float64] # test for method errors h265() = true loc_h265 = "$(Base.source_path()):$(@__LINE__ - 1)" @test h265() -@test_throws MethodError consume(t265, h265, ()) +@test_throws MethodError put_n_take!(h265, ()) @test_throws MethodError wait(t265) @test istaskdone(t265) let ex = t265.exception