From eaffa3e91335d097dbfc97e733083e5f9ebc87d4 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 20 Dec 2024 18:29:32 +0800 Subject: [PATCH 01/50] self:push_result --- kong/clustering/rpc/socket.lua | 48 ++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2044acf170a..0e58c23404e 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -11,6 +11,7 @@ local utils = require("kong.clustering.rpc.utils") local queue = require("kong.clustering.rpc.queue") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") +--local isarray = require("table.isarray") local assert = assert @@ -73,10 +74,10 @@ function _M._dispatch(premature, self, cb, payload) return end - res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, - err)) + res, err = self:push_result(new_error(payload.id, jsonrpc.SERVER_ERROR, err), + "[rpc] unable to push RPC call error: ") if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) + ngx_log(ngx_WARN, err) end return @@ -89,17 +90,27 @@ function _M._dispatch(premature, self, cb, payload) end -- success - res, err = self.outgoing:push({ + res, err = self:push_result({ jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, - }) + }, "[rpc] unable to push RPC call result: ") if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err) + ngx_log(ngx_WARN, err) end end +function _M:push_result(msg, err_prefix) + local res, err = self.outgoing:push(msg) + if not res then + return nil, err_prefix .. err + end + + return true +end + + -- start reader and writer thread and event loop function _M:start() self.read_thread = ngx.thread.spawn(function() @@ -120,9 +131,9 @@ function _M:start() end if waited > CLUSTERING_PING_INTERVAL then - local res, err = self.outgoing:push(PING_TYPE) + local res, err = self:push_result(PING_TYPE, "unable to send ping: ") if not res then - return nil, "unable to send ping: " .. err + return nil, err end end @@ -133,9 +144,9 @@ function _M:start() last_seen = ngx_time() if typ == "ping" then - local res, err = self.outgoing:push(PONG_TYPE) + local res, err = self:push_result(PONG_TYPE, "unable to handle ping: ") if not res then - return nil, "unable to handle ping: " .. err + return nil, err end goto continue @@ -154,6 +165,13 @@ function _M:start() assert(typ == "binary") local payload = decompress_payload(data) + + -- rpc batching + --if isarray(payload) then + -- for _, v in ipairs(payload) do + -- end + --end -- isarray + assert(payload.jsonrpc == jsonrpc.VERSION) if payload.method then @@ -163,9 +181,10 @@ function _M:start() local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb and payload.id then - local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) + local res, err = self:push_result(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND), + "unable to send \"METHOD_NOT_FOUND\" error back to client: ") if not res then - return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err + return nil, err end goto continue @@ -176,9 +195,10 @@ function _M:start() self.node_id, payload.id or 0, payload.method), 0, _M._dispatch, self, dispatch_cb, payload) if not res and payload.id then - local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) + local reso, erro = self:push_result(new_error(payload.id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ") if not reso then - return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro + return nil, erro end return nil, "unable to dispatch JSON-RPC callback: " .. err From 0f613c084289cca0878d6804d4e26396e213a72a Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 21 Dec 2024 09:35:41 +0800 Subject: [PATCH 02/50] push_request --- kong/clustering/rpc/socket.lua | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 0e58c23404e..4e2d589fe6b 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -60,6 +60,21 @@ function _M:_get_next_id() end +function _M:push_request(msg) + return self.outgoing:push(msg) +end + + +function _M:push_result(msg, err_prefix) + local res, err = self.outgoing:push(msg) + if not res then + return nil, err_prefix .. err + end + + return true +end + + function _M._dispatch(premature, self, cb, payload) if premature then return @@ -101,16 +116,6 @@ function _M._dispatch(premature, self, cb, payload) end -function _M:push_result(msg, err_prefix) - local res, err = self.outgoing:push(msg) - if not res then - return nil, err_prefix .. err - end - - return true -end - - -- start reader and writer thread and event loop function _M:start() self.read_thread = ngx.thread.spawn(function() @@ -310,7 +315,7 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback end - return self.outgoing:push({ + return self:push_request({ jsonrpc = jsonrpc.VERSION, method = method, params = params, From fc69a8ae0b604086e033dbbd8786ff472a1f0528 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 21 Dec 2024 16:44:54 +0800 Subject: [PATCH 03/50] process_rpc_msg --- kong/clustering/rpc/socket.lua | 110 ++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 50 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 4e2d589fe6b..2fbccce5993 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -116,6 +116,63 @@ function _M._dispatch(premature, self, cb, payload) end +function _M:process_rpc_msg(payload) + assert(payload.jsonrpc == jsonrpc.VERSION) + + local payload_id = payload.id + + if payload.method then + -- invoke + + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload_id, ")") + + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] + if not dispatch_cb and payload_id then + local res, err = self:push_result(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), + "unable to send \"METHOD_NOT_FOUND\" error back to client: ") + if not res then + return nil, err + end + + return true + end + + -- call dispatch + local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload_id or 0, payload.method), + 0, _M._dispatch, self, dispatch_cb, payload) + if not res and payload_id then + local reso, erro = self:push_result(new_error(payload_id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ") + if not reso then + return nil, erro + end + + return nil, "unable to dispatch JSON-RPC callback: " .. err + end + + else + -- response + local interest_cb = self.interest[payload_id] + self.interest[payload_id] = nil -- edge trigger only once + + if not interest_cb then + ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload_id, ", dropping it") + + return true + end + + local res, err = interest_cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", + payload_id, ", err: ", err) + end + end -- if payload.method + + return true +end + + -- start reader and writer thread and event loop function _M:start() self.read_thread = ngx.thread.spawn(function() @@ -177,57 +234,10 @@ function _M:start() -- end --end -- isarray - assert(payload.jsonrpc == jsonrpc.VERSION) - - if payload.method then - -- invoke - - ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") - - local dispatch_cb = self.manager.callbacks.callbacks[payload.method] - if not dispatch_cb and payload.id then - local res, err = self:push_result(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND), - "unable to send \"METHOD_NOT_FOUND\" error back to client: ") - if not res then - return nil, err - end - - goto continue - end - - -- call dispatch - local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload.id or 0, payload.method), - 0, _M._dispatch, self, dispatch_cb, payload) - if not res and payload.id then - local reso, erro = self:push_result(new_error(payload.id, jsonrpc.INTERNAL_ERROR), - "unable to send \"INTERNAL_ERROR\" error back to client: ") - if not reso then - return nil, erro - end - - return nil, "unable to dispatch JSON-RPC callback: " .. err - end - - else - -- response - local interest_cb = self.interest[payload.id] - self.interest[payload.id] = nil -- edge trigger only once - - if not interest_cb then - ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it") - - goto continue - end - - local res, err = interest_cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", - payload.id, ", err: ", err) - end + local ok, err = self:process_rpc_msg(payload) + if not ok then + return nil, err end - - ::continue:: end end) From 493d0bd9e4f4f0eca775492cc5373260e4c1db46 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 21 Dec 2024 19:39:58 +0800 Subject: [PATCH 04/50] lint fix --- kong/clustering/rpc/socket.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2fbccce5993..17f69c5f887 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -238,6 +238,8 @@ function _M:start() if not ok then return nil, err end + + ::continue:: end end) From de125d61e3147d62e41c6e3d616f6829b0ba0a7f Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 11:25:17 +0800 Subject: [PATCH 05/50] payload_array --- kong/clustering/rpc/socket.lua | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 17f69c5f887..8f9b86863e8 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -177,6 +177,7 @@ end function _M:start() self.read_thread = ngx.thread.spawn(function() local last_seen = ngx_time() + local payload_array = {} while not exiting() do local data, typ, err = self.wb:recv_frame() @@ -226,17 +227,24 @@ function _M:start() assert(typ == "binary") + local payloads local payload = decompress_payload(data) - -- rpc batching - --if isarray(payload) then - -- for _, v in ipairs(payload) do - -- end - --end -- isarray + if isarray(payload) then + -- rpc batching + payloads = payload - local ok, err = self:process_rpc_msg(payload) - if not ok then - return nil, err + else + -- only one rpc msg + payload_array[1] = payload + payloads = payload_array + end -- isarray + + for _, v in ipairs(payloads) do + local ok, err = self:process_rpc_msg(payload) + if not ok then + return nil, err + end end ::continue:: From 460e4046662d57f83bd084c6f5b38e3e907f4e90 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 11:29:07 +0800 Subject: [PATCH 06/50] typo fix --- kong/clustering/rpc/socket.lua | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 8f9b86863e8..e5131769ec0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -228,6 +228,8 @@ function _M:start() assert(typ == "binary") local payloads + local results = {} + local payload = decompress_payload(data) if isarray(payload) then @@ -241,7 +243,7 @@ function _M:start() end -- isarray for _, v in ipairs(payloads) do - local ok, err = self:process_rpc_msg(payload) + local ok, err = self:process_rpc_msg(v) if not ok then return nil, err end From b5af4bad3cbd1e1c8e5b2b7b5636985e03f79c46 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 11:31:51 +0800 Subject: [PATCH 07/50] lint fix --- kong/clustering/rpc/socket.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index e5131769ec0..ef8f782d0a0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -228,7 +228,7 @@ function _M:start() assert(typ == "binary") local payloads - local results = {} + --local results = {} local payload = decompress_payload(data) From 36ec7c7cabfc0cf1e0721a95fdee8212b595e288 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 17:47:38 +0800 Subject: [PATCH 08/50] fix mistake --- kong/clustering/rpc/socket.lua | 61 ++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index ef8f782d0a0..7d864e69582 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -11,7 +11,7 @@ local utils = require("kong.clustering.rpc.utils") local queue = require("kong.clustering.rpc.queue") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") ---local isarray = require("table.isarray") +local isarray = require("table.isarray") local assert = assert @@ -65,7 +65,13 @@ function _M:push_request(msg) end -function _M:push_result(msg, err_prefix) +function _M:push_result(msg, err_prefix, results) + -- may be a batch + if results then + table.insert(results, msg) + return true + end + local res, err = self.outgoing:push(msg) if not res then return nil, err_prefix .. err @@ -75,7 +81,7 @@ function _M:push_result(msg, err_prefix) end -function _M._dispatch(premature, self, cb, payload) +function _M._dispatch(premature, self, cb, payload, results) if premature then return end @@ -90,7 +96,8 @@ function _M._dispatch(premature, self, cb, payload) end res, err = self:push_result(new_error(payload.id, jsonrpc.SERVER_ERROR, err), - "[rpc] unable to push RPC call error: ") + "[rpc] unable to push RPC call error: ", + results) if not res then ngx_log(ngx_WARN, err) end @@ -109,14 +116,14 @@ function _M._dispatch(premature, self, cb, payload) jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, - }, "[rpc] unable to push RPC call result: ") + }, "[rpc] unable to push RPC call result: ", results) if not res then ngx_log(ngx_WARN, err) end end -function _M:process_rpc_msg(payload) +function _M:process_rpc_msg(payload, results) assert(payload.jsonrpc == jsonrpc.VERSION) local payload_id = payload.id @@ -129,7 +136,8 @@ function _M:process_rpc_msg(payload) local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb and payload_id then local res, err = self:push_result(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), - "unable to send \"METHOD_NOT_FOUND\" error back to client: ") + "unable to send \"METHOD_NOT_FOUND\" error back to client: ", + results) if not res then return nil, err end @@ -140,10 +148,11 @@ function _M:process_rpc_msg(payload) -- call dispatch local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", self.node_id, payload_id or 0, payload.method), - 0, _M._dispatch, self, dispatch_cb, payload) + 0, _M._dispatch, self, dispatch_cb, payload, results) if not res and payload_id then local reso, erro = self:push_result(new_error(payload_id, jsonrpc.INTERNAL_ERROR), - "unable to send \"INTERNAL_ERROR\" error back to client: ") + "unable to send \"INTERNAL_ERROR\" error back to client: ", + results) if not reso then return nil, erro end @@ -227,28 +236,36 @@ function _M:start() assert(typ == "binary") - local payloads - --local results = {} - local payload = decompress_payload(data) - if isarray(payload) then - -- rpc batching - payloads = payload + if not isarray(payload) then + local ok, err = self:process_rpc_msg(v) + if not ok then + return nil, err + end - else - -- only one rpc msg - payload_array[1] = payload - payloads = payload_array - end -- isarray + goto continue + end - for _, v in ipairs(payloads) do - local ok, err = self:process_rpc_msg(v) + -- rpc batching + local results = {} + + for _, v in ipairs(payload) do + local ok, err = self:process_rpc_msg(v, results) if not ok then return nil, err end end + if #results == 0 then + goto continue + end + + local res, err = self.outgoing:push(results) + if not res then + return nil, "[rpc] unable to push RPC call result: " .. err + end + ::continue:: end end) From d6c1f0a4005ddc653d06bc287e26e015e6097452 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 17:50:34 +0800 Subject: [PATCH 09/50] lint fix --- kong/clustering/rpc/socket.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 7d864e69582..cdcf5b3b9f7 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -186,7 +186,6 @@ end function _M:start() self.read_thread = ngx.thread.spawn(function() local last_seen = ngx_time() - local payload_array = {} while not exiting() do local data, typ, err = self.wb:recv_frame() From d739a7f654cbe4b28f40769fa74df809a38a0800 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 18:20:31 +0800 Subject: [PATCH 10/50] lint fix --- kong/clustering/rpc/socket.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index cdcf5b3b9f7..06d6bdbefe3 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -238,7 +238,7 @@ function _M:start() local payload = decompress_payload(data) if not isarray(payload) then - local ok, err = self:process_rpc_msg(v) + local ok, err = self:process_rpc_msg(payload) if not ok then return nil, err end From cc9a6e337fd6e33a79f348fd46560087763af32a Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 18:43:17 +0800 Subject: [PATCH 11/50] isempty --- kong/clustering/rpc/socket.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 06d6bdbefe3..5f1853f26e0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -12,6 +12,7 @@ local queue = require("kong.clustering.rpc.queue") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") local isarray = require("table.isarray") +local isempty = require("table.isempty") local assert = assert @@ -256,7 +257,7 @@ function _M:start() end end - if #results == 0 then + if isempty(results) then goto continue end From bca76b86254b524adc9d68dfb3642809a0b1aa21 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 18:45:58 +0800 Subject: [PATCH 12/50] payload_method --- kong/clustering/rpc/socket.lua | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 5f1853f26e0..a394ceeaa66 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -128,13 +128,14 @@ function _M:process_rpc_msg(payload, results) assert(payload.jsonrpc == jsonrpc.VERSION) local payload_id = payload.id + local payload_method = payload.method - if payload.method then + if payload_method then -- invoke - ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload_id, ")") + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")") - local dispatch_cb = self.manager.callbacks.callbacks[payload.method] + local dispatch_cb = self.manager.callbacks.callbacks[payload_method] if not dispatch_cb and payload_id then local res, err = self:push_result(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), "unable to send \"METHOD_NOT_FOUND\" error back to client: ", @@ -148,7 +149,7 @@ function _M:process_rpc_msg(payload, results) -- call dispatch local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload_id or 0, payload.method), + self.node_id, payload_id or 0, payload_method), 0, _M._dispatch, self, dispatch_cb, payload, results) if not res and payload_id then local reso, erro = self:push_result(new_error(payload_id, jsonrpc.INTERNAL_ERROR), From 3c4e27c9585aea5fb9e80680929aa8ec44f76fbb Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 22 Dec 2024 18:53:29 +0800 Subject: [PATCH 13/50] clean --- kong/clustering/rpc/socket.lua | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index a394ceeaa66..62c3c7dc6d2 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -262,9 +262,10 @@ function _M:start() goto continue end - local res, err = self.outgoing:push(results) + local res, err = self:push_result(results, + "[rpc] unable to push RPC call result: ") if not res then - return nil, "[rpc] unable to push RPC call result: " .. err + return nil, err end ::continue:: From 5a6fca8f6b27fe90c80e0eb77dc5e10b0443a336 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 23 Dec 2024 19:20:59 +0800 Subject: [PATCH 14/50] push_response --- kong/clustering/rpc/socket.lua | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 62c3c7dc6d2..9d4cb618451 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -66,7 +66,7 @@ function _M:push_request(msg) end -function _M:push_result(msg, err_prefix, results) +function _M:push_response(msg, err_prefix, results) -- may be a batch if results then table.insert(results, msg) @@ -96,9 +96,9 @@ function _M._dispatch(premature, self, cb, payload, results) return end - res, err = self:push_result(new_error(payload.id, jsonrpc.SERVER_ERROR, err), - "[rpc] unable to push RPC call error: ", - results) + res, err = self:push_response(new_error(payload.id, jsonrpc.SERVER_ERROR, err), + "[rpc] unable to push RPC call error: ", + results) if not res then ngx_log(ngx_WARN, err) end @@ -113,7 +113,7 @@ function _M._dispatch(premature, self, cb, payload, results) end -- success - res, err = self:push_result({ + res, err = self:push_response({ jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, @@ -137,9 +137,9 @@ function _M:process_rpc_msg(payload, results) local dispatch_cb = self.manager.callbacks.callbacks[payload_method] if not dispatch_cb and payload_id then - local res, err = self:push_result(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), - "unable to send \"METHOD_NOT_FOUND\" error back to client: ", - results) + local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), + "unable to send \"METHOD_NOT_FOUND\" error back to client: ", + results) if not res then return nil, err end @@ -152,9 +152,9 @@ function _M:process_rpc_msg(payload, results) self.node_id, payload_id or 0, payload_method), 0, _M._dispatch, self, dispatch_cb, payload, results) if not res and payload_id then - local reso, erro = self:push_result(new_error(payload_id, jsonrpc.INTERNAL_ERROR), - "unable to send \"INTERNAL_ERROR\" error back to client: ", - results) + local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ", + results) if not reso then return nil, erro end @@ -204,7 +204,7 @@ function _M:start() end if waited > CLUSTERING_PING_INTERVAL then - local res, err = self:push_result(PING_TYPE, "unable to send ping: ") + local res, err = self:push_response(PING_TYPE, "unable to send ping: ") if not res then return nil, err end @@ -217,7 +217,7 @@ function _M:start() last_seen = ngx_time() if typ == "ping" then - local res, err = self:push_result(PONG_TYPE, "unable to handle ping: ") + local res, err = self:push_response(PONG_TYPE, "unable to handle ping: ") if not res then return nil, err end @@ -262,8 +262,8 @@ function _M:start() goto continue end - local res, err = self:push_result(results, - "[rpc] unable to push RPC call result: ") + local res, err = self:push_response(results, + "[rpc] unable to push RPC call result: ") if not res then return nil, err end From 146a2bb0d1ae99aeb2d91e2ac5ab523709938b80 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 24 Dec 2024 11:04:14 +0800 Subject: [PATCH 15/50] clean --- kong/clustering/rpc/socket.lua | 35 +++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 9d4cb618451..f15139d0597 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -13,6 +13,7 @@ local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") local isarray = require("table.isarray") local isempty = require("table.isempty") +local tb_insert = table.insert local assert = assert @@ -66,10 +67,10 @@ function _M:push_request(msg) end -function _M:push_response(msg, err_prefix, results) +function _M:push_response(msg, err_prefix, collection) -- may be a batch - if results then - table.insert(results, msg) + if collection then + tb_insert(collection, msg) return true end @@ -82,7 +83,7 @@ function _M:push_response(msg, err_prefix, results) end -function _M._dispatch(premature, self, cb, payload, results) +function _M._dispatch(premature, self, cb, payload, collection) if premature then return end @@ -98,7 +99,7 @@ function _M._dispatch(premature, self, cb, payload, results) res, err = self:push_response(new_error(payload.id, jsonrpc.SERVER_ERROR, err), "[rpc] unable to push RPC call error: ", - results) + collection) if not res then ngx_log(ngx_WARN, err) end @@ -117,14 +118,14 @@ function _M._dispatch(premature, self, cb, payload, results) jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, - }, "[rpc] unable to push RPC call result: ", results) + }, "[rpc] unable to push RPC call result: ", collection) if not res then ngx_log(ngx_WARN, err) end end -function _M:process_rpc_msg(payload, results) +function _M:process_rpc_msg(payload, collection) assert(payload.jsonrpc == jsonrpc.VERSION) local payload_id = payload.id @@ -139,7 +140,7 @@ function _M:process_rpc_msg(payload, results) if not dispatch_cb and payload_id then local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), "unable to send \"METHOD_NOT_FOUND\" error back to client: ", - results) + collection) if not res then return nil, err end @@ -150,11 +151,11 @@ function _M:process_rpc_msg(payload, results) -- call dispatch local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", self.node_id, payload_id or 0, payload_method), - 0, _M._dispatch, self, dispatch_cb, payload, results) + 0, _M._dispatch, self, dispatch_cb, payload, collection) if not res and payload_id then local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), "unable to send \"INTERNAL_ERROR\" error back to client: ", - results) + collection) if not reso then return nil, erro end @@ -239,6 +240,7 @@ function _M:start() local payload = decompress_payload(data) + -- single rpc call if not isarray(payload) then local ok, err = self:process_rpc_msg(payload) if not ok then @@ -248,21 +250,24 @@ function _M:start() goto continue end - -- rpc batching - local results = {} + -- batch rpc call + local collection = {} for _, v in ipairs(payload) do - local ok, err = self:process_rpc_msg(v, results) + local ok, err = self:process_rpc_msg(v, collection) if not ok then return nil, err end end - if isempty(results) then + -- may be all notifications + if isempty(collection) then goto continue end - local res, err = self:push_response(results, + assert(isarray(collection)) + + local res, err = self:push_response(collection, "[rpc] unable to push RPC call result: ") if not res then return nil, err From dc3f34889a0a271a0784b16a3a69fdc5d93a1cc5 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 27 Dec 2024 15:43:38 +0800 Subject: [PATCH 16/50] check payload --- kong/clustering/rpc/socket.lua | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index f15139d0597..cab9e8bf8e6 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -16,6 +16,7 @@ local isempty = require("table.isempty") local tb_insert = table.insert +local type = type local assert = assert local unpack = unpack local string_format = string.format @@ -126,6 +127,17 @@ end function _M:process_rpc_msg(payload, collection) + if type(payload) ~= "table" then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), + collection) + if not res then + return nil, err + end + + return true + end + assert(payload.jsonrpc == jsonrpc.VERSION) local payload_id = payload.id From 063ef0274723e6353ebd48ad7eac2df0cbdcc185 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 27 Dec 2024 15:46:38 +0800 Subject: [PATCH 17/50] isempty payload --- kong/clustering/rpc/socket.lua | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index cab9e8bf8e6..dcc20f76781 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -262,7 +262,20 @@ function _M:start() goto continue end + -- rpc call with an empty Array + if isempty(payload) then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), + collection) + if not res then + return nil, err + end + + goto continue + end + -- batch rpc call + local collection = {} for _, v in ipairs(payload) do From ceac4d9c1efb6707237aaa1548c299f1d0884644 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 27 Dec 2024 15:57:41 +0800 Subject: [PATCH 18/50] lint fix --- kong/clustering/rpc/socket.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index dcc20f76781..48e2a771078 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -265,8 +265,7 @@ function _M:start() -- rpc call with an empty Array if isempty(payload) then local res, err = self:push_response( - new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), - collection) + new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request")) if not res then return nil, err end From 7c688f47bb6bba8cdde8f6f77f580e4d173bd593 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 30 Dec 2024 15:53:05 +0800 Subject: [PATCH 19/50] comments --- kong/clustering/rpc/socket.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 48e2a771078..2ef6481a107 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -68,6 +68,8 @@ function _M:push_request(msg) end +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. function _M:push_response(msg, err_prefix, collection) -- may be a batch if collection then From 37d64b2ae1cf66c4725766c7e4ca8d5e48783086 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 30 Dec 2024 17:02:05 +0800 Subject: [PATCH 20/50] err msg --- kong/clustering/rpc/socket.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2ef6481a107..298bac37c5d 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -131,7 +131,7 @@ end function _M:process_rpc_msg(payload, collection) if type(payload) ~= "table" then local res, err = self:push_response( - new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"), + new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"), collection) if not res then return nil, err @@ -267,7 +267,7 @@ function _M:start() -- rpc call with an empty Array if isempty(payload) then local res, err = self:push_response( - new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request")) + new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) if not res then return nil, err end From 43b259a1bff068b59bf71b67236d43bec7b5afb2 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 5 Jan 2025 16:23:41 +0800 Subject: [PATCH 21/50] push_request --- kong/clustering/rpc/socket.lua | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 298bac37c5d..3a43c109f3d 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -63,7 +63,15 @@ function _M:_get_next_id() end -function _M:push_request(msg) +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. +function _M:push_request(msg, collection) + -- may be a batch + if collection then + tb_insert(collection, msg) + return true + end + return self.outgoing:push(msg) end @@ -376,7 +384,7 @@ end -- needed for this implementation, but it is important -- for concentrator socket, so we include it just to keep -- the signature consistent -function _M:call(node_id, method, params, callback) +function _M:call(node_id, method, params, callback, collection) assert(node_id == self.node_id) local id @@ -392,7 +400,7 @@ function _M:call(node_id, method, params, callback) method = method, params = params, id = id, - }) + }, collection) end From 072d1720dd1842e5215fb9a79b3c9c031220007f Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 6 Jan 2025 20:14:35 +0800 Subject: [PATCH 22/50] bach sync call --- kong/clustering/rpc/socket.lua | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 3a43c109f3d..48f1864e3fb 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -170,10 +170,24 @@ function _M:process_rpc_msg(payload, collection) return true end + local res, err + -- call dispatch - local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload_id or 0, payload_method), - 0, _M._dispatch, self, dispatch_cb, payload, collection) + + if collection then + + -- collection is not nil, it means it is a batch call + -- we should call sync function + res, err = _M._dispatch(nil, self, dispatch_cb, payload, collection) + else + + -- collection is nil, it means it is a single call + -- we should call async function + local name = string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload_id or 0, payload_method) + res, err = kong.timer:named_at(name, 0, _M._dispatch, self, dispatch_cb, payload) + end + if not res and payload_id then local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), "unable to send \"INTERNAL_ERROR\" error back to client: ", From 8c241bd39bdfedd0ed9dfde691d2b620bd15bebd Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 07:43:49 +0800 Subject: [PATCH 23/50] Revert "push_request" This reverts commit ec806ab09df10c1c9c2d4a1279678586ceb8e4bc. --- kong/clustering/rpc/socket.lua | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 48f1864e3fb..f46734bb838 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -63,15 +63,7 @@ function _M:_get_next_id() end --- collection is only for rpc batch call. --- if collection is nil, it means the rpc is a single call. -function _M:push_request(msg, collection) - -- may be a batch - if collection then - tb_insert(collection, msg) - return true - end - +function _M:push_request(msg) return self.outgoing:push(msg) end @@ -398,7 +390,7 @@ end -- needed for this implementation, but it is important -- for concentrator socket, so we include it just to keep -- the signature consistent -function _M:call(node_id, method, params, callback, collection) +function _M:call(node_id, method, params, callback) assert(node_id == self.node_id) local id @@ -414,7 +406,7 @@ function _M:call(node_id, method, params, callback, collection) method = method, params = params, id = id, - }, collection) + }) end From bd8c3b8ae650fc8664e93d5ba833931264bb0933 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 08:22:21 +0800 Subject: [PATCH 24/50] rpc.set_batch --- kong/clustering/rpc/manager.lua | 8 ++++ kong/clustering/rpc/socket.lua | 75 +++++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index f1918398c76..7abc67a45e7 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -56,6 +56,8 @@ function _M.new(conf, node_id) cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), callbacks = callbacks.new(), + + batch_size = 0, -- rpc batching } if conf.role == "control_plane" then @@ -625,4 +627,10 @@ function _M:get_peer_info(node_id) end +function _M:set_batch(n) + assert(type(n) == "number" and n >= 0) + self.batch_size = n +end + + return _M diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index f46734bb838..14313dbbb8b 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -13,6 +13,7 @@ local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") local isarray = require("table.isarray") local isempty = require("table.isempty") +local tb_clear = require("table.clear") local tb_insert = table.insert @@ -34,6 +35,7 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 local PING_TYPE = "PING" local PONG_TYPE = "PONG" +--local BATCH_DONE = "BATCH_DONE" local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG @@ -192,7 +194,7 @@ function _M:process_rpc_msg(payload, collection) end else - -- response + -- response, don't care about `collection` local interest_cb = self.interest[payload_id] self.interest[payload_id] = nil -- edge trigger only once @@ -300,7 +302,7 @@ function _M:start() end end - -- may be all notifications + -- may be responses or all notifications if isempty(collection) then goto continue end @@ -317,13 +319,77 @@ function _M:start() end end) + local batch_requests = {} + self.write_thread = ngx.thread.spawn(function() while not exiting() do - local payload, err = self.outgoing:pop(5) + -- 0.5 seconds for not waiting too long + local payload, err = self.outgoing:pop(0.5) if err then return nil, err end + -- timeout + if not payload then + if #batch_requests > 0 then + local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) + if not bytes then + return nil, err + end + + tb_clear(batch_requests) + end + goto continue + end + + if payload == PING_TYPE then + local _, err = self.wb:send_ping() + if err then + return nil, "failed to send PING frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + end + goto continue + end + + if payload == PONG_TYPE then + local _, err = self.wb:send_pong() + if err then + return nil, "failed to send PONG frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") + end + goto continue + end + + assert(type(payload) == "table") + + -- batch enabled + local batch_size = self.manager.batch_size + + if batch_size > 0 then + tb_insert(batch_requests, payload) + + -- send batch requests + if #batch_requests >= batch_size then + local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) + if not bytes then + return nil, err + end + + tb_clear(batch_requests) + end + goto continue + end + + local bytes, err = self.wb:send_binary(compress_payload(payload)) + if not bytes then + return nil, err + end + + --[[ if payload then if payload == PING_TYPE then local _, err = self.wb:send_ping() @@ -352,6 +418,9 @@ function _M:start() end end end + --]] + + ::continue:: end end) end From c610b3309cc9d523bc312e5289cc9ae70bd24f7a Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 08:31:20 +0800 Subject: [PATCH 25/50] clean --- kong/clustering/rpc/socket.lua | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 14313dbbb8b..e1297e05b93 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -35,7 +35,6 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 local PING_TYPE = "PING" local PONG_TYPE = "PONG" ---local BATCH_DONE = "BATCH_DONE" local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG @@ -173,6 +172,7 @@ function _M:process_rpc_msg(payload, collection) -- collection is not nil, it means it is a batch call -- we should call sync function res, err = _M._dispatch(nil, self, dispatch_cb, payload, collection) + else -- collection is nil, it means it is a single call @@ -280,7 +280,7 @@ function _M:start() goto continue end - -- rpc call with an empty Array + -- rpc call with an empty array if isempty(payload) then local res, err = self:push_response( new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) @@ -319,9 +319,9 @@ function _M:start() end end) - local batch_requests = {} - self.write_thread = ngx.thread.spawn(function() + local batch_requests = {} + while not exiting() do -- 0.5 seconds for not waiting too long local payload, err = self.outgoing:pop(0.5) From 5b38e48eea6386c5e3fb172aac98ce8566fc649d Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 10:50:28 +0800 Subject: [PATCH 26/50] add todo --- kong/clustering/rpc/socket.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index e1297e05b93..e2f0de41417 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -169,6 +169,7 @@ function _M:process_rpc_msg(payload, collection) if collection then + -- TODO: async call by using a new manager of timer -- collection is not nil, it means it is a batch call -- we should call sync function res, err = _M._dispatch(nil, self, dispatch_cb, payload, collection) From 7a8a69199825be3d94977991f2fbdfbfb5fe827a Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 14:42:29 +0800 Subject: [PATCH 27/50] test v0 --- kong/clustering/rpc/socket.lua | 20 ++--- .../18-hybrid_rpc/06-batch-rpc_spec.lua | 90 +++++++++++++++++++ .../kong/plugins/rpc-batch-test/handler.lua | 27 ++++++ .../kong/plugins/rpc-batch-test/schema.lua | 12 +++ 4 files changed, 139 insertions(+), 10 deletions(-) create mode 100644 spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua create mode 100644 spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index e2f0de41417..33c61a744a5 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -172,7 +172,7 @@ function _M:process_rpc_msg(payload, collection) -- TODO: async call by using a new manager of timer -- collection is not nil, it means it is a batch call -- we should call sync function - res, err = _M._dispatch(nil, self, dispatch_cb, payload, collection) + _M._dispatch(nil, self, dispatch_cb, payload, collection) else @@ -181,17 +181,17 @@ function _M:process_rpc_msg(payload, collection) local name = string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", self.node_id, payload_id or 0, payload_method) res, err = kong.timer:named_at(name, 0, _M._dispatch, self, dispatch_cb, payload) - end - if not res and payload_id then - local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), - "unable to send \"INTERNAL_ERROR\" error back to client: ", - collection) - if not reso then - return nil, erro - end + if not res and payload_id then + local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ", + collection) + if not reso then + return nil, erro + end - return nil, "unable to dispatch JSON-RPC callback: " .. err + return nil, "unable to dispatch JSON-RPC callback: " .. err + end end else diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua new file mode 100644 index 00000000000..8af05675e46 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -0,0 +1,90 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + +-- register a test rpc service in custom plugin rpc-batch-test +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { "routes", "services" }) + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + plugins = "bundled,rpc-batch-test", + cluster_rpc_sync = "off", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + plugins = "bundled,rpc-batch-test", + cluster_rpc_sync = "off", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("batch works", function() + it("DP calls CP via batching ", function() + local admin_client = helpers.admin_client(10000) + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:post("/services", { + body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(201, res) + + res = assert(admin_client:post("/services/mockbin-service/routes", { + body = { paths = { "/" }, }, + headers = {["Content-Type"] = "application/json"} + })) + local body = assert.res_status(201, res) + local json = cjson.decode(body) + local route_id = json.id + + -- add a plugin for route + res = assert(admin_client:post("/routes/" .. route_id .. "/plugins", { + body = { name = "rpc-batch-test" }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(201, res) + + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + + res = proxy_client:send({ + method = "GET", + path = "/", + }) + + local status = res and res.status + proxy_client:close() + if status == 200 then + return true + end + end, 10) + + assert.logfile().has.line("kong.test.batch called", true) + end) + end) + end) +end -- for _, strategy diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua new file mode 100644 index 00000000000..2871f6d3671 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -0,0 +1,27 @@ +local RpcBatchTestHandler = { + VERSION = "1.0", + PRIORITY = 1000, +} + + +function RpcBatchTestHandler:init_worker() + kong.rpc.callbacks:register("kong.test.batch", function(node_id, greeting) + ngx.log(ngx.DEBUG, "kong.test.batch called") + return "hello ".. greeting + end) +end + + +function RpcBatchTestHandler:access() + kong.rpc:set_batch(1) + + local res, err = kong.rpc:call("control_plane", "kong.test.batch", "world") + if not res then + return kong.response.exit(500, err) + end + + return kong.response.exit(200, res) +end + + +return RpcBatchTestHandler diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua new file mode 100644 index 00000000000..0515df5835c --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-batch-test", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +} From 7393ebbcf3e4e0601eb1e886c949dec3ec2a977d Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 16:01:17 +0800 Subject: [PATCH 28/50] pwait_until --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 8af05675e46..12dd3a4abe5 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -83,7 +83,10 @@ for _, strategy in helpers.each_strategy() do end end, 10) - assert.logfile().has.line("kong.test.batch called", true) + helpers.pwait_until(function() + assert.logfile().has.line("kong.test.batch called", true) + return true + end, 5) end) end) end) From cdf21e198cab626230d441f40800aa0502f0295e Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 16:11:13 +0800 Subject: [PATCH 29/50] more asserts --- kong/clustering/rpc/socket.lua | 2 ++ .../18-hybrid_rpc/06-batch-rpc_spec.lua | 18 +++++++++++++++++- .../kong/plugins/rpc-batch-test/handler.lua | 8 +++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 33c61a744a5..220692960b4 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -296,6 +296,8 @@ function _M:start() local collection = {} + ngx_log(ngx_DEBUG, "[rpc] got batch RPC call: ", #payload) + for _, v in ipairs(payload) do local ok, err = self:process_rpc_msg(v, collection) if not ok then diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 12dd3a4abe5..9a29fff2d48 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -84,7 +84,23 @@ for _, strategy in helpers.each_strategy() do end, 10) helpers.pwait_until(function() - assert.logfile().has.line("kong.test.batch called", true) + assert.logfile().has.line( + "[rpc] got batch RPC call: 1", true) + assert.logfile().has.line( + "kong.test.batch called: world", true) + + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] got batch RPC call: 1", true) + assert.logfile("servroot2/logs/error.log").has.line( + "kong.test.batch called: hello world", true) + + assert.logfile().has.line( + "[rpc] got batch RPC call: 2", true) + assert.logfile().has.line( + "kong.test.batch called: kong", true) + assert.logfile().has.line( + "kong.test.batch called: gateway", true) + return true end, 5) end) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua index 2871f6d3671..cef63b70f5d 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -6,7 +6,7 @@ local RpcBatchTestHandler = { function RpcBatchTestHandler:init_worker() kong.rpc.callbacks:register("kong.test.batch", function(node_id, greeting) - ngx.log(ngx.DEBUG, "kong.test.batch called") + ngx.log(ngx.DEBUG, "kong.test.batch called: ", greeting) return "hello ".. greeting end) end @@ -20,6 +20,12 @@ function RpcBatchTestHandler:access() return kong.response.exit(500, err) end + ngx.log(ngx.DEBUG, "kong.test.batch called: ", res) + + kong.rpc:set_batch(2) + kong.rpc:notify("control_plane", "kong.test.batch", "kong") + kong.rpc:notify("control_plane", "kong.test.batch", "gateway") + return kong.response.exit(200, res) end From 1fb0ddfeef485f4adec9991bf221db8ff110b087 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 16:34:45 +0800 Subject: [PATCH 30/50] more asserts --- kong/clustering/rpc/socket.lua | 4 ++++ spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 220692960b4..2d090a6e144 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -340,6 +340,8 @@ function _M:start() return nil, err end + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + tb_clear(batch_requests) end goto continue @@ -382,6 +384,8 @@ function _M:start() return nil, err end + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + tb_clear(batch_requests) end goto continue diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 9a29fff2d48..a967974fe93 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -41,7 +41,7 @@ for _, strategy in helpers.each_strategy() do end) describe("batch works", function() - it("DP calls CP via batching ", function() + it("DP calls CP via batching", function() local admin_client = helpers.admin_client(10000) finally(function() admin_client:close() @@ -89,11 +89,15 @@ for _, strategy in helpers.each_strategy() do assert.logfile().has.line( "kong.test.batch called: world", true) + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] sent batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "[rpc] got batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "kong.test.batch called: hello world", true) + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] sent batch RPC call: 2", true) assert.logfile().has.line( "[rpc] got batch RPC call: 2", true) assert.logfile().has.line( @@ -102,7 +106,7 @@ for _, strategy in helpers.each_strategy() do "kong.test.batch called: gateway", true) return true - end, 5) + end, 10) end) end) end) From d1357d9db5684059e4d05350b0d7afb2763593f7 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 16:57:40 +0800 Subject: [PATCH 31/50] test clean --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index a967974fe93..04b994d7b43 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -96,8 +96,13 @@ for _, strategy in helpers.each_strategy() do assert.logfile("servroot2/logs/error.log").has.line( "kong.test.batch called: hello world", true) + return true + end, 10) + + helpers.pwait_until(function() assert.logfile("servroot2/logs/error.log").has.line( "[rpc] sent batch RPC call: 2", true) + assert.logfile().has.line( "[rpc] got batch RPC call: 2", true) assert.logfile().has.line( From ba66025444bd333729662706a19ee8fd52c7c666 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 17:13:21 +0800 Subject: [PATCH 32/50] small pwait_until --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 04b994d7b43..bc0a77e0b20 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -89,6 +89,10 @@ for _, strategy in helpers.each_strategy() do assert.logfile().has.line( "kong.test.batch called: world", true) + return true + end, 10) + + helpers.pwait_until(function() assert.logfile("servroot2/logs/error.log").has.line( "[rpc] sent batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( @@ -103,6 +107,10 @@ for _, strategy in helpers.each_strategy() do assert.logfile("servroot2/logs/error.log").has.line( "[rpc] sent batch RPC call: 2", true) + return true + end, 10) + + helpers.pwait_until(function() assert.logfile().has.line( "[rpc] got batch RPC call: 2", true) assert.logfile().has.line( From 8b9e574070e61b0eaf88ec3152365ad5e41926c3 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 18:22:46 +0800 Subject: [PATCH 33/50] flaky test --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index bc0a77e0b20..a0f52d80a9a 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -84,8 +84,8 @@ for _, strategy in helpers.each_strategy() do end, 10) helpers.pwait_until(function() - assert.logfile().has.line( - "[rpc] got batch RPC call: 1", true) + --assert.logfile().has.line( + -- "[rpc] got batch RPC call: 1", true) assert.logfile().has.line( "kong.test.batch called: world", true) From 8d6678e78061da1771aa9040ad747d3c4e1f79d9 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 18:45:54 +0800 Subject: [PATCH 34/50] flaky --- .../18-hybrid_rpc/06-batch-rpc_spec.lua | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index a0f52d80a9a..e9673cf3fc0 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -84,33 +84,22 @@ for _, strategy in helpers.each_strategy() do end, 10) helpers.pwait_until(function() - --assert.logfile().has.line( - -- "[rpc] got batch RPC call: 1", true) + assert.logfile().has.line( + "[rpc] got batch RPC call: 1", true) assert.logfile().has.line( "kong.test.batch called: world", true) - return true - end, 10) - - helpers.pwait_until(function() - assert.logfile("servroot2/logs/error.log").has.line( - "[rpc] sent batch RPC call: 1", true) + --assert.logfile("servroot2/logs/error.log").has.line( + -- "[rpc] sent batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "[rpc] got batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "kong.test.batch called: hello world", true) - return true - end, 10) - helpers.pwait_until(function() - assert.logfile("servroot2/logs/error.log").has.line( - "[rpc] sent batch RPC call: 2", true) + --assert.logfile("servroot2/logs/error.log").has.line( + -- "[rpc] sent batch RPC call: 2", true) - return true - end, 10) - - helpers.pwait_until(function() assert.logfile().has.line( "[rpc] got batch RPC call: 2", true) assert.logfile().has.line( From eb4503811e07a2186fa82d6e78512865ea05f71e Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 19:06:54 +0800 Subject: [PATCH 35/50] code clean --- kong/clustering/rpc/socket.lua | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2d090a6e144..6386cfcb9f0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -396,37 +396,6 @@ function _M:start() return nil, err end - --[[ - if payload then - if payload == PING_TYPE then - local _, err = self.wb:send_ping() - if err then - return nil, "failed to send PING frame to peer: " .. err - - else - ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") - end - - elseif payload == PONG_TYPE then - local _, err = self.wb:send_pong() - if err then - return nil, "failed to send PONG frame to peer: " .. err - - else - ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") - end - - else - assert(type(payload) == "table") - - local bytes, err = self.wb:send_binary(compress_payload(payload)) - if not bytes then - return nil, err - end - end - end - --]] - ::continue:: end end) From 0873d5d3e12c79ac64ee4c31ac6aa340e16e15b8 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 19:54:00 +0800 Subject: [PATCH 36/50] flaky comments --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index e9673cf3fc0..dd65b6a0d39 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -89,14 +89,17 @@ for _, strategy in helpers.each_strategy() do assert.logfile().has.line( "kong.test.batch called: world", true) + -- this may cause flakiness --assert.logfile("servroot2/logs/error.log").has.line( -- "[rpc] sent batch RPC call: 1", true) + assert.logfile("servroot2/logs/error.log").has.line( "[rpc] got batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "kong.test.batch called: hello world", true) + -- this may cause flakiness --assert.logfile("servroot2/logs/error.log").has.line( -- "[rpc] sent batch RPC call: 2", true) From e8d02355453e7940425c1fb8fedb38003e1e547c Mon Sep 17 00:00:00 2001 From: chronolaw Date: Wed, 8 Jan 2025 19:56:39 +0800 Subject: [PATCH 37/50] typo fix --- kong/clustering/rpc/socket.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 6386cfcb9f0..a41e305e12f 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -132,7 +132,7 @@ end function _M:process_rpc_msg(payload, collection) if type(payload) ~= "table" then local res, err = self:push_response( - new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"), + new_error(nil, jsonrpc.INVALID_REQUEST, "not a valid object"), collection) if not res then return nil, err From 4ec62d02e730f3fcb63db3b93f1bc4b8c997145c Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 9 Jan 2025 07:45:23 +0800 Subject: [PATCH 38/50] #batch_requests --- kong/clustering/rpc/socket.lua | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index a41e305e12f..028d0454ffc 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -334,13 +334,14 @@ function _M:start() -- timeout if not payload then - if #batch_requests > 0 then + local n = #batch_requests + if n > 0 then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) tb_clear(batch_requests) end @@ -378,13 +379,14 @@ function _M:start() tb_insert(batch_requests, payload) -- send batch requests - if #batch_requests >= batch_size then + local n = #batch_requests + if n >= batch_size then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) tb_clear(batch_requests) end From 8dfc9fae869883519e122a329db711b9a33bbb9f Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 9 Jan 2025 10:10:33 +0800 Subject: [PATCH 39/50] Revert "#batch_requests" This reverts commit 09d5cdd045f55389b16c89185b8788fc9addbfce. --- kong/clustering/rpc/socket.lua | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 028d0454ffc..a41e305e12f 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -334,14 +334,13 @@ function _M:start() -- timeout if not payload then - local n = #batch_requests - if n > 0 then + if #batch_requests > 0 then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) tb_clear(batch_requests) end @@ -379,14 +378,13 @@ function _M:start() tb_insert(batch_requests, payload) -- send batch requests - local n = #batch_requests - if n >= batch_size then + if #batch_requests >= batch_size then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) tb_clear(batch_requests) end From e21a71395e1caada09c41a9cebc2f663ff222945 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 9 Jan 2025 16:47:30 +0800 Subject: [PATCH 40/50] comments --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index dd65b6a0d39..660f306bc45 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -16,8 +16,8 @@ for _, strategy in helpers.each_strategy() do cluster_listen = "127.0.0.1:9005", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_rpc = "on", - plugins = "bundled,rpc-batch-test", - cluster_rpc_sync = "off", + plugins = "bundled,rpc-batch-test", -- enable custom plugin + cluster_rpc_sync = "off", -- disable rpc sync })) assert(helpers.start_kong({ @@ -30,8 +30,8 @@ for _, strategy in helpers.each_strategy() do proxy_listen = "0.0.0.0:9002", nginx_conf = "spec/fixtures/custom_nginx.template", cluster_rpc = "on", - plugins = "bundled,rpc-batch-test", - cluster_rpc_sync = "off", + plugins = "bundled,rpc-batch-test", -- enable custom plugin + cluster_rpc_sync = "off", -- disable rpc sync })) end) From ae262fb56ec422213a5d9fccfcc472f396b0b430 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Thu, 9 Jan 2025 17:19:47 +0800 Subject: [PATCH 41/50] timeout 15 --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 660f306bc45..306f4d3dfc6 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -111,7 +111,7 @@ for _, strategy in helpers.each_strategy() do "kong.test.batch called: gateway", true) return true - end, 10) + end, 15) end) end) end) From fb05cdfdaf5e9b1ea678dde2122407f8e088ef12 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 11:04:55 +0800 Subject: [PATCH 42/50] clean prefix --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 306f4d3dfc6..73fadb2c183 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -36,8 +36,8 @@ for _, strategy in helpers.each_strategy() do end) lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() + helpers.clean_prefix("servroot2") + helpers.clean_prefix() end) describe("batch works", function() @@ -83,6 +83,10 @@ for _, strategy in helpers.each_strategy() do end end, 10) + -- try to flush log files + helpers.stop_kong(nil, true) + helpers.stop_kong("servroot2", true) + helpers.pwait_until(function() assert.logfile().has.line( "[rpc] got batch RPC call: 1", true) From effa63ed92209991af6a3a7ff5fd0bd24db3a7b1 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 11:39:58 +0800 Subject: [PATCH 43/50] Revert "clean prefix" This reverts commit 60ebe4e4dc1be48c6a5d3496d922f0f1fcb47239. --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 73fadb2c183..306f4d3dfc6 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -36,8 +36,8 @@ for _, strategy in helpers.each_strategy() do end) lazy_teardown(function() - helpers.clean_prefix("servroot2") - helpers.clean_prefix() + helpers.stop_kong("servroot2") + helpers.stop_kong() end) describe("batch works", function() @@ -83,10 +83,6 @@ for _, strategy in helpers.each_strategy() do end end, 10) - -- try to flush log files - helpers.stop_kong(nil, true) - helpers.stop_kong("servroot2", true) - helpers.pwait_until(function() assert.logfile().has.line( "[rpc] got batch RPC call: 1", true) From 475baec082a317cf645519dd17ce42dacf96ffb4 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 11:44:25 +0800 Subject: [PATCH 44/50] rpc connected event --- .../18-hybrid_rpc/06-batch-rpc_spec.lua | 52 ++----------------- .../kong/plugins/rpc-batch-test/handler.lua | 25 ++++----- 2 files changed, 17 insertions(+), 60 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 306f4d3dfc6..5cbd50ef9f7 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -42,66 +42,22 @@ for _, strategy in helpers.each_strategy() do describe("batch works", function() it("DP calls CP via batching", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:post("/services", { - body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - res = assert(admin_client:post("/services/mockbin-service/routes", { - body = { paths = { "/" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - local route_id = json.id - - -- add a plugin for route - res = assert(admin_client:post("/routes/" .. route_id .. "/plugins", { - body = { name = "rpc-batch-test" }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) - - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) - - res = proxy_client:send({ - method = "GET", - path = "/", - }) - - local status = res and res.status - proxy_client:close() - if status == 200 then - return true - end - end, 10) - helpers.pwait_until(function() assert.logfile().has.line( "[rpc] got batch RPC call: 1", true) assert.logfile().has.line( "kong.test.batch called: world", true) - -- this may cause flakiness - --assert.logfile("servroot2/logs/error.log").has.line( - -- "[rpc] sent batch RPC call: 1", true) + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] sent batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "[rpc] got batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( "kong.test.batch called: hello world", true) - - -- this may cause flakiness - --assert.logfile("servroot2/logs/error.log").has.line( - -- "[rpc] sent batch RPC call: 2", true) + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] sent batch RPC call: 2", true) assert.logfile().has.line( "[rpc] got batch RPC call: 2", true) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua index cef63b70f5d..8d038ecacb6 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -9,24 +9,25 @@ function RpcBatchTestHandler:init_worker() ngx.log(ngx.DEBUG, "kong.test.batch called: ", greeting) return "hello ".. greeting end) -end + local worker_events = assert(kong.worker_events) -function RpcBatchTestHandler:access() - kong.rpc:set_batch(1) + -- if rpc is ready we will start to sync + worker_events.register(function(capabilities_list) + kong.rpc:set_batch(1) - local res, err = kong.rpc:call("control_plane", "kong.test.batch", "world") - if not res then - return kong.response.exit(500, err) - end + local res, err = kong.rpc:call("control_plane", "kong.test.batch", "world") + if not res then + return + end - ngx.log(ngx.DEBUG, "kong.test.batch called: ", res) + ngx.log(ngx.DEBUG, "kong.test.batch called: ", res) - kong.rpc:set_batch(2) - kong.rpc:notify("control_plane", "kong.test.batch", "kong") - kong.rpc:notify("control_plane", "kong.test.batch", "gateway") + kong.rpc:set_batch(2) + kong.rpc:notify("control_plane", "kong.test.batch", "kong") + kong.rpc:notify("control_plane", "kong.test.batch", "gateway") - return kong.response.exit(200, res) + end, "clustering:jsonrpc", "connected") end From 216ba9a58c113b293e38e4650db7c579d6319f97 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 11:46:26 +0800 Subject: [PATCH 45/50] comments --- spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 7 +++---- .../custom_plugins/kong/plugins/rpc-batch-test/handler.lua | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 5cbd50ef9f7..2565d6b11a0 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -1,5 +1,4 @@ local helpers = require "spec.helpers" -local cjson = require("cjson.safe") -- register a test rpc service in custom plugin rpc-batch-test for _, strategy in helpers.each_strategy() do @@ -43,14 +42,14 @@ for _, strategy in helpers.each_strategy() do describe("batch works", function() it("DP calls CP via batching", function() helpers.pwait_until(function() + assert.logfile("servroot2/logs/error.log").has.line( + "[rpc] sent batch RPC call: 1", true) + assert.logfile().has.line( "[rpc] got batch RPC call: 1", true) assert.logfile().has.line( "kong.test.batch called: world", true) - assert.logfile("servroot2/logs/error.log").has.line( - "[rpc] sent batch RPC call: 1", true) - assert.logfile("servroot2/logs/error.log").has.line( "[rpc] got batch RPC call: 1", true) assert.logfile("servroot2/logs/error.log").has.line( diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua index 8d038ecacb6..0d3ee5389f2 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -12,11 +12,11 @@ function RpcBatchTestHandler:init_worker() local worker_events = assert(kong.worker_events) - -- if rpc is ready we will start to sync + -- if rpc is ready we will start to batch call worker_events.register(function(capabilities_list) kong.rpc:set_batch(1) - local res, err = kong.rpc:call("control_plane", "kong.test.batch", "world") + local res = kong.rpc:call("control_plane", "kong.test.batch", "world") if not res then return end From ff2bc4dbccecbcbe60b0a8e354f45023fa401d1b Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 13:40:01 +0800 Subject: [PATCH 46/50] clean --- kong/clustering/rpc/socket.lua | 10 ++++++---- .../02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index a41e305e12f..028d0454ffc 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -334,13 +334,14 @@ function _M:start() -- timeout if not payload then - if #batch_requests > 0 then + local n = #batch_requests + if n > 0 then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) tb_clear(batch_requests) end @@ -378,13 +379,14 @@ function _M:start() tb_insert(batch_requests, payload) -- send batch requests - if #batch_requests >= batch_size then + local n = #batch_requests + if n >= batch_size then local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end - ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", #batch_requests) + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) tb_clear(batch_requests) end diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index 2565d6b11a0..a775ef6ae41 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -64,6 +64,8 @@ for _, strategy in helpers.each_strategy() do "kong.test.batch called: kong", true) assert.logfile().has.line( "kong.test.batch called: gateway", true) + assert.logfile().has.line( + "[rpc] notification has no response", true) return true end, 15) From 596ecba8d531c26a899663d7028e60df26e08463 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Fri, 10 Jan 2025 13:44:44 +0800 Subject: [PATCH 47/50] test clean --- .../18-hybrid_rpc/06-batch-rpc_spec.lua | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua index a775ef6ae41..f60e2097267 100644 --- a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -41,30 +41,33 @@ for _, strategy in helpers.each_strategy() do describe("batch works", function() it("DP calls CP via batching", function() + local cp_logfile = nil + local dp_logfile = "servroot2/logs/error.log" + helpers.pwait_until(function() - assert.logfile("servroot2/logs/error.log").has.line( + assert.logfile(dp_logfile).has.line( "[rpc] sent batch RPC call: 1", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "[rpc] got batch RPC call: 1", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "kong.test.batch called: world", true) - assert.logfile("servroot2/logs/error.log").has.line( + assert.logfile(dp_logfile).has.line( "[rpc] got batch RPC call: 1", true) - assert.logfile("servroot2/logs/error.log").has.line( + assert.logfile(dp_logfile).has.line( "kong.test.batch called: hello world", true) - assert.logfile("servroot2/logs/error.log").has.line( + assert.logfile(dp_logfile).has.line( "[rpc] sent batch RPC call: 2", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "[rpc] got batch RPC call: 2", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "kong.test.batch called: kong", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "kong.test.batch called: gateway", true) - assert.logfile().has.line( + assert.logfile(cp_logfile).has.line( "[rpc] notification has no response", true) return true From 7ce13e243cac46969f700dfb6b175f80cced4f8b Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 14 Jan 2025 15:20:20 +0800 Subject: [PATCH 48/50] comments for batch_size --- kong/clustering/rpc/manager.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7abc67a45e7..9423d00bb10 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -57,7 +57,10 @@ function _M.new(conf, node_id) cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), callbacks = callbacks.new(), - batch_size = 0, -- rpc batching + batch_size = 0, -- rpc batching size, 0 means disable. + -- currently, we don't have Lua interface to initiate + -- a batch call, any value `> 0` should be considered + -- as testing code. } if conf.role == "control_plane" then From ef4f69513019e147f77630dde3415ec5272f8dd1 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 14 Jan 2025 15:22:48 +0800 Subject: [PATCH 49/50] rename to __set_batch --- kong/clustering/rpc/manager.lua | 4 +++- .../custom_plugins/kong/plugins/rpc-batch-test/handler.lua | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 9423d00bb10..1466a3bad14 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -630,7 +630,9 @@ function _M:get_peer_info(node_id) end -function _M:set_batch(n) +-- Currently, this function only for testing purpose, +-- we don't have a Lua interface to initiate a batch call yet. +function _M:__set_batch(n) assert(type(n) == "number" and n >= 0) self.batch_size = n end diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua index 0d3ee5389f2..92cb24226a7 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -14,7 +14,7 @@ function RpcBatchTestHandler:init_worker() -- if rpc is ready we will start to batch call worker_events.register(function(capabilities_list) - kong.rpc:set_batch(1) + kong.rpc:__set_batch(1) local res = kong.rpc:call("control_plane", "kong.test.batch", "world") if not res then @@ -23,7 +23,7 @@ function RpcBatchTestHandler:init_worker() ngx.log(ngx.DEBUG, "kong.test.batch called: ", res) - kong.rpc:set_batch(2) + kong.rpc:__set_batch(2) kong.rpc:notify("control_plane", "kong.test.batch", "kong") kong.rpc:notify("control_plane", "kong.test.batch", "gateway") From dc0699a22160230df11bc147ff32620b4898e524 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 14 Jan 2025 15:33:37 +0800 Subject: [PATCH 50/50] __batch_size --- kong/clustering/rpc/manager.lua | 10 +++++----- kong/clustering/rpc/socket.lua | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 1466a3bad14..548220442cd 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -57,10 +57,10 @@ function _M.new(conf, node_id) cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), callbacks = callbacks.new(), - batch_size = 0, -- rpc batching size, 0 means disable. - -- currently, we don't have Lua interface to initiate - -- a batch call, any value `> 0` should be considered - -- as testing code. + __batch_size = 0, -- rpc batching size, 0 means disable. + -- currently, we don't have Lua interface to initiate + -- a batch call, any value `> 0` should be considered + -- as testing code. } if conf.role == "control_plane" then @@ -634,7 +634,7 @@ end -- we don't have a Lua interface to initiate a batch call yet. function _M:__set_batch(n) assert(type(n) == "number" and n >= 0) - self.batch_size = n + self.__batch_size = n end diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 028d0454ffc..34d1b7b1aa0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -373,7 +373,7 @@ function _M:start() assert(type(payload) == "table") -- batch enabled - local batch_size = self.manager.batch_size + local batch_size = self.manager.__batch_size if batch_size > 0 then tb_insert(batch_requests, payload)