Skip to content

Commit

Permalink
feat(clustering/rpc): rpc batching on concentrator (#14055)
Browse files Browse the repository at this point in the history
Sister PR: #14040

KAG-5934
  • Loading branch information
chronolaw authored Jan 20, 2025
1 parent 122810c commit 4448eef
Showing 1 changed file with 130 additions and 41 deletions.
171 changes: 130 additions & 41 deletions kong/clustering/rpc/concentrator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ local queue = require("kong.clustering.rpc.queue")
local cjson = require("cjson")
local jsonrpc = require("kong.clustering.rpc.json_rpc_v2")
local rpc_utils = require("kong.clustering.rpc.utils")
local isarray = require("table.isarray")
local isempty = require("table.isempty")
local tb_insert = table.insert


local type = type
local setmetatable = setmetatable
local tostring = tostring
local pcall = pcall
Expand All @@ -22,6 +26,7 @@ local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_DEBUG = ngx.DEBUG
local new_error = jsonrpc.new_error


local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp:<worker_uuid>
Expand Down Expand Up @@ -90,6 +95,89 @@ local function enqueue_notifications(notifications, notifications_queue)
end


function _M:process_one_response(payload)
assert(payload.jsonrpc == jsonrpc.VERSION)
local payload_id = payload.id

-- may be some error message for peer
if not payload_id then
if payload.error then
ngx_log(ngx_ERR, "[rpc] RPC failed, code: ",
payload.error.code, ", err: ",
payload.error.message)
end
return
end

-- response
local cb = self.interest[payload_id]
self.interest[payload_id] = nil -- edge trigger only once

if not cb then
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it")
return
end

local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload_id, ", err: ", err)
end
end


function _M:process_one_request(target_id, reply_to, payload, collection)
if type(payload) ~= "table" then
local res, err = self:_enqueue_rpc_response(
reply_to,
new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"),
collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end

return
end

local payload_id = payload.id

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload_id)

-- notification has no callback or id
if not payload_id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
return
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
result = res,
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end
end
end


function _M:_event_loop(lconn)
local notifications_queue = queue.new(4096)
local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id
Expand All @@ -116,21 +204,16 @@ function _M:_event_loop(lconn)
if n.channel == rpc_resp_channel_name then
-- an response for a previous RPC call we asked for
local payload = cjson_decode(n.payload)
assert(payload.jsonrpc == jsonrpc.VERSION)

-- response
local cb = self.interest[payload.id]
self.interest[payload.id] = nil -- edge trigger only once

if cb then
local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload.id, ", err: ", err)
end
if not isarray(payload) then
-- one rpc response
self:process_one_response(payload)

else
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it")
-- batch rpc response
for _, v in ipairs(payload) do
self:process_one_response(v)
end
end

else
Expand All @@ -153,45 +236,44 @@ function _M:_event_loop(lconn)
local reply_to = assert(call.reply_to,
"unknown requester for RPC")

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload.id)
if not isarray(payload) then
-- one rpc call
self:process_one_request(target_id, reply_to, payload)

-- notification has no callback or id
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
goto continue
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
result = res,
})
-- rpc call with an empty Array
if isempty(payload) then
local res, err = self:_enqueue_rpc_response(
reply_to,
new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array"))
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
})
goto continue
end

-- batching rpc call

local collection = {}

for _, v in ipairs(payload) do
self:process_one_request(target_id, reply_to, v, collection)
end

if not isempty(collection) then
local res, err = self:_enqueue_rpc_response(reply_to, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end
end

::continue::
end
end
end
end -- for _, call
end -- if n.channel == rpc_resp_channel_name
end -- while true

local res, err = lconn:wait_for_notification()
if not res then
Expand All @@ -217,7 +299,7 @@ function _M:_event_loop(lconn)
else
notifications_queue:push(res)
end
end
end -- while not exiting()
end


Expand Down Expand Up @@ -270,7 +352,14 @@ end


-- enqueue a RPC response from CP worker with ID worker_id
function _M:_enqueue_rpc_response(worker_id, payload)
-- collection is only for rpc batch call.
-- if collection is nil, it means the rpc is a single call.
function _M:_enqueue_rpc_response(worker_id, payload, collection)
if collection then
tb_insert(collection, payload)
return
end

local sql = string_format("SELECT pg_notify(%s, %s);",
self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id),
self.db.connector:escape_literal(cjson_encode(payload)))
Expand Down

1 comment on commit 4448eef

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong-dev:4448eefdff6949ff234cf3c3b89935c68bad04f2
Artifacts available https://github.com/Kong/kong/actions/runs/12863977901

Please sign in to comment.