diff --git a/stdlib/SparseArrays/src/linalg.jl b/stdlib/SparseArrays/src/linalg.jl index d6a97e759b8d9..44bb763272734 100644 --- a/stdlib/SparseArrays/src/linalg.jl +++ b/stdlib/SparseArrays/src/linalg.jl @@ -146,63 +146,104 @@ end *(A::Adjoint{<:Any,<:SparseMatrixCSC{Tv,Ti}}, B::Adjoint{<:Any,<:SparseMatrixCSC{Tv,Ti}}) where {Tv,Ti} = spmatmul(copy(A), copy(B)) *(A::Transpose{<:Any,<:SparseMatrixCSC{Tv,Ti}}, B::Transpose{<:Any,<:SparseMatrixCSC{Tv,Ti}}) where {Tv,Ti} = spmatmul(copy(A), copy(B)) -function spmatmul(A::SparseMatrixCSC{Tv,Ti}, B::SparseMatrixCSC{Tv,Ti}; - sortindices::Symbol = :sortcols) where {Tv,Ti} +# Gustavsen's matrix multiplication algorithm revisited. +# The result rowval vector is already sorted by construction. +# The auxiliary Vector{Ti} xb is replaced by a Vector{Bool} of same length. +# The optional argument controlling a sorting algorithm is obsolete. +# depending on expected execution speed the sorting of the result column is +# done by a quicksort of the row indices or by a full scan of the dense result vector. +# The last is faster, if more than ≈ 1/32 of the result column is nonzero. +# TODO: extend to SparseMatrixCSCUnion to allow for SubArrays (view(X, :, r)). +function spmatmul(A::SparseMatrixCSC{Tv,Ti}, B::SparseMatrixCSC{Tv,Ti}) where {Tv,Ti} mA, nA = size(A) - mB, nB = size(B) - nA==mB || throw(DimensionMismatch()) + nB = size(B, 2) + nA == size(B, 1) || throw(DimensionMismatch()) - colptrA = A.colptr; rowvalA = A.rowval; nzvalA = A.nzval - colptrB = B.colptr; rowvalB = B.rowval; nzvalB = B.nzval - # TODO: Need better estimation of result space - nnzC = min(mA*nB, length(nzvalA) + length(nzvalB)) + rowvalA = rowvals(A); nzvalA = nonzeros(A) + rowvalB = rowvals(B); nzvalB = nonzeros(B) + nnzC = max(estimate_mulsize(mA, nnz(A), nA, nnz(B), nB) * 11 ÷ 10, mA) colptrC = Vector{Ti}(undef, nB+1) rowvalC = Vector{Ti}(undef, nnzC) nzvalC = Vector{Tv}(undef, nnzC) + nzpercol = nnzC ÷ max(nB, 1) @inbounds begin ip = 1 - xb = zeros(Ti, mA) - x = zeros(Tv, mA) + xb = fill(false, mA) for i in 1:nB if ip + mA - 1 > nnzC - resize!(rowvalC, nnzC + max(nnzC,mA)) - resize!(nzvalC, nnzC + max(nnzC,mA)) - nnzC = length(nzvalC) + nnzC += max(mA, nnzC>>2) + resize!(rowvalC, nnzC) + resize!(nzvalC, nnzC) end - colptrC[i] = ip - for jp in colptrB[i]:(colptrB[i+1] - 1) + colptrC[i] = ip0 = ip + k0 = ip - 1 + for jp in nzrange(B, i) nzB = nzvalB[jp] j = rowvalB[jp] - for kp in colptrA[j]:(colptrA[j+1] - 1) + for kp in nzrange(A, j) nzC = nzvalA[kp] * nzB k = rowvalA[kp] - if xb[k] != i + if xb[k] + nzvalC[k+k0] += nzC + else + nzvalC[k+k0] = nzC + xb[k] = true rowvalC[ip] = k ip += 1 - xb[k] = i - x[k] = nzC - else - x[k] += nzC end end end - for vp in colptrC[i]:(ip - 1) - nzvalC[vp] = x[rowvalC[vp]] + if ip > ip0 + if prefer_sort(ip-k0, mA) + # in-place sort of indices. Effort: O(nnz*ln(nnz)). + sort!(rowvalC, ip0, ip-1, QuickSort, Base.Order.Forward) + for vp = ip0:ip-1 + k = rowvalC[vp] + xb[k] = false + nzvalC[vp] = nzvalC[k+k0] + end + else + # scan result vector (effort O(mA)) + for k = 1:mA + if xb[k] + xb[k] = false + rowvalC[ip0] = k + nzvalC[ip0] = nzvalC[k+k0] + ip0 += 1 + end + end + end end end colptrC[nB+1] = ip end - deleteat!(rowvalC, colptrC[end]:length(rowvalC)) - deleteat!(nzvalC, colptrC[end]:length(nzvalC)) + resize!(rowvalC, ip - 1) + resize!(nzvalC, ip - 1) - # The Gustavson algorithm does not guarantee the product to have sorted row indices. - Cunsorted = SparseMatrixCSC(mA, nB, colptrC, rowvalC, nzvalC) - C = SparseArrays.sortSparseMatrixCSC!(Cunsorted, sortindices=sortindices) + # This modification of Gustavson algorithm has sorted row indices + C = SparseMatrixCSC(mA, nB, colptrC, rowvalC, nzvalC) return C end +# estimated number of non-zeros in matrix product +# it is assumed, that the non-zero indices are distributed independently and uniformly +# in both matrices. Over-estimation is possible if that is not the case. +function estimate_mulsize(m::Integer, nnzA::Integer, n::Integer, nnzB::Integer, k::Integer) + p = (nnzA / (m * n)) * (nnzB / (n * k)) + p >= 1 ? m*k : p > 0 ? Int(ceil(-expm1(log1p(-p) * n)*m*k)) : 0 # (1-(1-p)^n)*m*k +end + +# determine if sort! shall be used or the whole column be scanned +# based on empirical data on i7-3610QM CPU +# measuring runtimes of the scanning and sorting loops of the algorithm. +# The parameters 6 and 3 might be modified for different architectures. +prefer_sort(nz::Integer, m::Integer) = m > 6 && 3 * ilog2(nz) * nz < m + +# minimal number of bits required to represent integer; ilog2(n) >= log2(n) +ilog2(n::Integer) = sizeof(n)<<3 - leading_zeros(n) + # Frobenius dot/inner product: trace(A'B) function dot(A::SparseMatrixCSC{T1,S1},B::SparseMatrixCSC{T2,S2}) where {T1,T2,S1,S2} m, n = size(A) diff --git a/stdlib/SparseArrays/test/sparse.jl b/stdlib/SparseArrays/test/sparse.jl index d82fac08b6a1e..3447c56eca541 100644 --- a/stdlib/SparseArrays/test/sparse.jl +++ b/stdlib/SparseArrays/test/sparse.jl @@ -325,8 +325,7 @@ end a = sprand(10, 5, 0.7) b = sprand(5, 15, 0.3) @test maximum(abs.(a*b - Array(a)*Array(b))) < 100*eps() - @test maximum(abs.(SparseArrays.spmatmul(a,b,sortindices=:sortcols) - Array(a)*Array(b))) < 100*eps() - @test maximum(abs.(SparseArrays.spmatmul(a,b,sortindices=:doubletranspose) - Array(a)*Array(b))) < 100*eps() + @test maximum(abs.(SparseArrays.spmatmul(a,b) - Array(a)*Array(b))) < 100*eps() f = Diagonal(rand(5)) @test Array(a*f) == Array(a)*f @test Array(f*b) == f*Array(b)