diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index f1918398c76..548220442cd 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -56,6 +56,11 @@ function _M.new(conf, node_id) cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), callbacks = callbacks.new(), + + __batch_size = 0, -- rpc batching size, 0 means disable. + -- currently, we don't have Lua interface to initiate + -- a batch call, any value `> 0` should be considered + -- as testing code. } if conf.role == "control_plane" then @@ -625,4 +630,12 @@ function _M:get_peer_info(node_id) end +-- Currently, this function only for testing purpose, +-- we don't have a Lua interface to initiate a batch call yet. +function _M:__set_batch(n) + assert(type(n) == "number" and n >= 0) + self.__batch_size = n +end + + return _M diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2044acf170a..34d1b7b1aa0 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -11,8 +11,13 @@ local utils = require("kong.clustering.rpc.utils") local queue = require("kong.clustering.rpc.queue") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") +local isarray = require("table.isarray") +local isempty = require("table.isempty") +local tb_clear = require("table.clear") +local tb_insert = table.insert +local type = type local assert = assert local unpack = unpack local string_format = string.format @@ -59,7 +64,30 @@ function _M:_get_next_id() end -function _M._dispatch(premature, self, cb, payload) +function _M:push_request(msg) + return self.outgoing:push(msg) +end + + +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. +function _M:push_response(msg, err_prefix, collection) + -- may be a batch + if collection then + tb_insert(collection, msg) + return true + end + + local res, err = self.outgoing:push(msg) + if not res then + return nil, err_prefix .. err + end + + return true +end + + +function _M._dispatch(premature, self, cb, payload, collection) if premature then return end @@ -73,10 +101,11 @@ function _M._dispatch(premature, self, cb, payload) return end - res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, - err)) + res, err = self:push_response(new_error(payload.id, jsonrpc.SERVER_ERROR, err), + "[rpc] unable to push RPC call error: ", + collection) if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) + ngx_log(ngx_WARN, err) end return @@ -89,17 +118,104 @@ function _M._dispatch(premature, self, cb, payload) end -- success - res, err = self.outgoing:push({ + res, err = self:push_response({ jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, - }) + }, "[rpc] unable to push RPC call result: ", collection) if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err) + ngx_log(ngx_WARN, err) end end +function _M:process_rpc_msg(payload, collection) + if type(payload) ~= "table" then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "not a valid object"), + collection) + if not res then + return nil, err + end + + return true + end + + assert(payload.jsonrpc == jsonrpc.VERSION) + + local payload_id = payload.id + local payload_method = payload.method + + if payload_method then + -- invoke + + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")") + + local dispatch_cb = self.manager.callbacks.callbacks[payload_method] + if not dispatch_cb and payload_id then + local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), + "unable to send \"METHOD_NOT_FOUND\" error back to client: ", + collection) + if not res then + return nil, err + end + + return true + end + + local res, err + + -- call dispatch + + if collection then + + -- TODO: async call by using a new manager of timer + -- collection is not nil, it means it is a batch call + -- we should call sync function + _M._dispatch(nil, self, dispatch_cb, payload, collection) + + else + + -- collection is nil, it means it is a single call + -- we should call async function + local name = string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload_id or 0, payload_method) + res, err = kong.timer:named_at(name, 0, _M._dispatch, self, dispatch_cb, payload) + + if not res and payload_id then + local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ", + collection) + if not reso then + return nil, erro + end + + return nil, "unable to dispatch JSON-RPC callback: " .. err + end + end + + else + -- response, don't care about `collection` + local interest_cb = self.interest[payload_id] + self.interest[payload_id] = nil -- edge trigger only once + + if not interest_cb then + ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload_id, ", dropping it") + + return true + end + + local res, err = interest_cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", + payload_id, ", err: ", err) + end + end -- if payload.method + + return true +end + + -- start reader and writer thread and event loop function _M:start() self.read_thread = ngx.thread.spawn(function() @@ -120,9 +236,9 @@ function _M:start() end if waited > CLUSTERING_PING_INTERVAL then - local res, err = self.outgoing:push(PING_TYPE) + local res, err = self:push_response(PING_TYPE, "unable to send ping: ") if not res then - return nil, "unable to send ping: " .. err + return nil, err end end @@ -133,9 +249,9 @@ function _M:start() last_seen = ngx_time() if typ == "ping" then - local res, err = self.outgoing:push(PONG_TYPE) + local res, err = self:push_response(PONG_TYPE, "unable to handle ping: ") if not res then - return nil, "unable to handle ping: " .. err + return nil, err end goto continue @@ -154,52 +270,52 @@ function _M:start() assert(typ == "binary") local payload = decompress_payload(data) - assert(payload.jsonrpc == jsonrpc.VERSION) - if payload.method then - -- invoke - - ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") + -- single rpc call + if not isarray(payload) then + local ok, err = self:process_rpc_msg(payload) + if not ok then + return nil, err + end - local dispatch_cb = self.manager.callbacks.callbacks[payload.method] - if not dispatch_cb and payload.id then - local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) - if not res then - return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err - end + goto continue + end - goto continue + -- rpc call with an empty array + if isempty(payload) then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) + if not res then + return nil, err end - -- call dispatch - local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload.id or 0, payload.method), - 0, _M._dispatch, self, dispatch_cb, payload) - if not res and payload.id then - local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) - if not reso then - return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro - end + goto continue + end - return nil, "unable to dispatch JSON-RPC callback: " .. err - end + -- batch rpc call - else - -- response - local interest_cb = self.interest[payload.id] - self.interest[payload.id] = nil -- edge trigger only once + local collection = {} - if not interest_cb then - ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it") + ngx_log(ngx_DEBUG, "[rpc] got batch RPC call: ", #payload) - goto continue + for _, v in ipairs(payload) do + local ok, err = self:process_rpc_msg(v, collection) + if not ok then + return nil, err end + end - local res, err = interest_cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", - payload.id, ", err: ", err) - end + -- may be responses or all notifications + if isempty(collection) then + goto continue + end + + assert(isarray(collection)) + + local res, err = self:push_response(collection, + "[rpc] unable to push RPC call result: ") + if not res then + return nil, err end ::continue:: @@ -207,40 +323,82 @@ function _M:start() end) self.write_thread = ngx.thread.spawn(function() + local batch_requests = {} + while not exiting() do - local payload, err = self.outgoing:pop(5) + -- 0.5 seconds for not waiting too long + local payload, err = self.outgoing:pop(0.5) if err then return nil, err end - if payload then - if payload == PING_TYPE then - local _, err = self.wb:send_ping() - if err then - return nil, "failed to send PING frame to peer: " .. err - - else - ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + -- timeout + if not payload then + local n = #batch_requests + if n > 0 then + local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) + if not bytes then + return nil, err end - elseif payload == PONG_TYPE then - local _, err = self.wb:send_pong() - if err then - return nil, "failed to send PONG frame to peer: " .. err + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) - else - ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") - end + tb_clear(batch_requests) + end + goto continue + end + + if payload == PING_TYPE then + local _, err = self.wb:send_ping() + if err then + return nil, "failed to send PING frame to peer: " .. err else - assert(type(payload) == "table") + ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + end + goto continue + end + + if payload == PONG_TYPE then + local _, err = self.wb:send_pong() + if err then + return nil, "failed to send PONG frame to peer: " .. err - local bytes, err = self.wb:send_binary(compress_payload(payload)) + else + ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") + end + goto continue + end + + assert(type(payload) == "table") + + -- batch enabled + local batch_size = self.manager.__batch_size + + if batch_size > 0 then + tb_insert(batch_requests, payload) + + -- send batch requests + local n = #batch_requests + if n >= batch_size then + local bytes, err = self.wb:send_binary(compress_payload(batch_requests)) if not bytes then return nil, err end + + ngx_log(ngx_DEBUG, "[rpc] sent batch RPC call: ", n) + + tb_clear(batch_requests) end + goto continue + end + + local bytes, err = self.wb:send_binary(compress_payload(payload)) + if not bytes then + return nil, err end + + ::continue:: end end) end @@ -290,7 +448,7 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback end - return self.outgoing:push({ + return self:push_request({ jsonrpc = jsonrpc.VERSION, method = method, params = params, diff --git a/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua new file mode 100644 index 00000000000..f60e2097267 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/06-batch-rpc_spec.lua @@ -0,0 +1,78 @@ +local helpers = require "spec.helpers" + +-- register a test rpc service in custom plugin rpc-batch-test +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { "routes", "services" }) + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + plugins = "bundled,rpc-batch-test", -- enable custom plugin + cluster_rpc_sync = "off", -- disable rpc sync + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + plugins = "bundled,rpc-batch-test", -- enable custom plugin + cluster_rpc_sync = "off", -- disable rpc sync + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("batch works", function() + it("DP calls CP via batching", function() + local cp_logfile = nil + local dp_logfile = "servroot2/logs/error.log" + + helpers.pwait_until(function() + assert.logfile(dp_logfile).has.line( + "[rpc] sent batch RPC call: 1", true) + + assert.logfile(cp_logfile).has.line( + "[rpc] got batch RPC call: 1", true) + assert.logfile(cp_logfile).has.line( + "kong.test.batch called: world", true) + + assert.logfile(dp_logfile).has.line( + "[rpc] got batch RPC call: 1", true) + assert.logfile(dp_logfile).has.line( + "kong.test.batch called: hello world", true) + + assert.logfile(dp_logfile).has.line( + "[rpc] sent batch RPC call: 2", true) + + assert.logfile(cp_logfile).has.line( + "[rpc] got batch RPC call: 2", true) + assert.logfile(cp_logfile).has.line( + "kong.test.batch called: kong", true) + assert.logfile(cp_logfile).has.line( + "kong.test.batch called: gateway", true) + assert.logfile(cp_logfile).has.line( + "[rpc] notification has no response", true) + + return true + end, 15) + end) + end) + end) +end -- for _, strategy diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua new file mode 100644 index 00000000000..92cb24226a7 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/handler.lua @@ -0,0 +1,34 @@ +local RpcBatchTestHandler = { + VERSION = "1.0", + PRIORITY = 1000, +} + + +function RpcBatchTestHandler:init_worker() + kong.rpc.callbacks:register("kong.test.batch", function(node_id, greeting) + ngx.log(ngx.DEBUG, "kong.test.batch called: ", greeting) + return "hello ".. greeting + end) + + local worker_events = assert(kong.worker_events) + + -- if rpc is ready we will start to batch call + worker_events.register(function(capabilities_list) + kong.rpc:__set_batch(1) + + local res = kong.rpc:call("control_plane", "kong.test.batch", "world") + if not res then + return + end + + ngx.log(ngx.DEBUG, "kong.test.batch called: ", res) + + kong.rpc:__set_batch(2) + kong.rpc:notify("control_plane", "kong.test.batch", "kong") + kong.rpc:notify("control_plane", "kong.test.batch", "gateway") + + end, "clustering:jsonrpc", "connected") +end + + +return RpcBatchTestHandler diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua new file mode 100644 index 00000000000..0515df5835c --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-batch-test/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-batch-test", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +}