From 6309d41fd6a93208169d8d9827cfb5bd6abb00ec Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 19 Jan 2016 19:55:00 +0530 Subject: [PATCH 1/8] clear model inputs before save Helps remove unnecessary bloat from model file. Also required in distributed memory mode, where input data will be too large. Inputs can be loaded back when used after restore. --- src/als-wr.jl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/als-wr.jl b/src/als-wr.jl index f7c4d73..6376117 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -40,6 +40,12 @@ type Inputs end end +function clear(inp::Inputs) + inp.R = nothing + inp.item_idmap = nothing + inp.user_idmap = nothing +end + type Model U::Matrix{Float64} P::Matrix{Float64} @@ -54,6 +60,14 @@ end ALSWR{T<:Parallelism}(inp::FileSpec, par::T=ParShmem()) = ALSWR{T}(Inputs(inp), nothing, par) +function save(model::ALSWR, filename::AbstractString) + clear(model.inp) + open(filename, "w") do f + serialize(f, model) + end + nothing +end + ratings(als::ALSWR) = ratings(als.inp) function ratings(inp::Inputs; only_items::Vector{Int64}=Int64[]) if isnull(inp.R) From 5fc0dbd43beafe5ca798001d1be227f27f34822c Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 20 Jan 2016 12:45:56 +0530 Subject: [PATCH 2/8] abstraction for input ratings data --- examples/lastfm/lastfm.jl | 13 +- examples/movielens/movielens.jl | 11 +- src/RecSys.jl | 4 + src/als-wr.jl | 228 ++++++++++---------------------- src/input.jl | 129 ++++++++++++++++++ 5 files changed, 218 insertions(+), 167 deletions(-) create mode 100644 src/input.jl diff --git a/examples/lastfm/lastfm.jl b/examples/lastfm/lastfm.jl index 3d60fdd..2c1dd48 100644 --- a/examples/lastfm/lastfm.jl +++ b/examples/lastfm/lastfm.jl @@ -129,12 +129,13 @@ function test(dataset_path) print_recommendations(rec, recommend(rec, 9875)...) println("recommending anonymous user:") - R, item_idmap, user_idmap = RecSys.ratings(rec.rec) - # take user 100 - actual_user = findfirst(user_idmap, 9875) - ratings_anon = R[actual_user, :] - actual_movie_ids = item_idmap[find(full(ratings_anon))] - sp_ratings_anon = SparseVector(maximum(item_idmap), actual_movie_ids, nonzeros(ratings_anon)) + u_idmap = RecSys.user_idmap(rec.rec.inp) + i_idmap = RecSys.item_idmap(rec.rec.inp) + # take user 9875 + actual_user = findfirst(u_idmap, 9875) + rated_anon, ratings_anon = RecSys.items_and_ratings(rec.rec.inp, actual_user) + actual_movie_ids = i_idmap[rated_anon] + sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") diff --git a/examples/movielens/movielens.jl b/examples/movielens/movielens.jl index faf7112..1d14af4 100644 --- a/examples/movielens/movielens.jl +++ b/examples/movielens/movielens.jl @@ -69,12 +69,13 @@ function test(dataset_path) print_recommendations(rec, recommend(rec, 100)...) println("recommending anonymous user:") - R, item_idmap, user_idmap = RecSys.ratings(rec.rec) + u_idmap = RecSys.user_idmap(rec.rec.inp) + i_idmap = RecSys.item_idmap(rec.rec.inp) # take user 100 - actual_user = findfirst(user_idmap, 100) - ratings_anon = R[actual_user, :] - actual_movie_ids = item_idmap[find(full(ratings_anon))] - sp_ratings_anon = SparseVector(maximum(item_idmap), actual_movie_ids, nonzeros(ratings_anon)) + actual_user = findfirst(u_idmap, 100) + rated_anon, ratings_anon = RecSys.items_and_ratings(rec.rec.inp, actual_user) + actual_movie_ids = i_idmap[rated_anon] + sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") diff --git a/src/RecSys.jl b/src/RecSys.jl index 2a26351..3651028 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -8,6 +8,7 @@ if isless(Base.VERSION, v"0.5.0-") end import Base: zero +import ParallelSparseMatMul: share export FileSpec, DlmFile, MatFile, SparseMat, read_input export ALSWR, train, recommend, rmse, zero @@ -16,6 +17,8 @@ export save, load typealias RatingMatrix SparseMatrixCSC{Float64,Int64} typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64} +typealias InputRatings Union{RatingMatrix,SharedRatingMatrix} +typealias InputIdMap Union{Vector{Int64}, SharedVector{Int64}} abstract FileSpec abstract Parallelism @@ -27,6 +30,7 @@ type ParThread <: Parallelism end export ParThread end +include("input.jl") include("als-wr.jl") include("utils.jl") diff --git a/src/als-wr.jl b/src/als-wr.jl index 6376117..b5265be 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -1,51 +1,3 @@ -# Note: -# Filtering out causes the item and user ids to change. -# We need to keep a mapping to be able to match in the recommend step. -function filter_empty(R::RatingMatrix; only_items::Vector{Int64}=Int64[]) - if !isempty(only_items) - max_num_items = maximum(only_items) - if size(R, 2) < max_num_items - RI, RJ, RV = findnz(R) - R = sparse(RI, RJ, RV, size(R, 1), max_num_items) - end - R = R' - R = R[only_items, :] - R = R' - non_empty_items = only_items - end - - U = sum(R, 2) - non_empty_users = find(U) - R = R[non_empty_users, :] - - if isempty(only_items) - R = R' - P = sum(R, 2) - non_empty_items = find(P) - R = R[non_empty_items, :] - R = R' - end - - R, non_empty_items, non_empty_users -end - -type Inputs - ratings_file::FileSpec - R::Nullable{RatingMatrix} - item_idmap::Nullable{Vector{Int64}} - user_idmap::Nullable{Vector{Int64}} - - function Inputs(file::FileSpec) - new(file, nothing, nothing, nothing) - end -end - -function clear(inp::Inputs) - inp.R = nothing - inp.item_idmap = nothing - inp.user_idmap = nothing -end - type Model U::Matrix{Float64} P::Matrix{Float64} @@ -68,39 +20,8 @@ function save(model::ALSWR, filename::AbstractString) nothing end -ratings(als::ALSWR) = ratings(als.inp) -function ratings(inp::Inputs; only_items::Vector{Int64}=Int64[]) - if isnull(inp.R) - logmsg("loading inputs...") - t1 = time() - A = read_input(inp.ratings_file) - - if isa(A, SparseMatrixCSC) - R = convert(SparseMatrixCSC{Float64,Int64}, A) - else - # separate the columns and make them of appropriate types - users = convert(Vector{Int64}, A[:,1]) - items = convert(Vector{Int64}, A[:,2]) - ratings = convert(Vector{Float64}, A[:,3]) - - # create a sparse matrix - R = sparse(users, items, ratings) - end - - R, item_idmap, user_idmap = filter_empty(R; only_items=only_items) - inp.R = Nullable(R) - inp.item_idmap = Nullable(item_idmap) - inp.user_idmap = Nullable(user_idmap) - t2 = time() - logmsg("time to load inputs: $(t2-t1) secs") - end - - get(inp.R), get(inp.item_idmap), get(inp.user_idmap) -end - function train(als::ALSWR, niters::Int, nfactors::Int64, lambda::Float64=0.065) - R, _i_idmap, _u_idmap = ratings(als) - U, P = fact(als.par, R, niters, nfactors, lambda) + U, P = fact(als, niters, nfactors, lambda) model = Model(U, P, lambda) als.model = Nullable(model) nothing @@ -110,34 +31,26 @@ end ## # Training # -function prep(R::RatingMatrix, nfactors::Int) - nusers, nitems = size(R) - - U = zeros(nusers, nfactors) - P = rand(nfactors, nitems) - for idx in 1:nitems - P[1,idx] = mean(nonzeros(R[:,idx])) +function prep(als::ALSWR, nfactors::Int) + nu = nusers(als.inp) + ni = nitems(als.inp) + + U = zeros(nu, nfactors) + P = rand(nfactors, ni) + for idx in 1:ni + P[1,idx] = mean(all_user_ratings(als.inp, idx)) end - U, P, R -end - -function sprows(R::Union{RatingMatrix,SharedRatingMatrix}, col::Int64) - rowstart = R.colptr[col] - rowend = R.colptr[col+1] - 1 - # use subarray? - rows = R.rowval[rowstart:rowend] - vals = R.nzval[rowstart:rowend] - rows, vals + U, P end function update_user(u::Int64) c = fetch_compdata() - update_user(u, c.U, c.P, c.RT, c.lambdaI) + update_user(u, c.U, c.P, c.inp, c.lambdaI) end -function update_user(u::Int64, U, P, RT, lambdaI) - nzrows, nzvals = sprows(RT, u) +function update_user(u::Int64, U, P, inp, lambdaI) + nzrows, nzvals = items_and_ratings(inp, u) Pu = P[:, nzrows] vec = Pu * nzvals mat = (Pu * Pu') + (length(nzrows) * lambdaI) @@ -147,11 +60,11 @@ end function update_item(i::Int64) c = fetch_compdata() - update_item(i::Int64, c.U, c.P, c.R, c.lambdaI) + update_item(i::Int64, c.U, c.P, c.inp, c.lambdaI) end -function update_item(i::Int64, U, P, R, lambdaI) - nzrows, nzvals = sprows(R, i) +function update_item(i::Int64, U, P, inp, lambdaI) + nzrows, nzvals = users_and_ratings(inp, i) Ui = U[nzrows, :] Uit = Ui' vec = Uit * nzvals @@ -160,57 +73,60 @@ function update_item(i::Int64, U, P, R, lambdaI) nothing end -function fact(par, R::RatingMatrix, niters::Int, nfactors::Int64, lambda::Float64) +function fact(als::ALSWR, niters::Int, nfactors::Int64, lambda::Float64) + ensure_loaded(als.inp) t1 = time() logmsg("preparing inputs...") - U, P, R = prep(R, nfactors) - nusers, nitems = size(R) - + U, P = prep(als, nfactors) lambdaI = lambda * eye(nfactors) - - RT = R' t2 = time() logmsg("prep time: $(t2-t1)") - fact_iters(par, U, P, R, RT, niters, nusers, nitems, lambdaI) + fact_iters(als.par, U, P, als.inp, niters, lambdaI) end ## # Validation function rmse(als::ALSWR) - R, _i_idmap, _u_idmap = ratings(als) - rmse(als, R) + ensure_loaded(als.inp) + rmse(als, als.inp) end function rmse(als::ALSWR, testdataset::FileSpec) - _R, i_idmap, _u_idmap = ratings(als) - R, _i_idmap, _u_idmap = ratings(Inputs(testdataset); only_items=i_idmap) - rmse(als, R) + ensure_loaded(als.inp) + i_idmap = item_idmap(als.inp) + + testinp = Inputs(testdataset) + ensure_loaded(testinp; only_items=i_idmap) + rmse(als, testinp) end ## # Recommendation -function _recommend(Uvec, P, rated, item_idmap; count::Int=10) +function _recommend(Uvec, P, rated, i_idmap; count::Int=10) top = sortperm(vec(Uvec*P)) recommended = Int64[] idx = 1 while length(recommended) < count && length(top) >= idx item_id = top[idx] - (item_id in rated) || push!(recommended, item_idmap[item_id]) + (item_id in rated) || push!(recommended, i_idmap[item_id]) idx += 1 end nexcl = idx - count - 1 - mapped_rated = Int64[item_idmap[id] for id in rated] + mapped_rated = Int64[i_idmap[id] for id in rated] recommended, mapped_rated, nexcl end function recommend(als::ALSWR, user::Int; unrated::Bool=true, count::Int=10) - R, item_idmap, user_idmap = ratings(als) - (user in user_idmap) || (return (Int[], Int[], 0)) - user = findfirst(user_idmap, user) + ensure_loaded(als.inp) + i_idmap = item_idmap(als.inp) + u_idmap = user_idmap(als.inp) + + (user in u_idmap) || (return (Int[], Int[], 0)) + user = findfirst(u_idmap, user) model = get(als.model) U = model.U @@ -218,9 +134,9 @@ function recommend(als::ALSWR, user::Int; unrated::Bool=true, count::Int=10) # All the items sorted in decreasing order of rating. Uvec = reshape(U[user, :], 1, size(U, 2)) - rated = unrated ? find(full(R[user,:])) : Int64[] + rated = unrated ? all_items_rated(als.inp, user) : Int64[] - _recommend(Uvec, P, rated, item_idmap; count=count) + _recommend(Uvec, P, rated, i_idmap; count=count) end function recommend(als::ALSWR, user_ratings::SparseVector{Float64,Int64}; unrated::Bool=true, count::Int=10) @@ -234,16 +150,17 @@ function recommend(als::ALSWR, user_ratings::SparseVector{Float64,Int64}; unrate # Pinv = Pt * inv(P * Pt) # # Uvec = Rvec * (Pt * inv(P * Pt)) - _R, item_idmap, _user_idmap = ratings(als) + ensure_loaded(als.inp) + i_idmap = item_idmap(als.inp) model = get(als.model) P = model.P PT = P' Pinv = PT * inv(P * PT) - Rvec = reshape(user_ratings[item_idmap], 1, length(item_idmap)) - #Rvec = reshape(user_ratings, 1, length(item_idmap)) + Rvec = reshape(user_ratings[i_idmap], 1, length(i_idmap)) + #Rvec = reshape(user_ratings, 1, length(i_idmap)) Uvec = Rvec * Pinv - _recommend(Uvec, P, find(Rvec), item_idmap; count=count) + _recommend(Uvec, P, find(Rvec), i_idmap; count=count) end @@ -253,8 +170,7 @@ end type ComputeData U::SharedArray{Float64,2} P::SharedArray{Float64,2} - R::SharedRatingMatrix - RT::SharedRatingMatrix + inp::Inputs lambdaI::SharedArray{Float64,2} end @@ -264,29 +180,29 @@ share_compdata(c::ComputeData) = (push!(compdata, c); nothing) fetch_compdata() = compdata[1] noop(args...) = nothing -function fact_iters{T<:ParShmem}(::T, _U::Matrix{Float64}, _P::Matrix{Float64}, _R::RatingMatrix, _RT::RatingMatrix, - niters::Int64, nusers::Int64, nitems::Int64, _lambdaI::Matrix{Float64}) +function fact_iters{T<:ParShmem}(::T, _U::Matrix{Float64}, _P::Matrix{Float64}, inp::Inputs, niters::Int64, _lambdaI::Matrix{Float64}) t1 = time() U = share(_U) P = share(_P) - R = share(_R) - RT = share(_RT) lambdaI = share(_lambdaI) + share(inp) - c = ComputeData(U, P, R, RT, lambdaI) + c = ComputeData(U, P, inp, lambdaI) for w in workers() remotecall_fetch(share_compdata, w, c) end + nu = nusers(inp) + ni = nitems(inp) for iter in 1:niters logmsg("begin iteration $iter") - pmap(update_user, 1:nusers) - #@parallel (noop) for u in 1:nusers + pmap(update_user, 1:nu) + #@parallel (noop) for u in 1:nu # update_user(u) #end logmsg("\tusers") - pmap(update_item, 1:nitems) - #@parallel (noop) for i in 1:nitems + pmap(update_item, 1:ni) + #@parallel (noop) for i in 1:ni # update_item(i) #end logmsg("\titems") @@ -298,17 +214,17 @@ function fact_iters{T<:ParShmem}(::T, _U::Matrix{Float64}, _P::Matrix{Float64}, copy(U), copy(P) end -function rmse{T<:ParShmem}(als::ALSWR{T}, R::RatingMatrix) +function rmse{T<:ParShmem}(als::ALSWR{T}, inp::Inputs) t1 = time() model = get(als.model) U = share(model.U) P = share(model.P) - RT = share(R') + share(inp) - cumerr = @parallel (.+) for user in 1:size(RT, 2) + cumerr = @parallel (.+) for user in 1:nusers(inp) Uvec = reshape(U[user, :], 1, size(U, 2)) - nzrows, nzvals = sprows(RT, user) + nzrows, nzvals = items_and_ratings(inp, user) predicted = vec(Uvec*P)[nzrows] [sum((predicted .- nzvals) .^ 2), length(predicted)] end @@ -321,33 +237,34 @@ end # Thread parallelism if (Base.VERSION >= v"0.5.0-") -function thread_update_item(U::Matrix{Float64}, P::Matrix{Float64}, R::RatingMatrix, nitems::Int64, lambdaI::Matrix{Float64}) - @threads for i in Int64(1):nitems - update_item(i, U, P, R, lambdaI) +function thread_update_item(U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, ni::Int64, lambdaI::Matrix{Float64}) + @threads for i in Int64(1):ni + update_item(i, U, P, inp, lambdaI) end nothing end -function thread_update_user(U::Matrix{Float64}, P::Matrix{Float64}, RT::RatingMatrix, nusers::Int64, lambdaI::Matrix{Float64}) - @threads for u in Int64(1):nusers - update_user(u, U, P, RT, lambdaI) +function thread_update_user(U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, nu::Int64, lambdaI::Matrix{Float64}) + @threads for u in Int64(1):nu + update_user(u, U, P, inp, lambdaI) end nothing end -function fact_iters{T<:ParThread}(::T, U::Matrix{Float64}, P::Matrix{Float64}, R::RatingMatrix, RT::RatingMatrix, - niters::Int64, nusers::Int64, nitems::Int64, lambdaI::Matrix{Float64}) +function fact_iters{T<:ParThread}(::T, U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, niters::Int64, lambdaI::Matrix{Float64}) t1 = time() + nu = nusers(inp) + ni = nitems(inp) for iter in 1:niters logmsg("begin iteration $iter") # gc is not threadsafe yet. issue #10317 gc_enable(false) - thread_update_user(U, P, RT, nusers, lambdaI) + thread_update_user(U, P, inp, nu, lambdaI) gc_enable(true) gc() gc_enable(false) logmsg("\tusers") - thread_update_item(U, P, R, nitems, lambdaI) + thread_update_item(U, P, inp, ni, lambdaI) gc_enable(true) gc() logmsg("\titems") @@ -359,26 +276,25 @@ function fact_iters{T<:ParThread}(::T, U::Matrix{Float64}, P::Matrix{Float64}, R U, P end -function rmse{T<:ParThread}(als::ALSWR{T}, R::RatingMatrix) +function rmse{T<:ParThread}(als::ALSWR{T}, inp::Inputs) t1 = time() model = get(als.model) U = model.U P = model.P - RT = R' errs = zeros(nthreads()) lengths = zeros(Int, nthreads()) pos = 1 NU = size(U, 2) - N2 = size(RT, 2) + N2 = nusers(als.inp) while pos < N2 endpos = min(pos+10000, N2) gc_enable(false) @threads for user in pos:endpos Uvec = reshape(U[user, :], 1, NU) - nzrows, nzvals = sprows(RT, user) + nzrows, nzvals = items_and_ratings(inp, user) predicted = vec(Uvec*P)[nzrows] tid = threadid() diff --git a/src/input.jl b/src/input.jl new file mode 100644 index 0000000..3296dc8 --- /dev/null +++ b/src/input.jl @@ -0,0 +1,129 @@ +type Inputs + ratings_file::FileSpec + R::Nullable{InputRatings} + RT::Nullable{InputRatings} + item_idmap::Nullable{InputIdMap} + user_idmap::Nullable{InputIdMap} + + function Inputs(file::FileSpec) + new(file, nothing, nothing, nothing, nothing) + end +end + +function clear(inp::Inputs) + inp.R = nothing + inp.RT = nothing + inp.item_idmap = nothing + inp.user_idmap = nothing +end + +function share(inp::Inputs) + R = get(inp.R) + isa(R, SharedRatingMatrix) || (inp.R = share(R)) + + RT = get(inp.RT) + isa(RT, SharedRatingMatrix) || (inp.RT = share(RT)) + + item_idmap = get(inp.item_idmap) + isa(item_idmap, SharedVector) || (inp.item_idmap = share(item_idmap)) + + user_idmap = get(inp.user_idmap) + isa(user_idmap, SharedVector) || (inp.user_idmap = share(user_idmap)) + + nothing +end + +# Note: +# Filtering out causes the item and user ids to change. +# We need to keep a mapping to be able to match in the recommend step. +function filter_empty(R::RatingMatrix; only_items::Vector{Int64}=Int64[]) + if !isempty(only_items) + max_num_items = maximum(only_items) + if size(R, 2) < max_num_items + RI, RJ, RV = findnz(R) + R = sparse(RI, RJ, RV, size(R, 1), max_num_items) + end + R = R' + R = R[only_items, :] + R = R' + non_empty_items = only_items + end + + U = sum(R, 2) + non_empty_users = find(U) + R = R[non_empty_users, :] + + if isempty(only_items) + R = R' + P = sum(R, 2) + non_empty_items = find(P) + R = R[non_empty_items, :] + R = R' + end + + R, non_empty_items, non_empty_users +end + +function ensure_loaded(inp::Inputs; only_items::Vector{Int64}=Int64[]) + if isnull(inp.R) + logmsg("loading inputs...") + t1 = time() + A = read_input(inp.ratings_file) + + if isa(A, SparseMatrixCSC) + R = convert(SparseMatrixCSC{Float64,Int64}, A) + else + # separate the columns and make them of appropriate types + users = convert(Vector{Int64}, A[:,1]) + items = convert(Vector{Int64}, A[:,2]) + ratings = convert(Vector{Float64}, A[:,3]) + + # create a sparse matrix + R = sparse(users, items, ratings) + end + + R, item_idmap, user_idmap = filter_empty(R; only_items=only_items) + inp.R = Nullable(R) + inp.item_idmap = Nullable(item_idmap) + inp.user_idmap = Nullable(user_idmap) + inp.RT = Nullable(R') + t2 = time() + logmsg("time to load inputs: $(t2-t1) secs") + end + nothing +end + +item_idmap(inp::Inputs) = get(inp.item_idmap) +user_idmap(inp::Inputs) = get(inp.user_idmap) + +nusers(inp::Inputs) = size(get(inp.R), 1) +nitems(inp::Inputs) = size(get(inp.R), 2) + +users_and_ratings(inp::Inputs, i::Int64) = _sprowsvals(get(inp.R), i) +all_user_ratings(inp::Inputs, i::Int64) = _spvals(get(inp.R), i) +all_users_rated(inp::Inputs, i::Int64) = _sprows(get(inp.R), i) + +items_and_ratings(inp::Inputs, u::Int64) = _sprowsvals(get(inp.RT), u) +all_item_ratings(inp::Inputs, u::Int64) = _spvals(get(inp.RT), u) +all_items_rated(inp::Inputs, u::Int64) = _sprows(get(inp.RT), u) + +function _sprowsvals(R::InputRatings, col::Int64) + rowstart = R.colptr[col] + rowend = R.colptr[col+1] - 1 + # use subarray? + R.rowval[rowstart:rowend], R.nzval[rowstart:rowend] +end + +function _sprows(R::InputRatings, col::Int64) + rowstart = R.colptr[col] + rowend = R.colptr[col+1] - 1 + # use subarray? + R.rowval[rowstart:rowend] +end + +function _spvals(R::InputRatings, col::Int64) + rowstart = R.colptr[col] + rowend = R.colptr[col+1] - 1 + # use subarray? + R.nzval[rowstart:rowend] +end From e08a2cc25a29bb85255da56b9175228980f7a823 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 20 Jan 2016 15:51:36 +0530 Subject: [PATCH 3/8] abstraction for the model --- src/RecSys.jl | 3 +- src/als-wr.jl | 128 ++++++++++++++++------------------------------- src/als_model.jl | 86 +++++++++++++++++++++++++++++++ src/input.jl | 2 +- 4 files changed, 133 insertions(+), 86 deletions(-) create mode 100644 src/als_model.jl diff --git a/src/RecSys.jl b/src/RecSys.jl index 3651028..5f804b1 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -8,7 +8,6 @@ if isless(Base.VERSION, v"0.5.0-") end import Base: zero -import ParallelSparseMatMul: share export FileSpec, DlmFile, MatFile, SparseMat, read_input export ALSWR, train, recommend, rmse, zero @@ -19,6 +18,7 @@ typealias RatingMatrix SparseMatrixCSC{Float64,Int64} typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64} typealias InputRatings Union{RatingMatrix,SharedRatingMatrix} typealias InputIdMap Union{Vector{Int64}, SharedVector{Int64}} +typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}} abstract FileSpec abstract Parallelism @@ -31,6 +31,7 @@ export ParThread end include("input.jl") +include("als_model.jl") include("als-wr.jl") include("utils.jl") diff --git a/src/als-wr.jl b/src/als-wr.jl index b5265be..508454c 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -1,9 +1,3 @@ -type Model - U::Matrix{Float64} - P::Matrix{Float64} - lambda::Float64 -end - type ALSWR{T<:Parallelism} inp::Inputs model::Nullable{Model} @@ -20,70 +14,46 @@ function save(model::ALSWR, filename::AbstractString) nothing end -function train(als::ALSWR, niters::Int, nfactors::Int64, lambda::Float64=0.065) - U, P = fact(als, niters, nfactors, lambda) - model = Model(U, P, lambda) - als.model = Nullable(model) +function train(als::ALSWR, niters::Int, nfacts::Int64, lambda::Float64=0.065) + als.model = prep(als.inp, nfacts, lambda) + fact_iters(als, niters) nothing end +fact_iters(als, niters) = fact_iters(als.par, get(als.model), als.inp, niters) ## # Training # -function prep(als::ALSWR, nfactors::Int) - nu = nusers(als.inp) - ni = nitems(als.inp) - - U = zeros(nu, nfactors) - P = rand(nfactors, ni) - for idx in 1:ni - P[1,idx] = mean(all_user_ratings(als.inp, idx)) - end - - U, P -end - function update_user(u::Int64) c = fetch_compdata() - update_user(u, c.U, c.P, c.inp, c.lambdaI) + update_user(u, c.model, c.inp, c.lambdaI) end -function update_user(u::Int64, U, P, inp, lambdaI) +function update_user(u::Int64, model, inp, lambdaI) nzrows, nzvals = items_and_ratings(inp, u) - Pu = P[:, nzrows] + Pu = getP(model, nzrows) vec = Pu * nzvals mat = (Pu * Pu') + (length(nzrows) * lambdaI) - U[u,:] = mat \ vec + setU(model, u, mat \ vec) nothing end function update_item(i::Int64) c = fetch_compdata() - update_item(i::Int64, c.U, c.P, c.inp, c.lambdaI) + update_item(i::Int64, c.model, c.inp, c.lambdaI) end -function update_item(i::Int64, U, P, inp, lambdaI) +function update_item(i::Int64, model, inp, lambdaI) nzrows, nzvals = users_and_ratings(inp, i) - Ui = U[nzrows, :] + Ui = getU(model, nzrows) Uit = Ui' vec = Uit * nzvals mat = (Uit * Ui) + (length(nzrows) * lambdaI) - P[:,i] = mat \ vec + setP(model, i, mat \ vec) nothing end -function fact(als::ALSWR, niters::Int, nfactors::Int64, lambda::Float64) - ensure_loaded(als.inp) - t1 = time() - logmsg("preparing inputs...") - U, P = prep(als, nfactors) - lambdaI = lambda * eye(nfactors) - t2 = time() - logmsg("prep time: $(t2-t1)") - fact_iters(als.par, U, P, als.inp, niters, lambdaI) -end - ## # Validation @@ -104,8 +74,8 @@ end ## # Recommendation -function _recommend(Uvec, P, rated, i_idmap; count::Int=10) - top = sortperm(vec(Uvec*P)) +function _recommend(Uvec, model, rated, i_idmap; count::Int=10) + top = sortperm(vec(vec_mul_p(model, Uvec))) recommended = Int64[] idx = 1 @@ -129,14 +99,12 @@ function recommend(als::ALSWR, user::Int; unrated::Bool=true, count::Int=10) user = findfirst(u_idmap, user) model = get(als.model) - U = model.U - P = model.P # All the items sorted in decreasing order of rating. - Uvec = reshape(U[user, :], 1, size(U, 2)) + Uvec = reshape(getU(model, user), 1, nfactors(model)) rated = unrated ? all_items_rated(als.inp, user) : Int64[] - _recommend(Uvec, P, rated, i_idmap; count=count) + _recommend(Uvec, model, rated, i_idmap; count=count) end function recommend(als::ALSWR, user_ratings::SparseVector{Float64,Int64}; unrated::Bool=true, count::Int=10) @@ -153,14 +121,11 @@ function recommend(als::ALSWR, user_ratings::SparseVector{Float64,Int64}; unrate ensure_loaded(als.inp) i_idmap = item_idmap(als.inp) model = get(als.model) - P = model.P - PT = P' - Pinv = PT * inv(P * PT) Rvec = reshape(user_ratings[i_idmap], 1, length(i_idmap)) #Rvec = reshape(user_ratings, 1, length(i_idmap)) - Uvec = Rvec * Pinv + Uvec = vec_mul_pinv(model, Rvec) - _recommend(Uvec, P, find(Rvec), i_idmap; count=count) + _recommend(Uvec, model, find(Rvec), i_idmap; count=count) end @@ -168,10 +133,9 @@ end ## # Shared memory parallelism type ComputeData - U::SharedArray{Float64,2} - P::SharedArray{Float64,2} + model::Model inp::Inputs - lambdaI::SharedArray{Float64,2} + lambdaI::ModelFactor # store directly, avoid null check on every iteration end const compdata = ComputeData[] @@ -180,14 +144,12 @@ share_compdata(c::ComputeData) = (push!(compdata, c); nothing) fetch_compdata() = compdata[1] noop(args...) = nothing -function fact_iters{T<:ParShmem}(::T, _U::Matrix{Float64}, _P::Matrix{Float64}, inp::Inputs, niters::Int64, _lambdaI::Matrix{Float64}) +function fact_iters{T<:ParShmem}(::T, model::Model, inp::Inputs, niters::Int64) t1 = time() - U = share(_U) - P = share(_P) - lambdaI = share(_lambdaI) - share(inp) + share!(model) + share!(inp) - c = ComputeData(U, P, inp, lambdaI) + c = ComputeData(model, inp, get(model.lambdaI)) for w in workers() remotecall_fetch(share_compdata, w, c) end @@ -208,26 +170,27 @@ function fact_iters{T<:ParShmem}(::T, _U::Matrix{Float64}, _P::Matrix{Float64}, logmsg("\titems") end + localize!(model) + t2 = time() logmsg("fact time $(t2-t1)") - - copy(U), copy(P) + nothing end function rmse{T<:ParShmem}(als::ALSWR{T}, inp::Inputs) t1 = time() model = get(als.model) - U = share(model.U) - P = share(model.P) - share(inp) + share!(model) + share!(inp) cumerr = @parallel (.+) for user in 1:nusers(inp) - Uvec = reshape(U[user, :], 1, size(U, 2)) + Uvec = reshape(getU(model,user), 1, nfactors(model)) nzrows, nzvals = items_and_ratings(inp, user) - predicted = vec(Uvec*P)[nzrows] + predicted = vec(vec_mul_p(model, Uvec))[nzrows] [sum((predicted .- nzvals) .^ 2), length(predicted)] end + localize!(model) logmsg("rmse time $(time()-t1)") sqrt(cumerr[1]/cumerr[2]) end @@ -237,34 +200,35 @@ end # Thread parallelism if (Base.VERSION >= v"0.5.0-") -function thread_update_item(U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, ni::Int64, lambdaI::Matrix{Float64}) +function thread_update_item(model::Model, inp::Inputs, ni::Int64, lambdaI::Matrix{Float64}) @threads for i in Int64(1):ni - update_item(i, U, P, inp, lambdaI) + update_item(i, model, inp, lambdaI) end nothing end -function thread_update_user(U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, nu::Int64, lambdaI::Matrix{Float64}) +function thread_update_user(model::Model, inp::Inputs, nu::Int64, lambdaI::Matrix{Float64}) @threads for u in Int64(1):nu - update_user(u, U, P, inp, lambdaI) + update_user(u, model, inp, lambdaI) end nothing end -function fact_iters{T<:ParThread}(::T, U::Matrix{Float64}, P::Matrix{Float64}, inp::Inputs, niters::Int64, lambdaI::Matrix{Float64}) +function fact_iters{T<:ParThread}(::T, model::Model, inp::Inputs, niters::Int64) t1 = time() nu = nusers(inp) ni = nitems(inp) + lambdaI = get(model.lambdaI) for iter in 1:niters logmsg("begin iteration $iter") # gc is not threadsafe yet. issue #10317 gc_enable(false) - thread_update_user(U, P, inp, nu, lambdaI) + thread_update_user(model, inp, nu, lambdaI) gc_enable(true) gc() gc_enable(false) logmsg("\tusers") - thread_update_item(U, P, inp, ni, lambdaI) + thread_update_item(model, inp, ni, lambdaI) gc_enable(true) gc() logmsg("\titems") @@ -272,30 +236,26 @@ function fact_iters{T<:ParThread}(::T, U::Matrix{Float64}, P::Matrix{Float64}, i t2 = time() logmsg("fact time $(t2-t1)") - - U, P + nothing end function rmse{T<:ParThread}(als::ALSWR{T}, inp::Inputs) t1 = time() model = get(als.model) - U = model.U - P = model.P - errs = zeros(nthreads()) lengths = zeros(Int, nthreads()) pos = 1 - NU = size(U, 2) + NF = nfactors(model) N2 = nusers(als.inp) while pos < N2 endpos = min(pos+10000, N2) gc_enable(false) @threads for user in pos:endpos - Uvec = reshape(U[user, :], 1, NU) + Uvec = reshape(getU(model, user), 1, NF) nzrows, nzvals = items_and_ratings(inp, user) - predicted = vec(Uvec*P)[nzrows] + predicted = vec(vec_mul_p(model, Uvec))[nzrows] tid = threadid() lengths[tid] += length(predicted) diff --git a/src/als_model.jl b/src/als_model.jl new file mode 100644 index 0000000..adbe8a0 --- /dev/null +++ b/src/als_model.jl @@ -0,0 +1,86 @@ +type Model + U::ModelFactor + P::ModelFactor + nfactors::Int + lambda::Float64 + lambdaI::Nullable{ModelFactor} + Pinv::Nullable{ModelFactor} +end + +nusers(model::Model) = size(model.U, 1) +nitems(model::Model) = size(model.P, 2) +nfactors(model::Model) = model.nfactors + +function share!(model::Model) + isa(model.U, SharedArray) || (model.U = share(model.U)) + isa(model.P, SharedArray) || (model.P = share(model.P)) + + lambdaI = get(model.lambdaI) + isa(lambdaI, SharedArray) || (model.lambdaI = share(lambdaI)) + + if !isnull(model.Pinv) + Pinv = get(model.Pinv) + isa(Pinv, SharedArray) || (model.Pinv = share(Pinv)) + end + nothing +end + +function localize!(model::Model) + isa(model.U, SharedArray) && (model.U = copy(model.U)) + isa(model.P, SharedArray) && (model.P = copy(model.P)) + nothing +end + +function clear(model::Model) + model.lambdaI = nothing + model.Pinv = nothing +end + +function pinv(model::Model) + if isnull(model.Pinv) + # since: I = (P * Pt) * inv(P * Pt) + # Pinv = Pt * inv(P * Pt) + P = model.P + PT = P' + model.Pinv = PT * inv(P * PT) + end + get(model.Pinv) +end + +vec_mul_p(model::Model, v) = v * model.P +vec_mul_pinv(model::Model, v) = v * pinv(model) + +function prep(inp::Inputs, nfacts::Int, lambda::Float64) + ensure_loaded(inp) + t1 = time() + logmsg("preparing inputs...") + + nu = nusers(inp) + ni = nitems(inp) + + U = zeros(nu, nfacts) + P = rand(nfacts, ni) + for idx in 1:ni + P[1,idx] = mean(all_user_ratings(inp, idx)) + end + + lambdaI = lambda * eye(nfacts) + model = Model(U, P, nfacts, lambda, lambdaI, nothing) + + t2 = time() + logmsg("prep time: $(t2-t1)") + model +end + +@inline function setU(model::Model, u::Int64, vals) + model.U[u,:] = vals + nothing +end + +@inline function setP(model::Model, i::Int64, vals) + model.P[:,i] = vals + nothing +end + +getU(model::Model, users) = model.U[users, :] +getP(model::Model, items) = model.P[:, items] diff --git a/src/input.jl b/src/input.jl index 3296dc8..603d783 100644 --- a/src/input.jl +++ b/src/input.jl @@ -17,7 +17,7 @@ function clear(inp::Inputs) inp.user_idmap = nothing end -function share(inp::Inputs) +function share!(inp::Inputs) R = get(inp.R) isa(R, SharedRatingMatrix) || (inp.R = share(R)) From 63a61ba3041eec701f1dbc82d5b5eb279b59b334 Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 21 Jan 2016 10:38:32 +0530 Subject: [PATCH 4/8] parameterize als with input and model types --- src/RecSys.jl | 3 +++ src/als-wr.jl | 30 +++++++++++++++--------------- src/als_model.jl | 32 ++++++++++++++++---------------- src/input.jl | 30 +++++++++++++++--------------- 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/src/RecSys.jl b/src/RecSys.jl index 5f804b1..f902ba9 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -19,7 +19,10 @@ typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64, typealias InputRatings Union{RatingMatrix,SharedRatingMatrix} typealias InputIdMap Union{Vector{Int64}, SharedVector{Int64}} typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}} + abstract FileSpec +abstract Inputs +abstract Model abstract Parallelism type ParShmem <: Parallelism end diff --git a/src/als-wr.jl b/src/als-wr.jl index 508454c..8370380 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -1,10 +1,10 @@ -type ALSWR{T<:Parallelism} - inp::Inputs - model::Nullable{Model} - par::T +type ALSWR{TP<:Parallelism,TI<:Inputs,TM<:Model} + inp::TI + model::Nullable{TM} + par::TP end -ALSWR{T<:Parallelism}(inp::FileSpec, par::T=ParShmem()) = ALSWR{T}(Inputs(inp), nothing, par) +ALSWR{TP<:Parallelism}(inp::FileSpec, par::TP=ParShmem()) = ALSWR{TP,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) function save(model::ALSWR, filename::AbstractString) clear(model.inp) @@ -66,7 +66,7 @@ function rmse(als::ALSWR, testdataset::FileSpec) ensure_loaded(als.inp) i_idmap = item_idmap(als.inp) - testinp = Inputs(testdataset) + testinp = SharedMemoryInputs(testdataset) ensure_loaded(testinp; only_items=i_idmap) rmse(als, testinp) end @@ -132,9 +132,9 @@ end ## # Shared memory parallelism -type ComputeData - model::Model - inp::Inputs +type ComputeData{TM<:Model,TI<:Inputs} + model::TM + inp::TI lambdaI::ModelFactor # store directly, avoid null check on every iteration end @@ -144,7 +144,7 @@ share_compdata(c::ComputeData) = (push!(compdata, c); nothing) fetch_compdata() = compdata[1] noop(args...) = nothing -function fact_iters{T<:ParShmem}(::T, model::Model, inp::Inputs, niters::Int64) +function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) t1 = time() share!(model) share!(inp) @@ -177,7 +177,7 @@ function fact_iters{T<:ParShmem}(::T, model::Model, inp::Inputs, niters::Int64) nothing end -function rmse{T<:ParShmem}(als::ALSWR{T}, inp::Inputs) +function rmse{TP<:ParShmem,TI<:Inputs}(als::ALSWR{TP}, inp::TI) t1 = time() model = get(als.model) @@ -200,20 +200,20 @@ end # Thread parallelism if (Base.VERSION >= v"0.5.0-") -function thread_update_item(model::Model, inp::Inputs, ni::Int64, lambdaI::Matrix{Float64}) +function thread_update_item{TM<:Model,TI<:Inputs}(model::TM, inp::TI, ni::Int64, lambdaI::Matrix{Float64}) @threads for i in Int64(1):ni update_item(i, model, inp, lambdaI) end nothing end -function thread_update_user(model::Model, inp::Inputs, nu::Int64, lambdaI::Matrix{Float64}) +function thread_update_user{TM<:Model,TI<:Inputs}(model::TM, inp::TI, nu::Int64, lambdaI::Matrix{Float64}) @threads for u in Int64(1):nu update_user(u, model, inp, lambdaI) end nothing end -function fact_iters{T<:ParThread}(::T, model::Model, inp::Inputs, niters::Int64) +function fact_iters{TP<:ParThread,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64) t1 = time() nu = nusers(inp) @@ -239,7 +239,7 @@ function fact_iters{T<:ParThread}(::T, model::Model, inp::Inputs, niters::Int64) nothing end -function rmse{T<:ParThread}(als::ALSWR{T}, inp::Inputs) +function rmse{TP<:ParThread,TI<:Inputs}(als::ALSWR{TP}, inp::TI) t1 = time() model = get(als.model) diff --git a/src/als_model.jl b/src/als_model.jl index adbe8a0..0db660b 100644 --- a/src/als_model.jl +++ b/src/als_model.jl @@ -1,4 +1,4 @@ -type Model +type SharedMemoryModel <: Model U::ModelFactor P::ModelFactor nfactors::Int @@ -7,11 +7,11 @@ type Model Pinv::Nullable{ModelFactor} end -nusers(model::Model) = size(model.U, 1) -nitems(model::Model) = size(model.P, 2) -nfactors(model::Model) = model.nfactors +nusers(model::SharedMemoryModel) = size(model.U, 1) +nitems(model::SharedMemoryModel) = size(model.P, 2) +nfactors(model::SharedMemoryModel) = model.nfactors -function share!(model::Model) +function share!(model::SharedMemoryModel) isa(model.U, SharedArray) || (model.U = share(model.U)) isa(model.P, SharedArray) || (model.P = share(model.P)) @@ -25,18 +25,18 @@ function share!(model::Model) nothing end -function localize!(model::Model) +function localize!(model::SharedMemoryModel) isa(model.U, SharedArray) && (model.U = copy(model.U)) isa(model.P, SharedArray) && (model.P = copy(model.P)) nothing end -function clear(model::Model) +function clear(model::SharedMemoryModel) model.lambdaI = nothing model.Pinv = nothing end -function pinv(model::Model) +function pinv(model::SharedMemoryModel) if isnull(model.Pinv) # since: I = (P * Pt) * inv(P * Pt) # Pinv = Pt * inv(P * Pt) @@ -47,10 +47,10 @@ function pinv(model::Model) get(model.Pinv) end -vec_mul_p(model::Model, v) = v * model.P -vec_mul_pinv(model::Model, v) = v * pinv(model) +vec_mul_p(model::SharedMemoryModel, v) = v * model.P +vec_mul_pinv(model::SharedMemoryModel, v) = v * pinv(model) -function prep(inp::Inputs, nfacts::Int, lambda::Float64) +function prep{TI<:SharedMemoryInputs}(inp::TI, nfacts::Int, lambda::Float64) ensure_loaded(inp) t1 = time() logmsg("preparing inputs...") @@ -65,22 +65,22 @@ function prep(inp::Inputs, nfacts::Int, lambda::Float64) end lambdaI = lambda * eye(nfacts) - model = Model(U, P, nfacts, lambda, lambdaI, nothing) + model = SharedMemoryModel(U, P, nfacts, lambda, lambdaI, nothing) t2 = time() logmsg("prep time: $(t2-t1)") model end -@inline function setU(model::Model, u::Int64, vals) +@inline function setU(model::SharedMemoryModel, u::Int64, vals) model.U[u,:] = vals nothing end -@inline function setP(model::Model, i::Int64, vals) +@inline function setP(model::SharedMemoryModel, i::Int64, vals) model.P[:,i] = vals nothing end -getU(model::Model, users) = model.U[users, :] -getP(model::Model, items) = model.P[:, items] +getU(model::SharedMemoryModel, users) = model.U[users, :] +getP(model::SharedMemoryModel, items) = model.P[:, items] diff --git a/src/input.jl b/src/input.jl index 603d783..045ffbf 100644 --- a/src/input.jl +++ b/src/input.jl @@ -1,23 +1,23 @@ -type Inputs +type SharedMemoryInputs <: Inputs ratings_file::FileSpec R::Nullable{InputRatings} RT::Nullable{InputRatings} item_idmap::Nullable{InputIdMap} user_idmap::Nullable{InputIdMap} - function Inputs(file::FileSpec) + function SharedMemoryInputs(file::FileSpec) new(file, nothing, nothing, nothing, nothing) end end -function clear(inp::Inputs) +function clear(inp::SharedMemoryInputs) inp.R = nothing inp.RT = nothing inp.item_idmap = nothing inp.user_idmap = nothing end -function share!(inp::Inputs) +function share!(inp::SharedMemoryInputs) R = get(inp.R) isa(R, SharedRatingMatrix) || (inp.R = share(R)) @@ -64,7 +64,7 @@ function filter_empty(R::RatingMatrix; only_items::Vector{Int64}=Int64[]) R, non_empty_items, non_empty_users end -function ensure_loaded(inp::Inputs; only_items::Vector{Int64}=Int64[]) +function ensure_loaded(inp::SharedMemoryInputs; only_items::Vector{Int64}=Int64[]) if isnull(inp.R) logmsg("loading inputs...") t1 = time() @@ -93,19 +93,19 @@ function ensure_loaded(inp::Inputs; only_items::Vector{Int64}=Int64[]) nothing end -item_idmap(inp::Inputs) = get(inp.item_idmap) -user_idmap(inp::Inputs) = get(inp.user_idmap) +item_idmap(inp::SharedMemoryInputs) = get(inp.item_idmap) +user_idmap(inp::SharedMemoryInputs) = get(inp.user_idmap) -nusers(inp::Inputs) = size(get(inp.R), 1) -nitems(inp::Inputs) = size(get(inp.R), 2) +nusers(inp::SharedMemoryInputs) = size(get(inp.R), 1) +nitems(inp::SharedMemoryInputs) = size(get(inp.R), 2) -users_and_ratings(inp::Inputs, i::Int64) = _sprowsvals(get(inp.R), i) -all_user_ratings(inp::Inputs, i::Int64) = _spvals(get(inp.R), i) -all_users_rated(inp::Inputs, i::Int64) = _sprows(get(inp.R), i) +users_and_ratings(inp::SharedMemoryInputs, i::Int64) = _sprowsvals(get(inp.R), i) +all_user_ratings(inp::SharedMemoryInputs, i::Int64) = _spvals(get(inp.R), i) +all_users_rated(inp::SharedMemoryInputs, i::Int64) = _sprows(get(inp.R), i) -items_and_ratings(inp::Inputs, u::Int64) = _sprowsvals(get(inp.RT), u) -all_item_ratings(inp::Inputs, u::Int64) = _spvals(get(inp.RT), u) -all_items_rated(inp::Inputs, u::Int64) = _sprows(get(inp.RT), u) +items_and_ratings(inp::SharedMemoryInputs, u::Int64) = _sprowsvals(get(inp.RT), u) +all_item_ratings(inp::SharedMemoryInputs, u::Int64) = _spvals(get(inp.RT), u) +all_items_rated(inp::SharedMemoryInputs, u::Int64) = _sprows(get(inp.RT), u) function _sprowsvals(R::InputRatings, col::Int64) rowstart = R.colptr[col] From b3cebc11808959103d33a35c4751b87f15b7a787 Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 21 Jan 2016 11:34:15 +0530 Subject: [PATCH 5/8] better save/restore of model - clear input and model of unnecessary data before save - results in smaller saved model - but requires input files to be read while using model - localize shared memory entities before save, as they can not be deserialized --- examples/lastfm/lastfm.jl | 16 +++++++++------- examples/movielens/movielens.jl | 16 +++++++++------- src/RecSys.jl | 2 +- src/als-wr.jl | 16 +++++++++++++--- src/input.jl | 23 +++++++++++++++++++++++ 5 files changed, 55 insertions(+), 18 deletions(-) diff --git a/examples/lastfm/lastfm.jl b/examples/lastfm/lastfm.jl index 2c1dd48..972fd4e 100644 --- a/examples/lastfm/lastfm.jl +++ b/examples/lastfm/lastfm.jl @@ -10,7 +10,7 @@ type MusicRec trainingset::FileSpec artist_names::FileSpec artist_map::FileSpec - rec::ALSWR + als::ALSWR artist_mat::Nullable{Dict{Int64,AbstractString}} function MusicRec(trainingset::FileSpec, artist_names::FileSpec, artist_map::FileSpec) @@ -92,9 +92,9 @@ function artist_names(rec::MusicRec) get(rec.artist_mat) end -train(musicrec::MusicRec, args...) = train(musicrec.rec, args...) -rmse(musicrec::MusicRec) = rmse(musicrec.rec) -recommend(musicrec::MusicRec, args...; kwargs...) = recommend(musicrec.rec, args...; kwargs...) +train(musicrec::MusicRec, args...) = train(musicrec.als, args...) +rmse(musicrec::MusicRec) = rmse(musicrec.als) +recommend(musicrec::MusicRec, args...; kwargs...) = recommend(musicrec.als, args...; kwargs...) function print_list(mat::Dict, idxs::Vector{Int}, header::AbstractString) if !isempty(idxs) @@ -129,16 +129,18 @@ function test(dataset_path) print_recommendations(rec, recommend(rec, 9875)...) println("recommending anonymous user:") - u_idmap = RecSys.user_idmap(rec.rec.inp) - i_idmap = RecSys.item_idmap(rec.rec.inp) + u_idmap = RecSys.user_idmap(rec.als.inp) + i_idmap = RecSys.item_idmap(rec.als.inp) # take user 9875 actual_user = findfirst(u_idmap, 9875) - rated_anon, ratings_anon = RecSys.items_and_ratings(rec.rec.inp, actual_user) + rated_anon, ratings_anon = RecSys.items_and_ratings(rec.als.inp, actual_user) actual_movie_ids = i_idmap[rated_anon] sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") + clear(rec.als) + localize!(rec.als) save(rec, "model.sav") nothing end diff --git a/examples/movielens/movielens.jl b/examples/movielens/movielens.jl index 1d14af4..18554c2 100644 --- a/examples/movielens/movielens.jl +++ b/examples/movielens/movielens.jl @@ -8,7 +8,7 @@ end type MovieRec movie_names::FileSpec - rec::ALSWR + als::ALSWR movie_mat::Nullable{SparseVector{AbstractString,Int64}} function MovieRec(trainingset::FileSpec, movie_names::FileSpec) @@ -30,9 +30,9 @@ function movie_names(rec::MovieRec) get(rec.movie_mat) end -train(movierec::MovieRec, args...) = train(movierec.rec, args...) -rmse(movierec::MovieRec, args...; kwargs...) = rmse(movierec.rec, args...; kwargs...) -recommend(movierec::MovieRec, args...; kwargs...) = recommend(movierec.rec, args...; kwargs...) +train(movierec::MovieRec, args...) = train(movierec.als, args...) +rmse(movierec::MovieRec, args...; kwargs...) = rmse(movierec.als, args...; kwargs...) +recommend(movierec::MovieRec, args...; kwargs...) = recommend(movierec.als, args...; kwargs...) function print_list(mat::SparseVector, idxs::Vector{Int}, header::AbstractString) if isless(Base.VERSION, v"0.5.0-") @@ -69,16 +69,18 @@ function test(dataset_path) print_recommendations(rec, recommend(rec, 100)...) println("recommending anonymous user:") - u_idmap = RecSys.user_idmap(rec.rec.inp) - i_idmap = RecSys.item_idmap(rec.rec.inp) + u_idmap = RecSys.user_idmap(rec.als.inp) + i_idmap = RecSys.item_idmap(rec.als.inp) # take user 100 actual_user = findfirst(u_idmap, 100) - rated_anon, ratings_anon = RecSys.items_and_ratings(rec.rec.inp, actual_user) + rated_anon, ratings_anon = RecSys.items_and_ratings(rec.als.inp, actual_user) actual_movie_ids = i_idmap[rated_anon] sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") + clear(rec.als) + localize!(rec.als) save(rec, "model.sav") nothing end diff --git a/src/RecSys.jl b/src/RecSys.jl index f902ba9..a4184f0 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -12,7 +12,7 @@ import Base: zero export FileSpec, DlmFile, MatFile, SparseMat, read_input export ALSWR, train, recommend, rmse, zero export ParShmem -export save, load +export save, load, clear, localize! typealias RatingMatrix SparseMatrixCSC{Float64,Int64} typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64} diff --git a/src/als-wr.jl b/src/als-wr.jl index 8370380..51cbb9f 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -6,10 +6,20 @@ end ALSWR{TP<:Parallelism}(inp::FileSpec, par::TP=ParShmem()) = ALSWR{TP,SharedMemoryInputs,SharedMemoryModel}(SharedMemoryInputs(inp), nothing, par) -function save(model::ALSWR, filename::AbstractString) - clear(model.inp) +function clear(als::ALSWR) + clear(als.inp) + isnull(als.model) || clear(get(als.model)) +end + +function localize!(als::ALSWR) + localize!(als.inp) + isnull(als.model) || localize!(get(als.model)) +end + +function save(als::ALSWR, filename::AbstractString) + clear(als) open(filename, "w") do f - serialize(f, model) + serialize(f, als) end nothing end diff --git a/src/input.jl b/src/input.jl index 045ffbf..f77be52 100644 --- a/src/input.jl +++ b/src/input.jl @@ -17,6 +17,29 @@ function clear(inp::SharedMemoryInputs) inp.user_idmap = nothing end +function localize!(inp::SharedMemoryInputs) + if !isnull(inp.R) + R = get(inp.R) + isa(R, SharedRatingMatrix) && (inp.R = copy(R)) + end + + if !isnull(inp.RT) + RT = get(inp.RT) + isa(RT, SharedRatingMatrix) && (inp.RT = copy(RT)) + end + + if !isnull(inp.item_idmap) + item_idmap = get(inp.item_idmap) + isa(item_idmap, SharedVector) && (inp.item_idmap = copy(item_idmap)) + end + + if !isnull(inp.user_idmap) + user_idmap = get(inp.user_idmap) + isa(user_idmap, SharedVector) && (inp.user_idmap = copy(user_idmap)) + end + nothing +end + function share!(inp::SharedMemoryInputs) R = get(inp.R) isa(R, SharedRatingMatrix) || (inp.R = share(R)) From c6422cb6568f2b3587866088b34d4dc9631cdaec Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 21 Jan 2016 17:09:55 +0530 Subject: [PATCH 6/8] do not map ids when not needed Makes things more efficient. Also essential in larger distributed memory conditions where such mappings must be avoided. --- examples/lastfm/lastfm.jl | 7 ++++--- examples/movielens/movielens.jl | 7 ++++--- src/als-wr.jl | 15 +++++++++------ src/input.jl | 14 ++++++++------ 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/examples/lastfm/lastfm.jl b/examples/lastfm/lastfm.jl index 972fd4e..975436f 100644 --- a/examples/lastfm/lastfm.jl +++ b/examples/lastfm/lastfm.jl @@ -132,10 +132,11 @@ function test(dataset_path) u_idmap = RecSys.user_idmap(rec.als.inp) i_idmap = RecSys.item_idmap(rec.als.inp) # take user 9875 - actual_user = findfirst(u_idmap, 9875) + actual_user = isempty(u_idmap) ? 9875 : findfirst(u_idmap, 9875) rated_anon, ratings_anon = RecSys.items_and_ratings(rec.als.inp, actual_user) - actual_movie_ids = i_idmap[rated_anon] - sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) + actual_music_ids = isempty(i_idmap) ? rated_anon : i_idmap[rated_anon] + nmusic = isempty(i_idmap) ? RecSys.nitems(rec.als.inp) : maximum(i_idmap) + sp_ratings_anon = SparseVector(nmusic, actual_music_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") diff --git a/examples/movielens/movielens.jl b/examples/movielens/movielens.jl index 18554c2..e8602d3 100644 --- a/examples/movielens/movielens.jl +++ b/examples/movielens/movielens.jl @@ -72,10 +72,11 @@ function test(dataset_path) u_idmap = RecSys.user_idmap(rec.als.inp) i_idmap = RecSys.item_idmap(rec.als.inp) # take user 100 - actual_user = findfirst(u_idmap, 100) + actual_user = isempty(u_idmap) ? 100 : findfirst(u_idmap, 100) rated_anon, ratings_anon = RecSys.items_and_ratings(rec.als.inp, actual_user) - actual_movie_ids = i_idmap[rated_anon] - sp_ratings_anon = SparseVector(maximum(i_idmap), actual_movie_ids, ratings_anon) + actual_movie_ids = isempty(i_idmap) ? rated_anon : i_idmap[rated_anon] + nmovies = isempty(i_idmap) ? RecSys.nitems(rec.als.inp) : maximum(i_idmap) + sp_ratings_anon = SparseVector(nmovies, actual_movie_ids, ratings_anon) print_recommendations(rec, recommend(rec, sp_ratings_anon)...) println("saving model to model.sav") diff --git a/src/als-wr.jl b/src/als-wr.jl index 51cbb9f..d7d672b 100644 --- a/src/als-wr.jl +++ b/src/als-wr.jl @@ -91,12 +91,13 @@ function _recommend(Uvec, model, rated, i_idmap; count::Int=10) idx = 1 while length(recommended) < count && length(top) >= idx item_id = top[idx] - (item_id in rated) || push!(recommended, i_idmap[item_id]) + real_item_id = isempty(i_idmap) ? item_id : i_idmap[item_id] + (item_id in rated) || push!(recommended, real_item_id) idx += 1 end nexcl = idx - count - 1 - mapped_rated = Int64[i_idmap[id] for id in rated] + mapped_rated = isempty(i_idmap) ? rated : Int64[i_idmap[id] for id in rated] recommended, mapped_rated, nexcl end @@ -105,8 +106,10 @@ function recommend(als::ALSWR, user::Int; unrated::Bool=true, count::Int=10) i_idmap = item_idmap(als.inp) u_idmap = user_idmap(als.inp) - (user in u_idmap) || (return (Int[], Int[], 0)) - user = findfirst(u_idmap, user) + if !isempty(u_idmap) + (user in u_idmap) || (return (Int[], Int[], 0)) + user = findfirst(u_idmap, user) + end model = get(als.model) @@ -131,8 +134,8 @@ function recommend(als::ALSWR, user_ratings::SparseVector{Float64,Int64}; unrate ensure_loaded(als.inp) i_idmap = item_idmap(als.inp) model = get(als.model) - Rvec = reshape(user_ratings[i_idmap], 1, length(i_idmap)) - #Rvec = reshape(user_ratings, 1, length(i_idmap)) + mapped_ratings = isempty(i_idmap) ? full(user_ratings) : user_ratings[i_idmap] + Rvec = reshape(mapped_ratings, 1, length(mapped_ratings)) Uvec = vec_mul_pinv(model, Rvec) _recommend(Uvec, model, find(Rvec), i_idmap; count=count) diff --git a/src/input.jl b/src/input.jl index f77be52..e72a8ce 100644 --- a/src/input.jl +++ b/src/input.jl @@ -106,18 +106,20 @@ function ensure_loaded(inp::SharedMemoryInputs; only_items::Vector{Int64}=Int64[ end R, item_idmap, user_idmap = filter_empty(R; only_items=only_items) - inp.R = Nullable(R) - inp.item_idmap = Nullable(item_idmap) - inp.user_idmap = Nullable(user_idmap) - inp.RT = Nullable(R') + inp.R = R + inp.item_idmap = (extrema(item_idmap) == (1,length(item_idmap))) ? nothing : item_idmap + inp.user_idmap = (extrema(user_idmap) == (1,length(user_idmap))) ? nothing : user_idmap + inp.RT = R' t2 = time() + isnull(inp.item_idmap) && logmsg("no need to map item_ids") + isnull(inp.user_idmap) && logmsg("no need to map user_ids") logmsg("time to load inputs: $(t2-t1) secs") end nothing end -item_idmap(inp::SharedMemoryInputs) = get(inp.item_idmap) -user_idmap(inp::SharedMemoryInputs) = get(inp.user_idmap) +item_idmap(inp::SharedMemoryInputs) = isnull(inp.item_idmap) ? Int64[] : get(inp.item_idmap) +user_idmap(inp::SharedMemoryInputs) = isnull(inp.user_idmap) ? Int64[] : get(inp.user_idmap) nusers(inp::SharedMemoryInputs) = size(get(inp.R), 1) nitems(inp::SharedMemoryInputs) = size(get(inp.R), 2) From 9aec9bdd74d518e0176bfd204f00bdaa3dfbfcd8 Mon Sep 17 00:00:00 2001 From: tan Date: Sat, 23 Jan 2016 11:55:28 +0530 Subject: [PATCH 7/8] modernize travis.yml, fix build - modernize travis.yml - disable Logging - provide empty `@threads` macro on Julia v0.4 --- .travis.yml | 27 ++++++++++++--------------- src/RecSys.jl | 13 ++++++++----- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/.travis.yml b/.travis.yml index a522a2a..97a6e35 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,17 +1,14 @@ -language: cpp -compiler: - - clang +# Documentation: http://docs.travis-ci.com/user/languages/julia/ +language: julia +os: + - linux + - osx +julia: + - release + - nightly notifications: email: false -env: - matrix: - - JULIAVERSION="juliareleases" - - JULIAVERSION="julianightlies" -before_install: - - sudo add-apt-repository ppa:staticfloat/julia-deps -y - - sudo add-apt-repository ppa:staticfloat/${JULIAVERSION} -y - - sudo apt-get update -qq -y - - sudo apt-get install libpcre3-dev julia -y -script: - - julia -e 'Pkg.init(); run(`ln -s $(pwd()) $(Pkg.dir("JuliaRecSys"))`); Pkg.pin("JuliaRecSys"); Pkg.resolve()' - - julia -e 'using JuliaRecSys; @assert isdefined(:JuliaRecSys); @assert typeof(JuliaRecSys) === Module' +# uncomment the following lines to override the default test script +#script: +# - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi +# - julia -e 'Pkg.clone(pwd()); Pkg.build("RecSys"); Pkg.test("RecSys"; coverage=true)' diff --git a/src/RecSys.jl b/src/RecSys.jl index a4184f0..d3e20c1 100644 --- a/src/RecSys.jl +++ b/src/RecSys.jl @@ -31,6 +31,9 @@ if (Base.VERSION >= v"0.5.0-") using Base.Threads type ParThread <: Parallelism end export ParThread +else +macro threads(x) +end end include("input.jl") @@ -39,10 +42,10 @@ include("als-wr.jl") include("utils.jl") # enable logging only during debugging -using Logging -#const logger = Logging.configure(filename="recsys.log", level=DEBUG) -const logger = Logging.configure(level=DEBUG) -logmsg(s) = debug(s) -#logmsg(s) = nothing +#using Logging +##const logger = Logging.configure(filename="recsys.log", level=DEBUG) +#const logger = Logging.configure(level=DEBUG) +#logmsg(s) = debug(s) +logmsg(s) = nothing end From 224a4a5b0c87e67d4171d86cc83b6613cbd7ad1b Mon Sep 17 00:00:00 2001 From: tan Date: Sun, 24 Jan 2016 13:11:55 +0530 Subject: [PATCH 8/8] fix share! of inputs check for null case before sharing --- src/input.jl | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/input.jl b/src/input.jl index e72a8ce..3c7f93b 100644 --- a/src/input.jl +++ b/src/input.jl @@ -41,17 +41,25 @@ function localize!(inp::SharedMemoryInputs) end function share!(inp::SharedMemoryInputs) - R = get(inp.R) - isa(R, SharedRatingMatrix) || (inp.R = share(R)) + if !isnull(inp.R) + R = get(inp.R) + isa(R, SharedRatingMatrix) || (inp.R = share(R)) + end - RT = get(inp.RT) - isa(RT, SharedRatingMatrix) || (inp.RT = share(RT)) + if !isnull(inp.RT) + RT = get(inp.RT) + isa(RT, SharedRatingMatrix) || (inp.RT = share(RT)) + end - item_idmap = get(inp.item_idmap) - isa(item_idmap, SharedVector) || (inp.item_idmap = share(item_idmap)) + if !isnull(inp.item_idmap) + item_idmap = get(inp.item_idmap) + isa(item_idmap, SharedVector) || (inp.item_idmap = share(item_idmap)) + end - user_idmap = get(inp.user_idmap) - isa(user_idmap, SharedVector) || (inp.user_idmap = share(user_idmap)) + if !isnull(inp.user_idmap) + user_idmap = get(inp.user_idmap) + isa(user_idmap, SharedVector) || (inp.user_idmap = share(user_idmap)) + end nothing end