Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed out-of-core computation with chunks #22

Merged
merged 2 commits into from
Feb 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions REQUIRE
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ julia 0.4.1
SparseVectors
ParallelSparseMatMul
MAT
LRUCache
39 changes: 32 additions & 7 deletions examples/lastfm/lastfm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ type MusicRec
T, N = map_artists(trainingset, artist_names, artist_map)
new(trainingset, artist_names, artist_map, ALSWR(SparseMat(T)), Nullable(N))
end
function MusicRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, artist_names::FileSpec, artist_map::FileSpec)
new(user_item_ratings, artist_names, artist_map, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing)
end
end

function read_artist_map(artist_map::FileSpec)
t1 = time()
RecSys.logmsg("reading artist map")
RecSys.@logmsg("reading artist map")
A = read_input(artist_map)
valid = map(x->isa(x, Integer), A)
valid = valid[:,1] & valid[:,2]
Expand All @@ -33,13 +36,13 @@ function read_artist_map(artist_map::FileSpec)
good_id = Avalid[idx, 2]
amap[bad_id] = good_id
end
RecSys.logmsg("read artist map in $(time()-t1) secs")
RecSys.@logmsg("read artist map in $(time()-t1) secs")
amap
end

function read_trainingset(trainingset::FileSpec, amap::Dict{Int64,Int64})
t1 = time()
RecSys.logmsg("reading trainingset")
RecSys.@logmsg("reading trainingset")
T = read_input(trainingset)
for idx in 1:size(T,1)
artist_id = T[idx,2]
Expand All @@ -51,13 +54,13 @@ function read_trainingset(trainingset::FileSpec, amap::Dict{Int64,Int64})
artists = convert(Vector{Int64}, T[:,2])
ratings = convert(Vector{Float64}, T[:,3])
S = sparse(users, artists, ratings)
RecSys.logmsg("read trainingset in $(time()-t1) secs")
RecSys.@logmsg("read trainingset in $(time()-t1) secs")
S
end

function read_artist_names(artist_names::FileSpec, amap::Dict{Int64,Int64})
t1 = time()
RecSys.logmsg("reading artist names")
RecSys.@logmsg("reading artist names")
A = read_input(artist_names)
name_map = Dict{Int64,AbstractString}()
for idx in 1:size(A,1)
Expand All @@ -72,7 +75,7 @@ function read_artist_names(artist_names::FileSpec, amap::Dict{Int64,Int64})
name_map[artist_id] = artist_name
end
end
RecSys.logmsg("read artist names in $(time()-t1) secs")
RecSys.@logmsg("read artist names in $(time()-t1) secs")
name_map
end

Expand Down Expand Up @@ -114,6 +117,13 @@ function print_recommendations(rec::MusicRec, recommended::Vector{Int}, listened
nothing
end

function print_recommendations(recommended::Vector{Int}, listened::Vector{Int}, nexcl::Int)
println("Already listened: $listened")
(nexcl == 0) || println("Excluded $(nexcl) artists already listened")
println("Recommended: $recommended")
nothing
end

function test(dataset_path)
ratings_file = DlmFile(joinpath(dataset_path, "user_artist_data.txt"))
artist_names = DlmFile(joinpath(dataset_path, "artist_data.txt"); dlm='\t', quotes=false)
Expand All @@ -124,7 +134,6 @@ function test(dataset_path)

err = rmse(rec)
println("rmse of the model: $err")

println("recommending existing user:")
print_recommendations(rec, recommend(rec, 9875)...)

Expand All @@ -145,3 +154,19 @@ function test(dataset_path)
save(rec, "model.sav")
nothing
end

function test_chunks(dataset_path, splits_dir, model_path)
user_item_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "R_itemwise.meta"), 10)
item_user_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "RT_userwise.meta"), 10)
artist_names = DlmFile(joinpath(dataset_path, "artist_data.txt"); dlm='\t', quotes=false)
artist_map = DlmFile(joinpath(dataset_path, "artist_alias.txt"))

rec = MusicRec(user_item_ratings, item_user_ratings, artist_names, artist_map)
train(rec, 20, 20, model_path, 10)

err = rmse(rec)
println("rmse of the model: $err")
println("recommending existing user:")
print_recommendations(recommend(rec, 9875)...)
nothing
end
19 changes: 19 additions & 0 deletions examples/movielens/movielens.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type MovieRec
function MovieRec(trainingset::FileSpec, movie_names::FileSpec)
new(movie_names, ALSWR(trainingset), nothing)
end
function MovieRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, movie_names::FileSpec)
new(movie_names, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing)
end
end

function movie_names(rec::MovieRec)
Expand Down Expand Up @@ -85,3 +88,19 @@ function test(dataset_path)
save(rec, "model.sav")
nothing
end

# prepare chunks for movielens dataset by running `split_movielens` from `playground/split_input.jl`
function test_chunks(dataset_path, model_path)
user_item_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "R_itemwise.meta"), 5)
item_user_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "RT_userwise.meta"), 5)
movies_file = DlmFile(joinpath(dataset_path, "movies.csv"); dlm=',', header=true)
rec = MovieRec(user_item_ratings, item_user_ratings, movies_file)
train(rec, 10, 4, model_path, 10)

err = rmse(rec)
println("rmse of the model: $err")

println("recommending existing user:")
print_recommendations(rec, recommend(rec, 100)...)
nothing
end
182 changes: 182 additions & 0 deletions playground/split_input.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
using RecSys

function split_sparse(S, chunkmax, filepfx)
metafilename = "$(filepfx).meta"
open(metafilename, "w") do mfile
chunknum = 1
count = 1
colstart = 1
splits = UnitRange[]
nzval = S.nzval
rowval = S.rowval
for col in 1:size(S,2)
npos = S.colptr[col+1]
if (npos >= (count + chunkmax)) || (col == size(S,2))
print("\tchunk $chunknum ... ")
cfilename = "$(filepfx).$(chunknum)"
println(mfile, colstart, ",", col, ",", cfilename)
RecSys.mmap_csc_save(S[:, colstart:col], cfilename)
push!(splits, colstart:col)
colstart = col+1
count = npos
chunknum += 1
println("done")
end
end
println("splits: $splits")
end
nothing
end

function splitall(inp::DlmFile, output_path::AbstractString, nsplits::Int)
println("reading inputs...")
ratings = RecSys.read_input(inp)

users = convert(Vector{Int64}, ratings[:,1]);
items = convert(Vector{Int64}, ratings[:,2]);
ratings = convert(Vector{Float64}, ratings[:,3]);
R = sparse(users, items, ratings);
splitall(R, output_path, nsplits)
end

function splitall(R::SparseMatrixCSC, output_path::AbstractString, nsplits::Int)
nratings = nnz(R)
println("$nratings ratings in $(size(R)) sized sparse matrix")
println("removing empty...")
R, non_empty_items, non_empty_users = RecSys.filter_empty(R)
nratings = nnz(R)
println("$nratings ratings in $(size(R)) sized sparse matrix")

nsplits_u = round(Int, nratings/nsplits)
nsplits_i = round(Int, nratings/nsplits)

println("splitting R itemwise at $nsplits_i items...")
split_sparse(R, nsplits_i, joinpath(output_path, "R_itemwise"))
RT = R'
println("splitting RT userwise at $nsplits_u users...")
split_sparse(RT, nsplits_u, joinpath(output_path, "RT_userwise"))
nothing
end

function split_movielens(dataset_path = "/data/Work/datasets/movielens/ml-20m")
ratings_file = DlmFile(joinpath(dataset_path, "ratings.csv"); dlm=',', header=true)
splitall(ratings_file, joinpath(dataset_path, "splits"), 10)
end

function split_lastfm(dataset_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005")

function read_artist_map(artist_map::FileSpec)
t1 = time()
RecSys.@logmsg("reading artist map")
A = read_input(artist_map)
valid = map(x->isa(x, Integer), A)
valid = valid[:,1] & valid[:,2]
Avalid = convert(Matrix{Int64}, A[valid, :])

amap = Dict{Int64,Int64}()
for idx in 1:size(Avalid,1)
bad_id = Avalid[idx, 1]
good_id = Avalid[idx, 2]
amap[bad_id] = good_id
end
RecSys.@logmsg("read artist map in $(time()-t1) secs")
amap
end

function read_trainingset(trainingset::FileSpec, amap::Dict{Int64,Int64})
t1 = time()
RecSys.@logmsg("reading trainingset")
T = read_input(trainingset)
for idx in 1:size(T,1)
artist_id = T[idx,2]
if artist_id in keys(amap)
T[idx,2] = amap[artist_id]
end
end
users = convert(Vector{Int64}, T[:,1])
artists = convert(Vector{Int64}, T[:,2])
ratings = convert(Vector{Float64}, T[:,3])
S = sparse(users, artists, ratings)
RecSys.@logmsg("read trainingset in $(time()-t1) secs")
S
end

amap = read_artist_map(DlmFile(joinpath(dataset_path, "artist_alias.txt")))
T = read_trainingset(DlmFile(joinpath(dataset_path, "user_artist_data.txt")), amap)
splitall(T, joinpath(dataset_path, "splits"), 20)
end

function load_splits(dataset_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits")
cf = RecSys.ChunkedFile(joinpath(dataset_path, "R_itemwise.meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 10)
println(cf)
nchunks = length(cf.chunks)
for idx in 1:10
cid = floor(Int, nchunks*rand()) + 1
println("fetching from chunk $cid")
c = cf.chunks[cid]
key = floor(Int, length(c.keyrange)*rand()) + c.keyrange.start
println("fetching key $key")
r,v = RecSys.data(cf, key)
#println("\tr:$r, v:$v")
end
println("finished")
end

function test_dense_splits(dataset_path = "/tmp/test")
metafilename = joinpath(dataset_path, "mem.meta")
cfile = DenseMatChunks(metafilename, 1, (10^6,10))
RecSys.create(cfile)
cf = RecSys.read_input(cfile)
for idx in 1:10
chunk = RecSys.getchunk(cf, idx*10^4)
A = RecSys.data(chunk, cf.lrucache)
@assert A.val[1] == 0.0
fill!(A.val, idx)
end
cf = RecSys.read_input(cfile)
for idx in 1:10
chunk = RecSys.getchunk(cf, idx*10^4)
A = RecSys.data(chunk, cf.lrucache)
@assert A.val[1] == Float64(idx)
println(A.val[1])
end
end

##
# an easy way to generate somewhat relevant test data is to take an existing dataset and replicate
# items and users based on existing data.
function generate_test_data(setname::AbstractString, generated_data_path, original_data_path, mul_factor)
RecSys.@logmsg("generating $setname")
@assert mul_factor > 1
incf = RecSys.ChunkedFile(joinpath(original_data_path, "$(setname).meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 2)
outmetaname = joinpath(generated_data_path, "$(setname).meta")

outkeystart = 1
outfileidx = 1
open(outmetaname, "w") do outmeta
for chunk in incf.chunks
L = length(chunk.keyrange)
S = RecSys.data(chunk, incf.lrucache)
Sout = S
for x in 1:(mul_factor-1)
Sout = vcat(Sout, S)
end
outfname = joinpath(generated_data_path, "$(setname).$(outfileidx)")
outfileidx += 1
RecSys.mmap_csc_save(Sout, outfname)
for x in 1:mul_factor
println(outmeta, outkeystart, ",", outkeystart+L-1, ",", outfname)
RecSys.@logmsg("generated $setname $outkeystart:$(outkeystart+L-1) with size: $(size(Sout)) from size: $(size(S))")
outkeystart += L
end
end
end
end

function generate_test_data(generated_data_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits2",
original_data_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits",
mul_factor=2)
generate_test_data("R_itemwise", generated_data_path, original_data_path, mul_factor)
generate_test_data("RT_userwise", generated_data_path, original_data_path, mul_factor)
println("finished")
end
35 changes: 26 additions & 9 deletions src/RecSys.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ end

import Base: zero

export FileSpec, DlmFile, MatFile, SparseMat, read_input
export FileSpec, DlmFile, MatFile, SparseMat, SparseMatChunks, DenseMatChunks, read_input
export ALSWR, train, recommend, rmse, zero
export ParShmem
export ParShmem, ParChunk
export save, load, clear, localize!

typealias RatingMatrix SparseMatrixCSC{Float64,Int64}
Expand All @@ -26,26 +26,43 @@ abstract Model

abstract Parallelism
type ParShmem <: Parallelism end
type ParChunk <: Parallelism end

if (Base.VERSION >= v"0.5.0-")
using Base.Threads
type ParThread <: Parallelism end
export ParThread
else
threadid() = 1
macro threads(x)
end
end

include("input.jl")
include("als_model.jl")
include("als-wr.jl")
include("utils.jl")

# enable logging only during debugging
#using Logging
##const logger = Logging.configure(filename="recsys.log", level=DEBUG)
#const logger = Logging.configure(level=DEBUG)
#logmsg(s) = debug(s)
logmsg(s) = nothing
#macro logmsg(s)
# quote
# debug("[", myid(), "-", threadid(), "] ", $(esc(s)))
# end
#end
macro logmsg(s)
end


include("chunks/chunk.jl")
include("chunks/csv.jl")
include("chunks/mmapsparse.jl")
include("chunks/mmapdense.jl")

include("inputs/input.jl")
include("inputs/dist_input.jl")

include("models/als_model.jl")
include("models/als_dist_model.jl")

include("als-wr.jl")
include("utils.jl")

end
Loading