diff --git a/bin/busted b/bin/busted index e0635aabd6f..e59760322fa 100755 --- a/bin/busted +++ b/bin/busted @@ -19,6 +19,9 @@ local cert_path do busted_cert_content = busted_cert_content .. "\n" .. pl_file.read(system_cert_path) end + local cluster_cert_content = assert(pl_file.read("spec/fixtures/kong_clustering.crt")) + busted_cert_content = busted_cert_content .. "\n" .. cluster_cert_content + pl_file.write(busted_cert_file, busted_cert_content) cert_path = busted_cert_file end diff --git a/spec/02-integration/01-helpers/05-rpc-mock_spec.lua b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua new file mode 100644 index 00000000000..24ab3270967 --- /dev/null +++ b/spec/02-integration/01-helpers/05-rpc-mock_spec.lua @@ -0,0 +1,146 @@ +local helpers = require("spec.helpers") +local server = require("spec.helpers.rpc_mock.server") +local client = require("spec.helpers.rpc_mock.client") +local get_node_id = helpers.get_node_id + +local function trigger_change() + -- the initial sync is flaky. let's trigger a sync by creating a service + local admin_client = helpers.admin_client() + assert.res_status(201, admin_client:send { + method = "POST", + path = "/services/", + body = { + url = "http://example.com", + }, + headers = { + ["Content-Type"] = "application/json", + }, + }) +end + +describe("rpc mock/hook", function() + local recover + lazy_setup(function() + if not kong.worker_events then + helpers.patch_worker_events() + recover = true + end + end) + + lazy_teardown(function() + if recover then + kong.worker_events = nil + end + end) + + describe("server side", function() + local server_mock + + lazy_setup(function() + helpers.get_db_utils() + + server_mock = server.new() + assert(server_mock:start()) + + assert(helpers.start_kong({ + database = "off", + role = "data_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + log_level = "debug", + cluster_control_plane = "127.0.0.1:8005", + })) + end) + + lazy_teardown(function() + server_mock:stop(true) + helpers.stop_kong(nil, true) + end) + + it("recording", function() + trigger_change() + + local record = server_mock:wait_for_call() + assert.is_table(record.response.result.default.deltas) + end) + + it("mock", function() + local client_version + server_mock:mock("kong.sync.v2.get_delta", function(node_id, payload) + client_version = payload.default.version + return { default = { version = 100, deltas = {} } } + end) + server_mock:attach_debugger() + + local dp_id = get_node_id("servroot") + + server_mock:wait_for_node(dp_id) + + assert(server_mock:call(dp_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } })) + + -- the mock should have been called + helpers.wait_until(function() + return client_version + end, 20) + end) + end) + + describe("client side", function() + local client_mock + local called = false + + lazy_setup(function() + helpers.get_db_utils() + + client_mock = assert(client.new()) + assert(helpers.start_kong({ + role = "control_plane", + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", + cluster_rpc = "on", + cluster_rpc_sync = "on", + })) + + client_mock.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, payload) + called = true + end) + + client_mock:start() + client_mock:wait_until_connected() + end) + + lazy_teardown(function() + helpers.stop_kong(nil, true) + client_mock:stop() + end) + + it("client->CP", function() + local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + local res, err = client_mock:call("control_plane", "kong.sync.v2.unknown", { default = { },}) + assert.is_string(err) + assert.is_nil(res) + end) + + it("CP->client", function() + -- this registers the data plane node + local res, err = client_mock:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + assert.is_nil(err) + assert.is_table(res and res.default and res.default.deltas) + + trigger_change() + + helpers.wait_until(function() + return called + end, 20) + end) + end) +end) diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua new file mode 100644 index 00000000000..d279689dd97 --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/handler.lua @@ -0,0 +1,76 @@ +local kong_meta = require("kong.meta") + +local _M = { + PRIORITY = 1000, + VERSION = kong_meta.version, +} + +local original_callbacks = {} +local inc_id = 0 + +function _M.init_worker() + kong.rpc.callbacks:register("kong.rpc.debug.register", function(node_id, register_payload) + local proxy_apis = register_payload.proxy_apis + + for _, proxy_api in ipairs(proxy_apis) do + -- unregister and save the original callback + local original_cb + if not original_callbacks[proxy_api] then + original_callbacks[proxy_api] = kong.rpc.callbacks.callbacks[proxy_api] + end + original_cb = original_callbacks[proxy_api] + kong.rpc.callbacks.callbacks[proxy_api] = nil + + kong.log.info("hooking registering RPC proxy API: ", proxy_api) + kong.rpc.callbacks:register(proxy_api, function(client_id, payload) + local id = inc_id + inc_id = inc_id + 1 + kong.log.info("hooked proxy API ", proxy_api, " called by node: ", client_id) + kong.log.info("forwarding to node: ", node_id) + local res, err = kong.rpc:call(node_id, "kong.rpc.debug.mock", { call_id = id, method = proxy_api, node_id = client_id, payload = payload }) + if not res then + return nil, "Failed to proxy(" .. node_id .. "): " .. err + end + + if res.error then + return nil, res.error + end + + if res.prehook or res.posthook then + if res.prehook then + payload = res.args + end + + local origin_res, origin_err = original_cb(client_id, payload) + + if res.posthook then + res, err = kong.rpc:call(node_id, "kong.rpc.debug.posthook", { call_id = id, method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} }) + if not res then + return nil, "Failed to call post hook(" .. node_id .. "): " .. err + end + + return res.result, res.error + end + elseif res.mock then + return res.result, res.error + end + + return nil, "invalid response from proxy" + end) + end + + return true + end) + + kong.rpc.callbacks:register("kong.rpc.debug.call", function(node_id, payload) + local res, err = kong.rpc:call(payload.node_id, payload.method, payload.args) + return res, err + end) + + kong.rpc.callbacks:register("kong.rpc.debug.lua_code", function(node_id, payload) + local code = assert(loadstring(payload)) + return code() + end) +end + +return _M diff --git a/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua new file mode 100644 index 00000000000..dc57731018b --- /dev/null +++ b/spec/fixtures/custom_plugins/kong/plugins/rpc-debug/schema.lua @@ -0,0 +1,12 @@ +return { + name = "rpc-debug", + fields = { + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +} diff --git a/spec/helpers.lua b/spec/helpers.lua index 22b67c4434d..5dee43ae3d5 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -28,6 +28,8 @@ local server = reload_module("spec.internal.server") local client = reload_module("spec.internal.client") local wait = reload_module("spec.internal.wait") +-- redo the patches to apply the kong global patches +misc.repatch_timer() ---------------- -- Variables/constants @@ -226,4 +228,9 @@ local wait = reload_module("spec.internal.wait") make_temp_dir = misc.make_temp_dir, build_go_plugins = cmd.build_go_plugins, + + get_node_id = misc.get_node_id, + + repatch_timer = misc.repatch_timer, + patch_worker_events = misc.patch_worker_events, } diff --git a/spec/helpers/rpc_mock/client.lua b/spec/helpers/rpc_mock/client.lua new file mode 100644 index 00000000000..4091ef76989 --- /dev/null +++ b/spec/helpers/rpc_mock/client.lua @@ -0,0 +1,68 @@ +-- by importing helpers, we ensure the kong PDK module is initialized +local helpers = require "spec.helpers" +local rpc_mgr = require("kong.clustering.rpc.manager") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert +local uuid = require "kong.tools.uuid" + + +local _M = {} + + +local default_dp_conf = { + role = "data_plane", + cluster_control_plane = "localhost:8005", +} + +setmetatable(default_dp_conf, { __index = default_cert }) +local default_meta = { __index = default_dp_conf, } + + +local function do_nothing() end + + +local function client_stop(rpc_mgr) + -- a hacky way to stop rpc_mgr from reconnecting + rpc_mgr.try_connect = do_nothing + + -- this will stop all connections + for _, socket in pairs(rpc_mgr.clients) do + for conn in pairs(socket) do + pcall(conn.stop, conn) + end + end +end + + +local function client_is_connected(rpc_mgr) + for _, socket in pairs(rpc_mgr.clients) do + if next(socket) then + return true + end + end + return false +end + + +local function client_wait_until_connected(rpc_mgr, timeout) + return helpers.wait_until(function() + return rpc_mgr:is_connected() + end, timeout or 15) +end + + +-- TODO: let client not emits logs as it's expected to fail to connect for the first few seconds +function _M.new(opts) + opts = opts or {} + setmetatable(opts, default_meta) + local ret = rpc_mgr.new(default_dp_conf, opts.name or uuid.uuid()) + + ret.stop = client_stop + ret.is_connected = client_is_connected + ret.start = ret.try_connect + ret.wait_until_connected = client_wait_until_connected + + return ret +end + + +return _M diff --git a/spec/helpers/rpc_mock/default.lua b/spec/helpers/rpc_mock/default.lua new file mode 100644 index 00000000000..caadca8d8a6 --- /dev/null +++ b/spec/helpers/rpc_mock/default.lua @@ -0,0 +1,10 @@ +local default_cert = { + cluster_mtls = "shared", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + nginx_conf = "spec/fixtures/custom_nginx.template", +} + +return { + default_cert = default_cert, +} diff --git a/spec/helpers/rpc_mock/server.lua b/spec/helpers/rpc_mock/server.lua new file mode 100644 index 00000000000..3ed775809ff --- /dev/null +++ b/spec/helpers/rpc_mock/server.lua @@ -0,0 +1,263 @@ +local helpers = require("spec.helpers") +local client = require("spec.helpers.rpc_mock.client") +local default_cert = require("spec.helpers.rpc_mock.default").default_cert + + +local _M = {} +local _MT = { __index = _M, } + + +-- this function will start a control plane server with the given configuration +-- and attach a debugger to it +-- set attaching to true to attach to an existing control plane +function _M.new(opts) + opts = opts or {} + opts.prefix = opts.prefix or "servroot_rpc_tap" + opts.role = "control_plane" + opts.plugins = "bundled,rpc-debug" + opts.cluster_listen = opts.cluster_listen or "127.0.0.1:8005" + opts.mocks = opts.mocks or {} + opts.prehooks = opts.prehooks or {} + opts.posthooks = opts.posthooks or {} + opts.cluster_rpc = "on" + opts.cluster_rpc_sync = opts.cluster_rpc_sync or "on" + if opts.interception == nil then + opts.interception = true + end + + for k, v in pairs(default_cert) do + if opts[k] == nil then + opts[k] = v + end + end + + return setmetatable(opts, _MT) +end + +function _M.start(self) + if not self.attaching then + local _, db = helpers.get_db_utils(self.database, nil, { "rpc-debug" }) + + assert(db.plugins:insert({ + name = "rpc-debug", + config = {}, + })) + + assert(helpers.start_kong(self)) + end + + self.debugger_client = client.new({ + cluster_control_plane = self.cluster_listen, + }) + + -- install default interception handlers + if self.interception then + self:enable_inception() + end + + -- attached control plane will call this method when a hooked/mocked RPC is called. + -- this RPC handles both prehook and mock, and response to the control plane: + -- 1. if the RPC is mocked, return the mock result; + -- 2. if the RPC has a prehook, manipulate the args and returns them, and tell if a posthook is present and pending call + self.debugger_client.callbacks:register("kong.rpc.debug.mock", function(proxy_id, proxy_payload) + local method, node_id, payload, call_id = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_id + local cb = self.mocks[method] + if cb then + local res, err = cb(node_id, payload, proxy_id, self) + return { + mock = true, + result = res, + error = err, + } + end + + local prehook = self.prehooks[method] or self.prehooks["*"] + local posthook = self.posthooks[method] or self.posthooks["*"] + local result = { + prehook = prehook and true, + posthook = posthook and true, + } + + if prehook then + local res, err = prehook(node_id, payload, proxy_id, self, method, call_id) + if not res then + return nil, err + end + + result.args = res + end + + return result + end) + + self.debugger_client.callbacks:register("kong.rpc.debug.posthook", function(proxy_id, proxy_payload) + local method, node_id, payload, call_id = + proxy_payload.method, proxy_payload.node_id, proxy_payload.payload, proxy_payload.call_id + local cb = self.posthooks[method] or self.posthooks["*"] + if not cb then + return nil, "no callback registered for method: " .. method + end + + return cb(node_id, payload, proxy_id, self, method, call_id) + end) + + self.debugger_client:start() + self.debugger_client:wait_until_connected() + + if next(self.mocks) or next(self.prehooks) or next(self.posthooks) then + return self:attach_debugger() + end + + return true +end + +function _M:attach_debugger() + local hooked = {} + for api_name, cb in pairs(self.mocks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.prehooks) do + hooked[api_name] = true + end + for api_name, cb in pairs(self.posthooks) do + hooked[api_name] = true + end + + local hooked_list + if hooked["*"] then + hooked_list = { + "kong.sync.v2.get_delta", + } + else + hooked_list = {} + for api_name, _ in pairs(hooked) do + hooked_list[#hooked_list + 1] = api_name + end + end + + return self.debugger_client:call("control_plane", "kong.rpc.debug.register", { + proxy_apis = hooked_list, + }) +end + +function _M:call(node_id, method, payload) + return self.debugger_client:call("control_plane", "kong.rpc.debug.call", { + method = method, + args = payload, + node_id = node_id, + }) +end + +function _M:get_node_ids() + return self.debugger_client:call("control_plane", "kong.rpc.debug.lua_code", [[ + local node_ids = {} + for node_id, _ in pairs(kong.rpc.clients) do + if type(node_id) == "string" then + node_ids[node_id] = true + end + end + return node_ids + ]]) +end + +function _M:wait_for_node(node_id) + return helpers.wait_until(function() + local list, err = self:get_node_ids() + if not list then + return nil, err + end + + return list[node_id] + end) +end + +function _M:mock(api_name, cb) + self.mocks[api_name] = cb +end + + +function _M:prehook(api_name, cb) + self.prehooks[api_name] = cb +end + + +function _M:posthook(api_name, cb) + self.posthooks[api_name] = cb +end + + +local function get_records(server) + local records = server.records + if not records then + records = {} + server.records = records + end + return records +end + + +local function record_has_response(record) + return record.response and true +end + + +function _M:wait_for_call(cond) + cond = cond or record_has_response + + local result + + helpers.wait_until(function() + local records = get_records(self) + for _, record in pairs(records) do + if cond(record) then + result = record + return record + end + end + end) + + return result +end + + +local function default_inception_prehook(node_id, payload, proxy_id, server, method, call_id) + local records = get_records(server) + records[call_id] = { + request = payload, + node_id = node_id, + proxy_id = proxy_id, + method = method, + } + return payload +end + + +local function default_inception_posthook(node_id, payload, proxy_id, server, method, call_id) + local records = get_records(server) + local record = records[call_id] + if not record then + print("no record found for call_id: ", call_id) + return payload + end + record.response = payload + return payload +end + + +function _M:enable_inception() + self.prehooks["*"] = default_inception_prehook + self.posthooks["*"] = default_inception_posthook +end + + +function _M:stop(...) + if not self.attaching then + helpers.stop_kong(self.prefix, ...) + end + + self.debugger_client:stop() +end + + +return _M diff --git a/spec/internal/misc.lua b/spec/internal/misc.lua index f1339e0882c..a897ff5ca23 100644 --- a/spec/internal/misc.lua +++ b/spec/internal/misc.lua @@ -19,6 +19,8 @@ local shell = require("spec.internal.shell") local CONSTANTS = require("spec.internal.constants") local sys = require("spec.internal.sys") +local private_node = require "kong.pdk.private.node" + local pack = function(...) return { n = select("#", ...), ... } end local unpack = function(t) return unpack(t, 1, t.n) end @@ -287,6 +289,49 @@ local function use_old_plugin(name) end +local function repatch_timer() + local _timerng + + _timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, + }) + + _timerng:start() + + _G.timerng = _timerng + + _G.ngx.timer.at = function (delay, callback, ...) + return _timerng:at(delay, callback, ...) + end + + _G.ngx.timer.every = function (interval, callback, ...) + return _timerng:every(interval, callback, ...) + end + + if kong then + kong.timer = _timerng + end +end + + +local function patch_worker_events() + if not kong then + return + end + + if kong.worker_events then + return + end + + kong.worker_events = require "resty.events.compat" + kong.worker_events.configure({ + listening = "unix:", + testing = true, + }) +end + + return { pack = pack, unpack = unpack, @@ -306,4 +351,9 @@ return { with_current_ws = with_current_ws, make_temp_dir = make_temp_dir, use_old_plugin = use_old_plugin, + + get_node_id = private_node.load_node_id, + + repatch_timer = repatch_timer, + patch_worker_events = patch_worker_events, }