From 260e1efb09a04960b85077f867d8ae055ed36ba2 Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 3 Mar 2016 14:23:56 +0530 Subject: [PATCH 1/5] use Blobs.jl --- REQUIRE | 2 +- examples/lastfm/lastfm.jl | 8 +- examples/movielens/movielens.jl | 8 +- examples/netflix/netflix.jl | 2 +- playground/split_input.jl | 142 ++++++------- src/RecSys.jl | 33 +-- src/als-wr.jl | 46 ++-- src/chunks/matrix.jl | 359 ++++++++++++++++++++++++++++++++ src/inputs/dist_input.jl | 40 +--- src/inputs/input.jl | 37 +--- src/inputs/inputs.jl | 46 ++++ src/models/als_dist_model.jl | 92 ++------ src/models/als_model.jl | 16 -- src/models/models.jl | 28 +++ src/utils.jl | 95 ++++----- 15 files changed, 608 insertions(+), 346 deletions(-) create mode 100644 src/chunks/matrix.jl create mode 100644 src/inputs/inputs.jl create mode 100644 src/models/models.jl diff --git a/REQUIRE b/REQUIRE index 141d2ee..d84bc42 100644 --- a/REQUIRE +++ b/REQUIRE @@ -2,4 +2,4 @@ julia 0.4.1 SparseVectors ParallelSparseMatMul MAT -LRUCache +Blobs diff --git a/examples/lastfm/lastfm.jl b/examples/lastfm/lastfm.jl index 1d35ad7..dc3edcc 100644 --- a/examples/lastfm/lastfm.jl +++ b/examples/lastfm/lastfm.jl @@ -15,10 +15,10 @@ type MusicRec function MusicRec(trainingset::FileSpec, artist_names::FileSpec, artist_map::FileSpec) T, N = map_artists(trainingset, artist_names, artist_map) - new(trainingset, artist_names, artist_map, ALSWR(SparseMat(T)), Nullable(N)) + new(trainingset, artist_names, artist_map, ALSWR(SparseMat(T), ParShmem()), Nullable(N)) end function MusicRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, artist_names::FileSpec, artist_map::FileSpec) - new(user_item_ratings, artist_names, artist_map, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing) + new(user_item_ratings, artist_names, artist_map, ALSWR(user_item_ratings, item_user_ratings, ParBlob()), nothing) end end @@ -156,8 +156,8 @@ function test(dataset_path) end function test_chunks(dataset_path, splits_dir, model_path) - user_item_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "R_itemwise.meta"), 10) - item_user_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "RT_userwise.meta"), 10) + user_item_ratings = SparseBlobs(joinpath(dataset_path, splits_dir, "R_itemwise"); maxcache=10) + item_user_ratings = SparseBlobs(joinpath(dataset_path, splits_dir, "RT_userwise"); maxcache=10) artist_names = DlmFile(joinpath(dataset_path, "artist_data.txt"); dlm='\t', quotes=false) artist_map = DlmFile(joinpath(dataset_path, "artist_alias.txt")) diff --git a/examples/movielens/movielens.jl b/examples/movielens/movielens.jl index 7a5f1f7..d387187 100644 --- a/examples/movielens/movielens.jl +++ b/examples/movielens/movielens.jl @@ -12,10 +12,10 @@ type MovieRec movie_mat::Nullable{SparseVector{AbstractString,Int64}} function MovieRec(trainingset::FileSpec, movie_names::FileSpec) - new(movie_names, ALSWR(trainingset), nothing) + new(movie_names, ALSWR(trainingset, ParShmem()), nothing) end function MovieRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, movie_names::FileSpec) - new(movie_names, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing) + new(movie_names, ALSWR(user_item_ratings, item_user_ratings, ParBlob()), nothing) end end @@ -91,8 +91,8 @@ end # prepare chunks for movielens dataset by running `split_movielens` from `playground/split_input.jl` function test_chunks(dataset_path, model_path) - user_item_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "R_itemwise.meta"), 5) - item_user_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "RT_userwise.meta"), 5) + user_item_ratings = SparseBlobs(joinpath(dataset_path, "splits", "R_itemwise"); maxcache=10) + item_user_ratings = SparseBlobs(joinpath(dataset_path, "splits", "RT_userwise"); maxcache=10) movies_file = DlmFile(joinpath(dataset_path, "movies.csv"); dlm=',', header=true) rec = MovieRec(user_item_ratings, item_user_ratings, movies_file) train(rec, 10, 4, model_path, 10) diff --git a/examples/netflix/netflix.jl b/examples/netflix/netflix.jl index a574de4..63eefe3 100644 --- a/examples/netflix/netflix.jl +++ b/examples/netflix/netflix.jl @@ -2,7 +2,7 @@ using RecSys function test(matfile, entryname) ratings = MatFile(matfile, entryname) - rec = ALSWR(ratings) + rec = ALSWR(ratings, ParShmem()) train(rec, 10, 4) err = rmse(rec) println("rmse of the model: $err") diff --git a/playground/split_input.jl b/playground/split_input.jl index 251bb01..2391240 100644 --- a/playground/split_input.jl +++ b/playground/split_input.jl @@ -1,30 +1,28 @@ using RecSys - -function split_sparse(S, chunkmax, filepfx) - metafilename = "$(filepfx).meta" - open(metafilename, "w") do mfile - chunknum = 1 - count = 1 - colstart = 1 - splits = UnitRange[] - nzval = S.nzval - rowval = S.rowval - for col in 1:size(S,2) - npos = S.colptr[col+1] - if (npos >= (count + chunkmax)) || (col == size(S,2)) - print("\tchunk $chunknum ... ") - cfilename = "$(filepfx).$(chunknum)" - println(mfile, colstart, ",", col, ",", cfilename) - RecSys.mmap_csc_save(S[:, colstart:col], cfilename) - push!(splits, colstart:col) - colstart = col+1 - count = npos - chunknum += 1 - println("done") - end +using Blobs +#include("/home/tan/Work/julia/packages/Blobs/examples/matrix.jl") +using RecSys.MatrixBlobs + +function split_sparse(S, chunkmax, metadir) + isdir(metadir) || mkdir(metadir) + spblobs = SparseMatBlobs(Float64, Int64, metadir) + chunknum = 1 + count = 1 + colstart = 1 + nzval = S.nzval + rowval = S.rowval + for col in 1:size(S,2) + npos = S.colptr[col+1] + if (npos >= (count + chunkmax)) || (col == size(S,2)) + print("\tchunk $chunknum ... ") + append!(spblobs, S[:, colstart:col]) + colstart = col+1 + count = npos + chunknum += 1 + println("done") end - println("splits: $splits") end + Blobs.save(spblobs) nothing end @@ -107,70 +105,58 @@ function split_lastfm(dataset_path = "/data/Work/datasets/last_fm_music_recommen end function load_splits(dataset_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits") - cf = RecSys.ChunkedFile(joinpath(dataset_path, "R_itemwise.meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 10) - println(cf) - nchunks = length(cf.chunks) - for idx in 1:10 - cid = floor(Int, nchunks*rand()) + 1 - println("fetching from chunk $cid") - c = cf.chunks[cid] - key = floor(Int, length(c.keyrange)*rand()) + c.keyrange.start - println("fetching key $key") - r,v = RecSys.data(cf, key) - #println("\tr:$r, v:$v") + sp = SparseMatBlobs(joinpath(dataset_path, "R_itemwise")) + for idx in 1:length(sp.splits) + p = sp.splits[idx] + r = p.first + part, _r = Blobs.load(sp, first(r)) + RecSys.@logmsg("got part of size: $(size(part)), with r: $r, _r:$_r") end println("finished") end -function test_dense_splits(dataset_path = "/tmp/test") - metafilename = joinpath(dataset_path, "mem.meta") - cfile = DenseMatChunks(metafilename, 1, (10^6,10)) - RecSys.create(cfile) - cf = RecSys.read_input(cfile) - for idx in 1:10 - chunk = RecSys.getchunk(cf, idx*10^4) - A = RecSys.data(chunk, cf.lrucache) - @assert A.val[1] == 0.0 - fill!(A.val, idx) - end - cf = RecSys.read_input(cfile) - for idx in 1:10 - chunk = RecSys.getchunk(cf, idx*10^4) - A = RecSys.data(chunk, cf.lrucache) - @assert A.val[1] == Float64(idx) - println(A.val[1]) - end -end - ## # an easy way to generate somewhat relevant test data is to take an existing dataset and replicate # items and users based on existing data. +function Blobs.append!{Tv,Ti}(sp::SparseMatBlobs{Tv,Ti}, blob::Blob) + S = blob.data.value + m,n = size(S) + if isempty(sp.splits) + sp.sz = (m, n) + idxrange = 1:n + else + (sp.sz[1] == m) || throw(BoundsError("SparseMatBlobs $(sp.sz)", (m,n))) + old_n = sp.sz[2] + idxrange = (old_n+1):(old_n+n) + sp.sz = (m, old_n+n) + end + + push!(sp.splits, idxrange => blob.id) + RecSys.@logmsg("appending blob $(blob.id) of size: $(size(S)) for idxrange: $idxrange, sersz: $(blob.metadata.size)") + blob +end + function generate_test_data(setname::AbstractString, generated_data_path, original_data_path, mul_factor) - RecSys.@logmsg("generating $setname") - @assert mul_factor > 1 - incf = RecSys.ChunkedFile(joinpath(original_data_path, "$(setname).meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 2) - outmetaname = joinpath(generated_data_path, "$(setname).meta") - - outkeystart = 1 - outfileidx = 1 - open(outmetaname, "w") do outmeta - for chunk in incf.chunks - L = length(chunk.keyrange) - S = RecSys.data(chunk, incf.lrucache) - Sout = S - for x in 1:(mul_factor-1) - Sout = vcat(Sout, S) - end - outfname = joinpath(generated_data_path, "$(setname).$(outfileidx)") - outfileidx += 1 - RecSys.mmap_csc_save(Sout, outfname) - for x in 1:mul_factor - println(outmeta, outkeystart, ",", outkeystart+L-1, ",", outfname) - RecSys.@logmsg("generated $setname $outkeystart:$(outkeystart+L-1) with size: $(size(Sout)) from size: $(size(S))") - outkeystart += L - end + sp1 = SparseMatBlobs(joinpath(original_data_path, setname)) + metapath = joinpath(generated_data_path, setname) + sp2 = SparseMatBlobs(Float64, Int64, metapath) + isdir(metapath) || mkdir(metapath) + for idx in 1:length(sp1.splits) + p = sp1.splits[idx] + r = p.first + part, _r = Blobs.load(sp1, first(r)) + RecSys.@logmsg("got part of size: $(size(part)), with r: $r, _r:$_r") + part_out = part + for x in 1:(mul_factor-1) + part_out = vcat(part_out, part) + end + blob = append!(sp2, part_out) + for x in 1:(mul_factor-1) + append!(sp2, blob) + RecSys.@logmsg("generated part of size: $(size(part_out))") end end + Blobs.save(sp2) end function generate_test_data(generated_data_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits2", diff --git a/src/RecSys.jl b/src/RecSys.jl index 0009dc8..ee251a1 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -2,23 +2,28 @@ module RecSys using ParallelSparseMatMul using MAT +using Blobs + +include("chunks/matrix.jl") +using .MatrixBlobs if isless(Base.VERSION, v"0.5.0-") using SparseVectors end import Base: zero +import Blobs: save, load -export FileSpec, DlmFile, MatFile, SparseMat, SparseMatChunks, DenseMatChunks, read_input +export FileSpec, DlmFile, MatFile, SparseMat, SparseBlobs, DenseBlobs, read_input export ALSWR, train, recommend, rmse, zero -export ParShmem, ParChunk +export ParShmem, ParBlob export save, load, clear, localize! typealias RatingMatrix SparseMatrixCSC{Float64,Int64} typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64} -typealias InputRatings Union{RatingMatrix,SharedRatingMatrix} +typealias InputRatings Union{RatingMatrix,SharedRatingMatrix,SparseMatBlobs} typealias InputIdMap Union{Vector{Int64}, SharedVector{Int64}} -typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}} +typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}, DenseMatBlobs{Float64}} abstract FileSpec abstract Inputs @@ -26,7 +31,7 @@ abstract Model abstract Parallelism type ParShmem <: Parallelism end -type ParChunk <: Parallelism end +type ParBlob <: Parallelism end if (Base.VERSION >= v"0.5.0-") using Base.Threads @@ -49,18 +54,14 @@ end #end macro logmsg(s) end +#macro logmsg(s) +# quote +# info("[", myid(), "-", threadid(), "] ", $(esc(s))) +# end +#end - -include("chunks/chunk.jl") -include("chunks/csv.jl") -include("chunks/mmapsparse.jl") -include("chunks/mmapdense.jl") - -include("inputs/input.jl") -include("inputs/dist_input.jl") - -include("models/als_model.jl") -include("models/als_dist_model.jl") +include("inputs/inputs.jl") +include("models/models.jl") include("als-wr.jl") include("utils.jl") diff --git a/src/als-wr.jl b/src/als-wr.jl index a6165b2..6b62d77 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -4,8 +4,8 @@ type ALSWR{TP<:Parallelism,TI<:Inputs,TM<:Model} par::TP end -ALSWR{TP<:ParShmem}(inp::FileSpec, par::TP=ParShmem()) = ALSWR{TP,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) -ALSWR(user_item_ratings::FileSpec, item_user_ratings::FileSpec, par::ParChunk) = ALSWR{ParChunk,DistInputs,DistModel}(DistInputs(user_item_ratings, item_user_ratings), nothing, par) +ALSWR(inp::FileSpec, par::ParShmem) = ALSWR{ParShmem,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) +ALSWR(user_item_ratings::FileSpec, item_user_ratings::FileSpec, par::ParBlob) = ALSWR{ParBlob,DistInputs,DistModel}(DistInputs(user_item_ratings, item_user_ratings), nothing, par) function clear(als::ALSWR) clear(als.inp) @@ -177,17 +177,6 @@ end fetch_compdata() = compdata[1] noop(args...) = nothing -function sync_model() - c = fetch_compdata() - sync!(c.model) -end - -function sync_worker_models() - for w in workers() - remotecall_fetch(sync_model, w) - end -end - function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) t1 = time() share!(model) @@ -222,7 +211,7 @@ function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, nothing end -function rmse{TP<:Union{ParShmem,ParChunk},TI<:Inputs}(als::ALSWR{TP}, inp::TI) +function rmse{TP<:Union{ParShmem,ParBlob},TI<:Inputs}(als::ALSWR{TP}, inp::TI) t1 = time() model = get(als.model) @@ -243,14 +232,14 @@ function rmse{TP<:Union{ParShmem,ParChunk},TI<:Inputs}(als::ALSWR{TP}, inp::TI) end ## -# Chunk based distributed memory parallelism -function train(als::ALSWR{ParChunk,DistInputs,DistModel}, niters::Int, nfacts::Int64, model_dir::AbstractString, max_cache::Int=10, lambda::Float64=0.065) +# Blob based distributed memory parallelism +function train(als::ALSWR{ParBlob,DistInputs,DistModel}, niters::Int, nfacts::Int64, model_dir::AbstractString, max_cache::Int=10, lambda::Float64=0.065) als.model = prep(als.inp, nfacts, lambda, model_dir, max_cache) fact_iters(als, niters) nothing end -function fact_iters{TP<:ParChunk,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) +function fact_iters{TP<:ParBlob,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) t1 = time() clear(inp) @@ -258,24 +247,31 @@ function fact_iters{TP<:ParChunk,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, share!(inp) c = ComputeData(model, inp, get(model.lambdaI)) - uranges = UnitRange[chunk.keyrange for chunk in get(model.U).chunks] - iranges = UnitRange[chunk.keyrange for chunk in get(model.P).chunks] - clear(model) + uranges = UnitRange[p.first for p in get(model.U).splits] + iranges = UnitRange[p.first for p in get(model.P).splits] - for w in workers() + # clear, share the data and load it again (not required, but more efficient) + clear(model) + W = workers() + for w in W remotecall_fetch(share_compdata, w, c) end + ensure_loaded(model) + U = get(model.U) + P = get(model.P) nu = nusers(inp) ni = nitems(inp) @logmsg("nusers: $nu, nitems: $ni") for iter in 1:niters @logmsg("begin iteration $iter") + flush(U, W; callback=false) pmap(update_user, uranges) - sync_worker_models() + save(U, W) @logmsg("\tusers") + flush(P, W; callback=false) pmap(update_item, iranges) - sync_worker_models() + save(P, W) @logmsg("\titems") end @@ -291,7 +287,7 @@ end # Thread parallelism if (Base.VERSION >= v"0.5.0-") -ALSWR{TP<:ParThread}(inp::FileSpec, par::TP=ParShmem()) = ALSWR{TP,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) +ALSWR(inp::FileSpec, par::ParThread) = ALSWR{ParThread,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) function thread_update_item{TM<:Model,TI<:Inputs}(model::TM, inp::TI, ni::Int64, lambdaI::Matrix{Float64}) @threads for i in Int64(1):ni @@ -317,13 +313,11 @@ function fact_iters{TP<:ParThread,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI # gc is not threadsafe yet. issue #10317 gc_enable(false) thread_update_user(model, inp, nu, lambdaI) - sync!(model) gc_enable(true) gc() gc_enable(false) @logmsg("\tusers") thread_update_item(model, inp, ni, lambdaI) - sync!(model) gc_enable(true) gc() @logmsg("\titems") diff --git a/src/chunks/matrix.jl b/src/chunks/matrix.jl new file mode 100644 index 0000000..d52a5ec --- /dev/null +++ b/src/chunks/matrix.jl @@ -0,0 +1,359 @@ +module MatrixBlobs + +using Blobs +import Blobs: @logmsg, load, save, flush +using Base.Random: UUID +import Base: serialize, deserialize, getindex, setindex!, size, append!, flush, * + +export DenseMatBlobs, SparseMatBlobs, size, getindex, setindex!, serialize, deserialize, save, load, flush, * + +const BYTES_128MB = 128 * 1024 * 1024 + +def_sz{T}(::Type{T}, blk_sz::Int=BYTES_128MB) = floor(Int, blk_sz / sizeof(T)) + +function relidx(range::Range, D::Int, i1::Int, i2::Int) + if D == 1 + (i1 - first(range) + 1), i2 + else + i1, (i2 - first(range) + 1) + end +end + +type SparseMatBlobs{Tv,Ti} <: AbstractMatrix{Tv} + metadir::AbstractString + sz::Tuple + splits::Vector{Pair} + coll::BlobCollection{SparseMatrixCSC{Tv,Ti}} +end + +# D = dimension that is split +# N = value of the other dimension, which is constant across all splits +type DenseMatBlobs{T,D,N} <: AbstractMatrix{T} + metadir::AbstractString + sz::Tuple + splits::Vector{Pair} # keep a mapping of index ranges to blobs + coll::BlobCollection{Matrix{T}} +end + +typealias MatBlobs Union{SparseMatBlobs,DenseMatBlobs} + + +size(dm::MatBlobs) = dm.sz + +split_ranges(dm::MatBlobs) = [p.first for p in dm.splits] + +function splitidx(dm::MatBlobs, splitdim_idx::Int) + for splitnum in 1:length(dm.splits) + p = dm.splits[splitnum] + range = p.first + if splitdim_idx in range + return splitnum + end + end + throw(BoundsError("MatBlobs $(dm.sz) split on dimension $D", splitdim_idx)) +end + +function serialize(s::SerializationState, sm::MatBlobs) + Serializer.serialize_type(s, typeof(sm)) + serialize(s, sm.metadir) + serialize(s, sm.sz) + serialize(s, sm.splits) + + coll = sm.coll + serialize(s, coll.id) + serialize(s, coll.mutability) + serialize(s, coll.reader) + serialize(s, coll.blobs) + serialize(s, coll.maxcache) +end + +function matblob(metadir::AbstractString; maxcache::Int=10) + @logmsg("reading back matrix from $metadir") + open(joinpath(metadir, "meta"), "r") do io + mat = deserialize(SerializationState(io)) + mat.metadir = metadir + max_cached!(mat.coll, maxcache) + mat + end +end + +function flush(dm::MatBlobs, wrkrs::Vector{Int}=Int[]; callback::Bool=true) + isempty(wrkrs) ? flush(dm.coll; callback=callback) : flush(dm.coll, wrkrs; callback=callback) + nothing +end + +## +# SparseMatBlobs specific functions +sersz{Tv,Ti}(sp::SparseMatrixCSC{Tv,Ti}) = (sizeof(Int64)*3 + sizeof(sp.colptr) + sizeof(sp.rowval) + sizeof(sp.nzval)) + +function load{Tv<:Real,Ti<:Integer}(meta::FileMeta, reader::FileBlobIO{SparseMatrixCSC{Tv,Ti}}) + open(meta.filename, "r+") do fhandle + seek(fhandle, meta.offset) + header = Array(Int64, 3) + pos1 = position(fhandle) + header = read!(fhandle, header) + m = header[1] + n = header[2] + nz = header[3] + + pos1 += sizeof(header) + colptr = reader.use_mmap ? blobmmap(fhandle, Vector{Ti}, (n+1,), pos1) : read!(fhandle, Array(Ti, n+1)) + + pos1 += sizeof(colptr) + rowval = reader.use_mmap ? blobmmap(fhandle, Vector{Ti}, (nz,), pos1) : read!(fhandle, Array(Ti, nz)) + + pos1 += sizeof(rowval) + nzval = reader.use_mmap ? blobmmap(fhandle, Vector{Tv}, (nz,), pos1) : read!(fhandle, Array(Tv, nz)) + return SparseMatrixCSC{Tv,Ti}(m, n, colptr, rowval, nzval) + end +end + +function save{Tv<:Real,Ti<:Integer}(spm::SparseMatrixCSC{Tv,Ti}, meta::FileMeta, writer::FileBlobIO{SparseMatrixCSC{Tv,Ti}}) + if writer.use_mmap && ismmapped(spm.colptr) && ismmapped(spm.rowval) && ismmapped(spm.nzval) + syncmmapped(spm.colptr) + syncmmapped(spm.rowval) + syncmmapped(spm.nzval) + else + header = Int64[spm.m, spm.n, length(spm.nzval)] + + touch(meta.filename) + open(meta.filename, "r+") do fhandle + seek(fhandle, meta.offset) + write(fhandle, header) + write(fhandle, spm.colptr) + write(fhandle, spm.rowval) + write(fhandle, spm.nzval) + end + end + nothing +end + +function load(sm::SparseMatBlobs, col::Int) + splitnum = splitidx(sm, col) + p = sm.splits[splitnum] + range = p.first + bid = p.second + data = load(sm.coll, bid) + #@logmsg("loaded split idx:$(col) from splitnum $splitnum ($bid) with range: $range, sz: $(size(data))") + data, range +end + +getindex{Tv}(sm::SparseMatBlobs{Tv}, i::Int) = getindex(sm, ind2sub(size(sm), i)...) +function getindex{Tv}(sm::SparseMatBlobs{Tv}, i1::Int, i2::Int) + part, range = load(sm, i2) + part[relidx(range, 2, i1, i2)...] +end + +function getindex{Tv}(sm::SparseMatBlobs{Tv}, ::Colon, i2::Int) + part, range = load(sm, i2) + reli1, reli2 = relidx(range, 2, 1, i2) + part[:,reli2] +end + +function deserialize{Tv,Ti}(s::SerializationState, ::Type{SparseMatBlobs{Tv,Ti}}) + metadir = deserialize(s) + sz = deserialize(s) + splits = deserialize(s) + + coll_id = deserialize(s) + coll_mut = deserialize(s) + coll_reader = deserialize(s) + coll_blobs = deserialize(s) + coll_maxcache = deserialize(s) + + coll = BlobCollection(SparseMatrixCSC{Tv,Ti}, coll_mut, coll_reader; maxcache=coll_maxcache, id=coll_id) + coll.blobs = coll_blobs + SparseMatBlobs{Tv,Ti}(metadir, sz, splits, coll) +end + +function SparseMatBlobs{Tv,Ti}(::Type{Tv}, ::Type{Ti}, metadir::AbstractString; maxcache::Int=10) + T = SparseMatrixCSC{Tv,Ti} + mut = Mutable(BYTES_128MB, FileBlobIO(T, true)) + coll = BlobCollection(T, mut, FileBlobIO(T, true); maxcache=maxcache) + SparseMatBlobs{Tv,Ti}(metadir, (0,0), Pair[], coll) +end + +function append!{Tv,Ti}(sp::SparseMatBlobs{Tv,Ti}, S::SparseMatrixCSC{Tv,Ti}) + m,n = size(S) + if isempty(sp.splits) + sp.sz = (m, n) + idxrange = 1:n + else + (sp.sz[1] == m) || throw(BoundsError("SparseMatBlobs $(sp.sz)", (m,n))) + old_n = sp.sz[2] + idxrange = (old_n+1):(old_n+n) + sp.sz = (m, old_n+n) + end + + fname = joinpath(sp.metadir, string(length(sp.splits)+1)) + meta = FileMeta(fname, 0, sersz(S)) + + blob = append!(sp.coll, SparseMatrixCSC{Tv,Ti}, meta, StrongLocality(myid()), Nullable(S)) + push!(sp.splits, idxrange => blob.id) + @logmsg("appending blob $(blob.id) of size: $(size(S)) for idxrange: $idxrange, sersz: $(meta.size)") + blob +end + +function save(sp::SparseMatBlobs, wrkrs::Vector{Int}=Int[]) + isempty(wrkrs) ? save(sp.coll) : save(sp.coll, wrkrs) + save(sp.coll) + open(joinpath(sp.metadir, "meta"), "w") do io + serialize(SerializationState(io), sp) + end + nothing +end + +SparseMatBlobs(metadir::AbstractString; maxcache::Int=10) = matblob(metadir; maxcache=maxcache) + +## +# DenseMatBlobs specific functions +sersz{T}(d::Matrix{T}) = (sizeof(Int64)*2 + sizeof(d)) + +function load{T,D,N}(dm::DenseMatBlobs{T,D,N}, splitdim_idx::Int) + splitnum = splitidx(dm, splitdim_idx) + p = dm.splits[splitnum] + range = p.first + bid = p.second + load(dm.coll, bid), range +end + +getindex{T,D,N}(dm::DenseMatBlobs{T,D,N}, i::Int) = getindex(dm, ind2sub(size(dm), i)...) + +function getindex{T,D,N}(dm::DenseMatBlobs{T,D,N}, i1::Int, i2::Int) + splitdim_idx = (D == 1) ? i1 : i2 + part, range = load(dm, splitdim_idx) + part[relidx(range, D, i1, i2)...] +end + +function getindex{T,N}(dm::DenseMatBlobs{T,1,N}, i1::Int, ::Colon) + part, range = load(dm, i1) + reli1, reli2 = relidx(range, 1, i1, 1) + part[reli1,:] +end + +function getindex{T,N}(dm::DenseMatBlobs{T,1,N}, idxs, ::Colon) + res = Array(T, length(idxs), N) + for residx in 1:length(idxs) + idx = idxs[residx] + res[residx,:] = dm[idx,:] + end + res +end + +function getindex{T,N}(dm::DenseMatBlobs{T,2,N}, ::Colon, i2::Int) + part, range = load(dm, i2) + reli1, reli2 = relidx(range, 2, 1, i2) + part[:,reli2] +end + +function getindex{T,N}(dm::DenseMatBlobs{T,2,N}, ::Colon, idxs) + res = Array(T, N, length(idxs)) + for residx in 1:length(idxs) + idx = idxs[residx] + res[:, residx] = dm[:, idx] + end + res +end + +setindex!{T,D,N}(dm::DenseMatBlobs{T,D,N}, v::T, i::Int) = setindex!(dm, v, ind2sub(size(dm), i)...) + +function setindex!{T,D,N}(dm::DenseMatBlobs{T,D,N}, v::T, i1::Int, i2::Int) + splitdim_idx = (D == 1) ? i1 : i2 + part, range = load(dm, splitdim_idx) + part[relidx(range, D, i1, i2)...] = v +end + +function setindex!{T,N}(dm::DenseMatBlobs{T,1,N}, v, i1::Int, ::Colon) + part, range = load(dm, i1) + reli1, reli2 = relidx(range, 1, i1, 1) + part[reli1,:] = v +end + +function setindex!{T,N}(dm::DenseMatBlobs{T,2,N}, v, ::Colon, i2::Int) + part, range = load(dm, i2) + reli1, reli2 = relidx(range, 2, 1, i2) + part[:,reli2] = v +end +#= +function *{T1,T2}(A::Vector{T1}, B::DenseMatBlobs{T2,1}) + T = promote_type(T1, T2) + res = Array(T, size(B, 2)) + for idx in 1:length(B.splits) + p = B.splits[idx] + part, r = load(B, first(p.first)) + res[r] = v * part + end + res +end +=# +function *{T1,T2}(A::Matrix{T1}, B::DenseMatBlobs{T2,2}) + m,n = size(B) + (size(A, 2) == m) || throw(DimensionMismatch("A has dimensions $(size(A)) but B has dimensions $(size(B))")) + res = Array(promote_type(T1,T2), 1, n) + for idx in 1:length(B.splits) + p = B.splits[idx] + part, r = load(B, first(p.first)) + res[r] = A * part + end + res +end + +function deserialize{T,D,N}(s::SerializationState, ::Type{DenseMatBlobs{T,D,N}}) + metadir = deserialize(s) + sz = deserialize(s) + splits = deserialize(s) + + coll_id = deserialize(s) + coll_mut = deserialize(s) + coll_reader = deserialize(s) + coll_blobs = deserialize(s) + coll_maxcache = deserialize(s) + + coll = BlobCollection(Matrix{T}, coll_mut, coll_reader; maxcache=coll_maxcache, id=coll_id) + coll.blobs = coll_blobs + DenseMatBlobs{T,D,N}(metadir, sz, splits, coll) +end + +function DenseMatBlobs{Tv}(::Type{Tv}, D::Int, N::Int, metadir::AbstractString; maxcache::Int=10) + T = Matrix{Tv} + io = FileBlobIO(Array{Tv}, true) + mut = Mutable(BYTES_128MB, io) + coll = BlobCollection(T, mut, io; maxcache=maxcache) + DenseMatBlobs{Tv,D,N}(metadir, (0,0), Pair[], coll) +end + +function append!{Tv,D,N}(dm::DenseMatBlobs{Tv,D,N}, M::Matrix{Tv}) + m,n = size(M) + unsplit_dim = (D == 1) ? n : m + split_dim = (D == 1) ? m : n + (N == unsplit_dim) || throw(BoundsError("DenseMatBlobs with unsplit dimension $D fixed at $N", (m,n))) + if isempty(dm.splits) + dm.sz = (m, n) + idxrange = 1:split_dim + else + old_split_dim = dm.sz[D] + new_split_dim = old_split_dim + split_dim + idxrange = (old_split_dim+1):new_split_dim + dm.sz = (D == 1) ? (new_split_dim, unsplit_dim) : (unsplit_dim, new_split_dim) + end + + fname = joinpath(dm.metadir, string(length(dm.splits)+1)) + meta = FileMeta(fname, 0, sersz(M)) + + blob = append!(dm.coll, Matrix{Tv}, meta, StrongLocality(myid()), Nullable(M)) + push!(dm.splits, idxrange => blob.id) + @logmsg("appending blob $(blob.id) of size: $(size(M)) for idxrange: $idxrange, sersz: $(meta.size)") + blob +end + +DenseMatBlobs(metadir::AbstractString; maxcache::Int=10) = matblob(metadir; maxcache=maxcache) + +function save(dm::DenseMatBlobs, wrkrs::Vector{Int}=Int[]) + isempty(wrkrs) ? save(dm.coll) : save(dm.coll, wrkrs) + open(joinpath(dm.metadir, "meta"), "w") do io + serialize(SerializationState(io), dm) + end + nothing +end + +end # module diff --git a/src/inputs/dist_input.jl b/src/inputs/dist_input.jl index fda1387..0e13c98 100644 --- a/src/inputs/dist_input.jl +++ b/src/inputs/dist_input.jl @@ -8,8 +8,8 @@ type DistInputs <: Inputs RTfile::FileSpec nusers::Int nitems::Int - R::Nullable{ChunkedFile} - RT::Nullable{ChunkedFile} + R::Nullable{InputRatings} + RT::Nullable{InputRatings} function DistInputs(ratings_file::FileSpec, transposed_ratings_file::FileSpec) new(ratings_file, transposed_ratings_file, 0, 0, nothing, nothing) @@ -26,13 +26,10 @@ share!(inp::DistInputs) = nothing function ensure_loaded(inp::DistInputs) if isnull(inp.R) + # R is the user x item matrix (users in rows and items in columns) R = read_input(inp.Rfile) RT = read_input(inp.RTfile) - - last_chunk = R.chunks[end] - inp.nitems = last(last_chunk.keyrange) - last_chunk = RT.chunks[end] - inp.nusers = last(last_chunk.keyrange) + inp.nusers, inp.nitems = size(R) inp.R = R inp.RT = RT @@ -42,32 +39,3 @@ end item_idmap(inp::DistInputs) = Int64[] user_idmap(inp::DistInputs) = Int64[] - -nusers(inp::DistInputs) = inp.nusers -nitems(inp::DistInputs) = inp.nitems - -users_and_ratings(inp::DistInputs, i::Int64) = _sprowsvals(get(inp.R), i) -all_user_ratings(inp::DistInputs, i::Int64) = _spvals(get(inp.R), i) -all_users_rated(inp::DistInputs, i::Int64) = _sprows(get(inp.R), i) - -items_and_ratings(inp::DistInputs, u::Int64) = _sprowsvals(get(inp.RT), u) -all_item_ratings(inp::DistInputs, u::Int64) = _spvals(get(inp.RT), u) -all_items_rated(inp::DistInputs, u::Int64) = _sprows(get(inp.RT), u) - -function _sprowsvals{K,SK,SV}(cf::ChunkedFile{K,SparseMatrixCSC{SK,SV}}, col::Int64) - chunk = getchunk(cf, col) - d = data(chunk, cf.lrucache) - _sprowsvals(d, col - first(chunk.keyrange) + 1) -end - -function _sprows{K,SK,SV}(cf::ChunkedFile{K,SparseMatrixCSC{SK,SV}}, col::Int64) - chunk = getchunk(cf, col) - d = data(chunk, cf.lrucache) - _sprows(d, col - first(chunk.keyrange) + 1) -end - -function _spvals{K,SK,SV}(cf::ChunkedFile{K,SparseMatrixCSC{SK,SV}}, col::Int64) - chunk = getchunk(cf, col) - d = data(chunk, cf.lrucache) - _spvals(d, col - first(chunk.keyrange) + 1) -end diff --git a/src/inputs/input.jl b/src/inputs/input.jl index 44afeea..687949a 100644 --- a/src/inputs/input.jl +++ b/src/inputs/input.jl @@ -1,12 +1,14 @@ type SharedMemoryInputs <: Inputs ratings_file::FileSpec + nusers::Int + nitems::Int R::Nullable{InputRatings} RT::Nullable{InputRatings} item_idmap::Nullable{InputIdMap} user_idmap::Nullable{InputIdMap} function SharedMemoryInputs(file::FileSpec) - new(file, nothing, nothing, nothing, nothing) + new(file, 0, 0, nothing, nothing, nothing, nothing) end end @@ -115,6 +117,7 @@ function ensure_loaded(inp::SharedMemoryInputs; only_items::Vector{Int64}=Int64[ R, item_idmap, user_idmap = filter_empty(R; only_items=only_items) inp.R = R + inp.nusers, inp.nitems = size(R) inp.item_idmap = (extrema(item_idmap) == (1,length(item_idmap))) ? nothing : item_idmap inp.user_idmap = (extrema(user_idmap) == (1,length(user_idmap))) ? nothing : user_idmap inp.RT = R' @@ -128,35 +131,3 @@ end item_idmap(inp::SharedMemoryInputs) = isnull(inp.item_idmap) ? Int64[] : get(inp.item_idmap) user_idmap(inp::SharedMemoryInputs) = isnull(inp.user_idmap) ? Int64[] : get(inp.user_idmap) - -nusers(inp::SharedMemoryInputs) = size(get(inp.R), 1) -nitems(inp::SharedMemoryInputs) = size(get(inp.R), 2) - -users_and_ratings(inp::SharedMemoryInputs, i::Int64) = _sprowsvals(get(inp.R), i) -all_user_ratings(inp::SharedMemoryInputs, i::Int64) = _spvals(get(inp.R), i) -all_users_rated(inp::SharedMemoryInputs, i::Int64) = _sprows(get(inp.R), i) - -items_and_ratings(inp::SharedMemoryInputs, u::Int64) = _sprowsvals(get(inp.RT), u) -all_item_ratings(inp::SharedMemoryInputs, u::Int64) = _spvals(get(inp.RT), u) -all_items_rated(inp::SharedMemoryInputs, u::Int64) = _sprows(get(inp.RT), u) - -function _sprowsvals(R::InputRatings, col::Int64) - rowstart = R.colptr[col] - rowend = R.colptr[col+1] - 1 - # use subarray? - R.rowval[rowstart:rowend], R.nzval[rowstart:rowend] -end - -function _sprows(R::InputRatings, col::Int64) - rowstart = R.colptr[col] - rowend = R.colptr[col+1] - 1 - # use subarray? - R.rowval[rowstart:rowend] -end - -function _spvals(R::InputRatings, col::Int64) - rowstart = R.colptr[col] - rowend = R.colptr[col+1] - 1 - # use subarray? - R.nzval[rowstart:rowend] -end diff --git a/src/inputs/inputs.jl b/src/inputs/inputs.jl new file mode 100644 index 0000000..73cb1f0 --- /dev/null +++ b/src/inputs/inputs.jl @@ -0,0 +1,46 @@ +include("input.jl") +include("dist_input.jl") + +nusers{T<:Inputs}(inp::T) = inp.nusers +nitems{T<:Inputs}(inp::T) = inp.nitems + +users_and_ratings{T<:Inputs}(inp::T, i::Int64) = _sprowsvals(get(inp.R), i) +all_user_ratings{T<:Inputs}(inp::T, i::Int64) = _spvals(get(inp.R), i) +all_users_rated{T<:Inputs}(inp::T, i::Int64) = _sprows(get(inp.R), i) + +items_and_ratings{T<:Inputs}(inp::T, u::Int64) = _sprowsvals(get(inp.RT), u) +all_item_ratings{T<:Inputs}(inp::T, u::Int64) = _spvals(get(inp.RT), u) +all_items_rated{T<:Inputs}(inp::T, u::Int64) = _sprows(get(inp.RT), u) + +function _sprowsvals{R<:InputRatings}(sp::R, col::Int64) + v = sp[:,col] + Base.SparseArrays.nonzeroinds(v), Base.SparseArrays.nonzeros(v) +end + +function _sprows{R<:InputRatings}(sp::R, col::Int64) + v = sp[:,col] + Base.SparseArrays.nonzeroinds(v) +end + +function _spvals{R<:InputRatings}(sp::R, col::Int64) + v = sp[:,col] + Base.SparseArrays.nonzeros(sp[:,col]) +end + +function _sprowsvals(sp::ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64}, col::Int64) + rowstart = sp.colptr[col] + rowend = sp.colptr[col+1] - 1 + sp.rowval[rowstart:rowend], sp.nzval[rowstart:rowend] +end + +function _sprows(sp::ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64}, col::Int64) + rowstart = sp.colptr[col] + rowend = sp.colptr[col+1] - 1 + sp.rowval[rowstart:rowend] +end + +function _spvals(sp::ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64}, col::Int64) + rowstart = sp.colptr[col] + rowend = sp.colptr[col+1] - 1 + sp.nzval[rowstart:rowend] +end diff --git a/src/models/als_dist_model.jl b/src/models/als_dist_model.jl index 31efd74..17575d2 100644 --- a/src/models/als_dist_model.jl +++ b/src/models/als_dist_model.jl @@ -1,33 +1,29 @@ # In distributed memory mode, the model may be too large to load into memory on -# a single node. It is therefore split by one of its dimensions into chunks. +# a single node. It is therefore split by one of its dimensions into blobs. # # All matrix operations are done on the node where required, just that not all -# chunks are loaded at once at any time. This is more efficient for the ALS +# blobs are loaded at once at any time. This is more efficient for the ALS # algorithm, because: # - ALS computation is already distributed over users and items -# - it is more efficient to transfer a full chunk once to a node, instead of +# - it is more efficient to transfer a full blob once to a node, instead of # communicating to a remote node once for each step type DistModel <: Model UFile::FileSpec PFile::FileSpec nfactors::Int lambda::Float64 - U::Nullable{ChunkedFile} - P::Nullable{ChunkedFile} + U::Nullable{ModelFactor} + P::Nullable{ModelFactor} lambdaI::Nullable{ModelFactor} Pinv::Nullable{ModelFactor} end -nusers(model::DistModel) = last(keyrange(get(model.U))) -nitems(model::DistModel) = last(keyrange(get(model.P))) +nusers(model::DistModel) = size(get(model.U))[1] +nitems(model::DistModel) = size(get(model.P))[2] nfactors(model::DistModel) = model.nfactors share!(model::DistModel) = nothing localize!(model::DistModel) = nothing -function sync!(model::DistModel) - isnull(model.U) || sync!(get(model.U)) - isnull(model.P) || sync!(get(model.P)) -end function clear(model::DistModel) model.U = nothing @@ -48,16 +44,6 @@ end #end #vec_mul_pinv(model::DistModel, v) = v * pinv(model) -function vec_mul_p(model::DistModel, v) - res = Array(Float64, nitems(model)) - cfP = get(model.P) - for chunk in cfP.chunks - P = data(chunk, cfP.lrucache).val - res[chunk.keyrange] = v * P - end - res -end - function prep{TI<:DistInputs}(inp::TI, nfacts::Int, lambda::Float64, model_dir::AbstractString, max_cache::Int=10) ensure_loaded(inp) t1 = time() @@ -66,63 +52,27 @@ function prep{TI<:DistInputs}(inp::TI, nfacts::Int, lambda::Float64, model_dir:: nu = nusers(inp) ni = nitems(inp) - UFile = DenseMatChunks(joinpath(model_dir, "U.meta"), 1, (nu, nfacts), max_cache) - PFile = DenseMatChunks(joinpath(model_dir, "P.meta"), 2, (nfacts, ni), max_cache) - create(UFile, zero!, min(_max_items(UFile), ceil(Int, nu/nworkers()))) - create(PFile, rand!, min(_max_items(PFile), ceil(Int, ni/nworkers()))) + Udir = DenseBlobs(joinpath(model_dir, "U")) + Pdir = DenseBlobs(joinpath(model_dir, "P")) + isdir(model_dir) || mkdir(model_dir) + isdir(Udir.name) || mkdir(Udir.name) + isdir(Pdir.name) || mkdir(Pdir.name) + + Usz = (nu, nfacts) + Psz = (nfacts, ni) + U = create(Udir, Float64, 1, Usz, zeros, min(_max_items(Float64,1,Usz), ceil(Int, nu/nworkers()))) + P = create(Pdir, Float64, 2, Psz, rand, min(_max_items(Float64,2,Psz), ceil(Int, ni/nworkers()))) - cfU = read_input(UFile) - cfP = read_input(PFile) for idx in 1:ni - chunk = getchunk(cfP, idx) - P = data(chunk, cfP.lrucache).val - P[1,idx-first(chunk.keyrange)+1] = mean(all_user_ratings(inp, idx)) + P[1,idx] = mean(all_user_ratings(inp, idx)) end - sync!(cfP) + save(U) + save(P) lambdaI = lambda * eye(nfacts) - model = DistModel(UFile, PFile, nfacts, lambda, cfU, cfP, lambdaI, nothing) + model = DistModel(Udir, Pdir, nfacts, lambda, U, P, lambdaI, nothing) t2 = time() @logmsg("prep time: $(t2-t1)") model end - -function setU(model::DistModel, u::Int64, vals) - cf = get(model.U) - chunk = getchunk(cf, u) - U = data(chunk, cf.lrucache).val - U[u-first(chunk.keyrange)+1,:] = vals - nothing -end - -function setP(model::DistModel, i::Int64, vals) - cf = get(model.P) - chunk = getchunk(cf, i) - P = data(chunk, cf.lrucache).val - P[:,i-first(chunk.keyrange)+1] = vals - nothing -end - -function getU(model::DistModel, users) - cf = get(model.U) - Usub = Array(Float64, length(users), model.nfactors) - for idx in 1:length(users) - u = users[idx] - chunk = getchunk(cf, u) - U = data(chunk, cf.lrucache).val - Usub[idx,:] = U[u-first(chunk.keyrange)+1,:] - end - Usub -end -function getP(model::DistModel, items) - cf = get(model.P) - Psub = Array(Float64, model.nfactors, length(items)) - for idx in 1:length(items) - i = items[idx] - chunk = getchunk(cf, i) - P = data(chunk, cf.lrucache).val - Psub[:,idx] = P[:,i-first(chunk.keyrange)+1] - end - Psub -end diff --git a/src/models/als_model.jl b/src/models/als_model.jl index f150ebc..ccc0b1a 100644 --- a/src/models/als_model.jl +++ b/src/models/als_model.jl @@ -30,7 +30,6 @@ function localize!(model::SharedMemoryModel) isa(model.P, SharedArray) && (model.P = copy(model.P)) nothing end -sync!(model::SharedMemoryModel) = nothing function clear(model::SharedMemoryModel) model.lambdaI = nothing @@ -52,8 +51,6 @@ function pinv(model::SharedMemoryModel) end get(model.Pinv) end - -vec_mul_p(model::SharedMemoryModel, v) = v * model.P vec_mul_pinv(model::SharedMemoryModel, v) = v * pinv(model) function prep{TI<:Inputs}(inp::TI, nfacts::Int, lambda::Float64) @@ -77,16 +74,3 @@ function prep{TI<:Inputs}(inp::TI, nfacts::Int, lambda::Float64) @logmsg("prep time: $(t2-t1)") model end - -@inline function setU(model::SharedMemoryModel, u::Int64, vals) - model.U[u,:] = vals - nothing -end - -@inline function setP(model::SharedMemoryModel, i::Int64, vals) - model.P[:,i] = vals - nothing -end - -getU(model::SharedMemoryModel, users) = model.U[users, :] -getP(model::SharedMemoryModel, items) = model.P[:, items] diff --git a/src/models/models.jl b/src/models/models.jl new file mode 100644 index 0000000..eba60c6 --- /dev/null +++ b/src/models/models.jl @@ -0,0 +1,28 @@ +include("als_model.jl") +include("als_dist_model.jl") + +setU{M<:Model}(model::M, u::Int64, vals) = setU(model.U, u, vals) +setU(NU::Nullable, u::Int64, vals) = setU(get(NU), u, vals) +function setU{M<:ModelFactor}(U::M, u::Int64, vals) + U[u,:] = vals + nothing +end + +setP{M<:Model}(model::M, i::Int64, vals) = setP(model.P, i, vals) +setP(NP::Nullable, i, vals) = setP(get(NP), i, vals) +function setP{M<:ModelFactor}(P::M, i::Int64, vals) + P[:,i] = vals + nothing +end + +getU{M<:Model}(model::M, users) = getU(model.U, users) +getU(NU::Nullable, users) = getU(get(NU), users) +getU{M<:ModelFactor}(U::M, users) = U[users, :] + +getP{M<:Model}(model::M, items) = getP(model.P, items) +getP(NP::Nullable, items) = getP(get(NP), items) +getP{M<:ModelFactor}(P::M, items) = P[:, items] + +vec_mul_p{M<:Model}(model::M, v) = vec_mul_p(model.P, v) +vec_mul_p(NM::Nullable, v) = vec_mul_p(get(NM), v) +vec_mul_p{M<:ModelFactor}(model::M, v) = v * model diff --git a/src/utils.jl b/src/utils.jl index b43a597..1a2aa31 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -48,73 +48,48 @@ type SparseMat <: FileSpec end read_input(fspec::SparseMat) = fspec.S -# sparse matrix chunks (input only) -type SparseMatChunks <: FileSpec - metafile::AbstractString - max_cache::Int - - function SparseMatChunks(metafile::AbstractString, max_cache::Int=5) - new(metafile, max_cache) +# sparse matrix blobs (input only) +type SparseBlobs <: FileSpec + name::AbstractString + maxcache::Int + function SparseBlobs(name::AbstractString; maxcache::Int=10) + new(name, maxcache) end end -read_input(fspec::SparseMatChunks) = ChunkedFile(fspec.metafile, UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, fspec.max_cache) - -# dense matrix chunks (input and output) -type DenseMatChunks <: FileSpec - metafile::AbstractString - D::Int - sz::Tuple - max_cache::Int +read_input(fspec::SparseBlobs) = SparseMatBlobs(fspec.name; maxcache=fspec.maxcache) - function DenseMatChunks(metafile::AbstractString, splitdim::Int, sz::Tuple, max_cache::Int=5) - new(metafile, splitdim, sz, max_cache) +# dense matrix blobs (input and output) +type DenseBlobs <: FileSpec + name::AbstractString + maxcache::Int + function DenseBlobs(name::AbstractString; maxcache::Int=10) + new(name, maxcache) end end -function read_input(fspec::DenseMatChunks) - D = fspec.D - N = fspec.sz[(D==1) ? 2 : 1] - ChunkedFile(fspec.metafile, UnitRange{Int64}, MemMappedMatrix{Float64,D,N}, fspec.max_cache, false) -end - -zero!(a) = fill!(a, 0) +read_input(fspec::DenseBlobs) = DenseMatBlobs(fspec.name; maxcache=fspec.maxcache) const MAX_BLK_BYTES = 128*1000*1000 #128MB -function _max_items(fspec::DenseMatChunks) - D = fspec.D - N = fspec.sz[(D==1) ? 2 : 1] - ceil(Int, MAX_BLK_BYTES/sizeof(Float64)/N) +function _max_items(T::Type, D::Int, sz::Tuple) + m,n = sz + unsplit_dim = (D == 1) ? n : m + ceil(Int, MAX_BLK_BYTES/sizeof(T)/unsplit_dim) end - -function create(fspec::DenseMatChunks, initfn::Function=zero!, max_items::Int=_max_items(fspec)) - touch(fspec.metafile) - cf = read_input(fspec) - - D = fspec.D - N = fspec.sz[(D==1) ? 2 : 1] - V = fspec.sz[D] - NC = ceil(Int, V/max_items) - chunkpfx = splitext(fspec.metafile)[1] - empty!(cf.chunks) - empty!(cf.lrucache) - - for idx in 1:NC - chunkfname = "$(chunkpfx).$(idx)" - isfile(chunkfname) && rm(chunkfname) - r1 = (idx-1)*max_items + 1 - r2 = min(idx*max_items, V) - M = r2-r1+1 - sz = M*N*sizeof(Float64) - @logmsg("creating chunk $chunkfname sz: $sz, r: $r1:$r2, N:$N") - chunk = Chunk(chunkfname, 0, sz, r1:r2, MemMappedMatrix{Float64,D,N}) - push!(cf.chunks, chunk) +function create{T}(fspec::DenseBlobs, ::Type{T}, D::Int, sz::Tuple, init::Function, max_items::Int=_max_items(T,D,sz)) + @logmsg("creating densematarray") + isdir(fspec.name) || mkdir(fspec.dir) + m,n = sz + unsplit_dim = (D == 1) ? n : m + split_dim = sz[D] + dm = DenseMatBlobs(T, D, unsplit_dim, fspec.name) + + startidx = 1 + while startidx <= split_dim + idxrange = startidx:min(split_dim, startidx + max_items) + blobsz = (D == 1) ? (length(idxrange),unsplit_dim) : (unsplit_dim,length(idxrange)) + M = init(T, blobsz...) + @logmsg("idxrange: $idxrange, sz: $(size(M))") + append!(dm, M) + startidx = last(idxrange) + 1 end - for idx in 1:NC - r1 = (idx-1)*max_items + 1 - chunk = getchunk(cf, r1) - A = data(chunk, cf.lrucache) - initfn(A.val) - sync!(chunk) - end - writemeta(cf) - nothing + dm end From 6ec280247371515354c36a23c9e0c17d9f05277c Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Mar 2016 01:49:50 +0530 Subject: [PATCH 2/5] remove chunks code, now externalized as Blobs.jl --- src/chunks/chunk.jl | 101 --------------------------------------- src/chunks/csv.jl | 27 ----------- src/chunks/mmapdense.jl | 15 ------ src/chunks/mmapsparse.jl | 62 ------------------------ 4 files changed, 205 deletions(-) delete mode 100644 src/chunks/chunk.jl delete mode 100644 src/chunks/csv.jl delete mode 100644 src/chunks/mmapdense.jl delete mode 100644 src/chunks/mmapsparse.jl diff --git a/src/chunks/chunk.jl b/src/chunks/chunk.jl deleted file mode 100644 index f39fab3..0000000 --- a/src/chunks/chunk.jl +++ /dev/null @@ -1,101 +0,0 @@ -# Chunks are parts of a large data datastructure that can be: -# - loaded on demand -# - cached, but evicted on memory pressure -# - optionally attached with locality for efficient IO (in future) -# A chunked files: -# - has a metadata section that lists location and data range of each chunk -# - chunk key type (data range) must have the `in`, `first`, `last`, and `length` methods defined - -using LRUCache -using Base.Mmap -import Base.Mmap: sync! - -type Chunk{K,V} - path::AbstractString - offset::Int - size::Int - keyrange::K - valtype::Type{V} - data::WeakRef -end - -function Chunk(path::AbstractString, offset::Integer, size::Integer, keyrange, V) - K = typeof(keyrange) - Chunk{K,V}(path, offset, size, keyrange, V, WeakRef()) -end - -function Chunk(path::AbstractString, keyrange, V) - size = filesize(path) - K = typeof(keyrange) - Chunk{K,V}(path, 0, size, keyrange, V, WeakRef()) -end - -function data{K,V}(chunk::Chunk{K,V}, lrucache::LRU) - if chunk.data.value == nothing - data = load(V, chunk) - chunk.data.value = data - end - v = chunk.data.value - lrucache[chunk.keyrange] = v - v::V -end - -function sync!(chunk::Chunk) - try - (chunk.data.value == nothing) || sync!(chunk.data.value) - end -end - -type ChunkedFile{K,V} - keyrangetype::Type{K} - valtype::Type{V} - metapath::AbstractString - chunks::Vector{Chunk{K,V}} - lrucache::LRU -end - -keyrange(cf::ChunkedFile) = first(cf.chunks[1].keyrange):last(cf.chunks[end].keyrange) - -sync!(cf::ChunkedFile) = empty!(cf.lrucache) - -function _unload(cf, k, v) - chunk = getchunk(cf, first(k)) - sync!(chunk) - #@logmsg("unloading chunk $(chunk.path)") - chunk.data.value = nothing - nothing -end - -function ChunkedFile(metapath::AbstractString, K, V, max_cache::Integer, readonly::Bool=true) - chunks = Chunk{K,V}[] - meta = (filesize(metapath) == 0) ? Array(Any,0,0) : readcsv(metapath) - for idx in 1:size(meta,1) - fname = meta[idx,3] - push!(chunks, Chunk(fname, 0, filesize(fname), Int(meta[idx,1]):Int(meta[idx,2]), V)) - end - cf = ChunkedFile{K,V}(K, V, metapath, chunks, LRU{K,V}(max_cache)) - if !readonly - cf.lrucache.cb = (k,v) -> _unload(cf, k, v) - end - cf -end - -function writemeta(cf::ChunkedFile) - chunkpfx = splitext(cf.metapath)[1] - open(cf.metapath, "w") do meta - chunks = cf.chunks - idx = 1 - for chunk in chunks - #@logmsg("writing chunk: $(chunk.keyrange)") - println(meta, first(chunk.keyrange), ",", last(chunk.keyrange), ",", chunkpfx, ".", idx) - idx += 1 - end - end -end - -function getchunk{K,V}(cf::ChunkedFile{K,V}, key::Int) - for chunk in cf.chunks - (key in chunk.keyrange) && return chunk - end - error("Key not found") -end diff --git a/src/chunks/csv.jl b/src/chunks/csv.jl deleted file mode 100644 index a0de473..0000000 --- a/src/chunks/csv.jl +++ /dev/null @@ -1,27 +0,0 @@ -function load{T<:Vector{UInt8}}(::Type{T}, chunk::Chunk) - #@logmsg("loading chunk $(chunk.path)") - open(chunk.path) do f - seek(f, chunk.offset) - databytes = Array(UInt8, chunk.size) - read!(f, databytes) - return databytes::T - end -end - -function load{T<:Matrix}(::Type{T}, chunk::Chunk) - M = readcsv(load(Vector{UInt8}, chunk)) - M::T -end - -#= -function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk) - A = load(Matrix{Float64}, chunk) - rows = convert(Vector{Int64}, A[:,1]); - cols = convert(Vector{Int64}, A[:,2]); - vals = convert(Vector{Float64}, A[:,3]); - - # subtract keyrange to keep sparse matrix small - cols .-= (first(chunk.keyrange) - 1) - sparse(rows, cols, vals)::T -end -=# diff --git a/src/chunks/mmapdense.jl b/src/chunks/mmapdense.jl deleted file mode 100644 index 5352fc1..0000000 --- a/src/chunks/mmapdense.jl +++ /dev/null @@ -1,15 +0,0 @@ -# D = dimension that is split -# N = value of the other dimension, which is constant across all splits -type MemMappedMatrix{T,D,N} - val::Matrix{T} -end -sync!(m::MemMappedMatrix) = sync!(m.val, Base.MS_SYNC | Base.MS_INVALIDATE) - -function load{T,D,N}(::Type{MemMappedMatrix{T,D,N}}, chunk::Chunk) - #@logmsg("loading memory mapped chunk $(chunk.path)") - ncells = div(chunk.size, sizeof(T)) - M = Int(ncells/N) - dims = (D == 1) ? (M,N) : (N,M) - A = Mmap.mmap(chunk.path, Matrix{T}, dims, chunk.offset) - MemMappedMatrix{T,D,N}(A) -end diff --git a/src/chunks/mmapsparse.jl b/src/chunks/mmapsparse.jl deleted file mode 100644 index ac20581..0000000 --- a/src/chunks/mmapsparse.jl +++ /dev/null @@ -1,62 +0,0 @@ -using Base.Mmap -import Base.Mmap: sync! - -function mmap_csc_save(spm::SparseMatrixCSC, fname::AbstractString) - touch(fname) - open(fname, "r+") do fhandle - mmap_csc_save(spm, fhandle) - end -end - -function mmap_csc_save{Tv,Ti}(spm::SparseMatrixCSC{Tv,Ti}, fhandle::IO) - header = Int64[spm.m, spm.n, length(spm.nzval), Base.Serializer.sertag(Tv), Base.Serializer.sertag(Ti)] - - seek(fhandle, 0) - write(fhandle, reinterpret(UInt8, header)) - write(fhandle, reinterpret(UInt8, spm.colptr)) - write(fhandle, reinterpret(UInt8, spm.rowval)) - write(fhandle, reinterpret(UInt8, spm.nzval)) - nothing -end - -function mmap_csc_load(fname::AbstractString) - open(fname, "r+") do fhandle - mmap_csc_load(fhandle) - end -end - -function mmap_csc_load(fhandle::IO) - header = Array(Int64, 5) - pos1 = position(fhandle) - header = read!(fhandle, header) - m = header[1] - n = header[2] - nz = header[3] - Tv = Base.Serializer.desertag(Int32(header[4])) - Ti = Base.Serializer.desertag(Int32(header[5])) - - pos1 += sizeof(header) - colptr = Mmap.mmap(fhandle, Vector{Ti}, (n+1,), pos1) - - pos1 += sizeof(colptr) - rowval = Mmap.mmap(fhandle, Vector{Ti}, (nz,), pos1) - - pos1 += sizeof(rowval) - nzval = Mmap.mmap(fhandle, Vector{Tv}, (nz,), pos1) - SparseMatrixCSC{Tv,Ti}(m, n, colptr, rowval, nzval) -end - -function sync!(spm::SparseMatrixCSC) - Mmap.sync!(spm.colptr, Base.MS_SYNC | Base.MS_INVALIDATE) - Mmap.sync!(spm.rowval, Base.MS_SYNC | Base.MS_INVALIDATE) - Mmap.sync!(spm.nzval, Base.MS_SYNC | Base.MS_INVALIDATE) - nothing -end - -function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk) - #@logmsg("loading memory mapped sparse $(chunk.path)") - open(chunk.path) do f - seek(f, chunk.offset) - return mmap_csc_load(f)::T - end -end From 998f982657e552ca083d4a9d264b3762209a4ee7 Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 10 Mar 2016 22:19:48 +0530 Subject: [PATCH 3/5] gc need not be disabled anymore in thereads mode --- src/als-wr.jl | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/als-wr.jl b/src/als-wr.jl index 6b62d77..b7c8ab1 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -311,15 +311,9 @@ function fact_iters{TP<:ParThread,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI for iter in 1:niters @logmsg("begin iteration $iter") # gc is not threadsafe yet. issue #10317 - gc_enable(false) thread_update_user(model, inp, nu, lambdaI) - gc_enable(true) - gc() - gc_enable(false) @logmsg("\tusers") thread_update_item(model, inp, ni, lambdaI) - gc_enable(true) - gc() @logmsg("\titems") end @@ -340,7 +334,6 @@ function rmse{TP<:ParThread,TI<:Inputs}(als::ALSWR{TP}, inp::TI) N2 = nusers(als.inp) while pos < N2 endpos = min(pos+10000, N2) - gc_enable(false) @threads for user in pos:endpos Uvec = reshape(getU(model, user), 1, NF) nzrows, nzvals = items_and_ratings(inp, user) @@ -350,8 +343,6 @@ function rmse{TP<:ParThread,TI<:Inputs}(als::ALSWR{TP}, inp::TI) lengths[tid] += length(predicted) errs[tid] += sum((predicted - nzvals) .^ 2) end - gc_enable(true) - gc() pos = endpos + 1 end @logmsg("rmse time $(time()-t1)") From c9711ab0db21ef0f5ec38d0e203ad3b3018d7cca Mon Sep 17 00:00:00 2001 From: tan Date: Fri, 11 Mar 2016 10:34:46 +0530 Subject: [PATCH 4/5] further optimizations - store U' instead of U - specialize array indexing for blobs array --- src/als-wr.jl | 21 ++--- src/chunks/matrix.jl | 158 +++++++++++++++-------------------- src/models/als_dist_model.jl | 8 +- src/models/als_model.jl | 4 +- src/models/models.jl | 4 +- src/utils.jl | 17 ++-- 6 files changed, 92 insertions(+), 120 deletions(-) diff --git a/src/als-wr.jl b/src/als-wr.jl index b7c8ab1..85953a0 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -62,6 +62,7 @@ function update_item(r::UnitRange) for i in r update_item(i::Int64, c.model, c.inp, c.lambdaI) end + nothing end function update_item(i::Int64) @@ -71,8 +72,8 @@ end function update_item(i::Int64, model, inp, lambdaI) nzrows, nzvals = users_and_ratings(inp, i) - Ui = getU(model, nzrows) - Uit = Ui' + Uit = getU(model, nzrows) + Ui = Uit' vec = Uit * nzvals mat = (Uit * Ui) + (length(nzrows) * lambdaI) setP(model, i, mat \ vec) @@ -192,15 +193,15 @@ function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, @logmsg("nusers: $nu, nitems: $ni") for iter in 1:niters @logmsg("begin iteration $iter") - pmap(update_user, 1:nu) - #@parallel (noop) for u in 1:nu - # update_user(u) - #end + #pmap(update_user, 1:nu) + @parallel (noop) for u in 1:nu + update_user(u) + end @logmsg("\tusers") - pmap(update_item, 1:ni) - #@parallel (noop) for i in 1:ni - # update_item(i) - #end + #pmap(update_item, 1:ni) + @parallel (noop) for i in 1:ni + update_item(i) + end @logmsg("\titems") end diff --git a/src/chunks/matrix.jl b/src/chunks/matrix.jl index d52a5ec..b97c831 100644 --- a/src/chunks/matrix.jl +++ b/src/chunks/matrix.jl @@ -11,46 +11,37 @@ const BYTES_128MB = 128 * 1024 * 1024 def_sz{T}(::Type{T}, blk_sz::Int=BYTES_128MB) = floor(Int, blk_sz / sizeof(T)) -function relidx(range::Range, D::Int, i1::Int, i2::Int) - if D == 1 - (i1 - first(range) + 1), i2 - else - i1, (i2 - first(range) + 1) - end -end +relidx(range::UnitRange{Int}, i1::Int, i2::Int) = i1, (i2 - first(range) + 1) type SparseMatBlobs{Tv,Ti} <: AbstractMatrix{Tv} metadir::AbstractString - sz::Tuple - splits::Vector{Pair} + sz::Tuple{Int,Int} + splits::Vector{Pair{UnitRange{Int},Base.Random.UUID}} coll::BlobCollection{SparseMatrixCSC{Tv,Ti}} end -# D = dimension that is split -# N = value of the other dimension, which is constant across all splits -type DenseMatBlobs{T,D,N} <: AbstractMatrix{T} +type DenseMatBlobs{T} <: AbstractMatrix{T} metadir::AbstractString - sz::Tuple - splits::Vector{Pair} # keep a mapping of index ranges to blobs + sz::Tuple{Int,Int} + splits::Vector{Pair{UnitRange{Int},Base.Random.UUID}} # keep a mapping of index ranges to blobs coll::BlobCollection{Matrix{T}} end typealias MatBlobs Union{SparseMatBlobs,DenseMatBlobs} -size(dm::MatBlobs) = dm.sz +size{T<:MatBlobs}(dm::T) = dm.sz -split_ranges(dm::MatBlobs) = [p.first for p in dm.splits] +split_ranges{T<:MatBlobs}(dm::T) = [p.first for p in dm.splits] -function splitidx(dm::MatBlobs, splitdim_idx::Int) - for splitnum in 1:length(dm.splits) - p = dm.splits[splitnum] - range = p.first - if splitdim_idx in range +function splitidx{T<:MatBlobs}(dm::T, splitdim_idx::Int) + splits = UnitRange{Int}[x.first for x in dm.splits] + for splitnum in 1:length(splits) + if splitdim_idx in splits[splitnum] return splitnum end end - throw(BoundsError("MatBlobs $(dm.sz) split on dimension $D", splitdim_idx)) + throw(BoundsError("MatBlobs $(dm.sz)", splitdim_idx)) end function serialize(s::SerializationState, sm::MatBlobs) @@ -141,12 +132,12 @@ end getindex{Tv}(sm::SparseMatBlobs{Tv}, i::Int) = getindex(sm, ind2sub(size(sm), i)...) function getindex{Tv}(sm::SparseMatBlobs{Tv}, i1::Int, i2::Int) part, range = load(sm, i2) - part[relidx(range, 2, i1, i2)...] + part[relidx(range, i1, i2)...] end function getindex{Tv}(sm::SparseMatBlobs{Tv}, ::Colon, i2::Int) part, range = load(sm, i2) - reli1, reli2 = relidx(range, 2, 1, i2) + reli1, reli2 = relidx(range, 1, i2) part[:,reli2] end @@ -209,7 +200,7 @@ SparseMatBlobs(metadir::AbstractString; maxcache::Int=10) = matblob(metadir; max # DenseMatBlobs specific functions sersz{T}(d::Matrix{T}) = (sizeof(Int64)*2 + sizeof(d)) -function load{T,D,N}(dm::DenseMatBlobs{T,D,N}, splitdim_idx::Int) +function load{T}(dm::DenseMatBlobs{T}, splitdim_idx::Int) splitnum = splitidx(dm, splitdim_idx) p = dm.splits[splitnum] range = p.first @@ -217,76 +208,57 @@ function load{T,D,N}(dm::DenseMatBlobs{T,D,N}, splitdim_idx::Int) load(dm.coll, bid), range end -getindex{T,D,N}(dm::DenseMatBlobs{T,D,N}, i::Int) = getindex(dm, ind2sub(size(dm), i)...) - -function getindex{T,D,N}(dm::DenseMatBlobs{T,D,N}, i1::Int, i2::Int) - splitdim_idx = (D == 1) ? i1 : i2 - part, range = load(dm, splitdim_idx) - part[relidx(range, D, i1, i2)...] -end - -function getindex{T,N}(dm::DenseMatBlobs{T,1,N}, i1::Int, ::Colon) - part, range = load(dm, i1) - reli1, reli2 = relidx(range, 1, i1, 1) - part[reli1,:] -end +getindex{T}(dm::DenseMatBlobs{T}, i::Int) = getindex(dm, ind2sub(size(dm), i)...) -function getindex{T,N}(dm::DenseMatBlobs{T,1,N}, idxs, ::Colon) - res = Array(T, length(idxs), N) - for residx in 1:length(idxs) - idx = idxs[residx] - res[residx,:] = dm[idx,:] - end - res +function getindex{T}(dm::DenseMatBlobs{T}, i1::Int, i2::Int) + part, range = load(dm, i2) + part[relidx(range, i1, i2)...] end -function getindex{T,N}(dm::DenseMatBlobs{T,2,N}, ::Colon, i2::Int) +function getindex{T}(dm::DenseMatBlobs{T}, ::Colon, i2::Int) part, range = load(dm, i2) - reli1, reli2 = relidx(range, 2, 1, i2) - part[:,reli2] + reli1, reli2 = relidx(range, 1, i2) + @inbounds v = part[:,reli2] + v end -function getindex{T,N}(dm::DenseMatBlobs{T,2,N}, ::Colon, idxs) - res = Array(T, N, length(idxs)) - for residx in 1:length(idxs) - idx = idxs[residx] - res[:, residx] = dm[:, idx] +function getindex{T}(dm::DenseMatBlobs{T}, ::Colon, idxs::Vector{Int}) + res = Array(T, dm.sz[1], length(idxs)) + sp = sortperm(idxs) + sidxs = idxs[sp] + remain = length(idxs) + startpos = 1 + endpos = 0 + for spair in dm.splits + (remain > 0) || break + r,b = spair.first, spair.second + endpos = searchsortedlast(sidxs, last(r)) + (endpos < startpos) && continue + A = load(dm.coll, b) + ir = startpos:endpos + fidxs = sidxs[ir] + @inbounds res[:, sp[ir]] = A[:, fidxs-start(r)+1] + remain -= length(ir) + startpos = endpos+1 end res end -setindex!{T,D,N}(dm::DenseMatBlobs{T,D,N}, v::T, i::Int) = setindex!(dm, v, ind2sub(size(dm), i)...) - -function setindex!{T,D,N}(dm::DenseMatBlobs{T,D,N}, v::T, i1::Int, i2::Int) - splitdim_idx = (D == 1) ? i1 : i2 - part, range = load(dm, splitdim_idx) - part[relidx(range, D, i1, i2)...] = v -end +setindex!{T}(dm::DenseMatBlobs{T}, v::T, i::Int) = setindex!(dm, v, ind2sub(size(dm), i)...) -function setindex!{T,N}(dm::DenseMatBlobs{T,1,N}, v, i1::Int, ::Colon) - part, range = load(dm, i1) - reli1, reli2 = relidx(range, 1, i1, 1) - part[reli1,:] = v +function setindex!{T}(dm::DenseMatBlobs{T}, v::T, i1::Int, i2::Int) + part, range = load(dm, i2) + part[relidx(range, i1, i2)...] = v end -function setindex!{T,N}(dm::DenseMatBlobs{T,2,N}, v, ::Colon, i2::Int) +function setindex!{T}(dm::DenseMatBlobs{T}, v, ::Colon, i2::Int) part, range = load(dm, i2) - reli1, reli2 = relidx(range, 2, 1, i2) - part[:,reli2] = v -end -#= -function *{T1,T2}(A::Vector{T1}, B::DenseMatBlobs{T2,1}) - T = promote_type(T1, T2) - res = Array(T, size(B, 2)) - for idx in 1:length(B.splits) - p = B.splits[idx] - part, r = load(B, first(p.first)) - res[r] = v * part - end - res + reli1, reli2 = relidx(range, 1, i2) + @inbounds part[:,reli2] = v + v end -=# -function *{T1,T2}(A::Matrix{T1}, B::DenseMatBlobs{T2,2}) + +function *{T1,T2}(A::Matrix{T1}, B::DenseMatBlobs{T2}) m,n = size(B) (size(A, 2) == m) || throw(DimensionMismatch("A has dimensions $(size(A)) but B has dimensions $(size(B))")) res = Array(promote_type(T1,T2), 1, n) @@ -298,7 +270,7 @@ function *{T1,T2}(A::Matrix{T1}, B::DenseMatBlobs{T2,2}) res end -function deserialize{T,D,N}(s::SerializationState, ::Type{DenseMatBlobs{T,D,N}}) +function deserialize{T}(s::SerializationState, ::Type{DenseMatBlobs{T}}) metadir = deserialize(s) sz = deserialize(s) splits = deserialize(s) @@ -311,30 +283,32 @@ function deserialize{T,D,N}(s::SerializationState, ::Type{DenseMatBlobs{T,D,N}}) coll = BlobCollection(Matrix{T}, coll_mut, coll_reader; maxcache=coll_maxcache, id=coll_id) coll.blobs = coll_blobs - DenseMatBlobs{T,D,N}(metadir, sz, splits, coll) + DenseMatBlobs{T}(metadir, sz, splits, coll) end -function DenseMatBlobs{Tv}(::Type{Tv}, D::Int, N::Int, metadir::AbstractString; maxcache::Int=10) +function DenseMatBlobs{Tv}(::Type{Tv}, metadir::AbstractString; maxcache::Int=10) T = Matrix{Tv} io = FileBlobIO(Array{Tv}, true) mut = Mutable(BYTES_128MB, io) coll = BlobCollection(T, mut, io; maxcache=maxcache) - DenseMatBlobs{Tv,D,N}(metadir, (0,0), Pair[], coll) + DenseMatBlobs{Tv}(metadir, (0,0), Pair[], coll) end -function append!{Tv,D,N}(dm::DenseMatBlobs{Tv,D,N}, M::Matrix{Tv}) +function append!{Tv}(dm::DenseMatBlobs{Tv}, M::Matrix{Tv}) m,n = size(M) - unsplit_dim = (D == 1) ? n : m - split_dim = (D == 1) ? m : n - (N == unsplit_dim) || throw(BoundsError("DenseMatBlobs with unsplit dimension $D fixed at $N", (m,n))) + if dm.sz[1] > 0 + (dm.sz[1] == m) || throw(BoundsError("DenseMatBlobs with size $(size(dm))", (m,n))) + else + dm.sz = (m,0) + end if isempty(dm.splits) dm.sz = (m, n) - idxrange = 1:split_dim + idxrange = 1:n else - old_split_dim = dm.sz[D] - new_split_dim = old_split_dim + split_dim + old_split_dim = dm.sz[2] + new_split_dim = old_split_dim + n idxrange = (old_split_dim+1):new_split_dim - dm.sz = (D == 1) ? (new_split_dim, unsplit_dim) : (unsplit_dim, new_split_dim) + dm.sz = (dm.sz[1], new_split_dim) end fname = joinpath(dm.metadir, string(length(dm.splits)+1)) diff --git a/src/models/als_dist_model.jl b/src/models/als_dist_model.jl index 17575d2..0419b74 100644 --- a/src/models/als_dist_model.jl +++ b/src/models/als_dist_model.jl @@ -18,7 +18,7 @@ type DistModel <: Model Pinv::Nullable{ModelFactor} end -nusers(model::DistModel) = size(get(model.U))[1] +nusers(model::DistModel) = size(get(model.U))[2] nitems(model::DistModel) = size(get(model.P))[2] nfactors(model::DistModel) = model.nfactors @@ -58,10 +58,10 @@ function prep{TI<:DistInputs}(inp::TI, nfacts::Int, lambda::Float64, model_dir:: isdir(Udir.name) || mkdir(Udir.name) isdir(Pdir.name) || mkdir(Pdir.name) - Usz = (nu, nfacts) + Usz = (nfacts, nu) Psz = (nfacts, ni) - U = create(Udir, Float64, 1, Usz, zeros, min(_max_items(Float64,1,Usz), ceil(Int, nu/nworkers()))) - P = create(Pdir, Float64, 2, Psz, rand, min(_max_items(Float64,2,Psz), ceil(Int, ni/nworkers()))) + U = create(Udir, Float64, Usz, zeros, min(_max_items(Float64,Usz), ceil(Int, nu/nworkers()))) + P = create(Pdir, Float64, Psz, rand, min(_max_items(Float64,Psz), ceil(Int, ni/nworkers()))) for idx in 1:ni P[1,idx] = mean(all_user_ratings(inp, idx)) diff --git a/src/models/als_model.jl b/src/models/als_model.jl index ccc0b1a..a580ab5 100644 --- a/src/models/als_model.jl +++ b/src/models/als_model.jl @@ -7,7 +7,7 @@ type SharedMemoryModel <: Model Pinv::Nullable{ModelFactor} end -nusers(model::SharedMemoryModel) = size(model.U, 1) +nusers(model::SharedMemoryModel) = size(model.U, 2) nitems(model::SharedMemoryModel) = size(model.P, 2) nfactors(model::SharedMemoryModel) = model.nfactors @@ -61,7 +61,7 @@ function prep{TI<:Inputs}(inp::TI, nfacts::Int, lambda::Float64) nu = nusers(inp) ni = nitems(inp) - U = zeros(nu, nfacts) + U = zeros(nfacts, nu) P = rand(nfacts, ni) for idx in 1:ni P[1,idx] = mean(all_user_ratings(inp, idx)) diff --git a/src/models/models.jl b/src/models/models.jl index eba60c6..6d36c8c 100644 --- a/src/models/models.jl +++ b/src/models/models.jl @@ -4,7 +4,7 @@ include("als_dist_model.jl") setU{M<:Model}(model::M, u::Int64, vals) = setU(model.U, u, vals) setU(NU::Nullable, u::Int64, vals) = setU(get(NU), u, vals) function setU{M<:ModelFactor}(U::M, u::Int64, vals) - U[u,:] = vals + U[:,u] = vals nothing end @@ -17,7 +17,7 @@ end getU{M<:Model}(model::M, users) = getU(model.U, users) getU(NU::Nullable, users) = getU(get(NU), users) -getU{M<:ModelFactor}(U::M, users) = U[users, :] +getU{M<:ModelFactor}(U::M, users) = U[:, users] getP{M<:Model}(model::M, items) = getP(model.P, items) getP(NP::Nullable, items) = getP(get(NP), items) diff --git a/src/utils.jl b/src/utils.jl index 1a2aa31..1c63453 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -69,23 +69,20 @@ end read_input(fspec::DenseBlobs) = DenseMatBlobs(fspec.name; maxcache=fspec.maxcache) const MAX_BLK_BYTES = 128*1000*1000 #128MB -function _max_items(T::Type, D::Int, sz::Tuple) +function _max_items(T::Type, sz::Tuple) m,n = sz - unsplit_dim = (D == 1) ? n : m - ceil(Int, MAX_BLK_BYTES/sizeof(T)/unsplit_dim) + ceil(Int, MAX_BLK_BYTES/sizeof(T)/m) end -function create{T}(fspec::DenseBlobs, ::Type{T}, D::Int, sz::Tuple, init::Function, max_items::Int=_max_items(T,D,sz)) +function create{T}(fspec::DenseBlobs, ::Type{T}, sz::Tuple, init::Function, max_items::Int=_max_items(T,sz)) @logmsg("creating densematarray") isdir(fspec.name) || mkdir(fspec.dir) m,n = sz - unsplit_dim = (D == 1) ? n : m - split_dim = sz[D] - dm = DenseMatBlobs(T, D, unsplit_dim, fspec.name) + dm = DenseMatBlobs(T, fspec.name) startidx = 1 - while startidx <= split_dim - idxrange = startidx:min(split_dim, startidx + max_items) - blobsz = (D == 1) ? (length(idxrange),unsplit_dim) : (unsplit_dim,length(idxrange)) + while startidx <= n + idxrange = startidx:min(n, startidx + max_items) + blobsz = (m,length(idxrange)) M = init(T, blobsz...) @logmsg("idxrange: $idxrange, sz: $(size(M))") append!(dm, M) From c3a3a045240abedfb582da95eabbc4f30ceb4993 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 15 Mar 2016 09:30:06 +0530 Subject: [PATCH 5/5] print time with logs, some optimizations - club together remotecalls as much as possible - use sync-async structure to share data prior to factorization loops - remove skew from lastfm --- playground/split_input.jl | 4 ++++ src/RecSys.jl | 7 ++++++- src/als-wr.jl | 34 ++++++++++++++++++++++++---------- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/playground/split_input.jl b/playground/split_input.jl index 2391240..55eade8 100644 --- a/playground/split_input.jl +++ b/playground/split_input.jl @@ -101,6 +101,10 @@ function split_lastfm(dataset_path = "/data/Work/datasets/last_fm_music_recommen amap = read_artist_map(DlmFile(joinpath(dataset_path, "artist_alias.txt"))) T = read_trainingset(DlmFile(joinpath(dataset_path, "user_artist_data.txt")), amap) + + println("randomizing items to remove skew") + T = T[:, randperm(size(T,2))] + splitall(T, joinpath(dataset_path, "splits"), 20) end diff --git a/src/RecSys.jl b/src/RecSys.jl index ee251a1..47849d6 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -43,6 +43,11 @@ macro threads(x) end end +function tstr() + t = time() + string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end]) +end + # enable logging only during debugging #using Logging ##const logger = Logging.configure(filename="recsys.log", level=DEBUG) @@ -56,7 +61,7 @@ macro logmsg(s) end #macro logmsg(s) # quote -# info("[", myid(), "-", threadid(), "] ", $(esc(s))) +# info(tstr(), " [", myid(), "-", threadid(), "] ", $(esc(s))) # end #end diff --git a/src/als-wr.jl b/src/als-wr.jl index 85953a0..8a0295b 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -184,8 +184,8 @@ function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, share!(inp) c = ComputeData(model, inp, get(model.lambdaI)) - for w in workers() - remotecall_fetch(share_compdata, w, c) + @sync for w in workers() + @async remotecall_fetch(share_compdata, w, c) end nu = nusers(inp) @@ -240,6 +240,24 @@ function train(als::ALSWR{ParBlob,DistInputs,DistModel}, niters::Int, nfacts::In nothing end +function update_user_blobs(r::UnitRange) + c = fetch_compdata() + U = get(c.model.U) + flush(U; callback=false) + update_user(r) + save(U) + nothing +end + +function update_item_blobs(r::UnitRange) + c = fetch_compdata() + P = get(c.model.P) + flush(P; callback=false) + update_item(r) + save(P) + nothing +end + function fact_iters{TP<:ParBlob,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) t1 = time() @@ -254,8 +272,8 @@ function fact_iters{TP<:ParBlob,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, # clear, share the data and load it again (not required, but more efficient) clear(model) W = workers() - for w in W - remotecall_fetch(share_compdata, w, c) + @sync for w in W + @async remotecall_fetch(share_compdata, w, c) end ensure_loaded(model) U = get(model.U) @@ -266,13 +284,9 @@ function fact_iters{TP<:ParBlob,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, @logmsg("nusers: $nu, nitems: $ni") for iter in 1:niters @logmsg("begin iteration $iter") - flush(U, W; callback=false) - pmap(update_user, uranges) - save(U, W) + pmap(update_user_blobs, uranges) @logmsg("\tusers") - flush(P, W; callback=false) - pmap(update_item, iranges) - save(P, W) + pmap(update_item_blobs, iranges) @logmsg("\titems") end