Skip to content

Commit

Permalink
minor cleanup of pmap related files
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Apr 14, 2016
1 parent cf52a34 commit b759257
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 16 deletions.
2 changes: 0 additions & 2 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ function done(itr::AsyncCollector, state::AsyncCollectorState)
end

function next(itr::AsyncCollector, state::AsyncCollectorState)

# Wait if the maximum number of concurrent tasks are already running...
while isbusy(itr, state)
wait(state)
Expand Down Expand Up @@ -129,7 +128,6 @@ function pump_source(itr::AsyncGenerator, state::AsyncGeneratorState)
end

function next(itr::AsyncGenerator, state::AsyncGeneratorState)

state.i += 1

results = itr.collector.results
Expand Down
8 changes: 7 additions & 1 deletion base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1008,8 +1008,12 @@ export call
@deprecate istext istextmime

#15409
function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)
# Deprecated definition of pmap with keyword arguments.
# When this is removed the following definition needs to be uncommented
# and added to pmap.jl
# pmap(f, c...) = pmap(default_worker_pool(), f, c...)

function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)
if err_retry != nothing
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap)
if err_retry == true
Expand All @@ -1033,3 +1037,5 @@ function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)

return pmap(p, f, c...)
end


1 change: 0 additions & 1 deletion base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ retry(read, UVError)(io)
"""
function retry(f::Function, condition::Function=e->true;
n::Int=3, max_delay::Int=10)

(args...) -> begin
delay = 0.05
for i = 1:n
Expand Down
14 changes: 6 additions & 8 deletions base/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ for details.
"""
pmap(p::WorkerPool, f, c...) = collect(pgenerate(p, f, c...))

# TODO: deprecated.jl defines pmap(f, c...; kw...) to support old kw args.
# When that is retierd it should be replaced by somthing like this:
#
# pmap(f, c...) = pmap(default_worker_pool(), f, c...)



"""
batchsplit(c; min_batch_count=1, max_batch_size=100) -> iterator
Expand All @@ -64,9 +58,13 @@ Split a collection into at least `min_batch_count` batches.
Equivalent to `partition(c, max_batch_size)` when `length(c) >> max_batch_size`.
"""
function batchsplit(c; min_batch_count=1, max_batch_size=100)
if min_batch_count < 1
throw(ArgumentError("min_batch_count must be > 0, got $min_batch_count"))
end

@assert min_batch_count > 0
@assert max_batch_size > 1
if max_batch_size < 1
throw(ArgumentError("max_batch_size must be > 0, got $max_batch_size"))
end

# Split collection into batches, then peek at the first few batches...
batches = partition(c, max_batch_size)
Expand Down
1 change: 0 additions & 1 deletion base/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ end
Create a WorkerPool from a vector of worker ids.
"""
function WorkerPool(workers::Vector{Int})

pool = WorkerPool()

# Add workers to the pool...
Expand Down
6 changes: 3 additions & 3 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ different sizes::
If one process handles both 800x800 matrices and another handles both
600x600 matrices, we will not get as much scalability as we could. The
solution is to make a local task to "feed" work to each process when
it completes its current task. This can be seen in the implementation of
:func:`pmap`::
it completes its current task. For example, consider a simple :func:`pmap`
implementation::

function pmap(f, lst)
np = nprocs() # determine the number of processes available
Expand All @@ -428,7 +428,7 @@ it completes its current task. This can be seen in the implementation of
if idx > n
break
end
results[idx] = remotecall_fetch(p, f, lst[idx])
results[idx] = remotecall_fetch(f, p, lst[idx])
end
end
end
Expand Down

0 comments on commit b759257

Please sign in to comment.