diff --git a/base/asyncmap.jl b/base/asyncmap.jl index 9ed6032a3d002..5894c5145907a 100644 --- a/base/asyncmap.jl +++ b/base/asyncmap.jl @@ -62,7 +62,6 @@ function done(itr::AsyncCollector, state::AsyncCollectorState) end function next(itr::AsyncCollector, state::AsyncCollectorState) - # Wait if the maximum number of concurrent tasks are already running... while isbusy(itr, state) wait(state) @@ -129,7 +128,6 @@ function pump_source(itr::AsyncGenerator, state::AsyncGeneratorState) end function next(itr::AsyncGenerator, state::AsyncGeneratorState) - state.i += 1 results = itr.collector.results diff --git a/base/deprecated.jl b/base/deprecated.jl index 4ee214c20e93a..35fa37fdbd46a 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1008,8 +1008,12 @@ export call @deprecate istext istextmime #15409 -function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) +# Deprecated definition of pmap with keyword arguments. +# When this is removed the following definition needs to be uncommented +# and added to pmap.jl +# pmap(f, c...) = pmap(default_worker_pool(), f, c...) +function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) if err_retry != nothing depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap) if err_retry == true @@ -1033,3 +1037,5 @@ function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) return pmap(p, f, c...) end + + diff --git a/base/error.jl b/base/error.jl index 7c737a65f0fb4..d3de2f5efc13e 100644 --- a/base/error.jl +++ b/base/error.jl @@ -67,7 +67,6 @@ retry(read, UVError)(io) """ function retry(f::Function, condition::Function=e->true; n::Int=3, max_delay::Int=10) - (args...) -> begin delay = 0.05 for i = 1:n diff --git a/base/pmap.jl b/base/pmap.jl index e7c891948305b..acf48fa7826a5 100644 --- a/base/pmap.jl +++ b/base/pmap.jl @@ -49,12 +49,6 @@ for details. """ pmap(p::WorkerPool, f, c...) = collect(pgenerate(p, f, c...)) -# TODO: deprecated.jl defines pmap(f, c...; kw...) to support old kw args. -# When that is retierd it should be replaced by somthing like this: -# -# pmap(f, c...) = pmap(default_worker_pool(), f, c...) - - """ batchsplit(c; min_batch_count=1, max_batch_size=100) -> iterator @@ -64,9 +58,13 @@ Split a collection into at least `min_batch_count` batches. Equivalent to `partition(c, max_batch_size)` when `length(c) >> max_batch_size`. """ function batchsplit(c; min_batch_count=1, max_batch_size=100) + if min_batch_count < 1 + throw(ArgumentError("min_batch_count must be > 0, got $min_batch_count")) + end - @assert min_batch_count > 0 - @assert max_batch_size > 1 + if max_batch_size < 1 + throw(ArgumentError("max_batch_size must be > 0, got $max_batch_size")) + end # Split collection into batches, then peek at the first few batches... batches = partition(c, max_batch_size) diff --git a/base/workerpool.jl b/base/workerpool.jl index 37c35f9aa8858..fff96a0f9962e 100644 --- a/base/workerpool.jl +++ b/base/workerpool.jl @@ -15,7 +15,6 @@ end Create a WorkerPool from a vector of worker ids. """ function WorkerPool(workers::Vector{Int}) - pool = WorkerPool() # Add workers to the pool... diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index c8b27ebdb1b92..5885c278f348d 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -408,8 +408,8 @@ different sizes:: If one process handles both 800x800 matrices and another handles both 600x600 matrices, we will not get as much scalability as we could. The solution is to make a local task to "feed" work to each process when -it completes its current task. This can be seen in the implementation of -:func:`pmap`:: +it completes its current task. For example, consider a simple :func:`pmap` +implementation:: function pmap(f, lst) np = nprocs() # determine the number of processes available @@ -428,7 +428,7 @@ it completes its current task. This can be seen in the implementation of if idx > n break end - results[idx] = remotecall_fetch(p, f, lst[idx]) + results[idx] = remotecall_fetch(f, p, lst[idx]) end end end