diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 80d19cad769..6e2ab5785c2 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -7,8 +7,12 @@ local queue = require("kong.clustering.rpc.queue") local cjson = require("cjson") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local rpc_utils = require("kong.clustering.rpc.utils") +local isarray = require("table.isarray") +local isempty = require("table.isempty") +local tb_insert = table.insert +local type = type local setmetatable = setmetatable local tostring = tostring local pcall = pcall @@ -22,6 +26,7 @@ local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_WARN = ngx.WARN local ngx_DEBUG = ngx.DEBUG +local new_error = jsonrpc.new_error local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp: @@ -90,6 +95,89 @@ local function enqueue_notifications(notifications, notifications_queue) end +function _M:process_one_response(payload) + assert(payload.jsonrpc == jsonrpc.VERSION) + local payload_id = payload.id + + -- may be some error message for peer + if not payload_id then + if payload.error then + ngx_log(ngx_ERR, "[rpc] RPC failed, code: ", + payload.error.code, ", err: ", + payload.error.message) + end + return + end + + -- response + local cb = self.interest[payload_id] + self.interest[payload_id] = nil -- edge trigger only once + + if not cb then + ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it") + return + end + + local res, err = cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", + payload_id, ", err: ", err) + end +end + + +function _M:process_one_request(target_id, reply_to, payload, collection) + if type(payload) ~= "table" then + local res, err = self:_enqueue_rpc_response( + reply_to, + new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"), + collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + + return + end + + local payload_id = payload.id + + local res, err = self.manager:_local_call(target_id, payload.method, + payload.params, not payload_id) + + -- notification has no callback or id + if not payload_id then + ngx_log(ngx_DEBUG, "[rpc] notification has no response") + return + end + + if res then + -- call success + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = jsonrpc.VERSION, + id = payload_id, + result = res, + }, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + end + + else + -- call failure + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = jsonrpc.VERSION, + id = payload_id, + error = { + code = jsonrpc.SERVER_ERROR, + message = tostring(err), + } + }, collection) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + end +end + + function _M:_event_loop(lconn) local notifications_queue = queue.new(4096) local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id @@ -116,21 +204,16 @@ function _M:_event_loop(lconn) if n.channel == rpc_resp_channel_name then -- an response for a previous RPC call we asked for local payload = cjson_decode(n.payload) - assert(payload.jsonrpc == jsonrpc.VERSION) - - -- response - local cb = self.interest[payload.id] - self.interest[payload.id] = nil -- edge trigger only once - if cb then - local res, err = cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", - payload.id, ", err: ", err) - end + if not isarray(payload) then + -- one rpc response + self:process_one_response(payload) else - ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it") + -- batch rpc response + for _, v in ipairs(payload) do + self:process_one_response(v) + end end else @@ -153,45 +236,44 @@ function _M:_event_loop(lconn) local reply_to = assert(call.reply_to, "unknown requester for RPC") - local res, err = self.manager:_local_call(target_id, payload.method, - payload.params, not payload.id) + if not isarray(payload) then + -- one rpc call + self:process_one_request(target_id, reply_to, payload) - -- notification has no callback or id - if not payload.id then - ngx_log(ngx_DEBUG, "[rpc] notification has no response") goto continue end - if res then - -- call success - res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = jsonrpc.VERSION, - id = payload.id, - result = res, - }) + -- rpc call with an empty Array + if isempty(payload) then + local res, err = self:_enqueue_rpc_response( + reply_to, + new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) if not res then - ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) end - else - -- call failure - res, err = self:_enqueue_rpc_response(reply_to, { - jsonrpc = jsonrpc.VERSION, - id = payload.id, - error = { - code = jsonrpc.SERVER_ERROR, - message = tostring(err), - } - }) + goto continue + end + + -- batching rpc call + + local collection = {} + + for _, v in ipairs(payload) do + self:process_one_request(target_id, reply_to, v, collection) + end + + if not isempty(collection) then + local res, err = self:_enqueue_rpc_response(reply_to, collection) if not res then - ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) end end ::continue:: - end - end - end + end -- for _, call + end -- if n.channel == rpc_resp_channel_name + end -- while true local res, err = lconn:wait_for_notification() if not res then @@ -217,7 +299,7 @@ function _M:_event_loop(lconn) else notifications_queue:push(res) end - end + end -- while not exiting() end @@ -270,7 +352,14 @@ end -- enqueue a RPC response from CP worker with ID worker_id -function _M:_enqueue_rpc_response(worker_id, payload) +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. +function _M:_enqueue_rpc_response(worker_id, payload, collection) + if collection then + tb_insert(collection, payload) + return + end + local sql = string_format("SELECT pg_notify(%s, %s);", self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id), self.db.connector:escape_literal(cjson_encode(payload)))