diff --git a/.travis.yml b/.travis.yml index 845cba7..2e8bf63 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ os: - linux - osx julia: - - 0.4 + - 0.5 - nightly notifications: email: false diff --git a/REQUIRE b/REQUIRE index e1063f2..1f5e8ec 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,3 +1,2 @@ -julia 0.4 -Compat 0.7.14 +julia 0.5- Primes diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index 28d9975..b6695d2 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -5,16 +5,8 @@ module DistributedArrays using Compat import Compat.view -if VERSION >= v"0.5.0-dev+4340" - using Primes - using Primes: factor -end - -if VERSION < v"0.5.0-" - typealias Future RemoteRef - typealias RemoteChannel RemoteRef - typealias AbstractSerializer SerializationState # On 0.4 fallback to the only concrete implementation -end +using Primes +using Primes: factor importall Base import Base.Callable @@ -195,40 +187,24 @@ function DArray(refs) DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts) end -if VERSION < v"0.5.0-" - macro DArray(ex::Expr) - if ex.head !== :comprehension - throw(ArgumentError("invalid @DArray syntax")) - end - ex.args[1] = esc(ex.args[1]) - ndim = length(ex.args) - 1 - ranges = map(r->esc(r.args[2]), ex.args[2:end]) - for d = 1:ndim - var = ex.args[d+1].args[1] - ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) - end - return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex), - tuple($(map(r->:(length($r)), ranges)...))) ) + +macro DArray(ex0::Expr) + if ex0.head !== :comprehension + throw(ArgumentError("invalid @DArray syntax")) end -else - macro DArray(ex0::Expr) - if ex0.head !== :comprehension - throw(ArgumentError("invalid @DArray syntax")) - end - ex = ex0.args[1] - if ex.head !== :generator - throw(ArgumentError("invalid @DArray syntax")) - end - ex.args[1] = esc(ex.args[1]) - ndim = length(ex.args) - 1 - ranges = map(r->esc(r.args[2]), ex.args[2:end]) - for d = 1:ndim - var = ex.args[d+1].args[1] - ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) - end - return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0), - tuple($(map(r->:(length($r)), ranges)...))) ) + ex = ex0.args[1] + if ex.head !== :generator + throw(ArgumentError("invalid @DArray syntax")) end + ex.args[1] = esc(ex.args[1]) + ndim = length(ex.args) - 1 + ranges = map(r->esc(r.args[2]), ex.args[2:end]) + for d = 1:ndim + var = ex.args[d+1].args[1] + ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) + end + return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0), + tuple($(map(r->:(length($r)), ranges)...))) ) end # new DArray similar to an existing one @@ -656,21 +632,119 @@ function Base.setindex!(a::Array, d::DArray, return a end +# We also want to optimize setindex! with a SubDArray source, but this is hard +# and only works on 0.5. + +# Similar to Base.indexin, but just create a logical mask. Note that this +# must return a logical mask in order to support merging multiple masks +# together into one linear index since we need to know how many elements to +# skip at the end. In many cases range intersection would be much faster +# than generating a logical mask, but that loses the endpoint information. +indexin_mask(a, b::Number) = a .== b +indexin_mask(a, r::Range{Int}) = [i in r for i in a] +indexin_mask(a, b::AbstractArray{Int}) = indexin_mask(a, IntSet(b)) +indexin_mask(a, b::AbstractArray) = indexin_mask(a, Set(b)) +indexin_mask(a, b) = [i in b for i in a] + +import Base: tail +# Given a tuple of indices and a tuple of masks, restrict the indices to the +# valid regions. This is, effectively, reversing Base.setindex_shape_check. +# We can't just use indexing into MergedIndices here because getindex is much +# pickier about singleton dimensions than setindex! is. +restrict_indices(::Tuple{}, ::Tuple{}) = () +function restrict_indices(a::Tuple{Any, Vararg{Any}}, b::Tuple{Any, Vararg{Any}}) + if (length(a[1]) == length(b[1]) == 1) || (length(a[1]) > 1 && length(b[1]) > 1) + (vec(a[1])[vec(b[1])], restrict_indices(tail(a), tail(b))...) + elseif length(a[1]) == 1 + (a[1], restrict_indices(tail(a), b)) + elseif length(b[1]) == 1 && b[1][1] + restrict_indices(a, tail(b)) + else + throw(DimensionMismatch("this should be caught by setindex_shape_check; please submit an issue")) + end +end +# The final indices are funky - they're allowed to accumulate together. +# An easy (albeit very inefficient) fix for too many masks is to use the +# outer product to merge them. But we can do that lazily with a custom type: +function restrict_indices(a::Tuple{Any}, b::Tuple{Any, Any, Vararg{Any}}) + (vec(a[1])[vec(ProductIndices(b, map(length, b)))],) +end +# But too many indices is much harder; this requires merging the indices +# in `a` before applying the final mask in `b`. +function restrict_indices(a::Tuple{Any, Any, Vararg{Any}}, b::Tuple{Any}) + if length(a[1]) == 1 + (a[1], restrict_indices(tail(a), b)) + else + # When one mask spans multiple indices, we need to merge the indices + # together. At this point, we can just use indexing to merge them since + # there's no longer special handling of singleton dimensions + (view(MergedIndices(a, map(length, a)), b[1]),) + end +end + +immutable ProductIndices{I,N} <: AbstractArray{Bool, N} + indices::I + sz::NTuple{N,Int} +end +Base.size(P::ProductIndices) = P.sz +# This gets passed to map to avoid breaking propagation of inbounds +Base.@propagate_inbounds propagate_getindex(A, I...) = A[I...] +Base.@propagate_inbounds Base.getindex{_,N}(P::ProductIndices{_,N}, I::Vararg{Int, N}) = + Bool((&)(map(propagate_getindex, P.indices, I)...)) + +immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N} + indices::I + sz::NTuple{N,Int} +end +Base.size(M::MergedIndices) = M.sz +Base.@propagate_inbounds Base.getindex{_,N}(M::MergedIndices{_,N}, I::Vararg{Int, N}) = + CartesianIndex(map(propagate_getindex, M.indices, I)) +# Additionally, we optimize bounds checking when using MergedIndices as an +# array index since checking, e.g., A[1:500, 1:500] is *way* faster than +# checking an array of 500^2 elements of CartesianIndex{2}. This optimization +# also applies to reshapes of MergedIndices since the outer shape of the +# container doesn't affect the index elements themselves. We can go even +# farther and say that even restricted views of MergedIndices must be valid +# over the entire array. This is overly strict in general, but in this +# use-case all the merged indices must be valid at some point, so it's ok. +typealias ReshapedMergedIndices{T,N,M<:MergedIndices} Base.ReshapedArray{T,N,M} +typealias SubMergedIndices{T,N,M<:Union{MergedIndices, ReshapedMergedIndices}} SubArray{T,N,M} +typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices} +import Base: checkbounds_indices +@inline checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) +@inline checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) +@inline checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) + +# The tricky thing here is that we want to optimize the accesses into the +# distributed array, but in doing so, we lose track of which indices in I we +# should be using. +# +# I’ve come to the conclusion that the function is utterly insane. +# There are *6* flavors of indices with four different reference points: +# 1. Find the indices of each portion of the DArray. +# 2. Find the valid subset of indices for the SubArray into that portion. +# 3. Find the portion of the `I` indices that should be used when you access the +# `K` indices in the subarray. This guy is nasty. It’s totally backwards +# from all other arrays, wherein we simply iterate over the source array’s +# elements. You need to *both* know which elements in `J` were skipped +# (`indexin_mask`) and which dimensions should match up (`restrict_indices`) +# 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of +# the local portion of the source array function Base.setindex!(a::Array, s::SubDArray, I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) + Base.setindex_shape_check(s, Base.index_lengths(a, I...)...) n = length(I) d = s.parent - J = s.indexes - if length(J) < n - a[I...] = convert(Array,s) - return a - end - offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] + J = Base.decolon(d, s.indexes...) @sync for i = 1:length(d.pids) - K_c = Any[d.indexes[i]...] - K = [ intersect(J[j],K_c[j]) for j=1:n ] + K_c = d.indexes[i] + K = map(intersect, J, K_c) if !any(isempty, K) - idxs = [ I[j][K[j]-offs[j]] for j=1:n ] + K_mask = map(indexin_mask, J, K_c) + idxs = restrict_indices(Base.decolon(a, I...), K_mask) if isequal(K, K_c) # whole chunk @async a[idxs...] = chunk(d, i) @@ -678,7 +752,7 @@ function Base.setindex!(a::Array, s::SubDArray, # partial chunk @async a[idxs...] = remotecall_fetch(d.pids[i]) do - view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:n]...) + view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:length(J)]...) end end end @@ -1395,16 +1469,7 @@ function compute_boundaries{T}(d::DVector{T}; kwargs...) np = length(pids) sample_sz_on_wrkr = 512 - if VERSION < v"0.5.0-" - results = Array(Any,np) - @sync begin - for (i,p) in enumerate(pids) - @async results[i] = remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...) - end - end - else - results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) - end + results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) samples = Array(T,0) for x in results @@ -1455,14 +1520,7 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...) elseif sample==false # Assume an uniform distribution between min and max values - if VERSION < v"0.5.0-" - minmax=Array(Tuple, np) - @sync for (i,p) in enumerate(pids) - @async minmax[i] = remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d) - end - else - minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids) - end + minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids) min_d = minimum(T[x[1] for x in minmax]) max_d = maximum(T[x[2] for x in minmax]) @@ -1503,19 +1561,10 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...) end local_sort_results = Array(Tuple, np) - if VERSION < v"0.5.0-" - @sync begin - for (i,p) in enumerate(pids) - @async local_sort_results[i] = - remotecall_fetch( - scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...) - end - end - else - Base.asyncmap!((i,p) -> remotecall_fetch( + + Base.asyncmap!((i,p) -> remotecall_fetch( scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...), local_sort_results, 1:np, pids) - end # Construct a new DArray from the sorted refs. Remove parts with 0-length since # the DArray constructor_from_refs does not yet support it. This implies that diff --git a/test/darray.jl b/test/darray.jl index 5efb6d9..0efb59e 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -99,6 +99,22 @@ check_leaks() @test fetch(@spawnat MYID localpart(D)[1,1]) == D[1,1] @test fetch(@spawnat OTHERIDS localpart(D)[1,1]) == D[1,101] close(D2) + + S2 = convert(Vector{Float64}, D[4, 23:176]) + @test A[4, 23:176] == S2 + + S3 = convert(Vector{Float64}, D[23:176, 197]) + @test A[23:176, 197] == S3 + + S4 = zeros(4) + setindex!(S4, D[3:4, 99:100], :) + @test S4 == vec(D[3:4, 99:100]) + @test S4 == vec(A[3:4, 99:100]) + + S5 = zeros(2,2) + setindex!(S5, D[1,1:4], :, 1:2) + @test vec(S5) == D[1, 1:4] + @test vec(S5) == A[1, 1:4] end close(D) end @@ -610,20 +626,12 @@ check_leaks() # Commented out tests that need to be enabled in due course when DArray support is more complete @testset "test mapslices" begin a = drand((5,5), workers(), [1, min(nworkers(), 5)]) - if VERSION < v"0.5.0-dev+4361" - h = mapslices(v -> hist(v,0:0.1:1)[2], a, 1) - else - h = mapslices(v -> fit(Histogram,v,0:0.1:1).weights, a, 1) - end + h = mapslices(v -> fit(Histogram,v,0:0.1:1).weights, a, 1) # H = mapslices(v -> hist(v,0:0.1:1)[2], a, 2) # s = mapslices(sort, a, [1]) # S = mapslices(sort, a, [2]) for i = 1:5 - if VERSION < v"0.5.0-dev+4361" - @test h[:,i] == hist(a[:,i],0:0.1:1)[2] - else - @test h[:,i] == fit(Histogram, a[:,i],0:0.1:1).weights - end + @test h[:,i] == fit(Histogram, a[:,i],0:0.1:1).weights # @test vec(H[i,:]) => hist(vec(a[i,:]),0:0.1:1)[2] # @test s[:,i] => sort(a[:,i]) # @test vec(S[i,:]) => sort(vec(a[i,:]))