Skip to content

Commit

Permalink
Merge branch 'devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
pakozm committed May 11, 2014
2 parents 4178cf7 + 5395b6a commit e91e4b4
Show file tree
Hide file tree
Showing 16 changed files with 632 additions and 183 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ is stored at auxiliary mongoDB collections.
This software depends in:

- [Lua 5.2](http://www.lua.org/)
- [pakozm/luamongo](https://github.com/pakozm/luamongo/), a fork of
[moai/luamongo](https://github.com/moai/luamongo) for Lua 5.2 and with minor
improvements.
- [luamongo](https://github.com/moai/luamongo/), mongoDB driver
for Lua 5.2.

Installation
------------
Expand Down Expand Up @@ -105,7 +104,8 @@ the same structure, they return a Lua table with two fields:
- **init** function, which receives a table of arguments and allows to configure
your module options, in case that you need any option.

- **func** function, which implements the necessary Lua code.
- A function which implements the necessary Lua code for the operation. The name
of the function is different for each operation.

A map-reduce task is divided, at least, in the following modules:

Expand All @@ -120,7 +120,7 @@ local init = function(arg)
end
return {
init = init,
func = function()
taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
Expand All @@ -130,15 +130,17 @@ return {
```

- **mapfn.lua** is the script where the map function is implemented. The
**func** field is executed as a standard Lua function, and receives tow
arguments `(key,value)` generated by one of the yields at your `taskfn`
script. Map results are produced by calling the global function
**func** field is executed as a standard Lua function, and receives three
arguments `(key,value,emit)`. The first two are generated b
one of the yields at your `taskfn`
script. The third argument is a function. Map results
are produced by calling the function
`emit(key,value)`.

```Lua
return {
init = function() end,
func = function(key,value)
mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
Expand All @@ -161,7 +163,7 @@ local offset_basis = 2166136261
local MAX = 2^32
return {
init = function() end,
func = function(key)
partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
Expand All @@ -186,7 +188,7 @@ return {
```Lua
return {
init = function() end,
func = function(key,values)
reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
Expand All @@ -204,7 +206,7 @@ return {
```Lua
return {
init = function() end,
func = function(it)
finalfn = function(it)
for key,value in it do
print(value,key)
end
Expand Down
4 changes: 1 addition & 3 deletions examples/WordCount/finalfn.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
local it = 0
return {
init = function() end,
func = function(pairs_iterator)
it = it + 1
finalfn = function(pairs_iterator)
for key,value in pairs_iterator do
print(value,key)
end
Expand Down
48 changes: 48 additions & 0 deletions examples/WordCount/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- string hash function: http://isthe.com/chongo/tech/comp/fnv/
local NUM_REDUCERS = 10
local FNV_prime = 16777619
local offset_basis = 2166136261
local MAX = 2^32
return {
-- arg is for configuration purposes, it will be executed with init_args given
-- to the server
init = function(arg) end,

taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
coroutine.yield(4,"mapreduce/utils.lua")
end,

mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
end
end
end,

partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
h = (h * FNV_prime) % MAX
h = bit32.bxor(h, key:byte(i))
end
return h % NUM_REDUCERS
end,

reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
end,

finalfn = function(pairs_iterator)
for key,value in pairs_iterator do
print(value,key)
end
return true -- indicates to remove mongo gridfs result files
end,
}
2 changes: 1 addition & 1 deletion examples/WordCount/mapfn.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
return {
init = function() end,
func = function(key,value)
mapfn = function(key,value,emit)
for line in io.lines(value) do
for w in line:gmatch("[^%s]+") do
emit(w,1)
Expand Down
2 changes: 1 addition & 1 deletion examples/WordCount/partitionfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ local offset_basis = 2166136261
local MAX = 2^32
return {
init = function() end,
func = function(key)
partitionfn = function(key)
-- compute hash
local h = offset_basis
for i=1,#key do
Expand Down
2 changes: 1 addition & 1 deletion examples/WordCount/reducefn.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
return {
init = function() end,
func = function(key,values)
reducefn = function(key,values)
local count=0
for _,v in ipairs(values) do count = count + v end
return count
Expand Down
2 changes: 1 addition & 1 deletion examples/WordCount/taskfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ local init = function(arg)
end
return {
init = init,
func = function()
taskfn = function()
coroutine.yield(1,"mapreduce/server.lua")
coroutine.yield(2,"mapreduce/worker.lua")
coroutine.yield(3,"mapreduce/test.lua")
Expand Down
2 changes: 1 addition & 1 deletion examples/WordCountBig/taskfn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ return {
-- init is for configuration purposes, it is allowed in any of the scripts
init = function(arg)
end,
func = function()
taskfn = function()
local f = io.popen("ls /home/experimentos/CORPORA/EUROPARL/en-splits/*","r")
local i=0
for filename in f:lines() do
Expand Down
19 changes: 10 additions & 9 deletions execute_server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
--
-- [4] => mapfn Lua module, idem
--
-- [6] => partitionfn Lua module, idem
-- [5] => partitionfn Lua module, idem
--
-- [7] => reducefn Lua module, idem
-- [6] => reducefn Lua module, idem
--
-- [8] => finalfn Lua module, idem
-- [7] => finalfn Lua module, idem
--
-- [9] => result_ns Lua string (OPTIONAL, by default all data will be removed)
-- [8] => result_ns Lua string (OPTIONAL, by default all data will be removed)
--
-- IMPORTANT: the Lua modules (taskfn, mapfn, reducefn, ...) need to be in the
-- LUA_PATH in all the machines where this code need to be executed
Expand All @@ -41,12 +41,13 @@ s:configure{
partitionfn = normalize(partitionfn),
reducefn = normalize(reducefn),
finalfn = normalize(finalfn),
task_args = arg,
map_args = arg,
partition_args = arg,
reduce_args = arg,
final_args = arg,
init_args = arg,
result_ns = result_ns,
-- storage = "gridfs[:PATH]", -- 'gridfs', 'shared', 'sshfs', with the
-- optional string :PATH. if not given PATH will be os.tmpname()
-- storage = "gridfs:/tmp/wordcount",
-- storage = "shared:/home/experimentos/tmp/wordcount",
-- storage = "sshfs:/tmp/wordcount",
}
mapreduce.utils.sleep(4)
s:loop()
18 changes: 18 additions & 0 deletions mapreduce/cnn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ function cnn:get_dbname()
return self.dbname
end

function cnn:insert_error(who,msg)
local ns = string.format("%s.errors", self.dbname)
local db = self:connect()
db:insert(ns, { worker = who, msg = msg })
end

function cnn:get_errors()
local ns = string.format("%s.errors", self.dbname)
local db = self:connect()
return db:query(ns, {})
end

function cnn:remove_errors(ids)
local ns = string.format("%s.errors", self.dbname)
local db = self:connect()
db:remove(ns,{ _id = { ["$in"] = ids } })
end

function cnn:annotate_insert(ns,tbl,callback)
self.pending_inserts = self.pending_inserts or {}
self.pending_callbacks = self.pending_callbacks or {}
Expand Down
Loading

0 comments on commit e91e4b4

Please sign in to comment.