From bc6b0baf774ca8aab6a659722eadf58852b81f76 Mon Sep 17 00:00:00 2001 From: Paco Zamora Martinez Date: Mon, 19 May 2014 09:17:59 +0200 Subject: [PATCH 1/3] Added cache for previously done map jobs --- mapreduce/task.lua | 22 +++++++++++++++++++++- mapreduce/worker.lua | 3 ++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/mapreduce/task.lua b/mapreduce/task.lua index f2a6bb6..02a8c7a 100644 --- a/mapreduce/task.lua +++ b/mapreduce/task.lua @@ -243,7 +243,14 @@ function task:get_partition_args() return self.tbl.init_args end --- JOB INTERFACE +-- TASK INTERFACE + +local cache_map_ids = {} +local cache_inv_map_ids = {} +function task.reset_cache() + cache_map_ids = {} + cache_inv_map_ids = {} +end -- workers use this method to load a new job in the caller object function task:take_next_job(tmpname) @@ -265,6 +272,12 @@ function task:take_next_job(tmpname) { status = STATUS.BROKEN, }, }, } + -- after first iteration, map jobs done previously will be taken if possible, + -- reducing the overhead for loading data + if self:get_iteration() > 1 and task_status == TASK_STATUS.MAP then + query._id = { ["$in"] = cache_map_ids } + if db:count(jobs_ns, query) == 0 then query._id = nil end + end local set_query = { worker = utils.get_hostname(), tmpname = tmpname_summary(tmpname), @@ -282,6 +295,13 @@ function task:take_next_job(tmpname) -- updated its data local job_tbl = db:find_one(jobs_ns, set_query) if job_tbl then + if task_status == TASK_STATUS.MAP then + local _id = job_tbl._id + if not cache_inv_map_ids[_id] then + cache_inv_map_ids[_id] = true + table.insert(cache_map_ids, _id) + end + end local storage,path = self:get_storage() return task_status,job(self.cnn, job_tbl, task_status, self:get_fname(), self:get_args(), diff --git a/mapreduce/worker.lua b/mapreduce/worker.lua index 0c4f0ba..af5862b 100644 --- a/mapreduce/worker.lua +++ b/mapreduce/worker.lua @@ -36,7 +36,7 @@ local utils = require "mapreduce.utils" local task = require "mapreduce.task" local cnn = require "mapreduce.cnn" --- PRIVATE FUNCTIONS +-- PRIVATE FUNCTIONS AND PROPERTIES -- executes the worker main loop; it runs querying the task object for new jobs local worker_execute = function(self) @@ -92,6 +92,7 @@ local worker_execute = function(self) ntasks = ntasks + 1 job_done = false job.reset_cache() + task.reset_cache() end if ntasks < MAX_TASKS then print(string.format("# WAITING...\tntasks: %d/%d\tit: %d/%d\tsleep: %.1f", From d0550534f376f7500445618755261115049fa800 Mon Sep 17 00:00:00 2001 From: Paco Zamora Martinez Date: Mon, 19 May 2014 10:13:09 +0200 Subject: [PATCH 2/3] Added smooth factor in gradients computation --- mapreduce/examples/April-ANN/common.lua | 22 +++++++++++++-------- mapreduce/examples/April-ANN/init.lua | 26 ++++++++++++++----------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/mapreduce/examples/April-ANN/common.lua b/mapreduce/examples/April-ANN/common.lua index 5985f85..410b2c2 100644 --- a/mapreduce/examples/April-ANN/common.lua +++ b/mapreduce/examples/April-ANN/common.lua @@ -97,13 +97,17 @@ local mapfn = function(key, value, emit) local train_func = deserialize_from_gridfs(gridfs, assert(conf.train_func)) local trainer = train_func:get_state_table().last conf:read_only(true) - local weight_grads,loss_matrix = compute_gradients_and_loss(trainer, - key, value, - conf) + local weight_grads,loss_matrix,bunch_size = + compute_gradients_and_loss(trainer, key, value, conf) conf:read_only(false) + assert(weight_grads and loss_matrix and bunch_size, + "compute_gradients_and_loss had to return gradients, loss_matrix and bunch_size") for name,grads in pairs(weight_grads) do serialize_and_map_emit(name, - { grads, trainer:weights(name):get_shared_count() }, + { + grads, + trainer:weights(name):get_shared_count()*bunch_size + }, emit) end serialize_and_map_emit(TR_LOSS_KEY, loss_matrix, emit) @@ -165,10 +169,11 @@ local finalfn = function(pairs_iterator) tr_loss_mean = value[1] tr_loss_var = value[2] else - weight_grads[key] = value[1] - local w = trainer:weights(key) - w:reset_shared_count() - w:add_to_shared_count(value[2]) + local N = value[2] if not N or N==0 then N=1 end + if params.smooth_gradients then + -- gradients smoothing + weight_grads[key] = value[1]:scal( 1.0/math.sqrt(N) ) + end end end assert(tr_loss_mean) @@ -214,6 +219,7 @@ local make_map_reduce_task_table = function(t) user_taskfn = { mandatory = true, type_match="function" }, user_finalfn = { mandatory = true, type_match="function" }, generate_new_trainer_and_train_func = { mandatory = true, type_match="function" }, + smooth_gradients = { mandatory = false, default = true }, }, t) -- dbname = params.dbname diff --git a/mapreduce/examples/April-ANN/init.lua b/mapreduce/examples/April-ANN/init.lua index 989d294..523d938 100644 --- a/mapreduce/examples/April-ANN/init.lua +++ b/mapreduce/examples/April-ANN/init.lua @@ -9,11 +9,11 @@ local EXP_DBNAME = "exp_digits" local bunch_size = 32 local weights_random = random(1234) -local description = "256 inputs 256 tanh 128 tanh 10 log_softmax" +local description = "256 inputs 128 tanh 10 log_softmax" local inf = -1 local sup = 1 local shuffle_random = random() -- TOTALLY RANDOM FOR EACH WORKER -local learning_rate = 0.005 +local learning_rate = 0.01 local momentum = 0.02 local weight_decay = 1e-04 local max_epochs = 40 @@ -77,7 +77,7 @@ local make_load_matrix = function(value) end end -local make_load_dataset = function(mat) +local make_load_dataset = function(mat,m2) return function() local train_input = dataset.matrix(mat, { @@ -122,10 +122,11 @@ local make_load_dataset = function(mat) end end --- receives the persistent table in read-only mode as last argument +-- receives a trainer, key,value pair and the persistent table in read-only mode +-- as last argument; returns gradients, loss_matrix and bunch_size local compute_gradients_and_loss = function(trainer, key, value, conf) - local mat = cached(value, make_load_matrix(value), mat_cache) - local ds_tbl = cached(value, make_load_dataset(mat), ds_cache) + local mat = cached(value, make_load_matrix(value), mat_cache) + local ds_tbl = cached(value, make_load_dataset(mat,m2), ds_cache) local in_ds = ds_tbl.train_input local out_ds = ds_tbl.train_output local bunch_tbl = {} @@ -136,15 +137,16 @@ local compute_gradients_and_loss = function(trainer, key, value, conf) local target = out_ds:getPatternBunch(bunch_tbl) local grads,tr_loss,tr_loss_matrix = trainer:compute_gradients_step(input, target) - return grads,tr_loss_matrix + return grads,tr_loss_matrix,bunch_size end --- receives the persistent table in read-only mode as last argument +-- receives a trainer and the persistent table in read-only mode as last +-- argument; returns the validation loss mean and variance local compute_validation_loss = function(trainer, conf) util.omp_set_num_threads(4) local value = "misc/digits.png" - local mat = cached(value, make_load_matrix(value), mat_cache) - local ds_tbl = cached(value, make_load_dataset(mat), ds_cache) + local mat = cached(value, make_load_matrix(value), mat_cache) + local ds_tbl = cached(value, make_load_dataset(mat,m2), ds_cache) local in_ds = ds_tbl.val_input local out_ds = ds_tbl.val_output local va_loss_mean,va_loss_var = trainer:validate_dataset{ @@ -155,7 +157,8 @@ local compute_validation_loss = function(trainer, conf) return va_loss_mean,va_loss_var end --- the last argument is the persistent table (allows read/write operations) +-- receives a train_func instance and the persistent table (allows read/write +-- operations) local user_finalfn = function(train_func, conf) print(train_func:get_state_string()) train_func:save("best_func.lua") @@ -173,4 +176,5 @@ return common.make_map_reduce_task_table { generate_new_trainer_and_train_func = generate_new_trainer_and_train_func, compute_gradients_and_loss = compute_gradients_and_loss, compute_validation_loss = compute_validation_loss, + -- smooth_gradients = true, -- by default it is true } From a31a23f4efcce2b91a71c7fa1c1cc5cfd4b5e50a Mon Sep 17 00:00:00 2001 From: Paco Zamora Martinez Date: Mon, 19 May 2014 10:48:14 +0200 Subject: [PATCH 3/3] Updated bunch size --- mapreduce/examples/April-ANN/common.lua | 5 +++-- mapreduce/examples/April-ANN/init.lua | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mapreduce/examples/April-ANN/common.lua b/mapreduce/examples/April-ANN/common.lua index 410b2c2..151e383 100644 --- a/mapreduce/examples/April-ANN/common.lua +++ b/mapreduce/examples/April-ANN/common.lua @@ -133,7 +133,7 @@ local reducefn = function(key, values, emit) end serialize_and_red_emit({ loss:get_accum_loss() }, emit) else - -- accumulate here the shared count + -- accumulate gradients and shared count local t = deserialize_emitted_value(values[1]) local gradient = t[1] local counts = t[2] @@ -172,8 +172,9 @@ local finalfn = function(pairs_iterator) local N = value[2] if not N or N==0 then N=1 end if params.smooth_gradients then -- gradients smoothing - weight_grads[key] = value[1]:scal( 1.0/math.sqrt(N) ) + value[1]:scal( 1.0/math.sqrt(N) ) end + weight_grads[key] = value[1] end end assert(tr_loss_mean) diff --git a/mapreduce/examples/April-ANN/init.lua b/mapreduce/examples/April-ANN/init.lua index 523d938..599aaa4 100644 --- a/mapreduce/examples/April-ANN/init.lua +++ b/mapreduce/examples/April-ANN/init.lua @@ -7,7 +7,7 @@ local NUM_REDUCERS = 10 local EXP_DBHOST = "localhost" local EXP_DBNAME = "exp_digits" -local bunch_size = 32 +local bunch_size = 128 local weights_random = random(1234) local description = "256 inputs 128 tanh 10 log_softmax" local inf = -1