-
Notifications
You must be signed in to change notification settings - Fork 8
/
lua-mapreduce-client.lua
329 lines (285 loc) · 9.93 KB
/
lua-mapreduce-client.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#!/usr/bin/lua
-------------------------------------------------------------------------------
--
-- @script: lua-mapreduce-client.lua
--
-- @author: Rohit joshi
--
-- @copyright Joshi Ventures LLC � 2012
--
-- @license Apache License, Version 2.0
--
-- VERSION HISTORY:
-- 1.0 8/09/2012 - Initial release
--
-------------------------------------------------------------------------------
-- Purpose: It is a worker for mapreduce functionality.
-- It receives the task from service and process requested taks
-- either map or reduce
-------------------------------------------------------------------------------
-- lua lanes
local lanes = require("lanes")
lanes.configure({with_timers=false})
--- depends on logging
require "logging.console"
--- utils.lua
require "utils/utils"
--- requires serialize.lua
require "utils/serialize"
--- declare a logger instance. You can change it to file or other supported
local logger = logging.console()
logger:setLevel (logging.WARN)
local socket = require("socket")
local mapfn
local co_mapfn
local reducefn
local co_reducefn
local taskfile_loaded
------------------------------------------------------------------------------
--- load task file
--- @param task file
------------------------------------------------------------------------------
local function load_taskfile(file)
local f = assert(io.open(file, "r"))
local content = f:read("*all")
f:close()
print(content)
local source = assert(loadstring(content))
return source
end
------------------------------------------------------------------------------
--- Send Map Result
-- @return content of the task file
------------------------------------------------------------------------------
local function client_send_map_result(tcp, key, k, v)
local t = {}
local kv = {}
t['k']=key
kv[k]=v
t["v"] = kv
local value = serialize(t)
logger:debug("Sending map result: " .. value)
local bytes_sent, status = tcp:send(value .. "\r\n")
-- logger:debug("bytes sent: " .. bytes_sent .. ", bytes expected:" .. string.len(msg))
return status
end
------------------------------------------------------------------------------
--- Send Reduce Result
-- @return content of the task file
------------------------------------------------------------------------------
local function client_send_reduce_result(tcp, key, value)
local t = {}
t['k']=key
t['v']=value
local msg = serialize(t)
logger:debug("Sending reduce result:" .. msg)
return tcp:send(msg .. "\r\n")
end
------------------------------------------------------------------------------
--- client_loop: client is connected to the server and processing messages
------------------------------------------------------------------------------
local function client_run_loop(tcp, host, port)
local task_file_content, status
repeat
-- read command
logger:debug("Waiting to receive taskfile from the server:" .. host .. ":" .. port)
local data, status = tcp:receive("*l")
if(status == "closed") then
logger:error("Connection closed by foreign host.")
return status;
end
logger:debug("Received data " .. data)
local task_t = loadstring(data)()
local command = task_t['c']
local len = tonumber(task_t['l'])
logger:debug("Received command:" .. command .. ", payload length:" .. len)
--local command = "map"
if(command ~= "taskfile") then
tcp:send("error:invalid command. expected taskfile. received:" .. command .. "\r\n")
else
task_file_content, status = tcp:receive( len )
if(status == "closed") then
logger:error("Connection closed by foreign host while receiving task file")
return status;
end
local bytes_sent, status = tcp:send("OK," .. len .. "\r\n")
if(status == "closed") then
logger:error("Connection closed by foreign host while sending OK response for taskfile content receipt")
return status;
end
logger:debug("taskfile loaded successfully")
end
until task_file_content ~= nil
task_file_loaded = assert(loadstring( task_file_content))()
local mr_t = mapreducefn()
mapfn = mr_t.mapfn
reducefn = mr_t.reducefn
while true do
-- read command
logger:debug("Waiting to receive task (map/reduce) from the server:" .. host .. ":" .. port)
local data, status = tcp:receive("*l")
if(status == "closed") then
logger:error("Connection closed by foreign host.")
return status;
end
logger:debug("Received data " .. data)
local task_t = loadstring(data)()
local command = task_t['c']
local key = task_t['k']
local len = tonumber(task_t['l'])
logger:info("Received command:" .. command .. ", Key:" .. key .. ", payload length:" .. len)
--local command = "map"
if(command == "map") then
if mapfn then
local map_data, status = tcp:receive( len )
if(status == "closed") then
logger:error("Connection closed by foreign host while receiving map content for key:" .. key)
return status;
end
local bytes_sent, status = tcp:send("OK," .. len .. "\r\n")
if(status == "closed") then
logger:error("Connection closed by foreign host while sending OK response for map content receipt for key:" .. key)
return status;
end
local map_data_t = loadstring(map_data)()
local map_value = map_data_t[key]
logger:debug("Received map data:" .. map_value)
co_mapfn = coroutine.create(mapfn)
repeat
logger:debug("Calling mapfn...")
local ok, k, v = coroutine.resume(co_mapfn, key, map_value)
if(k ~= nil and v ~= nil) then
local s= client_send_map_result(tcp, key, k, v)
if(status == "closed") then
logger:error("Connection closed by foreign host while sending map result with key:" .. key .. ":" .. k)
return status;
end
end
until (ok ~= true or k == nil or v == nil)
logger:debug("Sending map completed status for key:" .. key)
local bytes_sent, status = tcp:send("map:completed:" .. key .. "\r\n")
if(status == "closed") then
logger:error("Connection closed by foreign host while sending map:completed status for key:" .. key)
return status;
end
else
logger:error("map function is not defined in the taskfile but still received map command")
end
end
if(command == "reduce") then
if reducefn then
-- logger:debug("Receiving reduce task payload lenth:" .. len)
local value, status = tcp:receive(len)
local r_v = loadstring(value)()
co_reducefn = coroutine.create(reducefn)
repeat
local ok, k, v = coroutine.resume(co_reducefn, key, r_v)
if(k ~= nil and v ~= nil) then
local s= client_send_reduce_result(tcp, k, v)
if(status == "closed") then
logger:error("Connection closed by foreign host while sending reduce:completed status for key:" .. key)
return status;
end
end
until (ok ~= true or k == nil or v == nil)
local bytes_sent, status = tcp:send("reduce:completed:" .. key .. "\r\n")
if(status == "closed") then
logger:error("Connection closed by foreign host while sending reduce:completed status for key:" .. key)
return status;
end
else
logger:error("reduce function is not defined in the taskfile but still received reduce command")
end
end
socket.select(nil, nil, 1)
end
end
------------------------------------------------------------------------------
--- Validate arguments
-- @return host, port and task_file
------------------------------------------------------------------------------
local function client_Validate_args()
local usage = "Usage lua-mapreduce-client.lua -s host -p port [-l loglevel -n number of client connections] "
local opts = getopt( arg, "hpsln" )
if(opts["h"] ~= nil) then
print(usage)
return;
end
-- get host
local host = opts["s"]
if(host == nil) then host = "127.0.0.1" end
-- get port
local port = opts["p"]
if( port == nil ) then port = "10000" end
local loglevel = opts["l"]
if(loglevel == nil) then
loglevel = "warn"
elseif(loglevel ~= "debug" and loglevel ~= "info" and loglevel ~= "warn" and loglevel ~= "error") then
print("Error: Invalid loglevel: " .. loglevel .. ". Valid options are debug, info, warn or error")
return;
end
local num_connections = opts["n"]
return host, port, loglevel, num_connections
end
------------------------------------------------------------------------------
--- main function (entry point)
-- @return content of the task file
------------------------------------------------------------------------------
function client_connection(host, port)
local tcp = assert(socket.tcp())
tcp:setoption('tcp-nodelay', true)
local reconnect = true
while true do
-- set timeout to non-blocking
if(reconnect) then
repeat
tcp:settimeout(1)
logger:debug("Connecting to server:" .. host .. ":" .. port)
local c, status = tcp:connect(host, port);
if(c == nil) then
logger:debug("Failed to connect. status:" .. status)
tcp:close()
tcp = assert(socket.tcp())
socket.select(nil, nil, 5)
else
logger:info("Connected to server:" .. host .. ":" .. port)
end
until c ~= nil
end
reconnect = false;
--reset timeout to nil (blocking)
tcp:settimeout(nil)
local cl = os.clock()
local status = client_run_loop(tcp, host, port)
print("Total time to process " .. os.clock() -cl)
if(status == "closed") then
reconnect = true;
end
end
end
-- validate args
local host, port, loglevel, num_connections = client_Validate_args()
if(host == nil or port == nil or loglevel == nil) then
return;
end
-- set the log level
set_loglevel(logger, loglevel)
-- client_connection(host, port)
if not num_connections then
num_connections = get_num_cores() --one connection per core
logger:info("number of cores on this machine " .. num_connections)
else
num_connections = tonumber(num_connections)
end
local conn_t = {}
if num_connections < 2 then
client_connection(host, port)
else
logger:info("Number of connections " .. num_connections)
for i= 1, num_connections do
table.insert(conn_t, lanes.gen(client_connection(host, port)))
end
for i =1, #conn_t do
conn_t[i]:join()
end
end