Skip to content

Commit

Permalink
reorganize stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmaykm committed Feb 1, 2016
1 parent fc7c59d commit 74afba5
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 95 deletions.
17 changes: 11 additions & 6 deletions src/RecSys.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@ macro logmsg(s)
end


include("chunk.jl")
include("mmapsparse.jl")
include("input.jl")
include("dist_input.jl")
include("als_model.jl")
include("als_dist_model.jl")
include("chunks/chunk.jl")
include("chunks/csv.jl")
include("chunks/mmapsparse.jl")
include("chunks/mmapdense.jl")

include("inputs/input.jl")
include("inputs/dist_input.jl")

include("models/als_model.jl")
include("models/als_dist_model.jl")

include("als-wr.jl")
include("utils.jl")

Expand Down
54 changes: 28 additions & 26 deletions src/als-wr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ function train(als::ALSWR, niters::Int, nfacts::Int64, lambda::Float64=0.065)
nothing
end

function train(als::ALSWR{ParChunk,DistInputs,DistModel}, niters::Int, nfacts::Int64, model_dir::AbstractString, max_cache::Int=10, lambda::Float64=0.065)
als.model = prep(als.inp, nfacts, lambda, model_dir, max_cache)
fact_iters(als, niters)
nothing
end

fact_iters(als, niters) = fact_iters(als.par, get(als.model), als.inp, niters)

##
Expand Down Expand Up @@ -228,6 +222,34 @@ function fact_iters{TP<:ParShmem,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI,
nothing
end

function rmse{TP<:Union{ParShmem,ParChunk},TI<:Inputs}(als::ALSWR{TP}, inp::TI)
t1 = time()

model = get(als.model)
share!(model)
share!(inp)
ensure_loaded(inp)
ensure_loaded(model)

cumerr = @parallel (.+) for user in 1:nusers(inp)
Uvec = reshape(getU(model,user), 1, nfactors(model))
nzrows, nzvals = items_and_ratings(inp, user)
predicted = vec(vec_mul_p(model, Uvec))[nzrows]
[sum((predicted .- nzvals) .^ 2), length(predicted)]
end
localize!(model)
@logmsg("rmse time $(time()-t1)")
sqrt(cumerr[1]/cumerr[2])
end

##
# Chunk based distributed memory parallelism
function train(als::ALSWR{ParChunk,DistInputs,DistModel}, niters::Int, nfacts::Int64, model_dir::AbstractString, max_cache::Int=10, lambda::Float64=0.065)
als.model = prep(als.inp, nfacts, lambda, model_dir, max_cache)
fact_iters(als, niters)
nothing
end

function fact_iters{TP<:ParChunk,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI, niters::Int64)
t1 = time()

Expand Down Expand Up @@ -264,26 +286,6 @@ function fact_iters{TP<:ParChunk,TM<:Model,TI<:Inputs}(::TP, model::TM, inp::TI,
nothing
end

function rmse{TP<:Union{ParShmem,ParChunk},TI<:Inputs}(als::ALSWR{TP}, inp::TI)
t1 = time()

model = get(als.model)
share!(model)
share!(inp)
ensure_loaded(inp)
ensure_loaded(model)

cumerr = @parallel (.+) for user in 1:nusers(inp)
Uvec = reshape(getU(model,user), 1, nfactors(model))
nzrows, nzvals = items_and_ratings(inp, user)
predicted = vec(vec_mul_p(model, Uvec))[nzrows]
[sum((predicted .- nzvals) .^ 2), length(predicted)]
end
localize!(model)
@logmsg("rmse time $(time()-t1)")
sqrt(cumerr[1]/cumerr[2])
end


##
# Thread parallelism
Expand Down
66 changes: 3 additions & 63 deletions src/chunk.jl → src/chunks/chunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@ using LRUCache
using Base.Mmap
import Base.Mmap: sync!

# D = dimension that is split
# N = value of the other dimension, which is constant across all splits
type MemMappedMatrix{T,D,N}
val::Matrix{T}
end
sync!(m::MemMappedMatrix) = sync!(m.val, Base.MS_SYNC | Base.MS_INVALIDATE)

type Chunk{K,V}
path::AbstractString
offset::Int
Expand All @@ -37,50 +30,6 @@ function Chunk(path::AbstractString, keyrange, V)
Chunk{K,V}(path, 0, size, keyrange, V, WeakRef())
end

function load{T,D,N}(::Type{MemMappedMatrix{T,D,N}}, chunk::Chunk)
@logmsg("loading memory mapped chunk $(chunk.path)")
ncells = div(chunk.size, sizeof(T))
M = Int(ncells/N)
dims = (D == 1) ? (M,N) : (N,M)
A = Mmap.mmap(chunk.path, Matrix{T}, dims, chunk.offset)
MemMappedMatrix{T,D,N}(A)
end

function load{T<:Vector{UInt8}}(::Type{T}, chunk::Chunk)
@logmsg("loading chunk $(chunk.path)")
open(chunk.path) do f
seek(f, chunk.offset)
databytes = Array(UInt8, chunk.size)
read!(f, databytes)
return databytes::T
end
end

function load{T<:Matrix}(::Type{T}, chunk::Chunk)
M = readcsv(load(Vector{UInt8}, chunk))
M::T
end

#=
function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk)
A = load(Matrix{Float64}, chunk)
rows = convert(Vector{Int64}, A[:,1]);
cols = convert(Vector{Int64}, A[:,2]);
vals = convert(Vector{Float64}, A[:,3]);
# subtract keyrange to keep sparse matrix small
cols .-= (first(chunk.keyrange) - 1)
sparse(rows, cols, vals)::T
end
=#
function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk)
@logmsg("loading memory mapped sparse $(chunk.path)")
open(chunk.path) do f
seek(f, chunk.offset)
return mmap_csc_load(f)::T
end
end

function data{K,V}(chunk::Chunk{K,V}, lrucache::LRU)
if chunk.data.value == nothing
data = load(V, chunk)
Expand All @@ -97,15 +46,6 @@ function sync!(chunk::Chunk)
end
end

#=
function finalize_chunk_data(chunk::Chunk, data)
#@logmsg("unloading chunk $(chunk.path)")
# TODO: need to sync! when it gets chucked out of the lrucache
#sync!(chunk)
chunk.data = nothing
end
=#

type ChunkedFile{K,V}
keyrangetype::Type{K}
valtype::Type{V}
Expand All @@ -119,9 +59,9 @@ keyrange(cf::ChunkedFile) = first(cf.chunks[1].keyrange):last(cf.chunks[end].key
sync!(cf::ChunkedFile) = empty!(cf.lrucache)

function _unload(cf, k, v)
sync!(v)
chunk = getchunk(cf, first(k))
@logmsg("unloading chunk $(chunk.path)")
sync!(chunk)
#@logmsg("unloading chunk $(chunk.path)")
chunk.data.value = nothing
nothing
end
Expand All @@ -146,7 +86,7 @@ function writemeta(cf::ChunkedFile)
chunks = cf.chunks
idx = 1
for chunk in chunks
@logmsg("writing chunk: $(chunk.keyrange)")
#@logmsg("writing chunk: $(chunk.keyrange)")
println(meta, first(chunk.keyrange), ",", last(chunk.keyrange), ",", chunkpfx, ".", idx)
idx += 1
end
Expand Down
27 changes: 27 additions & 0 deletions src/chunks/csv.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
function load{T<:Vector{UInt8}}(::Type{T}, chunk::Chunk)
#@logmsg("loading chunk $(chunk.path)")
open(chunk.path) do f
seek(f, chunk.offset)
databytes = Array(UInt8, chunk.size)
read!(f, databytes)
return databytes::T
end
end

function load{T<:Matrix}(::Type{T}, chunk::Chunk)
M = readcsv(load(Vector{UInt8}, chunk))
M::T
end

#=
function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk)
A = load(Matrix{Float64}, chunk)
rows = convert(Vector{Int64}, A[:,1]);
cols = convert(Vector{Int64}, A[:,2]);
vals = convert(Vector{Float64}, A[:,3]);
# subtract keyrange to keep sparse matrix small
cols .-= (first(chunk.keyrange) - 1)
sparse(rows, cols, vals)::T
end
=#
15 changes: 15 additions & 0 deletions src/chunks/mmapdense.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# D = dimension that is split
# N = value of the other dimension, which is constant across all splits
type MemMappedMatrix{T,D,N}
val::Matrix{T}
end
sync!(m::MemMappedMatrix) = sync!(m.val, Base.MS_SYNC | Base.MS_INVALIDATE)

function load{T,D,N}(::Type{MemMappedMatrix{T,D,N}}, chunk::Chunk)
#@logmsg("loading memory mapped chunk $(chunk.path)")
ncells = div(chunk.size, sizeof(T))
M = Int(ncells/N)
dims = (D == 1) ? (M,N) : (N,M)
A = Mmap.mmap(chunk.path, Matrix{T}, dims, chunk.offset)
MemMappedMatrix{T,D,N}(A)
end
8 changes: 8 additions & 0 deletions src/mmapsparse.jl → src/chunks/mmapsparse.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,11 @@ function sync!(spm::SparseMatrixCSC)
Mmap.sync!(spm.nzval, Base.MS_SYNC | Base.MS_INVALIDATE)
nothing
end

function load{T<:SparseMatrixCSC}(::Type{T}, chunk::Chunk)
#@logmsg("loading memory mapped sparse $(chunk.path)")
open(chunk.path) do f
seek(f, chunk.offset)
return mmap_csc_load(f)::T
end
end
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 74afba5

Please sign in to comment.