From 7cdfd1a33bf539d204bd07d6979257f3babdab3a Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Wed, 20 Apr 2016 15:40:10 +0530 Subject: [PATCH] configure pmap via keyword args --- base/deprecated.jl | 20 ++--- base/error.jl | 52 ++++-------- base/exports.jl | 1 - base/pmap.jl | 168 +++++++++++++++++++++++++++++++++++---- base/task.jl | 1 + doc/stdlib/base.rst | 25 +----- doc/stdlib/parallel.rst | 20 ++++- doc/stdlib/strings.rst | 2 +- test/error.jl | 32 +++----- test/parallel_exec.jl | 170 ++++++++++++++++++++++++++-------------- 10 files changed, 323 insertions(+), 168 deletions(-) diff --git a/base/deprecated.jl b/base/deprecated.jl index 348cce6cdfd27..2bfa76f390150 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1011,7 +1011,9 @@ export call # and added to pmap.jl # pmap(f, c...) = pmap(default_worker_pool(), f, c...) -function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) +function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing, kwargs...) + kwargs = Dict{Symbol, Any}(kwargs) + if err_retry != nothing depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap) if err_retry == true @@ -1019,13 +1021,6 @@ function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) end end - if err_stop != nothing - depwarn("err_stop is deprecated, use pmap(@catch(f), c...).", :pmap) - if err_stop == false - f = @catch(f) - end - end - if pids == nothing p = default_worker_pool() else @@ -1033,7 +1028,14 @@ function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) p = WorkerPool(pids) end - return pmap(p, f, c...) + if err_stop != nothing + depwarn("err_stop is deprecated, use pmap(f, c...; on_error = error_handling_func).", :pmap) + if err_stop == false + kwargs[:on_error] = e->e + end + end + + pmap(p, f, c...; kwargs...) end # 15692 diff --git a/base/error.jl b/base/error.jl index b237db748ee76..7e2578719dbbc 100644 --- a/base/error.jl +++ b/base/error.jl @@ -50,14 +50,21 @@ macro assert(ex, msgs...) :($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg))) end +const DEFAULT_RETRY_N = 1 +const DEFAULT_RETRY_ON = e->true +const DEFAULT_RETRY_MAX_DELAY = 10.0 """ - retry(f, [condition]; n=3; max_delay=10) -> Function + retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function Returns a lambda that retries function `f` up to `n` times in the -event of an exception. If `condition` is a `Type` then retry only -for exceptions of that type. If `condition` is a function -`cond(::Exception) -> Bool` then retry only if it is true. +event of an exception. If `retry_on` is a `Type` then retry only +for exceptions of that type. If `retry_on` is a function +`test_error(::Exception) -> Bool` then retry only if it is true. + +The first retry happens after a gap of 50 milliseconds or `max_delay`, +whichever is lower. Subsequently, the delays between retries are +exponentially increased with a random factor upto `max_delay`. **Examples** ```julia @@ -65,46 +72,21 @@ retry(http_get, e -> e.status == "503")(url) retry(read, UVError)(io) ``` """ -function retry(f::Function, condition::Function=e->true; - n::Int=3, max_delay::Int=10) +function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) (args...) -> begin - delay = 0.05 - for i = 1:n + delay = min(0.05, max_delay) + for i = 1:n+1 try return f(args...) catch e - if i == n || try condition(e) end != true + if i > n || try retry_on(e) end != true rethrow(e) end end - sleep(delay * (0.8 + (rand() * 0.4))) - delay = min(max_delay, delay * 5) + sleep(delay) + delay = min(max_delay, delay * (0.8 + (rand() * 0.4)) * 5) end end end retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...) - - -""" - @catch(f) -> Function - -Returns a lambda that executes `f` and returns either the result of `f` or -an `Exception` thrown by `f`. - -**Examples** -```julia -julia> r = @catch(length)([1,2,3]) -3 - -julia> r = @catch(length)() -MethodError(length,()) - -julia> typeof(r) -MethodError -``` -""" -catchf(f) = (args...) -> try f(args...) catch ex; ex end -macro catch(f) - esc(:(Base.catchf($f))) -end diff --git a/base/exports.jl b/base/exports.jl index b1cba3ce80895..d91c00e367817 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1036,7 +1036,6 @@ export # errors assert, backtrace, - @catch, catch_backtrace, error, rethrow, diff --git a/base/pmap.jl b/base/pmap.jl index 489379d40a0f2..a5e485adbdb46 100644 --- a/base/pmap.jl +++ b/base/pmap.jl @@ -1,8 +1,12 @@ # This file is a part of Julia. License is MIT: http://julialang.org/license +type BatchProcessingError <: Exception + data + ex +end """ - pgenerate([::WorkerPool], f, c...) -> iterator + pgenerate([::WorkerPool], f, c...) -> (iterator, process_batch_errors) Apply `f` to each element of `c` in parallel using available workers and tasks. @@ -15,38 +19,170 @@ Note that `f` must be made available to all worker processes; see and Loading Packages `) for details. """ -function pgenerate(p::WorkerPool, f, c) - if length(p) == 0 - return AsyncGenerator(f, c) +function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing, + retry_n=0, + retry_max_delay=DEFAULT_RETRY_MAX_DELAY, + retry_on=DEFAULT_RETRY_ON) + # Don't do remote calls if there are no workers. + if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid()) + distributed = false + end + + # Don't do batching if not doing remote calls. + if !distributed + batch_size = 1 + end + + # If not batching, do simple remote call. + if batch_size == 1 + if distributed + f = remote(p, f) + end + + if retry_n > 0 + f = wrap_retry(f, retry_on, retry_n, retry_max_delay) + end + if on_error != nothing + f = wrap_on_error(f, on_error) + end + return (AsyncGenerator(f, c), nothing) + else + batches = batchsplit(c, min_batch_count = length(p) * 3, + max_batch_size = batch_size) + + # During batch processing, We need to ensure that if on_error is set, it is called + # for each element in error, and that we return as many elements as the original list. + # retry, if set, has to be called element wise and we will do a best-effort + # to ensure that we do not call mapped function on the same element more than retry_n. + # This guarantee is not possible in case of worker death / network errors, wherein + # we will retry the entire batch on a new worker. + f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) + f = wrap_batch(f, p, on_error) + return (flatten(AsyncGenerator(f, batches)), + (p, f, results)->process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay)) end - batches = batchsplit(c, min_batch_count = length(p) * 3) - return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches)) end -pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...)) +pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...) + +pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c...; kwargs...) +pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...) + +function wrap_on_error(f, on_error; capture_data=false) + return x -> begin + try + f(x) + catch e + if capture_data + on_error(x, e) + else + on_error(e) + end + end + end +end -pgenerate(f, c) = pgenerate(default_worker_pool(), f, c...) -pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...)) +wrap_retry(f, retry_on, n, max_delay) = retry(f, retry_on; n=n, max_delay=max_delay) + +function wrap_batch(f, p, on_error) + f = asyncmap_batch(f) + return batch -> begin + try + remotecall_fetch(f, p, batch) + catch e + if on_error != nothing + return Any[BatchProcessingError(batch[i], e) for i in 1:length(batch)] + else + rethrow(e) + end + end + end +end +asyncmap_batch(f) = batch -> asyncmap(f, batch) """ - pmap([::WorkerPool], f, c...) -> collection + pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection Transform collection `c` by applying `f` to each element using available workers and tasks. For multiple collection arguments, apply f elementwise. -Note that `err_retry=true` and `err_stop=false` are deprecated, -use `pmap(retry(f), c)` or `pmap(@catch(f), c)` instead -(or to retry on a different worker, use `asyncmap(retry(remote(f)), c)`). - Note that `f` must be made available to all worker processes; see [Code Availability and Loading Packages](:ref:`Code Availability and Loading Packages `) for details. + +If a worker pool is not specified, all available workers, i.e., the default worker pool +is used. + +By default, `pmap` distributes the computation over all specified workers. To use only the +local process and distribute over tasks, specifiy `distributed=false`. This is equivalent to `asyncmap`. + +`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes +greater than 1, the collection is split into multiple batches, which are distributed across +workers. Each such batch is processed in parallel via tasks in each worker. The specified +`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated +depending on the number of workers available and length of the collection. + +Any error stops pmap from processing the remainder of the collection. To override this behavior +you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., +the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value +which is then returned inline with the results to the caller. + +Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through +to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails, +all items in the batch are retried. + +The following are equivalent: + +* `pmap(f, c; distributed=false)` and `asyncmap(f,c)` +* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)` +* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` """ -pmap(p::WorkerPool, f, c...) = collect(pgenerate(p, f, c...)) +function pmap(p::WorkerPool, f, c...; kwargs...) + results_iter, process_errors! = pgenerate(p, f, c...; kwargs...) + results = collect(results_iter) + if isa(process_errors!, Function) + process_errors!(p, f, results) + end + results +end + +function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay) + # Handle all the ones in error in another pmap, with batch size set to 1 + if (on_error != nothing) || (retry_n > 0) + reprocess = [] + for (idx, v) in enumerate(results) + if isa(v, BatchProcessingError) + push!(reprocess, (idx,v)) + end + end + + if length(reprocess) > 0 + errors = [x[2] for x in reprocess] + exceptions = [x.ex for x in errors] + if (retry_n > 0) && all([retry_on(ex) for ex in exceptions]) + retry_n = retry_n - 1 + error_processed = pmap(p, f, [x.data for x in errors]; + on_error=on_error, + retry_on=retry_on, + retry_n=retry_n, + retry_max_delay=retry_max_delay) + elseif on_error != nothing + error_processed = map(on_error, exceptions) + else + throw(CompositeException(exceptions)) + end + + for (idx, v) in enumerate(error_processed) + results[reprocess[idx][1]] = v + end + end + end + nothing +end """ @@ -72,7 +208,7 @@ function batchsplit(c; min_batch_count=1, max_batch_size=100) # If there are not enough batches, use a smaller batch size if length(head) < min_batch_count batch_size = max(1, div(sum(length, head), min_batch_count)) - return partition(flatten(head), batch_size) + return partition(collect(flatten(head)), batch_size) end return flatten((head, tail)) diff --git a/base/task.jl b/base/task.jl index ab27e349e60b5..d622af10ea9c5 100644 --- a/base/task.jl +++ b/base/task.jl @@ -25,6 +25,7 @@ showerror(io::IO, ce::CapturedException) = showerror(io, ce.ex, ce.processed_bt, type CompositeException <: Exception exceptions::Vector{Any} CompositeException() = new(Any[]) + CompositeException(exceptions) = new(exceptions) end length(c::CompositeException) = length(c.exceptions) push!(c::CompositeException, ex) = push!(c.exceptions, ex) diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 931386ee44eb0..2f566b589964a 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -1220,11 +1220,13 @@ Errors An error occurred when running a module's ``__init__`` function. The actual error thrown is available in the ``.error`` field. -.. function:: retry(f, [condition]; n=3; max_delay=10) -> Function +.. function:: retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function .. Docstring generated from Julia source - Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``condition`` is a ``Type`` then retry only for exceptions of that type. If ``condition`` is a function ``cond(::Exception) -> Bool`` then retry only if it is true. + Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``retry_on`` is a ``Type`` then retry only for exceptions of that type. If ``retry_on`` is a function ``test_error(::Exception) -> Bool`` then retry only if it is true. + + The first retry happens after a gap of 50 milliseconds or ``max_delay``\ , whichever is lower. Subsequently, the delays between retries are exponentially increased with a random factor upto ``max_delay``\ . **Examples** @@ -1233,25 +1235,6 @@ Errors retry(http_get, e -> e.status == "503")(url) retry(read, UVError)(io) -.. function:: @catch(f) -> Function - - .. Docstring generated from Julia source - - Returns a lambda that executes ``f`` and returns either the result of ``f`` or an ``Exception`` thrown by ``f``\ . - - **Examples** - - .. code-block:: julia - - julia> r = @catch(length)([1,2,3]) - 3 - - julia> r = @catch(length)() - MethodError(length,()) - - julia> typeof(r) - MethodError - Events ------ diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 3aad04e6dc20a..9ba6e61ab687d 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -273,7 +273,7 @@ General Parallel Computing Support For multiple collection arguments, apply f elementwise. -.. function:: pmap([::WorkerPool], f, c...) -> collection +.. function:: pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection .. Docstring generated from Julia source @@ -281,10 +281,24 @@ General Parallel Computing Support For multiple collection arguments, apply f elementwise. - Note that ``err_retry=true`` and ``err_stop=false`` are deprecated, use ``pmap(retry(f), c)`` or ``pmap(@catch(f), c)`` instead (or to retry on a different worker, use ``asyncmap(retry(remote(f)), c)``\ ). - Note that ``f`` must be made available to all worker processes; see :ref:`Code Availability and Loading Packages ` for details. + If a worker pool is not specified, all available workers, i.e., the default worker pool is used. + + By default, ``pmap`` distributes the computation over all specified workers. To use only the local process and distribute over tasks, specifiy ``distributed=false``\ . This is equivalent to ``asyncmap``\ . + + ``pmap`` can also use a mix of processes and tasks via the ``batch_size`` argument. For batch sizes greater than 1, the collection is split into multiple batches, which are distributed across workers. Each such batch is processed in parallel via tasks in each worker. The specified ``batch_size`` is an upper limit, the actual size of batches may be smaller and is calculated depending on the number of workers available and length of the collection. + + Any error stops pmap from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument ``on_error`` which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller. + + Failed computation can also be retried via ``retry_on``\ , ``retry_n``\ , ``retry_max_delay``\ , which are passed through to ``retry`` as arguments ``retry_on``\ , ``n`` and ``max_delay`` respectively. If batching is specified, and an entire batch fails, all items in the batch are retried. + + The following are equivalent: + + * ``pmap(f, c; distributed=false)`` and ``asyncmap(f,c)`` + * ``pmap(f, c; retry_n=1)`` and ``asyncmap(retry(remote(f)),c)`` + * ``pmap(f, c; retry_n=1, on_error=e->e)`` and ``asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`` + .. function:: remotecall(func, id, args...; kwargs...) .. Docstring generated from Julia source diff --git a/doc/stdlib/strings.rst b/doc/stdlib/strings.rst index 1e38401839a13..e846d377cd05d 100644 --- a/doc/stdlib/strings.rst +++ b/doc/stdlib/strings.rst @@ -66,7 +66,7 @@ .. Docstring generated from Julia source - Convert a string to a contiguous ASCII string (all characters must be valid ASCII characters). + Convert a string to ``String`` type and check that it contains only ASCII data, otherwise throwing an ``ArugmentError`` indicating the position of the first non-ASCII byte. .. function:: utf8(::Array{UInt8,1}) diff --git a/test/error.jl b/test/error.jl index c55078b4505c2..4871727dc1357 100644 --- a/test/error.jl +++ b/test/error.jl @@ -1,12 +1,5 @@ # This file is a part of Julia. License is MIT: http://julialang.org/license - -@test map(typeof, map(@catch(i->[1,2,3][i]), 1:6)) == - [Int, Int, Int, BoundsError, BoundsError, BoundsError] - -@test typeof(@catch(open)("/no/file/with/this/name")) == SystemError - - let function foo_error(c, n) c[1] += 1 @@ -23,47 +16,42 @@ let # Success on second attempt c = [0] - @test retry(foo_error)(c,1) == 7 + @test retry(foo_error;n=1)(c,1) == 7 @test c[1] == 2 - # Success on third attempt + # 2 failed retry attempts, so exception is raised c = [0] - @test retry(foo_error)(c,2) == 7 - @test c[1] == 3 - - # 3 failed attempts, so exception is raised - c = [0] - ex = @catch(retry(foo_error))(c,3) + ex = try retry(foo_error;n=2)(c,3) catch e; e end @test ex.msg == "foo" @test c[1] == 3 c = [0] - ex = @catch(retry(foo_error, ErrorException))(c,3) + ex = try retry(foo_error, ErrorException)(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" - @test c[1] == 3 + @test c[1] == 2 c = [0] - ex = @catch(retry(foo_error, e->e.msg == "foo"))(c,3) + ex = try retry(foo_error, e->e.msg == "foo")(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" - @test c[1] == 3 + @test c[1] == 2 # No retry if condition does not match c = [0] - ex = @catch(retry(foo_error, e->e.msg == "bar"))(c,3) + ex = try retry(foo_error, e->e.msg == "bar"; n=3)(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 1 c = [0] - ex = @catch(retry(foo_error, e->e.http_status_code == "503"))(c,3) + ex = try retry(foo_error, e->e.http_status_code == "503")(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 1 c = [0] - ex = @catch(retry(foo_error, SystemError))(c,3) + ex = try retry(foo_error, SystemError)(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 1 diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index 351295e135077..a1309354cc503 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -13,7 +13,7 @@ end # Test a `remote` invocation when no workers are present @test remote(myid)() == 1 -addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`) +addprocs(4; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`) # Test remote() let @@ -673,82 +673,132 @@ let ex @test repeated == 1 end -# The below block of tests are usually run only on local development systems, since: -# - tests which print errors -# - addprocs tests are memory intensive -# - ssh addprocs requires sshd to be running locally with passwordless login enabled. -# The test block is enabled by defining env JULIA_TESTFULL=1 - -DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0")))) - -if DoFullTest - # pmap tests - # needs at least 4 processors dedicated to the below tests - ppids = remotecall_fetch(()->addprocs(4), 1) - pool = WorkerPool(ppids) - s = "abcdefghijklmnopqrstuvwxyz"; - ups = uppercase(s); - - unmangle_exception = e -> begin +# pmap tests. Needs at least 4 processors dedicated to the below tests. Which we currently have +# since the parallel tests are now spawned as a separate set. +function unmangle_exception(e) + while any(x->isa(e, x), [CompositeException, RemoteException, CapturedException]) if isa(e, CompositeException) e = e.exceptions[1].ex - if isa(e, RemoteException) - e = e.captured.ex.exceptions[1].ex - end end - return e + if isa(e, RemoteException) + e = e.captured.ex + end + if isa(e, CapturedException) + e = e.ex + end end + return e +end + +# Test all combinations of pmap keyword args. +pmap_args = [ + (:distributed, [:default, false]), + (:batch_size, [:default,2]), + (:on_error, [:default, e -> unmangle_exception(e).msg == "foobar"]), + (:retry_on, [:default, e -> unmangle_exception(e).msg == "foobar"]), + (:retry_n, [:default, typemax(Int)-1]), + (:retry_max_delay, [0, 0.001]) + ] + +kwdict = Dict() +function walk_args(i) + if i > length(pmap_args) + kwargs = [] + for (k,v) in kwdict + if v != :default + push!(kwargs, (k,v)) + end + end + data = [1:100...] - for mapf in [map, asyncmap, (f, c) -> pmap(pool, f, c)] - @test ups == bytestring(UInt8[UInt8(c) for c in mapf(x->uppercase(x), s)]) - @test ups == bytestring(UInt8[UInt8(c) for c in mapf(x->uppercase(Char(x)), s.data)]) + testw = kwdict[:distributed] == false ? [1] : workers() - # retry, on error exit - errifeqa = x->(x=='a') ? - error("EXPECTED TEST ERROR. TO BE IGNORED.") : uppercase(x) - try - res = mapf(retry(errifeqa), s) - error("unexpected") - catch e - e = unmangle_exception(e) - @test isa(e, ErrorException) - @test e.msg == "EXPECTED TEST ERROR. TO BE IGNORED." + if (kwdict[:on_error] == :default) && (kwdict[:retry_n] == :default) + mapf = x -> (x*2, myid()) + results_test = pmap_res -> begin + results = [x[1] for x in pmap_res] + pids = [x[2] for x in pmap_res] + @test results == [2:2:200...] + for p in testw + @test p in pids + end + end + elseif kwdict[:retry_n] != :default + mapf = x -> iseven(myid()) ? error("foobar") : (x*2, myid()) + results_test = pmap_res -> begin + results = [x[1] for x in pmap_res] + pids = [x[2] for x in pmap_res] + @test results == [2:2:200...] + for p in testw + if isodd(p) + @test p in pids + else + @test !(p in pids) + end + end + end + else (kwdict[:on_error] != :default) && (kwdict[:retry_n] == :default) + mapf = x -> iseven(x) ? error("foobar") : (x*2, myid()) + results_test = pmap_res -> begin + w = testw + for (idx,x) in enumerate(data) + if iseven(x) + @test pmap_res[idx] == true + else + @test pmap_res[idx][1] == x*2 + @test pmap_res[idx][2] in w + end + end + end end - # no retry, on error exit try - res = mapf(errifeqa, s) - error("unexpected") + results_test(pmap(mapf, data; kwargs...)) catch e - e = unmangle_exception(e) - @test isa(e, ErrorException) - @test e.msg == "EXPECTED TEST ERROR. TO BE IGNORED." + println("pmap executing with args : ", kwargs) + rethrow(e) end - # no retry, on error continue - res = mapf(@catch(errifeqa), Any[s...]) - @test length(res) == length(ups) - res[1] = unmangle_exception(res[1]) - @test isa(res[1], ErrorException) - @test res[1].msg == "EXPECTED TEST ERROR. TO BE IGNORED." - @test ups[2:end] == string(res[2:end]...) + return end - # retry, on error exit - mapf = (f, c) -> asyncmap(retry(remote(pool, f), n=10, max_delay=0), c) - errifevenid = x->iseven(myid()) ? - error("EXPECTED TEST ERROR. TO BE IGNORED.") : uppercase(x) - res = mapf(errifevenid, s) - @test length(res) == length(ups) - @test ups == bytestring(UInt8[UInt8(c) for c in res]) + kwdict[pmap_args[i][1]] = pmap_args[i][2][1] + walk_args(i+1) + + kwdict[pmap_args[i][1]] = pmap_args[i][2][2] + walk_args(i+1) +end + +# Start test for various kw arg combinations +walk_args(1) + +# Simple test for pmap throws error +error_thrown = false +try + pmap(x -> x==50 ? error("foobar") : x, 1:100) +catch e + @test unmangle_exception(e).msg == "foobar" + error_thrown = true +end +@test error_thrown + +# Test pmap with a generator type iterator +@test [1:100...] == pmap(x->x, Base.Generator(x->(sleep(0.0001); x), 1:100)) + +# Test asyncmap +@test allunique(asyncmap(x->object_id(current_task()), 1:100)) - # retry, on error continue - mapf = (f, c) -> asyncmap(@catch(retry(remote(pool, f), n=10, max_delay=0)), c) - res = mapf(errifevenid, s) - @test length(res) == length(ups) - @test ups == bytestring(UInt8[UInt8(c) for c in res]) +# The below block of tests are usually run only on local development systems, since: +# - tests which print errors +# - addprocs tests are memory intensive +# - ssh addprocs requires sshd to be running locally with passwordless login enabled. +# The test block is enabled by defining env JULIA_TESTFULL=1 + +DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0")))) + +if DoFullTest # Topology tests need to run externally since a given cluster at any # time can only support a single topology and the current session # is already running in parallel under the default topology.