Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunks externalized as Blobs.jl #23

Merged
merged 5 commits into from
Mar 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion REQUIRE
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ julia 0.4.1
SparseVectors
ParallelSparseMatMul
MAT
LRUCache
Blobs
8 changes: 4 additions & 4 deletions examples/lastfm/lastfm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type MusicRec

function MusicRec(trainingset::FileSpec, artist_names::FileSpec, artist_map::FileSpec)
T, N = map_artists(trainingset, artist_names, artist_map)
new(trainingset, artist_names, artist_map, ALSWR(SparseMat(T)), Nullable(N))
new(trainingset, artist_names, artist_map, ALSWR(SparseMat(T), ParShmem()), Nullable(N))
end
function MusicRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, artist_names::FileSpec, artist_map::FileSpec)
new(user_item_ratings, artist_names, artist_map, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing)
new(user_item_ratings, artist_names, artist_map, ALSWR(user_item_ratings, item_user_ratings, ParBlob()), nothing)
end
end

Expand Down Expand Up @@ -156,8 +156,8 @@ function test(dataset_path)
end

function test_chunks(dataset_path, splits_dir, model_path)
user_item_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "R_itemwise.meta"), 10)
item_user_ratings = SparseMatChunks(joinpath(dataset_path, splits_dir, "RT_userwise.meta"), 10)
user_item_ratings = SparseBlobs(joinpath(dataset_path, splits_dir, "R_itemwise"); maxcache=10)
item_user_ratings = SparseBlobs(joinpath(dataset_path, splits_dir, "RT_userwise"); maxcache=10)
artist_names = DlmFile(joinpath(dataset_path, "artist_data.txt"); dlm='\t', quotes=false)
artist_map = DlmFile(joinpath(dataset_path, "artist_alias.txt"))

Expand Down
8 changes: 4 additions & 4 deletions examples/movielens/movielens.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ type MovieRec
movie_mat::Nullable{SparseVector{AbstractString,Int64}}

function MovieRec(trainingset::FileSpec, movie_names::FileSpec)
new(movie_names, ALSWR(trainingset), nothing)
new(movie_names, ALSWR(trainingset, ParShmem()), nothing)
end
function MovieRec(user_item_ratings::FileSpec, item_user_ratings::FileSpec, movie_names::FileSpec)
new(movie_names, ALSWR(user_item_ratings, item_user_ratings, ParChunk()), nothing)
new(movie_names, ALSWR(user_item_ratings, item_user_ratings, ParBlob()), nothing)
end
end

Expand Down Expand Up @@ -91,8 +91,8 @@ end

# prepare chunks for movielens dataset by running `split_movielens` from `playground/split_input.jl`
function test_chunks(dataset_path, model_path)
user_item_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "R_itemwise.meta"), 5)
item_user_ratings = SparseMatChunks(joinpath(dataset_path, "splits", "RT_userwise.meta"), 5)
user_item_ratings = SparseBlobs(joinpath(dataset_path, "splits", "R_itemwise"); maxcache=10)
item_user_ratings = SparseBlobs(joinpath(dataset_path, "splits", "RT_userwise"); maxcache=10)
movies_file = DlmFile(joinpath(dataset_path, "movies.csv"); dlm=',', header=true)
rec = MovieRec(user_item_ratings, item_user_ratings, movies_file)
train(rec, 10, 4, model_path, 10)
Expand Down
2 changes: 1 addition & 1 deletion examples/netflix/netflix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ using RecSys

function test(matfile, entryname)
ratings = MatFile(matfile, entryname)
rec = ALSWR(ratings)
rec = ALSWR(ratings, ParShmem())
train(rec, 10, 4)
err = rmse(rec)
println("rmse of the model: $err")
Expand Down
146 changes: 68 additions & 78 deletions playground/split_input.jl
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
using RecSys

function split_sparse(S, chunkmax, filepfx)
metafilename = "$(filepfx).meta"
open(metafilename, "w") do mfile
chunknum = 1
count = 1
colstart = 1
splits = UnitRange[]
nzval = S.nzval
rowval = S.rowval
for col in 1:size(S,2)
npos = S.colptr[col+1]
if (npos >= (count + chunkmax)) || (col == size(S,2))
print("\tchunk $chunknum ... ")
cfilename = "$(filepfx).$(chunknum)"
println(mfile, colstart, ",", col, ",", cfilename)
RecSys.mmap_csc_save(S[:, colstart:col], cfilename)
push!(splits, colstart:col)
colstart = col+1
count = npos
chunknum += 1
println("done")
end
using Blobs
#include("/home/tan/Work/julia/packages/Blobs/examples/matrix.jl")
using RecSys.MatrixBlobs

function split_sparse(S, chunkmax, metadir)
isdir(metadir) || mkdir(metadir)
spblobs = SparseMatBlobs(Float64, Int64, metadir)
chunknum = 1
count = 1
colstart = 1
nzval = S.nzval
rowval = S.rowval
for col in 1:size(S,2)
npos = S.colptr[col+1]
if (npos >= (count + chunkmax)) || (col == size(S,2))
print("\tchunk $chunknum ... ")
append!(spblobs, S[:, colstart:col])
colstart = col+1
count = npos
chunknum += 1
println("done")
end
println("splits: $splits")
end
Blobs.save(spblobs)
nothing
end

Expand Down Expand Up @@ -103,74 +101,66 @@ function split_lastfm(dataset_path = "/data/Work/datasets/last_fm_music_recommen

amap = read_artist_map(DlmFile(joinpath(dataset_path, "artist_alias.txt")))
T = read_trainingset(DlmFile(joinpath(dataset_path, "user_artist_data.txt")), amap)

println("randomizing items to remove skew")
T = T[:, randperm(size(T,2))]

splitall(T, joinpath(dataset_path, "splits"), 20)
end

function load_splits(dataset_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits")
cf = RecSys.ChunkedFile(joinpath(dataset_path, "R_itemwise.meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 10)
println(cf)
nchunks = length(cf.chunks)
for idx in 1:10
cid = floor(Int, nchunks*rand()) + 1
println("fetching from chunk $cid")
c = cf.chunks[cid]
key = floor(Int, length(c.keyrange)*rand()) + c.keyrange.start
println("fetching key $key")
r,v = RecSys.data(cf, key)
#println("\tr:$r, v:$v")
sp = SparseMatBlobs(joinpath(dataset_path, "R_itemwise"))
for idx in 1:length(sp.splits)
p = sp.splits[idx]
r = p.first
part, _r = Blobs.load(sp, first(r))
RecSys.@logmsg("got part of size: $(size(part)), with r: $r, _r:$_r")
end
println("finished")
end

function test_dense_splits(dataset_path = "/tmp/test")
metafilename = joinpath(dataset_path, "mem.meta")
cfile = DenseMatChunks(metafilename, 1, (10^6,10))
RecSys.create(cfile)
cf = RecSys.read_input(cfile)
for idx in 1:10
chunk = RecSys.getchunk(cf, idx*10^4)
A = RecSys.data(chunk, cf.lrucache)
@assert A.val[1] == 0.0
fill!(A.val, idx)
end
cf = RecSys.read_input(cfile)
for idx in 1:10
chunk = RecSys.getchunk(cf, idx*10^4)
A = RecSys.data(chunk, cf.lrucache)
@assert A.val[1] == Float64(idx)
println(A.val[1])
end
end

##
# an easy way to generate somewhat relevant test data is to take an existing dataset and replicate
# items and users based on existing data.
function Blobs.append!{Tv,Ti}(sp::SparseMatBlobs{Tv,Ti}, blob::Blob)
S = blob.data.value
m,n = size(S)
if isempty(sp.splits)
sp.sz = (m, n)
idxrange = 1:n
else
(sp.sz[1] == m) || throw(BoundsError("SparseMatBlobs $(sp.sz)", (m,n)))
old_n = sp.sz[2]
idxrange = (old_n+1):(old_n+n)
sp.sz = (m, old_n+n)
end

push!(sp.splits, idxrange => blob.id)
RecSys.@logmsg("appending blob $(blob.id) of size: $(size(S)) for idxrange: $idxrange, sersz: $(blob.metadata.size)")
blob
end

function generate_test_data(setname::AbstractString, generated_data_path, original_data_path, mul_factor)
RecSys.@logmsg("generating $setname")
@assert mul_factor > 1
incf = RecSys.ChunkedFile(joinpath(original_data_path, "$(setname).meta"), UnitRange{Int64}, SparseMatrixCSC{Float64,Int}, 2)
outmetaname = joinpath(generated_data_path, "$(setname).meta")

outkeystart = 1
outfileidx = 1
open(outmetaname, "w") do outmeta
for chunk in incf.chunks
L = length(chunk.keyrange)
S = RecSys.data(chunk, incf.lrucache)
Sout = S
for x in 1:(mul_factor-1)
Sout = vcat(Sout, S)
end
outfname = joinpath(generated_data_path, "$(setname).$(outfileidx)")
outfileidx += 1
RecSys.mmap_csc_save(Sout, outfname)
for x in 1:mul_factor
println(outmeta, outkeystart, ",", outkeystart+L-1, ",", outfname)
RecSys.@logmsg("generated $setname $outkeystart:$(outkeystart+L-1) with size: $(size(Sout)) from size: $(size(S))")
outkeystart += L
end
sp1 = SparseMatBlobs(joinpath(original_data_path, setname))
metapath = joinpath(generated_data_path, setname)
sp2 = SparseMatBlobs(Float64, Int64, metapath)
isdir(metapath) || mkdir(metapath)
for idx in 1:length(sp1.splits)
p = sp1.splits[idx]
r = p.first
part, _r = Blobs.load(sp1, first(r))
RecSys.@logmsg("got part of size: $(size(part)), with r: $r, _r:$_r")
part_out = part
for x in 1:(mul_factor-1)
part_out = vcat(part_out, part)
end
blob = append!(sp2, part_out)
for x in 1:(mul_factor-1)
append!(sp2, blob)
RecSys.@logmsg("generated part of size: $(size(part_out))")
end
end
Blobs.save(sp2)
end

function generate_test_data(generated_data_path = "/data/Work/datasets/last_fm_music_recommendation/profiledata_06-May-2005/splits2",
Expand Down
38 changes: 22 additions & 16 deletions src/RecSys.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,36 @@ module RecSys

using ParallelSparseMatMul
using MAT
using Blobs

include("chunks/matrix.jl")
using .MatrixBlobs

if isless(Base.VERSION, v"0.5.0-")
using SparseVectors
end

import Base: zero
import Blobs: save, load

export FileSpec, DlmFile, MatFile, SparseMat, SparseMatChunks, DenseMatChunks, read_input
export FileSpec, DlmFile, MatFile, SparseMat, SparseBlobs, DenseBlobs, read_input
export ALSWR, train, recommend, rmse, zero
export ParShmem, ParChunk
export ParShmem, ParBlob
export save, load, clear, localize!

typealias RatingMatrix SparseMatrixCSC{Float64,Int64}
typealias SharedRatingMatrix ParallelSparseMatMul.SharedSparseMatrixCSC{Float64,Int64}
typealias InputRatings Union{RatingMatrix,SharedRatingMatrix}
typealias InputRatings Union{RatingMatrix,SharedRatingMatrix,SparseMatBlobs}
typealias InputIdMap Union{Vector{Int64}, SharedVector{Int64}}
typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}}
typealias ModelFactor Union{Matrix{Float64}, SharedArray{Float64,2}, DenseMatBlobs{Float64}}

abstract FileSpec
abstract Inputs
abstract Model

abstract Parallelism
type ParShmem <: Parallelism end
type ParChunk <: Parallelism end
type ParBlob <: Parallelism end

if (Base.VERSION >= v"0.5.0-")
using Base.Threads
Expand All @@ -38,6 +43,11 @@ macro threads(x)
end
end

function tstr()
t = time()
string(Libc.strftime("%Y-%m-%dT%H:%M:%S",t), Libc.strftime("%z",t)[1:end-2], ":", Libc.strftime("%z",t)[end-1:end])
end

# enable logging only during debugging
#using Logging
##const logger = Logging.configure(filename="recsys.log", level=DEBUG)
Expand All @@ -49,18 +59,14 @@ end
#end
macro logmsg(s)
end
#macro logmsg(s)
# quote
# info(tstr(), " [", myid(), "-", threadid(), "] ", $(esc(s)))
# end
#end


include("chunks/chunk.jl")
include("chunks/csv.jl")
include("chunks/mmapsparse.jl")
include("chunks/mmapdense.jl")

include("inputs/input.jl")
include("inputs/dist_input.jl")

include("models/als_model.jl")
include("models/als_dist_model.jl")
include("inputs/inputs.jl")
include("models/models.jl")

include("als-wr.jl")
include("utils.jl")
Expand Down
Loading