Skip to content

Commit

Permalink
Channel constructor requires an explict size.
Browse files Browse the repository at this point in the history
Move channel tests into its own file.
Implement 0-sized channels.
  • Loading branch information
amitmurthy committed Oct 20, 2016
1 parent 0faf8ce commit e23f4e2
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 112 deletions.
102 changes: 88 additions & 14 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,49 @@

abstract AbstractChannel

const DEF_CHANNEL_SZ=32

type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
sz_max::Int # maximum size of channel
sz_max::Int # maximum size of channel

# Used when sz_max == 0, i.e., an unbuffered channel.
takers::Array{Condition}

function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
new(Condition(), Condition(), :open, Array{T}(0), sz_max)
function Channel(sz::Float64)
if sz == Inf
Channel{T}(typemax(Int))
else
Channel{T}(convert(Int, sz))
end
end
function Channel(sz::Integer)
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
end

# deprecated empty constructor
function Channel()
depwarn(string("The empty constructor Channel() is deprecated. ",
"The channel size needs to be specified explictly. ",
"Defaulting to Channel{$T}(32)."), :Channel)
Channel(32)
end
end

Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz)
Channel(sz) = Channel{Any}(sz)

# deprecated empty constructor
Channel() = Channel{Any}()

closed_exception() = InvalidStateException("Channel is closed.", :closed)

isbuffered(c::Channel) = c.sz_max==0 ? false : true

"""
close(c::Channel)
Expand All @@ -46,9 +69,16 @@ end
put!(c::Channel, v)
Appends an item `v` to the channel `c`. Blocks if the channel is full.
For unbuffered channels, blocks until a `take!` is performed by a different
task.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
end

function put_buffered(c::Channel, v)
while length(c.data) == c.sz_max
wait(c.cond_put)
end
Expand All @@ -57,19 +87,42 @@ function put!(c::Channel, v)
v
end

function put_unbuffered(c::Channel, v)
while length(c.takers) == 0
notify(c.cond_take, nothing, true, false) # Required to handle wait() on 0-sized channels
wait(c.cond_put)
end
cond_taker = shift!(c.takers)
notify(cond_taker, v, false, false)
v
end

push!(c::Channel, v) = put!(c, v)

function fetch(c::Channel)
"""
fetch(c::Channel)
Waits for and gets the first available item from the channel. Does not
remove the item. `fetch` is unsupported on an unbuffered (0-size) channel.
"""
fetch(c::Channel) = isbuffered(c) ? fetch_buffered(c) : fetch_unbuffered(c)
function fetch_buffered(c::Channel)
wait(c)
c.data[1]
end
fetch_unbuffered(c::Channel) = throw(ErrorException("`fetch` is not supported on an unbuffered Channel."))


"""
take!(c::Channel)
Removes and returns a value from a `Channel`. Blocks till data is available.
Removes and returns a value from a `Channel`. Blocks until data is available.
For unbuffered channels, blocks until a `put!` is performed by a different
task.
"""
function take!(c::Channel)
take!(c::Channel) = isbuffered(c) ? take_buffered(c) : take_unbuffered(c)
function take_buffered(c::Channel)
wait(c)
v = shift!(c.data)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
Expand All @@ -78,13 +131,35 @@ end

shift!(c::Channel) = take!(c)

# 0-size channel
function take_unbuffered(c::Channel)
!isopen(c) && throw(closed_exception())
cond_taker = Condition()
push!(c.takers, cond_taker)
notify(c.cond_put, nothing, false, false)
try
return wait(cond_taker)
catch e
if isa(e, InterruptException)
# remove self from the list of takers
filter!(x -> x != cond_taker, c.takers)
else
rethrow(e)
end
end
end

"""
isready(c::Channel)
Determine whether a `Channel` has a value stored to it.
`isready` on `Channel`s is non-blocking.
Determine whether a `Channel` has a value stored to it. Returns
immediately, does not block.
For unbuffered channels returns `true` if there are tasks waiting
on a `put!`.
"""
isready(c::Channel) = n_avail(c) > 0
n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
Expand All @@ -97,12 +172,11 @@ end
function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
foreach(x->notify_error(x, err), c.takers)
end

eltype{T}(::Type{Channel{T}}) = T

n_avail(c::Channel) = length(c.data)

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}()
Expand Down
3 changes: 3 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1033,4 +1033,7 @@ end))
@deprecate_binding cycle Iterators.cycle
@deprecate_binding repeated Iterators.repeated

# NOTE: Deprecation of Channel{T}() is implemented in channels.jl.
# To be removed from there when 0.6 deprecations are removed.

# End deprecations scheduled for 0.6
11 changes: 7 additions & 4 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1426,13 +1426,16 @@ endof
"""
Channel{T}(sz::Int)
Constructs a `Channel` that can hold a maximum of `sz` objects of type `T`. `put!` calls on
a full channel block till an object is removed with `take!`.
Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
of type `T`. `put!` calls on a full channel block until an object is removed with `take!`.
`Channel(0)` constructs an unbuffered channel. `put!` blocks until a matching `take!` is called.
And vice-versa.
Other constructors:
- `Channel()` - equivalent to `Channel{Any}(32)`
- `Channel(sz::Int)` equivalent to `Channel{Any}(sz)`
- `Channel(Inf)` - equivalent to `Channel{Any}(typemax(Int))`
- `Channel(sz)` equivalent to `Channel{Any}(sz)`
"""
Channel

Expand Down
1 change: 1 addition & 0 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ end

notify_error(c::Condition, err) = notify(c, err, true, true)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
Expand Down
1 change: 0 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,6 @@ Waits and fetches a value from `x` depending on the type of `x`. Does not remove
is an exception, throws a `RemoteException` which captures the remote exception and backtrace.
* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are
same as for a `Future` .
* `Channel` : Wait for and get the first available item from the channel.
"""
fetch(x::ANY) = x

Expand Down
75 changes: 44 additions & 31 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,53 @@ Tasks

.. Docstring generated from Julia source
Constructs a ``Channel`` that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ .
Constructs a ``Channel`` with an internal buffer that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block until an object is removed with ``take!``\ .

``Channel(0)`` constructs an unbuffered channel. ``put!`` blocks until a matching ``take!`` is called. And vice-versa.

Other constructors:

* ``Channel()`` - equivalent to ``Channel{Any}(32)``
* ``Channel(sz::Int)`` equivalent to ``Channel{Any}(sz)``
* ``Channel(Inf)`` - equivalent to ``Channel{Any}(typemax(Int))``
* ``Channel(sz)`` equivalent to ``Channel{Any}(sz)``

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source
Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

For unbuffered channels, blocks until a ``take!`` is performed by a different task.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source
Removes and returns a value from a ``Channel``\ . Blocks until data is available.

For unbuffered channels, blocks until a ``put!`` is performed by a different task.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source
Determine whether a ``Channel`` has a value stored to it. Returns immediately, does not block.

For unbuffered channels returns ``true`` if there are tasks waiting on a ``put!``\ .

.. function:: fetch(c::Channel)

.. Docstring generated from Julia source
Waits for and gets the first available item from the channel. Does not remove the item. ``fetch`` is unsupported on an unbuffered (0-size) channel.

.. function:: close(c::Channel)

.. Docstring generated from Julia source
Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

General Parallel Computing Support
----------------------------------
Expand Down Expand Up @@ -336,7 +377,6 @@ General Parallel Computing Support

* ``Future``\ : Wait for and get the value of a Future. The fetched value is cached locally. Further calls to ``fetch`` on the same reference return the cached value. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace.
* ``RemoteChannel``\ : Wait for and get the value of a remote reference. Exceptions raised are same as for a ``Future`` .
* ``Channel`` : Wait for and get the first available item from the channel.

.. function:: remotecall_wait(f, id::Integer, args...; kwargs...)

Expand All @@ -362,30 +402,12 @@ General Parallel Computing Support
Store a value to a ``Future`` ``rr``\ . ``Future``\ s are write-once remote references. A ``put!`` on an already set ``Future`` throws an ``Exception``\ . All asynchronous remote calls return ``Future``\ s and set the value to the return value of the call upon completion.

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source
Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

.. function:: take!(rr::RemoteChannel, args...)

.. Docstring generated from Julia source
Fetch value(s) from a remote channel, removing the value(s) in the processs.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source
Removes and returns a value from a ``Channel``\ . Blocks till data is available.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source
Determine whether a ``Channel`` has a value stored to it. ``isready`` on ``Channel``\ s is non-blocking.

.. function:: isready(rr::RemoteChannel, args...)

.. Docstring generated from Julia source
Expand All @@ -406,15 +428,6 @@ General Parallel Computing Support
@async put!(c, remotecall_fetch(long_computation, p))
isready(c) # will not block
.. function:: close(c::Channel)

.. Docstring generated from Julia source
Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

.. function:: WorkerPool(workers)

.. Docstring generated from Julia source
Expand Down
Loading

0 comments on commit e23f4e2

Please sign in to comment.