From 4d4b90c298f0c4dae5c271f3bd0dc2b117f7614b Mon Sep 17 00:00:00 2001 From: Javier Date: Thu, 31 Mar 2022 10:41:53 -0500 Subject: [PATCH] Feat/wrpc (#8357) This commit add the wRPC protocol and uses it for the DP/CP connection in hybrid mode. The wRPC protocol is a generic, bidirectional RPC protocol that allows multiple services on the same persistent (WebSocket) connection. --- kong-2.8.0-0.rockspec | 5 + kong/clustering/init.lua | 29 +- kong/clustering/wrpc_control_plane.lua | 677 ++++++++++++++++ kong/clustering/wrpc_data_plane.lua | 380 +++++++++ kong/conf_loader/init.lua | 1 + kong/db/declarative/init.lua | 47 +- kong/db/schema/others/declarative_config.lua | 6 +- kong/include/kong/model/ca_certificate.proto | 13 + kong/include/kong/model/certificate.proto | 15 + kong/include/kong/model/config.proto | 35 + kong/include/kong/model/consumer.proto | 13 + kong/include/kong/model/parameter.proto | 11 + kong/include/kong/model/plugin.proto | 23 + kong/include/kong/model/plugin_entities.proto | 72 ++ kong/include/kong/model/route.proto | 40 + kong/include/kong/model/service.proto | 28 + kong/include/kong/model/sni.proto | 15 + kong/include/kong/model/target.proto | 16 + kong/include/kong/model/upstream.proto | 74 ++ kong/include/kong/model/workspace.proto | 12 + .../kong/services/config/v1/config.proto | 136 ++++ kong/include/wrpc/wrpc.proto | 154 ++++ kong/init.lua | 8 + kong/templates/kong_defaults.lua | 1 + kong/templates/nginx_kong.lua | 9 + kong/tools/channel.lua | 164 ++++ kong/tools/grpc.lua | 2 +- kong/tools/protobuf.lua | 100 +++ kong/tools/wrpc.lua | 685 ++++++++++++++++ .../09-hybrid_mode/01-sync_spec.lua | 748 ++++++++++------- .../09-hybrid_mode/02-start_stop_spec.lua | 186 +++-- .../09-hybrid_mode/03-pki_spec.lua | 262 +++--- .../04-cp_cluster_sync_spec.lua | 8 +- .../09-hybrid_mode/05-ocsp_spec.lua | 756 +++++++++--------- spec/fixtures/custom_nginx.template | 6 + 35 files changed, 3846 insertions(+), 891 deletions(-) create mode 100644 kong/clustering/wrpc_control_plane.lua create mode 100644 kong/clustering/wrpc_data_plane.lua create mode 100644 kong/include/kong/model/ca_certificate.proto create mode 100644 kong/include/kong/model/certificate.proto create mode 100644 kong/include/kong/model/config.proto create mode 100644 kong/include/kong/model/consumer.proto create mode 100644 kong/include/kong/model/parameter.proto create mode 100644 kong/include/kong/model/plugin.proto create mode 100644 kong/include/kong/model/plugin_entities.proto create mode 100644 kong/include/kong/model/route.proto create mode 100644 kong/include/kong/model/service.proto create mode 100644 kong/include/kong/model/sni.proto create mode 100644 kong/include/kong/model/target.proto create mode 100644 kong/include/kong/model/upstream.proto create mode 100644 kong/include/kong/model/workspace.proto create mode 100644 kong/include/kong/services/config/v1/config.proto create mode 100644 kong/include/wrpc/wrpc.proto create mode 100644 kong/tools/channel.lua create mode 100644 kong/tools/protobuf.lua create mode 100644 kong/tools/wrpc.lua diff --git a/kong-2.8.0-0.rockspec b/kong-2.8.0-0.rockspec index 63e9bb63c660..ae469bc91163 100644 --- a/kong-2.8.0-0.rockspec +++ b/kong-2.8.0-0.rockspec @@ -67,6 +67,8 @@ build = { ["kong.clustering"] = "kong/clustering/init.lua", ["kong.clustering.data_plane"] = "kong/clustering/data_plane.lua", ["kong.clustering.control_plane"] = "kong/clustering/control_plane.lua", + ["kong.clustering.wrpc_data_plane"] = "kong/clustering/wrpc_data_plane.lua", + ["kong.clustering.wrpc_control_plane"] = "kong/clustering/wrpc_control_plane.lua", ["kong.clustering.compat.removed_fields"] = "kong/clustering/compat/removed_fields.lua", ["kong.cluster_events"] = "kong/cluster_events/init.lua", @@ -136,6 +138,9 @@ build = { ["kong.tools.sandbox"] = "kong/tools/sandbox.lua", ["kong.tools.uri"] = "kong/tools/uri.lua", ["kong.tools.kong-lua-sandbox"] = "kong/tools/kong-lua-sandbox.lua", + ["kong.tools.protobuf"] = "kong/tools/protobuf.lua", + ["kong.tools.wrpc"] = "kong/tools/wrpc.lua", + ["kong.tools.channel"] = "kong/tools/channel.lua", ["kong.runloop.handler"] = "kong/runloop/handler.lua", ["kong.runloop.certificate"] = "kong/runloop/certificate.lua", diff --git a/kong/clustering/init.lua b/kong/clustering/init.lua index d859a6dbdda9..0e53c5cc5589 100644 --- a/kong/clustering/init.lua +++ b/kong/clustering/init.lua @@ -130,7 +130,20 @@ function _M.new(conf) local key = assert(pl_file.read(conf.cluster_cert_key)) self.cert_key = assert(ssl.parse_pem_priv_key(key)) - self.child = require("kong.clustering." .. conf.role).new(self) + print("role: ", conf.role, " protocol: ", conf.cluster_protocol) + + if conf.role == "control_plane" then + self.json_handler = require("kong.clustering.control_plane").new(self) + self.wrpc_handler = require("kong.clustering.wrpc_control_plane").new(self) + + else + local clustering_submodule = conf.role + if conf.cluster_protocol == "wrpc" then + clustering_submodule = "wrpc_" .. clustering_submodule + end + + self.child = require("kong.clustering." .. clustering_submodule).new(self) + end return self end @@ -184,7 +197,11 @@ end function _M:handle_cp_websocket() - return self.child:handle_cp_websocket() + return self.json_handler:handle_cp_websocket() +end + +function _M:handle_wrpc_websocket() + return self.wrpc_handler:handle_cp_websocket() end @@ -198,7 +215,13 @@ function _M:init_worker() return { name = p.name, version = p.handler.VERSION, } end, self.plugins_list) - self.child:init_worker() + for _, ch in ipairs{"child", "json_handler", "wrpc_handler"} do + local child = self[ch] + if child then + child:init_worker() + end + end + --self.child:init_worker() end diff --git a/kong/clustering/wrpc_control_plane.lua b/kong/clustering/wrpc_control_plane.lua new file mode 100644 index 000000000000..0ccbfcca8e61 --- /dev/null +++ b/kong/clustering/wrpc_control_plane.lua @@ -0,0 +1,677 @@ +local _M = {} + + +local semaphore = require("ngx.semaphore") +local ws_server = require("resty.websocket.server") +local ssl = require("ngx.ssl") +local ocsp = require("ngx.ocsp") +local http = require("resty.http") +local cjson = require("cjson.safe") +local declarative = require("kong.db.declarative") +local constants = require("kong.constants") +local openssl_x509 = require("resty.openssl.x509") +local wrpc = require("kong.tools.wrpc") +local string = string +local setmetatable = setmetatable +local type = type +local pcall = pcall +local pairs = pairs +local ipairs = ipairs +local tonumber = tonumber +local tostring = tostring +local ngx = ngx +local ngx_log = ngx.log +local cjson_encode = cjson.encode +local kong = kong +local ngx_exit = ngx.exit +local exiting = ngx.worker.exiting +local ngx_time = ngx.time +local ngx_var = ngx.var +local table_insert = table.insert +local table_concat = table.concat + +local kong_dict = ngx.shared.kong +local KONG_VERSION = kong.version +local ngx_DEBUG = ngx.DEBUG +local ngx_INFO = ngx.INFO +local ngx_NOTICE = ngx.NOTICE +local ngx_WARN = ngx.WARN +local ngx_ERR = ngx.ERR +local ngx_CLOSE = ngx.HTTP_CLOSE +local MAX_PAYLOAD = constants.CLUSTERING_MAX_PAYLOAD +local WS_OPTS = { + timeout = constants.CLUSTERING_TIMEOUT, + max_payload_len = MAX_PAYLOAD, +} +local OCSP_TIMEOUT = constants.CLUSTERING_OCSP_TIMEOUT +local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS +local MAJOR_MINOR_PATTERN = "^(%d+)%.(%d+)%.%d+" +local _log_prefix = "[wrpc-clustering] " + +local wrpc_config_service + +local function get_config_service(self) + if not wrpc_config_service then + wrpc_config_service = wrpc.new_service() + wrpc_config_service:add("kong.services.config.v1.config") + + wrpc_config_service:set_handler("ConfigService.PingCP", function(peer, data) + local client = self.clients[peer.conn] + if client and client.update_sync_status then + client.last_seen = ngx_time() + client.config_hash = data.hash + client:update_sync_status() + ngx_log(ngx_INFO, _log_prefix, "received ping frame from data plane") + end + end) + + wrpc_config_service:set_handler("ConfigService.ReportMetadata", function(peer, data) + local client = self.clients[peer.conn] + if client then + ngx_log(ngx_INFO, _log_prefix, "received initial metadata package from client: ", client.dp_id) + client.basic_info = data + client.basic_info_semaphore:post() + end + return { + ok = "done", + } + end) + end + + return wrpc_config_service +end + + +local function extract_major_minor(version) + if type(version) ~= "string" then + return nil, nil + end + + local major, minor = version:match(MAJOR_MINOR_PATTERN) + if not major then + return nil, nil + end + + major = tonumber(major, 10) + minor = tonumber(minor, 10) + + return major, minor +end + + +local function plugins_list_to_map(plugins_list) + local versions = {} + for _, plugin in ipairs(plugins_list) do + local name = plugin.name + local version = plugin.version + local major, minor = extract_major_minor(plugin.version) + + if major and minor then + versions[name] = { + major = major, + minor = minor, + version = version, + } + + else + versions[name] = {} + end + end + return versions +end + + +function _M.new(parent) + local self = { + clients = setmetatable({}, { __mode = "k", }), + plugins_map = {}, + } + + return setmetatable(self, { + __index = function(tab, key) + return _M[key] or parent[key] + end, + }) +end + + +local config_version = 0 + +function _M:export_deflated_reconfigure_payload() + local config_table, err = declarative.export_config_proto() + if not config_table then + return nil, err + end + + -- update plugins map + self.plugins_configured = {} + if config_table.plugins then + for _, plugin in pairs(config_table.plugins) do + self.plugins_configured[plugin.name] = true + end + end + + local config_hash = self:calculate_config_hash(config_table) + config_version = config_version + 1 + + -- store serialized plugins map for troubleshooting purposes + local shm_key_name = "clustering:cp_plugins_configured:worker_" .. ngx.worker.id() + kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured)); + + local service = get_config_service(self) + self.config_call_rpc, self.config_call_args = assert(service:encode_args("ConfigService.SyncConfig", { + config = config_table, + version = config_version, + hash = config_hash, + })) + + return config_table, nil +end + +function _M:push_config_one_client(client) + if not self.config_call_rpc or not self.config_call_args then + local payload, err = self:export_deflated_reconfigure_payload() + if not payload then + ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err) + return + end + end + + client.peer:send_encoded_call(self.config_call_rpc, self.config_call_args) + ngx_log(ngx_DEBUG, _log_prefix, "config version #", config_version, " pushed. ", client.log_suffix) +end + +function _M:push_config() + local payload, err = self:export_deflated_reconfigure_payload() + if not payload then + ngx_log(ngx_ERR, _log_prefix, "unable to export config from database: ", err) + return + end + + local n = 0 + for _, client in pairs(self.clients) do + client.peer:send_encoded_call(self.config_call_rpc, self.config_call_args) + + n = n + 1 + end + + ngx_log(ngx_DEBUG, _log_prefix, "config version #", config_version, " pushed to ", n, " clients") +end + + +function _M:validate_shared_cert() + local cert = ngx_var.ssl_client_raw_cert + + if not cert then + return nil, "data plane failed to present client certificate during handshake" + end + + local err + cert, err = openssl_x509.new(cert, "PEM") + if not cert then + return nil, "unable to load data plane client certificate during handshake: " .. err + end + + local digest + digest, err = cert:digest("sha256") + if not digest then + return nil, "unable to retrieve data plane client certificate digest during handshake: " .. err + end + + if digest ~= self.cert_digest then + return nil, "data plane presented incorrect client certificate during handshake (expected: " .. + self.cert_digest .. ", got: " .. digest .. ")" + end + + return true +end + + +local check_for_revocation_status +do + local get_full_client_certificate_chain = require("resty.kong.tls").get_full_client_certificate_chain + check_for_revocation_status = function() + local cert, err = get_full_client_certificate_chain() + if not cert then + return nil, err + end + + local der_cert + der_cert, err = ssl.cert_pem_to_der(cert) + if not der_cert then + return nil, "failed to convert certificate chain from PEM to DER: " .. err + end + + local ocsp_url + ocsp_url, err = ocsp.get_ocsp_responder_from_der_chain(der_cert) + if not ocsp_url then + return nil, err or "OCSP responder endpoint can not be determined, " .. + "maybe the client certificate is missing the " .. + "required extensions" + end + + local ocsp_req + ocsp_req, err = ocsp.create_ocsp_request(der_cert) + if not ocsp_req then + return nil, "failed to create OCSP request: " .. err + end + + local c = http.new() + local res + res, err = c:request_uri(ocsp_url, { + headers = { + ["Content-Type"] = "application/ocsp-request" + }, + timeout = OCSP_TIMEOUT, + method = "POST", + body = ocsp_req, + }) + + if not res then + return nil, "failed sending request to OCSP responder: " .. tostring(err) + end + if res.status ~= 200 then + return nil, "OCSP responder returns bad HTTP status code: " .. res.status + end + + local ocsp_resp = res.body + if not ocsp_resp or #ocsp_resp == 0 then + return nil, "unexpected response from OCSP responder: empty body" + end + + res, err = ocsp.validate_ocsp_response(ocsp_resp, der_cert) + if not res then + return false, "failed to validate OCSP response: " .. err + end + + return true + end +end + + +function _M:check_version_compatibility(dp_version, dp_plugin_map, log_suffix) + local major_cp, minor_cp = extract_major_minor(KONG_VERSION) + local major_dp, minor_dp = extract_major_minor(dp_version) + + if not major_cp then + return nil, "data plane version " .. dp_version .. " is incompatible with control plane version", + CLUSTERING_SYNC_STATUS.KONG_VERSION_INCOMPATIBLE + end + + if not major_dp then + return nil, "data plane version is incompatible with control plane version " .. + KONG_VERSION .. " (" .. major_cp .. ".x.y are accepted)", + CLUSTERING_SYNC_STATUS.KONG_VERSION_INCOMPATIBLE + end + + if major_cp ~= major_dp then + return nil, "data plane version " .. dp_version .. + " is incompatible with control plane version " .. + KONG_VERSION .. " (" .. major_cp .. ".x.y are accepted)", + CLUSTERING_SYNC_STATUS.KONG_VERSION_INCOMPATIBLE + end + + if minor_cp < minor_dp then + return nil, "data plane version " .. dp_version .. + " is incompatible with older control plane version " .. KONG_VERSION, + CLUSTERING_SYNC_STATUS.KONG_VERSION_INCOMPATIBLE + end + + if minor_cp ~= minor_dp then + local msg = "data plane minor version " .. dp_version .. + " is different to control plane minor version " .. + KONG_VERSION + + ngx_log(ngx_INFO, _log_prefix, msg, log_suffix) + end + + for _, plugin in ipairs(self.plugins_list) do + local name = plugin.name + local cp_plugin = self.plugins_map[name] + local dp_plugin = dp_plugin_map[name] + + if not dp_plugin then + if cp_plugin.version then + ngx_log(ngx_WARN, _log_prefix, name, " plugin ", cp_plugin.version, " is missing from data plane", log_suffix) + else + ngx_log(ngx_WARN, _log_prefix, name, " plugin is missing from data plane", log_suffix) + end + + else + if cp_plugin.version and dp_plugin.version then + local msg = "data plane " .. name .. " plugin version " .. dp_plugin.version .. + " is different to control plane plugin version " .. cp_plugin.version + + if cp_plugin.major ~= dp_plugin.major then + ngx_log(ngx_WARN, _log_prefix, msg, log_suffix) + + elseif cp_plugin.minor ~= dp_plugin.minor then + ngx_log(ngx_INFO, _log_prefix, msg, log_suffix) + end + + elseif dp_plugin.version then + ngx_log(ngx_NOTICE, _log_prefix, "data plane ", name, " plugin version ", dp_plugin.version, + " has unspecified version on control plane", log_suffix) + + elseif cp_plugin.version then + ngx_log(ngx_NOTICE, _log_prefix, "data plane ", name, " plugin version is unspecified, ", + "and is different to control plane plugin version ", + cp_plugin.version, log_suffix) + end + end + end + + return true, nil, CLUSTERING_SYNC_STATUS.NORMAL +end + + +function _M:check_configuration_compatibility(dp_plugin_map) + for _, plugin in ipairs(self.plugins_list) do + if self.plugins_configured[plugin.name] then + local name = plugin.name + local cp_plugin = self.plugins_map[name] + local dp_plugin = dp_plugin_map[name] + + if not dp_plugin then + if cp_plugin.version then + return nil, "configured " .. name .. " plugin " .. cp_plugin.version .. + " is missing from data plane", CLUSTERING_SYNC_STATUS.PLUGIN_SET_INCOMPATIBLE + end + + return nil, "configured " .. name .. " plugin is missing from data plane", + CLUSTERING_SYNC_STATUS.PLUGIN_SET_INCOMPATIBLE + end + + if cp_plugin.version and dp_plugin.version then + -- CP plugin needs to match DP plugins with major version + -- CP must have plugin with equal or newer version than that on DP + if cp_plugin.major ~= dp_plugin.major or + cp_plugin.minor < dp_plugin.minor then + local msg = "configured data plane " .. name .. " plugin version " .. dp_plugin.version .. + " is different to control plane plugin version " .. cp_plugin.version + return nil, msg, CLUSTERING_SYNC_STATUS.PLUGIN_VERSION_INCOMPATIBLE + end + end + end + end + + -- TODO: DAOs are not checked in any way at the moment. For example if plugin introduces a new DAO in + -- minor release and it has entities, that will most likely fail on data plane side, but is not + -- checked here. + + return true, nil, CLUSTERING_SYNC_STATUS.NORMAL +end + +function _M:handle_cp_websocket() + local dp_id = ngx_var.arg_node_id + local dp_hostname = ngx_var.arg_node_hostname + local dp_ip = ngx_var.remote_addr + local dp_version = ngx_var.arg_node_version + + local log_suffix = {} + if type(dp_id) == "string" then + table_insert(log_suffix, "id: " .. dp_id) + end + + if type(dp_hostname) == "string" then + table_insert(log_suffix, "host: " .. dp_hostname) + end + + if type(dp_ip) == "string" then + table_insert(log_suffix, "ip: " .. dp_ip) + end + + if type(dp_version) == "string" then + table_insert(log_suffix, "version: " .. dp_version) + end + + if #log_suffix > 0 then + log_suffix = " [" .. table_concat(log_suffix, ", ") .. "]" + else + log_suffix = "" + end + + do + local _, err + + -- use mutual TLS authentication + if self.conf.cluster_mtls == "shared" then + _, err = self:validate_shared_cert() + + elseif self.conf.cluster_ocsp ~= "off" then + local ok + ok, err = check_for_revocation_status() + if ok == false then + err = "data plane client certificate was revoked: " .. err + + elseif not ok then + if self.conf.cluster_ocsp == "on" then + err = "data plane client certificate revocation check failed: " .. err + + else + ngx_log(ngx_WARN, _log_prefix, "data plane client certificate revocation check failed: ", err, log_suffix) + err = nil + end + end + end + + if err then + ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + return ngx_exit(ngx_CLOSE) + end + end + + if not dp_id then + ngx_log(ngx_WARN, _log_prefix, "data plane didn't pass the id", log_suffix) + ngx_exit(400) + end + + if not dp_version then + ngx_log(ngx_WARN, _log_prefix, "data plane didn't pass the version", log_suffix) + ngx_exit(400) + end + + local wb + do + local err + wb, err = ws_server:new(WS_OPTS) + if not wb then + ngx_log(ngx_ERR, _log_prefix, "failed to perform server side websocket handshake: ", err, log_suffix) + return ngx_exit(ngx_CLOSE) + end + end + + -- connection established + local w_peer = wrpc.new_peer(wb, get_config_service(self)) + local client = { + last_seen = ngx_time(), + peer = w_peer, + dp_id = dp_id, + dp_version = dp_version, + log_suffix = log_suffix, + basic_info = nil, + basic_info_semaphore = semaphore.new() + } + self.clients[w_peer.conn] = client + w_peer:spawn_threads() + + do + local ok, err = client.basic_info_semaphore:wait(5) + if not ok then + err = "waiting for basic info call: " .. (err or "--") + end + if not client.basic_info then + err = "invalid basic_info data" + end + + if err then + ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + wb:send_close() + return ngx_exit(ngx_CLOSE) + end + end + + client.dp_plugins_map = plugins_list_to_map(client.basic_info.plugins) + client.config_hash = string.rep("0", 32) -- initial hash + client.sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN + local purge_delay = self.conf.cluster_data_plane_purge_delay + function client:update_sync_status() + local ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id, }, { + last_seen = self.last_seen, + config_hash = self.config_hash ~= "" and self.config_hash or nil, + hostname = dp_hostname, + ip = dp_ip, + version = dp_version, + sync_status = self.sync_status, -- TODO: import may have been failed though + }, { ttl = purge_delay }) + if not ok then + ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix) + end + end + + do + local _, err + _, err, client.sync_status = self:check_version_compatibility(dp_version, client.dp_plugins_map, log_suffix) + if err then + ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + wb:send_close() + client:update_sync_status() + return ngx_exit(ngx_CLOSE) + end + end + + self:push_config_one_client(client) -- first config push + + ngx_log(ngx_NOTICE, _log_prefix, "data plane connected", log_suffix) + w_peer:wait_threads() + w_peer:close() + self.clients[wb] = nil + + return ngx_exit(ngx_CLOSE) +end + + +local function push_config_loop(premature, self, push_config_semaphore, delay) + if premature then + return + end + + do + local _, err = self:export_deflated_reconfigure_payload() + if err then + ngx_log(ngx_ERR, _log_prefix, "unable to export initial config from database: ", err) + end + end + + while not exiting() do + local ok, err = push_config_semaphore:wait(1) + if exiting() then + return + end + if ok then + ok, err = pcall(self.push_config, self) + if ok then + local sleep_left = delay + while sleep_left > 0 do + if sleep_left <= 1 then + ngx.sleep(sleep_left) + break + end + + ngx.sleep(1) + + if exiting() then + return + end + + sleep_left = sleep_left - 1 + end + + else + ngx_log(ngx_ERR, _log_prefix, "export and pushing config failed: ", err) + end + + elseif err ~= "timeout" then + ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err) + end + end +end + + +function _M:init_worker() + -- ROLE = "control_plane" + + self.plugins_map = plugins_list_to_map(self.plugins_list) + + self.deflated_reconfigure_payload = nil + self.reconfigure_payload = nil + self.plugins_configured = {} + self.plugin_versions = {} + + for i = 1, #self.plugins_list do + local plugin = self.plugins_list[i] + self.plugin_versions[plugin.name] = plugin.version + end + + local push_config_semaphore = semaphore.new() + + -- Sends "clustering", "push_config" to all workers in the same node, including self + local function post_push_config_event() + local res, err = kong.worker_events.post("clustering", "push_config") + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) + end + end + + -- Handles "clustering:push_config" cluster event + local function handle_clustering_push_config_event(data) + ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) + post_push_config_event() + end + + + -- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event + local function handle_dao_crud_event(data) + if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then + return + end + + kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) + + -- we have to re-broadcast event using `post` because the dao + -- events were sent using `post_local` which means not all workers + -- can receive it + post_push_config_event() + end + + -- The "clustering:push_config" cluster event gets inserted in the cluster when there's + -- a crud change (like an insertion or deletion). Only one worker per kong node receives + -- this callback. This makes such node post push_config events to all the cp workers on + -- its node + kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) + + -- The "dao:crud" event is triggered using post_local, which eventually generates an + -- ""clustering:push_config" cluster event. It is assumed that the workers in the + -- same node where the dao:crud event originated will "know" about the update mostly via + -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same + -- kong node where the event originated will need to be notified so they push config to + -- their data planes + kong.worker_events.register(handle_dao_crud_event, "dao:crud") + + -- When "clustering", "push_config" worker event is received by a worker, + -- it loads and pushes the config to its the connected data planes + kong.worker_events.register(function(_) + if push_config_semaphore:count() <= 0 then + -- the following line always executes immediately after the `if` check + -- because `:count` will never yield, end result is that the semaphore + -- count is guaranteed to not exceed 1 + push_config_semaphore:post() + end + end, "clustering", "push_config") + + ngx.timer.at(0, push_config_loop, self, push_config_semaphore, + self.conf.db_update_frequency) +end + + +return _M diff --git a/kong/clustering/wrpc_data_plane.lua b/kong/clustering/wrpc_data_plane.lua new file mode 100644 index 000000000000..4399be185643 --- /dev/null +++ b/kong/clustering/wrpc_data_plane.lua @@ -0,0 +1,380 @@ + +local semaphore = require("ngx.semaphore") +local ws_client = require("resty.websocket.client") +local cjson = require("cjson.safe") +local declarative = require("kong.db.declarative") +local protobuf = require("kong.tools.protobuf") +local wrpc = require("kong.tools.wrpc") +local constants = require("kong.constants") +local utils = require("kong.tools.utils") +local system_constants = require("lua_system_constants") +local bit = require("bit") +local ffi = require("ffi") +local assert = assert +local setmetatable = setmetatable +local type = type +local math = math +local xpcall = xpcall +local ngx = ngx +local ngx_log = ngx.log +local ngx_sleep = ngx.sleep +local cjson_decode = cjson.decode +local cjson_encode = cjson.encode +local kong = kong +local exiting = ngx.worker.exiting +local io_open = io.open +local inflate_gzip = utils.inflate_gzip +local deflate_gzip = utils.deflate_gzip + + +local KONG_VERSION = kong.version +local CONFIG_CACHE = ngx.config.prefix() .. "/config.cache.json.gz" +local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG +local ngx_INFO = ngx.INFO +local PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local _log_prefix = "[wrpc-clustering] " +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH + +local _M = { + DPCP_CHANNEL_NAME = "DP-CP_config", +} + +function _M.new(parent) + local self = { + declarative_config = declarative.new_config(parent.conf), + } + + return setmetatable(self, { + __index = function(_, key) + return _M[key] or parent[key] + end, + }) +end + + +function _M:encode_config(config) + return deflate_gzip(config) +end + + +function _M:decode_config(config) + return inflate_gzip(config) +end + + +function _M:update_config(config_table, config_hash, update_cache) + assert(type(config_table) == "table") + + if not config_hash then + config_hash = self:calculate_config_hash(config_table) + end + + local entities, err, _, meta, new_hash = + self.declarative_config:parse_table(config_table, config_hash) + if not entities then + return nil, "bad config received from control plane " .. err + end + + if declarative.get_current_hash() == new_hash then + ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ", + "no need to reload") + return true + end + + -- NOTE: no worker mutex needed as this code can only be + -- executed by worker 0 + local res + res, err = declarative.load_into_cache_with_events(entities, meta, new_hash) + if not res then + return nil, err + end + + if update_cache then + -- local persistence only after load finishes without error + local f + f, err = io_open(CONFIG_CACHE, "w") + if not f then + ngx_log(ngx_ERR, _log_prefix, "unable to open config cache file: ", err) + + else + local config = assert(cjson_encode(config_table)) + config = assert(self:encode_config(config)) + res, err = f:write(config) + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to write config cache file: ", err) + end + + f:close() + end + end + + return true +end + + +function _M:init_worker() + -- ROLE = "data_plane" + + if ngx.worker.id() == 0 then + local f = io_open(CONFIG_CACHE, "r") + if f then + local config, err = f:read("*a") + if not config then + ngx_log(ngx_ERR, _log_prefix, "unable to read cached config file: ", err) + end + + f:close() + + if config and #config > 0 then + ngx_log(ngx_INFO, _log_prefix, "found cached config, loading...") + config, err = self:decode_config(config) + if config then + config, err = cjson_decode(config) + if config then + local res + res, err = self:update_config(config) + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to update running config from cache: ", err) + end + + else + ngx_log(ngx_ERR, _log_prefix, "unable to json decode cached config: ", err, ", ignoring") + end + + else + ngx_log(ngx_ERR, _log_prefix, "unable to decode cached config: ", err, ", ignoring") + end + end + + else + -- CONFIG_CACHE does not exist, pre create one with 0600 permission + local flags = bit.bor(system_constants.O_RDONLY(), + system_constants.O_CREAT()) + + local mode = ffi.new("int", bit.bor(system_constants.S_IRUSR(), + system_constants.S_IWUSR())) + + local fd = ffi.C.open(CONFIG_CACHE, flags, mode) + if fd == -1 then + ngx_log(ngx_ERR, _log_prefix, "unable to pre-create cached config file: ", + ffi.string(ffi.C.strerror(ffi.errno()))) + + else + ffi.C.close(fd) + end + end + + assert(ngx.timer.at(0, function(premature) + self:communicate(premature) + end)) + end +end + + +local wrpc_config_service +local function get_config_service() + if not wrpc_config_service then + wrpc_config_service = wrpc.new_service() + wrpc_config_service:add("kong.services.config.v1.config") + wrpc_config_service:set_handler("ConfigService.SyncConfig", function(peer, data) + if peer.config_semaphore then + if data.config.plugins then + for _, plugin in ipairs(data.config.plugins) do + plugin.config = protobuf.pbunwrap_struct(plugin.config) + end + end + data.config._format_version = data.config.format_version + data.config.format_version = nil + + peer.config_obj.next_config = data.config + peer.config_obj.next_hash = data.hash + peer.config_obj.next_config_version = tonumber(data.version) + if peer.config_semaphore:count() <= 0 then + -- the following line always executes immediately after the `if` check + -- because `:count` will never yield, end result is that the semaphore + -- count is guaranteed to not exceed 1 + peer.config_semaphore:post() + end + end + return { accepted = true } + end) + end + + return wrpc_config_service +end + +function _M:communicate(premature) + + if premature then + -- worker wants to exit + return + end + + local conf = self.conf + + -- TODO: pick one random CP + local address = conf.cluster_control_plane + local log_suffix = " [" .. address .. "]" + + local c = assert(ws_client:new({ + timeout = constants.CLUSTERING_TIMEOUT, + max_payload_len = conf.cluster_max_payload, + })) + local uri = "wss://" .. address .. "/v1/wrpc?node_id=" .. + kong.node.get_id() .. + "&node_hostname=" .. kong.node.get_hostname() .. + "&node_version=" .. KONG_VERSION + + local opts = { + ssl_verify = true, + client_cert = self.cert, + client_priv_key = self.cert_key, + protocols = "wrpc.konghq.com", + } + if conf.cluster_mtls == "shared" then + opts.server_name = "kong_clustering" + else + -- server_name will be set to the host if it is not explicitly defined here + if conf.cluster_server_name ~= "" then + opts.server_name = conf.cluster_server_name + end + end + + local reconnection_delay = math.random(5, 10) + do + local res, err = c:connect(uri, opts) + if not res then + ngx_log(ngx_ERR, _log_prefix, "connection to control plane ", uri, " broken: ", err, + " (retrying after ", reconnection_delay, " seconds)", log_suffix) + + assert(ngx.timer.at(reconnection_delay, function(premature) + self:communicate(premature) + end)) + return + end + end + + local config_semaphore = semaphore.new(0) + local peer = wrpc.new_peer(c, get_config_service(), { channel = self.DPCP_CHANNEL_NAME }) + + peer.config_semaphore = config_semaphore + peer.config_obj = self + peer:spawn_threads() + + do + local resp, err = peer:call_wait("ConfigService.ReportMetadata", { plugins = self.plugins_list }) + if type(resp) == "table" then + err = err or resp.error + resp = resp[1] or resp.ok + end + if type(resp) == "table" then + resp = resp.ok or resp + end + + if not resp then + ngx_log(ngx_ERR, _log_prefix, "Couldn't report basic info to CP: ", err) + assert(ngx.timer.at(reconnection_delay, function(premature) + self:communicate(premature) + end)) + end + end + + -- Here we spawn two threads: + -- + -- * config_thread: it grabs a received declarative config and apply it + -- locally. In addition, this thread also persists the + -- config onto the local file system + -- * ping_thread: performs a ConfigService.PingCP call periodically. + + local config_exit + local last_config_version = -1 + + local config_thread = ngx.thread.spawn(function() + while not exiting() and not config_exit do + local ok, err = config_semaphore:wait(1) + if ok then + if peer.semaphore == config_semaphore then + peer.semaphore = nil + config_semaphore = nil + end + local config_table = self.next_config + local config_hash = self.next_hash + if config_table and self.next_config_version > last_config_version then + ngx_log(ngx_INFO, _log_prefix, "received config #", self.next_config_version, log_suffix) + + local pok, res + pok, res, err = xpcall(self.update_config, debug.traceback, self, config_table, config_hash, true) + if pok then + last_config_version = self.next_config_version + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err) + end + + else + ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res) + end + + if self.next_config == config_table then + self.next_config = nil + self.next_hash = nil + end + end + + elseif err ~= "timeout" then + ngx_log(ngx_ERR, _log_prefix, "semaphore wait error: ", err) + end + end + end) + + local ping_thread = ngx.thread.spawn(function() + while not exiting() do + local hash = declarative.get_current_hash() + + if hash == true then + hash = DECLARATIVE_EMPTY_CONFIG_HASH + end + assert(peer:call("ConfigService.PingCP", { hash = hash })) + ngx_log(ngx_INFO, _log_prefix, "sent ping", log_suffix) + + for _ = 1, PING_INTERVAL do + ngx_sleep(1) + if exiting() or peer.closing then + return + end + end + end + end) + + local ok, err, perr = ngx.thread.wait(ping_thread, config_thread) + + ngx.thread.kill(ping_thread) + c:close() + + if not ok then + ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + + elseif perr then + ngx_log(ngx_ERR, _log_prefix, perr, log_suffix) + end + + -- the config thread might be holding a lock if it's in the middle of an + -- update, so we need to give it a chance to terminate gracefully + config_exit = true + ok, err, perr = ngx.thread.wait(config_thread) + + if not ok then + ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + + elseif perr then + ngx_log(ngx_ERR, _log_prefix, perr, log_suffix) + end + + if not exiting() then + assert(ngx.timer.at(reconnection_delay, function(premature) + self:communicate(premature) + end)) + end +end + +return _M diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 317152825025..7c15e22fbdc9 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -641,6 +641,7 @@ local CONF_INFERENCES = { lua_socket_pool_size = { typ = "number" }, role = { enum = { "data_plane", "control_plane", "traditional", }, }, + cluster_protocol = { enum = { "json", "wrpc" }, }, cluster_control_plane = { typ = "string", }, cluster_cert = { typ = "string" }, cluster_cert_key = { typ = "string" }, diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index 65660cbaea73..fee3e72b1992 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -1,5 +1,6 @@ local declarative_config = require "kong.db.schema.others.declarative_config" local schema_topological_sort = require "kong.db.schema.topological_sort" +local protobuf = require "kong.tools.protobuf" local workspaces = require "kong.workspaces" local pl_file = require "pl.file" local lyaml = require "lyaml" @@ -397,7 +398,7 @@ function declarative.load_into_db(entities, meta) end -local function export_from_db(emitter, skip_ws, skip_disabled_entities) +local function export_from_db(emitter, skip_ws, skip_disabled_entities, expand_foreigns) local schemas = {} local db = kong.db @@ -459,7 +460,9 @@ local function export_from_db(emitter, skip_ws, skip_disabled_entities) if disabled_services[id] then goto skip_emit end - row[foreign_name] = id + if not expand_foreigns then + row[foreign_name] = id + end end end end @@ -565,6 +568,46 @@ local function remove_nulls(tbl) return tbl end +local proto_emitter = { + emit_toplevel = function(self, tbl) + self.out = { + format_version = tbl._format_version, + } + end, + + emit_entity = function(self, entity_name, entity_data) + if entity_name == "plugins" then + entity_data.config = protobuf.pbwrap_struct(entity_data.config) + end + + if not self.out[entity_name] then + self.out[entity_name] = { entity_data } + else + insert(self.out[entity_name], entity_data) + end + end, + + done = function(self) + return remove_nulls(self.out) + end, +} + +function proto_emitter.new() + return setmetatable({}, { __index = proto_emitter }) +end + +function declarative.export_config_proto(skip_ws, skip_disabled_entities) + -- default skip_ws=false and skip_disabled_services=true + if skip_ws == nil then + skip_ws = false + end + + if skip_disabled_entities == nil then + skip_disabled_entities = true + end + + return export_from_db(proto_emitter.new(), skip_ws, skip_disabled_entities, true) +end function declarative.get_current_hash() return lmdb.get(DECLARATIVE_HASH_KEY) diff --git a/kong/db/schema/others/declarative_config.lua b/kong/db/schema/others/declarative_config.lua index 9607a45b5963..9f058f2d3ea5 100644 --- a/kong/db/schema/others/declarative_config.lua +++ b/kong/db/schema/others/declarative_config.lua @@ -354,7 +354,11 @@ local function validate_references(self, input) for a, as in pairs(expected) do for b, bs in pairs(as) do for _, k in ipairs(bs) do - local found = find_entity(k.value, b, by_key, by_id) + local key = k.value + if type(key) == "table" then + key = key.id or key + end + local found = find_entity(key, b, by_key, by_id) if not found then errors[a] = errors[a] or {} diff --git a/kong/include/kong/model/ca_certificate.proto b/kong/include/kong/model/ca_certificate.proto new file mode 100644 index 000000000000..898c97fce47e --- /dev/null +++ b/kong/include/kong/model/ca_certificate.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +message CACertificate { + string id = 1; + string cert = 2; + string cert_digest = 3; + int32 created_at = 4; + repeated string tags = 5; +} diff --git a/kong/include/kong/model/certificate.proto b/kong/include/kong/model/certificate.proto new file mode 100644 index 000000000000..60f7210b92d9 --- /dev/null +++ b/kong/include/kong/model/certificate.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +message Certificate { + string id = 1; + string cert = 2; + string key = 3; + string cert_alt = 4; + string key_alt = 5; + int32 created_at = 6; + repeated string tags = 7; +} diff --git a/kong/include/kong/model/config.proto b/kong/include/kong/model/config.proto new file mode 100644 index 000000000000..c6c324e93272 --- /dev/null +++ b/kong/include/kong/model/config.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/service.proto"; +import "kong/model/route.proto"; +import "kong/model/consumer.proto"; +import "kong/model/plugin.proto"; +import "kong/model/plugin_entities.proto"; +import "kong/model/certificate.proto"; +import "kong/model/sni.proto"; +import "kong/model/ca_certificate.proto"; +import "kong/model/upstream.proto"; +import "kong/model/target.proto"; +import "kong/model/workspace.proto"; +import "kong/model/parameter.proto"; + +message Config { + string format_version = 1; + repeated Service services = 2; + repeated Route routes = 3; + repeated Consumer consumers = 4; + repeated Plugin plugins = 5; + repeated Upstream upstreams = 6; + repeated Target targets = 7; + repeated Certificate certificates = 8; + repeated SNI snis = 9; + repeated CACertificate ca_certificates = 10; + PluginData plugin_data = 11; + + repeated Workspace workspaces = 12; + repeated Parameter parameters = 13; +} diff --git a/kong/include/kong/model/consumer.proto b/kong/include/kong/model/consumer.proto new file mode 100644 index 000000000000..34028bad3227 --- /dev/null +++ b/kong/include/kong/model/consumer.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +message Consumer { + string id = 1; + string custom_id = 2; + string username = 3; + int32 created_at = 4; + repeated string tags = 5; +} diff --git a/kong/include/kong/model/parameter.proto b/kong/include/kong/model/parameter.proto new file mode 100644 index 000000000000..a945ed0d5a4e --- /dev/null +++ b/kong/include/kong/model/parameter.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +message Parameter { + string key = 1; + string value = 2; + int32 created_at = 3; +} diff --git a/kong/include/kong/model/plugin.proto b/kong/include/kong/model/plugin.proto new file mode 100644 index 000000000000..3e1b3444cd00 --- /dev/null +++ b/kong/include/kong/model/plugin.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "google/protobuf/struct.proto"; +import "kong/model/consumer.proto"; +import "kong/model/service.proto"; +import "kong/model/route.proto"; + +message Plugin { + string id = 1; + string name = 2; + google.protobuf.Struct config = 3; + bool enabled = 4; + repeated string protocols = 5; + repeated string tags = 6; + int32 created_at = 7; + Route route = 8; + Service service = 9; + Consumer consumer = 10; +} diff --git a/kong/include/kong/model/plugin_entities.proto b/kong/include/kong/model/plugin_entities.proto new file mode 100644 index 000000000000..7ff0fbf759f2 --- /dev/null +++ b/kong/include/kong/model/plugin_entities.proto @@ -0,0 +1,72 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/consumer.proto"; + +message PluginData { + repeated KeyAuth key_auths = 1; + repeated BasicAuth basic_auths = 2; + repeated HMACAuth hmac_auths = 3; + repeated JWTAuth jwt_auths = 4; + repeated MTLSAuth mtls_auths = 5; + repeated ACLGroup acls = 6; +} + +message BasicAuth { + string id = 1; + string username = 2; + string password = 3; + Consumer consumer = 4; + int32 created_at = 5; + repeated string tags = 6; +} + +message KeyAuth { + string id = 1; + string key = 2; + int32 ttl = 3; + Consumer consumer = 4; + int32 created_at = 5; + repeated string tags = 6; +} + + +message MTLSAuth { + string id = 1; + string subject_name = 2; + string ca_certificate_id = 3; + int32 created_at = 4; + Consumer consumer = 5; + repeated string tags = 6; +} +message ACLGroup { + string id = 1; + string group = 2; + Consumer consumer = 3; + int32 created_at = 4; + repeated string tags = 5; +} + +message HMACAuth { + string id = 1; + string username = 2; + string secret = 3; + string consumer_id = 4; + int32 created_at = 5; + repeated string tags = 6; +} + +message JWTAuth { + string id = 1; + string algorithm = 2; + string key = 3; + string rsa_public_key = 4; + string secret = 5; + Consumer consumer = 6; + int32 created_at = 7; + repeated string tags = 8; +} + diff --git a/kong/include/kong/model/route.proto b/kong/include/kong/model/route.proto new file mode 100644 index 000000000000..40e1370fc76f --- /dev/null +++ b/kong/include/kong/model/route.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/service.proto"; + +message Route { + string id = 1; + string name = 2; + map headers = 3; + repeated string hosts = 4; + int32 created_at = 5; + repeated string methods = 6; + repeated string paths = 7; + string path_handling = 8; + bool preserve_host = 9; + repeated string protocols = 10; + int32 regex_priority = 11; + bool strip_path = 12; + int32 updated_at = 13; + repeated string snis = 14; + repeated CIDRPort sources = 15; + repeated CIDRPort destinations = 16; + repeated string tags = 17; + int32 https_redirect_status_code = 18; + bool request_buffering = 19; + bool response_buffering = 20; + Service service = 21; +} + +message HeaderValues { + repeated string values = 1; +} + +message CIDRPort { + string ip = 1; + int32 port = 2; +} diff --git a/kong/include/kong/model/service.proto b/kong/include/kong/model/service.proto new file mode 100644 index 000000000000..5b82ec5366a2 --- /dev/null +++ b/kong/include/kong/model/service.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/certificate.proto"; + +message Service { + string id = 1; + string name = 2; + int32 connect_timeout = 3; + int32 created_at = 4; + string host = 5; + string path = 6; + int32 port = 7; + string protocol = 8; + int32 read_timeout = 9; + int32 retries = 10; + int32 updated_at = 11; + string url = 12; + int32 write_timeout = 13; + repeated string tags = 14; + bool tls_verify = 15; + int32 tls_verify_depth = 16; + Certificate client_certificate = 17; + repeated string ca_certificates = 18; +} diff --git a/kong/include/kong/model/sni.proto b/kong/include/kong/model/sni.proto new file mode 100644 index 000000000000..21617bcce8f6 --- /dev/null +++ b/kong/include/kong/model/sni.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/certificate.proto"; + +message SNI { + string id = 1; + string name = 2; + int32 created_at = 3; + Certificate certificate = 4; + repeated string tags = 5; +} diff --git a/kong/include/kong/model/target.proto b/kong/include/kong/model/target.proto new file mode 100644 index 000000000000..c46d9df5b0fe --- /dev/null +++ b/kong/include/kong/model/target.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/upstream.proto"; + +message Target { + int32 created_at = 1; + string id = 2; + string target = 3; + int32 weight = 4; + repeated string tags = 5; + Upstream upstream = 6; +} diff --git a/kong/include/kong/model/upstream.proto b/kong/include/kong/model/upstream.proto new file mode 100644 index 000000000000..28334fd0074a --- /dev/null +++ b/kong/include/kong/model/upstream.proto @@ -0,0 +1,74 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +import "kong/model/certificate.proto"; + +message Upstream { + string id = 1; + string name = 2; + string host_header = 3; + Certificate client_certificate = 4; + string algorithm = 5; + int32 slots = 6; + Healthcheck healthchecks = 7; + int32 created_at = 8; + string hash_on = 9; + string hash_fallback = 10; + string hash_on_header = 11; + string hash_fallback_header = 12; + string hash_on_cookie = 13; + string hash_on_cookie_path = 14; + repeated string tags = 15; +} + +message Healthcheck { + ActiveHealthcheck active = 1; + PassiveHealthcheck passive = 2; + double threshold = 3; +} + +message ActiveHealthcheck { + int32 concurrency = 1; + ActiveHealthy healthy = 2; + string http_path = 3; + string https_sni = 4; + bool https_verify_certificate = 5; + string type = 6; + int32 timeout = 7; + ActiveUnhealthy unhealthy = 8; +} + +message PassiveHealthcheck { + PassiveHealthy healthy = 1; + string type = 2; + PassiveUnhealthy unhealthy = 3; +} + +message ActiveHealthy { + repeated int32 http_statuses = 1; + int32 interval = 2; + int32 successes = 3; +} + +message ActiveUnhealthy { + int32 http_failures = 1; + repeated int32 http_statuses = 2; + int32 tcp_failures = 3; + int32 timeouts = 4; + int32 interval = 5; +} + +message PassiveHealthy { + repeated int32 http_statuses = 1; + int32 successes = 2; +} + +message PassiveUnhealthy { + int32 http_failures = 1; + repeated int32 http_statuses = 2; + int32 tcp_failures = 3; + int32 timeouts = 4; +} diff --git a/kong/include/kong/model/workspace.proto b/kong/include/kong/model/workspace.proto new file mode 100644 index 000000000000..029e8c58c7fb --- /dev/null +++ b/kong/include/kong/model/workspace.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + +message Workspace { + string id = 1; + string name = 2; + string comment = 3; + int32 created_at = 4; +} diff --git a/kong/include/kong/services/config/v1/config.proto b/kong/include/kong/services/config/v1/config.proto new file mode 100644 index 000000000000..e2f6d44b999b --- /dev/null +++ b/kong/include/kong/services/config/v1/config.proto @@ -0,0 +1,136 @@ +syntax = "proto3"; + +package kong.services.config.v1; + +import "kong/model/config.proto"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/service/config/v1;v1"; + +// ConfigService enables CP and DP to get configuration from CP down to a DP. +// +wrpc:service-id=1 +service ConfigService { + // GetCapabilities fetches the capabilities offered within the context of the + // service from the CP. A capability could span multiple RPCs within a Service, + // a single RPC. Capabilities are meant to introduce larger features + // without the need of a version upgrade. + // TODO(hbagdi): document that this RPC MUST be present in every service. + // + // Call direction: TODO(hbagdi) + // +wrpc:rpc-id=1 + rpc GetCapabilities(GetCapabilitiesRequest) returns (GetCapabilitiesResponse); + + // ReportBasicInfo informs the software installed in the DP. Currently + // this is the list of plugins with respective version. It's required + // that the DP send this information in order to receive configuration + // updates. + // + // Call direction: DP to CP + // +wrpc:rpc-id=4 + rpc ReportMetadata(ReportMetadataRequest) returns (ReportMetadataResponse); + + // SyncConfig is used by a CP to send a configuration request to the DP. + // CP may make concurrent calls to DP to update configuration. To guard + // against race conditions, version field in the request is used (read + // the documentation on the field). + // + // Call direction: + // - CP to DP + // +wrpc:rpc-id=2 + rpc SyncConfig(SyncConfigRequest) returns (SyncConfigResponse); + + // PingCP notifies that a DP would like CP to send the latest configuration. + // Once this call succeeds, CP MUST issue a SyncConfig request to the DP. + // DP expects the CP to send an updated configuration (or a no-op) on a + // soft real-time basis. + // DP can make multiple calls to CP and CP may choose to coallesce multiplee + // requests for configuration into a single SyncConfig(). + // + // Configuration is always PUSHed to the DP and so configuration is not sent + // as part of the response to this call to simplify implementation. + // + // Call direction: + // - DP to CP + // +wrpc:rpc-id=3 + rpc PingCP(PingCPRequest) returns (PingCPResponse); +} + +enum Capability { + CAPABILITY_UNSPECIFIED = 0; + CAPABILITY_BULK_UPDATE = 1; + // Incremental configuration will be added in future and is considered out + // of scope at the moment. + // CAPABILITY_INCREMENTAL = 2; +} + +message GetCapabilitiesRequest { +} + +message GetCapabilitiesResponse { + repeated Capability capabilities = 1; +} + +message ReportMetadataRequest { + repeated PluginVersion plugins = 1; +} + +message ReportMetadataResponse { + oneof response { + string ok = 1; + string error = 2; + } +} + +message PluginVersion { + string name = 1; + string version = 2; +} + +message SyncConfigRequest { + // Config represents a configuration of Kong Gateway. + // This is same as the declarative configuration of Kong. + // + // DP MUST NOT combine configuration from two SyncConfigRequest. Config in + // each request is self-contained. + kong.model.Config config = 1; + + // On every configuration change, CP MUST increment the version field + // in the request. + // Version field has no significance outside the context of a single ephemeral + // connection between a DP node and a CP node. + // + uint64 version = 2; + + // raw binary hash of the config data. + string hash = 3; +} + +message SyncConfigResponse { + // accepted is set to true when the DP has accepted the configuration. + // Acceptance of configuration implies that the configuration is successfully + // processed by the DP. + bool accepted = 1; + // If accepted is set to false, errors denote the errors with the configuration. + // CP MAY analyze the errors and send back a correct configuration. + // If accepted is true, this field must be empty + repeated SyncConfigError errors = 2; +} + +enum ErrorType { + ERROR_TYPE_UNSPECIFIED = 0; + ERROR_TYPE_VALIDATION = 1; + ERROR_TYPE_EXTRANEOUS_FIELD = 2; + ERROR_TYPE_ORPHANED = 3; +} + +message SyncConfigError { + string id = 1; + string entity = 2; + ErrorType err_type = 3; +} + +message PingCPRequest { + string hash = 1; +} + +message PingCPResponse { +} diff --git a/kong/include/wrpc/wrpc.proto b/kong/include/wrpc/wrpc.proto new file mode 100644 index 000000000000..1e1534066aaf --- /dev/null +++ b/kong/include/wrpc/wrpc.proto @@ -0,0 +1,154 @@ +syntax = "proto3"; + +package wrpc; + +option go_package = "github.com/kong/go-wrpc/wrpc"; + +// PayloadVersion identifies the version of the payload. +enum PayloadVersion { + // UNSPECIFIED indicates that the version is not specified. + // Receiver MUST drop the message. + PAYLOAD_VERSION_UNSPECIFIED = 0; + // V1 denotes version 1. + PAYLOAD_VERSION_V1 = 1; +} + +// MessageType identifies the type of a WebSocket message. +enum MessageType { + // UNSPECIFIED indicates that the type of the message is unknown. + // Receiver MUST drop the message. + MESSAGE_TYPE_UNSPECIFIED = 0; + // ERROR signals a protocol error such as incorrect serialization, timeouts, + // network hiccups, etc. + MESSAGE_TYPE_ERROR = 1; + // RPC signals that the message contains a request or a response. + MESSAGE_TYPE_RPC = 2; + // STREAM_BEGIN singals start of a stream. + MESSAGE_TYPE_STREAM_BEGIN = 3; + // STREAM_MESSAGE singals that the message belongs to a stream. + MESSAGE_TYPE_STREAM_MESSAGE = 4; + // STREAM_END singals end of a stream. + MESSAGE_TYPE_STREAM_END = 5; +} + +// Error identifies serialization, network, and protocol errors. +enum ErrorType { + ERROR_TYPE_UNSPECIFIED = 0; + // GENERIC signals a general error with the protocol. + ERROR_TYPE_GENERIC = 1; +} + +// Error represents a protocol error. +message Error { + // eType denotes the type of the error. + ErrorType etype = 1; + + // description contains human readable contextual information associated + // with the error. + string description = 2; +} + +// Encoding identifies the encoding method used to encode a payload. +enum Encoding { + ENCODING_UNSPECIFIED = 0; + ENCODING_PROTO3 = 1; +} + +// PayloadV1 is a container for WebSocket messages. +message PayloadV1 { + // mtype denotes the type of the payload within a WebSocket message. + MessageType mtype = 1; + + // When mtype is set to MESSAGE_TYPE_ERROR, this field contains the error. + // This field represents error due to encoding, network and protocol. Use ack + // to tie an error with a request or response received from the other side. + // Errors returned by an RPC are part of 'payloads' and NOT this field. + // Payloads field MUST not be set when this field is set. Sender MUST set + // set svc_id, rpc_id, seq to add contextual information for the receiver. + Error error = 2; + + // svc_id is the ID of the service as defined in the proto file of the Service. + // The ID is defined in the description of the Service. + // We acknowledge that it is cumbersome to track these IDs manually + // without any support for linting and programmatic access. + // This may be defined within the proto file + // itself using proto3 custom options in future. + // ID MUST be greater than 0. + // + // Receiver MUST return INVALID_SERVICE error when this field contains a + // service that the receiver doesn't understand. + uint32 svc_id = 3; + + // rpc_id is the ID of the RPC as defined in the proto file of the Service. + // The ID is defined in the description of the RPC. + // We acknowledge that it is cumbersome to track these IDs manually + // without any support for linting and programmatic access. + // This may be defined within the proto file + // itself using proto3 custom options in future. + // ID MUST be greater than 0. + // + // Receiver MUST return INVALID_RPC error when this field contains an + // RPC that the receiver doesn't understand. + uint32 rpc_id = 4; + + // seq is a number chosen by the sender. The sender MUST initialize this + // field to 1 for the first RPC on a given connection and then it should be + // incremented every time a new RPC is initiated. The receiver must not assume + // that the sequence numbers are strictly incremental. + // + // There are no guarantees about the order in which requests will be + // processed by the receiver. This field has no semantics outside the context + // of a WebSocket connection. It is invalid to set this field to 0 and the + // receiver MUST drop the message and close the connection. + uint32 seq = 5; + + // ack represents that the message contains a response or error for an RPC + // that was initiated earlier by the receiver of this message. To tie the message + // to a request received by the sender, sender ack MUST be set to the seq + // number in the request message. + uint32 ack = 6; + + // deadline is UNIX epoch time in seconds to indicate the time when the + // client will give up waiting for a response. Absolute time is used instead + // of timeouts to account for network and TCP/HTTP buffer latencies. It is + // assumed that out of band time synchronization solutions are deployed + // already. This field MUST be set to a non-zero value for a request and MUST + // be set to 0 for a response, if not the reciever MUST drop the message and + // close the connection. + uint32 deadline = 7; + + // payload_encoding identifies the encoding used for the payload. + // This field MUST be specified when payloads is set. + Encoding payload_encoding = 8; + + // payloads is an array representing the request or response data of an RPC. + // A request message MAY contain multiple elements where each element represents + // an argument to the RPC call. The order of the elements MUST correspond to + // the order of arguments in the function call. + // A response message MUST contain a single payload. + // Use a wrapper type for RPCs which contain multiple responses. + // Unless otherwise specified by the Service or RPC, the encoding method in + // use is PROTO3. + // + // Note: This results in double proto3 encoding. Once encoding of payloads, + // and then encoding of the entire message. We acknowledge that there are + // some overheads here and they will be addressed in a future iteration. + repeated bytes payloads = 9; + + // stream_id is the ID of a stream. stream_id is set to the sequence number + // of the STREAM_BEGIN controller message. + // This field MUST be set to zero for non-stream messages. + // + uint32 stream_id = 10; +} + +// WebsocketPayload represents a protobuf-based encoded payload of a WebSocket +// message. +message WebsocketPayload { + // version identifies the version of the payload. + PayloadVersion version = 1; + + // payload contains the message. This field MUST be present if and only if + // version is set to 1. + PayloadV1 payload = 2; +} diff --git a/kong/init.lua b/kong/init.lua index ff4aa2ddb269..831fbbbc3fe1 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -1501,6 +1501,14 @@ function Kong.serve_cluster_listener(options) return kong.clustering:handle_cp_websocket() end +function Kong.serve_wrpc_listener(options) + log_init_worker_errors() + + ngx.ctx.KONG_PHASE = PHASES.cluster_listener + + return kong.clustering:handle_wrpc_websocket() +end + function Kong.stream_api() stream_api.handle() diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 836fcd3441ba..7992563402b0 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -21,6 +21,7 @@ admin_listen = 127.0.0.1:8001 reuseport backlog=16384, 127.0.0.1:8444 http2 ssl status_listen = off cluster_listen = 0.0.0.0:8005 cluster_control_plane = 127.0.0.1:8005 +cluster_protocol = wrpc cluster_cert = NONE cluster_cert_key = NONE cluster_mtls = shared diff --git a/kong/templates/nginx_kong.lua b/kong/templates/nginx_kong.lua index a2d70b3cd0f5..5355d35ee31c 100644 --- a/kong/templates/nginx_kong.lua +++ b/kong/templates/nginx_kong.lua @@ -25,6 +25,9 @@ lua_shared_dict kong_core_db_cache ${{MEM_CACHE_SIZE}}; lua_shared_dict kong_core_db_cache_miss 12m; lua_shared_dict kong_db_cache ${{MEM_CACHE_SIZE}}; lua_shared_dict kong_db_cache_miss 12m; +> if role == "data_plane" then +lua_shared_dict wrpc_channel_dict 5m; +> end > if database == "cassandra" then lua_shared_dict kong_cassandra 5m; > end @@ -443,6 +446,12 @@ server { Kong.serve_cluster_listener() } } + + location = /v1/wrpc { + content_by_lua_block { + Kong.serve_wrpc_listener() + } + } } > end -- role == "control_plane" ]] diff --git a/kong/tools/channel.lua b/kong/tools/channel.lua new file mode 100644 index 000000000000..a1e3d5144b83 --- /dev/null +++ b/kong/tools/channel.lua @@ -0,0 +1,164 @@ + +local min = math.min +local max = math.max + +local now = ngx.now +local sleep = ngx.sleep + +local DEFAULT_EXPTIME = 3600 +local DEFAULT_TIMEOUT = 5 +local NAME_KEY = "channel_up" +local POST_VAL_KEY_PREFIX = "channel_post_value_" +local RESP_VAL_KEY_PREFIX = "channel_resp_value_" + + +local function waitstep(step, deadline) + sleep(step) + return min(max(0.001, step * 2), deadline-now(), 0.5) +end + +--- waiting version of `d:add()` +--- blocks the coroutine until there's no value under this key +--- so the new value can be safely added +local function add_wait(dict, key, val, exptime, deadline) + local step = 0 + + while deadline > now() do + local ok, err = dict:add(key, val, exptime) + if ok then + return true + end + + if err ~= "exists" then + return nil, err + end + + step = waitstep(step, deadline) + end + + return nil, "timeout" +end + +--- waiting version of `d:get()` +--- blocks the coroutine until there's actually a value under this key +local function get_wait(dict, key, deadline) + local step = 0 + + while deadline > now() do + local value, err = dict:get(key) + if value then + return value + end + + if err ~= nil then + return nil, err + end + + step = waitstep(step, deadline) + end + + return nil, "timeout" +end + +--- waits until the key is empty +--- blocks the coroutine while there's a value under this key +local function empty_wait(dict, key, deadline) + local step = 0 + while deadline > now() do + local value, err = dict:get(key) + if not value then + if err ~= nil then + return nil, err + end + + return true + end + + step = waitstep(step, deadline) + end + return nil, "timeout" +end + + +local Channel = {} +Channel.__index = Channel + +--- Create a new channel client +--- @param dict_name string Name of the shdict to use +--- @param name string channel name +function Channel.new(dict_name, name) + return setmetatable({ + dict = assert(ngx.shared[dict_name]), + name = name, + exptime = DEFAULT_EXPTIME, + timeout = DEFAULT_TIMEOUT, + }, Channel) +end + + +--- Post a value, client -> server +--- blocks the thread until the server picks it +--- @param val any Value to post (any type supported by shdict) +--- @return boolean, string ok, err +function Channel:post(val) + local key = POST_VAL_KEY_PREFIX .. self.name + local ok, err = add_wait(self.dict, key, val, self.exptime, now() + self.timeout) + if not ok then + return nil, err + end + + ok, err = add_wait(self.dict, NAME_KEY, self.name, self.exptime, now() + self.timeout) + if not ok then + self.dict:delete(key) + return nil, err + end + + return empty_wait(self.dict, key, now() + self.timeout) +end + +--- Get a response value, client <- server +--- blocks the thread until the server puts a value +--- @return any, string value, error +function Channel:get() + local key = RESP_VAL_KEY_PREFIX .. self.name + local val, err = get_wait(self.dict, key, now() + self.timeout) + if val then + self.dict:delete(key) + return val + end + + return nil, err +end + + +--- Waits until a value is posted by any client +--- @param dict shdict shdict to use +--- @return any, string, string value, channel name, error +function Channel.wait_all(dict) + local name, err = get_wait(dict, NAME_KEY, now() + DEFAULT_TIMEOUT) + if not name then + return nil, nil, err + end + + local key = POST_VAL_KEY_PREFIX .. name + local val + val, err = get_wait(dict, key, now() + DEFAULT_TIMEOUT) + dict:delete(key) + dict:delete(NAME_KEY) + + return val, name, err +end + + +--- Put a response value server -> client +--- @param dict shdict shdict to use +--- @param name string channel name +--- @param val any Value to put (any type supported by shdict) +--- @return boolean, string ok, error +function Channel.put_back(dict, name, val) + local key = RESP_VAL_KEY_PREFIX .. name + return add_wait(dict, key, val, DEFAULT_EXPTIME, now() + DEFAULT_TIMEOUT) +end + + +return Channel diff --git a/kong/tools/grpc.lua b/kong/tools/grpc.lua index 58c61d931e1d..4c75d85da34e 100644 --- a/kong/tools/grpc.lua +++ b/kong/tools/grpc.lua @@ -13,7 +13,7 @@ local grpc = {} local function safe_set_type_hook(type, dec, enc) if not pcall(pb.hook, type) then - ngx.log(ngx.NOTICE, "no type '" .. type .. "' defined") + ngx.log(ngx.DEBUG, "no type '" .. type .. "' defined") return end diff --git a/kong/tools/protobuf.lua b/kong/tools/protobuf.lua new file mode 100644 index 000000000000..9fa5767c3697 --- /dev/null +++ b/kong/tools/protobuf.lua @@ -0,0 +1,100 @@ + +local protobuf = {} + +do + local structpb_value, structpb_list, structpb_struct + + function structpb_value(v) + local t = type(v) + + local bool_v = nil + if t == "boolean" then + bool_v = v + end + + local list_v = nil + local struct_v = nil + + if t == "table" then + if t[1] ~= nil then + list_v = structpb_list(v) + else + struct_v = structpb_struct(v) + end + end + + return { + null_value = t == "nil" and 1 or nil, + bool_value = bool_v, + number_value = t == "number" and v or nil, + string_value = t == "string" and v or nil, + list_value = list_v, + struct_value = struct_v, + } + end + + function structpb_list(l) + local out = {} + for i, v in ipairs(l) do + out[i] = structpb_value(v) + end + return { values = out } + end + + function structpb_struct(d) + local out = {} + for k, v in pairs(d) do + out[k] = structpb_value(v) + end + return { fields = out } + end + + protobuf.pbwrap_struct = structpb_struct +end + +do + local structpb_value, structpb_list, structpb_struct + + function structpb_value(v) + if type(v) ~= "table" then + return v + end + + if v.list_value then + return structpb_list(v.list_value) + end + + if v.struct_value then + return structpb_struct(v.struct_value) + end + + return v.bool_value or v.string_value or v.number_value or v.null_value + end + + function structpb_list(l) + local out = {} + if type(l) == "table" then + for i, v in ipairs(l.values or l) do + out[i] = structpb_value(v) + end + end + return out + end + + function structpb_struct(struct) + if type(struct) ~= "table" then + return struct + end + + local out = {} + for k, v in pairs(struct.fields or struct) do + out[k] = structpb_value(v) + end + return out + end + + protobuf.pbunwrap_struct = structpb_struct +end + + +return protobuf diff --git a/kong/tools/wrpc.lua b/kong/tools/wrpc.lua new file mode 100644 index 000000000000..e5b662302d00 --- /dev/null +++ b/kong/tools/wrpc.lua @@ -0,0 +1,685 @@ +require "table.new" +local pb = require "pb" +local semaphore = require "ngx.semaphore" +local grpc = require "kong.tools.grpc" +local channel = require "kong.tools.channel" + +local select = select +local table_unpack = table.unpack -- luacheck: ignore +local table_insert = table.insert +local table_remove = table.remove + +local exiting = ngx.worker.exiting + +local DEFAULT_EXPIRATION_DELAY = 90 +local CHANNEL_CLIENT_PREFIX = "wrpc_client_" + +pb.option("no_default_values") + +local wrpc = {} + +local function endswith(s, e) -- luacheck: ignore + return s and e and e ~= "" and s:sub(#s-#e+1, #s) == e +end + +local Queue = {} +Queue.__index = Queue + +function Queue.new() + return setmetatable({ + smph = semaphore.new(), + }, Queue) +end + +function Queue:push(itm) + table_insert(self, itm) + return self.smph:post() +end + +function Queue:pop(timeout) + local ok, err = self.smph:wait(timeout or 1) + if not ok then + return nil, err + end + + return table_remove(self, 1) +end + +local semaphore_waiter +do + local function handle(self, data) + self.data = data + self.smph:post() + end + + local function handle_error(self, etype, errdesc) + self.data = nil + self.error = errdesc + self.etype = etype + self.smph:post() + end + + local function expire(self) + self:handle_error("timeout", "timeout") + end + + function semaphore_waiter() + return { + smph = semaphore.new(), + deadline = ngx.now() + DEFAULT_EXPIRATION_DELAY, + handle = handle, + handle_error = handle_error, + expire = expire, + } + end +end + + +local remote_waiter +do + local function handle(self, payload) + channel.put_back(self.dict, self.name, pb.encode("wrpc.PayloadV1", payload)) + end + + local function handle_error(self, etype, errdesc) + channel.put_back(self.dict, self.name, pb.encode("wrpc.PayloadV1", { + mtype = "MESSAGE_TYPE_ERROR", + error = { + etype = etype, + description = errdesc, + } + })) + end + + function remote_waiter(dict, name) + return { + dict = dict, + name = name, + deadline = ngx.now() + DEFAULT_EXPIRATION_DELAY, + handle = handle, + handle_error = handle_error, + raw = true, + } + end +end + + +local function merge(a, b) + if type(b) == "table" then + for k, v in pairs(b) do + a[k] = v + end + end + + return a +end + +local function proto_searchpath(name) + return package.searchpath(name, "kong/include/?.proto;/usr/include/?.proto") +end + +--- definitions for the transport protocol +local wrpc_proto + + +local wrpc_service = {} +wrpc_service.__index = wrpc_service + + +--- a `service` object holds a set of methods defined +--- in .proto files +function wrpc.new_service() + if not wrpc_proto then + local wrpc_protofname = assert(proto_searchpath("wrpc.wrpc")) + wrpc_proto = assert(grpc.each_method(wrpc_protofname)) + end + + return setmetatable({ + methods = {}, + }, wrpc_service) +end + +--- Loads the methods from a .proto file. +--- There can be more than one file, and any number of +--- service definitions. +function wrpc_service:add(service_name) + local annotations = { + service = {}, + rpc = {}, + } + local service_fname = assert(proto_searchpath(service_name)) + local proto_f = assert(io.open(service_fname)) + local scope_name = "" + + for line in proto_f:lines() do + local annotation = line:match("//%s*%+wrpc:%s*(.-)%s*$") + if annotation then + local nextline = proto_f:read("*l") + local keyword, identifier = nextline:match("^%s*(%a+)%s+(%w+)") + if keyword and identifier then + + if keyword == "service" then + scope_name = identifier; + + elseif keyword == "rpc" then + identifier = scope_name .. "." .. identifier + end + + local type_annotations = annotations[keyword] + if type_annotations then + local tag_key, tag_value = annotation:match("^%s*(%S-)=(%S+)%s*$") + if tag_key and tag_value then + tag_value = tag_value + local tags = type_annotations[identifier] or {} + type_annotations[identifier] = tags + tags[tag_key] = tag_value + end + end + end + end + end + proto_f:close() + + grpc.each_method(service_fname, function(_, srvc, mthd) + assert(srvc.name) + assert(mthd.name) + local rpc_name = srvc.name .. "." .. mthd.name + + local service_id = assert(annotations.service[srvc.name] and annotations.service[srvc.name]["service-id"]) + local rpc_id = assert(annotations.rpc[rpc_name] and annotations.rpc[rpc_name]["rpc-id"]) + local rpc = { + name = rpc_name, + service_id = tonumber(service_id), + rpc_id = tonumber(rpc_id), + input_type = mthd.input_type, + output_type = mthd.output_type, + } + self.methods[service_id .. ":" .. rpc_id] = rpc + self.methods[rpc_name] = rpc + end, true) +end + +--- returns the method defintion given either: +--- pair of IDs (service, rpc) or +--- rpc name as "." +function wrpc_service:get_method(srvc_id, rpc_id) + local rpc_name + if type(srvc_id) == "string" and rpc_id == nil then + rpc_name = srvc_id + else + rpc_name = tostring(srvc_id) .. ":" .. tostring(rpc_id) + end + + return self.methods[rpc_name] +end + +--- sets a service handler for the givern rpc method +--- @param rpc_name string Full name of the rpc method +--- @param handler function Function called to handle the rpc method. +--- @param response_handler function Fallback function called to handle responses. +function wrpc_service:set_handler(rpc_name, handler, response_handler) + local rpc = self:get_method(rpc_name) + if not rpc then + return nil, string.format("unknown method %q", rpc_name) + end + + rpc.handler = handler + rpc.response_handler = response_handler + return rpc +end + + +--- Part of wrpc_peer:call() +--- If calling the same method with the same args several times, +--- (to the same or different peers), this method returns the +--- invariant part, so it can be cached to reduce encoding overhead +function wrpc_service:encode_args(name, ...) + local rpc = self:get_method(name) + if not rpc then + return nil, string.format("unknown method %q", name) + end + + local num_args = select('#', ...) + local payloads = table.new(num_args, 0) + for i = 1, num_args do + payloads[i] = assert(pb.encode(rpc.input_type, select(i, ...))) + end + + return rpc, payloads +end + + +local wrpc_peer = { + encode = pb.encode, + decode = pb.decode, +} +wrpc_peer.__index = wrpc_peer + +local function is_wsclient(conn) + return conn and not conn.close or nil +end + +--- a `peer` object holds a (websocket) connection and a service. +function wrpc.new_peer(conn, service, opts) + opts = opts or {} + return setmetatable(merge({ + conn = conn, + service = service, + seq = 1, + request_queue = is_wsclient(conn) and Queue.new(), + response_queue = {}, + closing = false, + channel_dict = opts.channel and ngx.shared["wrpc_channel_" .. opts.channel], + _receiving_thread = nil, + }, opts), wrpc_peer) +end + + +function wrpc_peer:close() + self.closing = true + self.conn:send_close() + if self.conn.close then + self.conn:close() + end +end + + +function wrpc_peer:send(d) + if self.request_queue then + return self.request_queue:push(d) + end + + return self.conn:send_binary(d) +end + +function wrpc_peer:receive() + while true do + local data, typ, err = self.conn:recv_frame() + if not data then + return nil, err + end + + if typ == "binary" then + return data + end + + if typ == "close" then + kong.log.notice("Received WebSocket \"close\" frame from peer: ", err, ": ", data) + return self:close() + end + end +end + +--- RPC call. +--- returns the call sequence number, doesn't wait for response. +function wrpc_peer:call(name, ...) + local rpc, payloads = assert(self.service:encode_args(name, ...)) + return self:send_encoded_call(rpc, payloads) +end + + +function wrpc_peer:call_wait(name, ...) + local waiter = semaphore_waiter() + + local seq = self.seq + self.response_queue[seq] = waiter + self:call(name, ...) + + local ok, err = waiter.smph:wait(DEFAULT_EXPIRATION_DELAY) + if not ok then + return nil, err + end + return waiter.data, waiter.error +end + + +--- Part of wrpc_peer:call() +--- This performs the per-call parts. The arguments +--- are the return values from wrpc_peer:encode_args(), +--- either directly or cached (to repeat the same call +--- several times). +function wrpc_peer:send_encoded_call(rpc, payloads) + self:send_payload({ + mtype = "MESSAGE_TYPE_RPC", + svc_id = rpc.service_id, + rpc_id = rpc.rpc_id, + payload_encoding = "ENCODING_PROTO3", + payloads = payloads, + }) + return self.seq +end + +--- little helper to ease grabbing an unspecified number +--- of values after an `ok` flag +local function ok_wrapper(ok, ...) + return ok, {n = select('#', ...), ...} +end + +--- decodes each element of an array with the same type +local function decodearray(decode, typ, l) + local out = {} + for i, v in ipairs(l) do + out[i] = decode(typ, v) + end + return out +end + +--- encodes each element of an array with the same type +local function encodearray(encode, typ, l) + local out = {} + for i = 1, l.n do + out[i] = encode(typ, l[i]) + end + return out +end + +--- encodes and sends a wRPC message. +--- Assumes protocol fields are already filled (except `.seq` and `.deadline`) +--- and payload data (if any) is already encoded with the right type. +--- Keeps track of the sequence number and assigns deadline. +function wrpc_peer:send_payload(payload) + local seq = self.seq + payload.seq = seq + self.seq = seq + 1 + + if not payload.ack or payload.ack == 0 then + payload.deadline = ngx.now() + DEFAULT_EXPIRATION_DELAY + end + + self:send(self.encode("wrpc.WebsocketPayload", { + version = "PAYLOAD_VERSION_V1", + payload = payload, + })) +end + +function wrpc_peer:send_remote_payload(msg, name) + local payload = self.decode("wrpc.PayloadV1", msg) + self.response_queue[self.seq] = remote_waiter(self.channel_dict, name) + return self:send_payload(payload) +end + +--- Handle RPC data (mtype == MESSAGE_TYPE_RPC). +--- Could be an incoming method call or the response to a previous one. +--- @param payload table decoded payload field from incoming `wrpc.WebsocketPayload` message +function wrpc_peer:handle(payload) + local rpc = self.service:get_method(payload.svc_id, payload.rpc_id) + if not rpc then + self:send_payload({ + mtype = "MESSAGE_TYPE_ERROR", + error = { + etype = "ERROR_TYPE_INVALID_SERVICE", + description = "Invalid service (or rpc)", + }, + srvc_id = payload.svc_id, + rpc_id = payload.rpc_id, + ack = payload.seq, + }) + return nil, "INVALID_SERVICE" + end + + local ack = tonumber(payload.ack) or 0 + if ack > 0 then + -- response to a previous call + local response_waiter = self.response_queue[ack] + if response_waiter then + + if response_waiter.deadline and response_waiter.deadline < ngx.now() then + if response_waiter.expire then + response_waiter:expire() + end + + else + if response_waiter.raw then + response_waiter:handle(payload) + else + response_waiter:handle(decodearray(self.decode, rpc.output_type, payload.payloads)) + end + end + self.response_queue[ack] = nil + + elseif rpc.response_handler then + pcall(rpc.response_handler, self, decodearray(self.decode, rpc.output_type, payload.payloads)) + end + + else + -- incoming method call + if rpc.handler then + local input_data = decodearray(self.decode, rpc.input_type, payload.payloads) + local ok, output_data = ok_wrapper(pcall(rpc.handler, self, table_unpack(input_data, 1, input_data.n))) + if not ok then + local err = tostring(output_data[1]) + ngx.log(ngx.ERR, ("[wrpc] Error handling %q method: %q"):format(rpc.name, err)) + self:send_payload({ + mtype = "MESSAGE_TYPE_ERROR", + error = { + etype = "ERROR_TYPE_UNSPECIFIED", + description = err, + }, + srvc_id = payload.svc_id, + rpc_id = payload.rpc_id, + ack = payload.seq, + }) + return nil, err + end + + self:send_payload({ + mtype = "MESSAGE_TYPE_RPC", -- MESSAGE_TYPE_RPC, + svc_id = rpc.service_id, + rpc_id = rpc.rpc_id, + ack = payload.seq, + payload_encoding = "ENCODING_PROTO3", + payloads = encodearray(self.encode, rpc.output_type, output_data), + }) + + else + -- rpc has no handler + self:send_payload({ + mtype = "MESSAGE_TYPE_ERROR", + error = { + etype = "ERROR_TYPE_INVALID_RPC", -- invalid here, not in the definition + description = "Unhandled method", + }, + srvc_id = payload.svc_id, + rpc_id = payload.rpc_id, + ack = payload.seq, + }) + end + end +end + + +--- Handle incoming error message (mtype == MESSAGE_TYPE_ERROR). +function wrpc_peer:handle_error(payload) + local etype = payload.error and payload.error.etype or "--" + local errdesc = payload.error and payload.error.description or "--" + ngx.log(ngx.NOTICE, string.format("[wRPC] Received error message, %s.%s:%s (%s: %q)", + payload.svc_id, payload.rpc_id, payload.ack, etype, errdesc + )) + + local ack = tonumber(payload.ack) or 0 + if ack > 0 then + -- response to a previous call + local response_waiter = self.response_queue[ack] + if response_waiter and response_waiter.handle_error then + + if response_waiter.deadline and response_waiter.deadline < ngx.now() then + if response_waiter.expire then + response_waiter:expire() + end + + else + response_waiter:handle_error(etype, errdesc) + end + self.response_queue[ack] = nil + + else + local rpc = self.service:get_method(payload.svc_id, payload.rpc_id) + if rpc and rpc.error_handler then + pcall(rpc.error_handler, self, etype, errdesc) + end + end + end +end + + +function wrpc_peer:step() + local msg, err = self:receive() + + while msg ~= nil do + msg = assert(self.decode("wrpc.WebsocketPayload", msg)) + assert(msg.version == "PAYLOAD_VERSION_V1", "unknown encoding version") + local payload = msg.payload + + if payload.mtype == "MESSAGE_TYPE_ERROR" then + self:handle_error(payload) + + elseif payload.mtype == "MESSAGE_TYPE_RPC" then + local ack = payload.ack or 0 + local deadline = payload.deadline or 0 + + if ack == 0 and deadline < ngx.now() then + ngx.log(ngx.NOTICE, "[wRPC] Expired message (", deadline, "<", ngx.now(), ") discarded") + + elseif ack ~= 0 and deadline ~= 0 then + ngx.log(ngx.NOTICE, "[WRPC] Invalid deadline (", deadline, ") for response") + + else + self:handle(payload) + end + end + + msg, err = self:receive() + end + + if err ~= nil and not endswith(err, "timeout") then + ngx.log(ngx.NOTICE, "[wRPC] WebSocket frame: ", err) + self.closing = true + end +end + +function wrpc_peer:spawn_threads() + self._receiving_thread = assert(ngx.thread.spawn(function() + while not exiting() and not self.closing do + self:step() + ngx.sleep(0) + end + end)) + + if self.request_queue then + self._transmit_thread = assert(ngx.thread.spawn(function() + while not exiting() and not self.closing do + local data, err = self.request_queue:pop() + if data then + self.conn:send_binary(data) + + else + if err ~= "timeout" then + return nil, err + end + end + end + end)) + end + + if self.channel_dict then + self._channel_thread = assert(ngx.thread.spawn(function() + while not exiting() and not self.closing do + local msg, name, err = channel.wait_all(self.channel_dict) + if msg and name then + self:send_remote_payload(msg, name) + + else + if err ~= "timeout" then + return nil, err + end + end + end + end)) + end +end + + +--- return same args in the same order, removing any nil args. +--- required for functions (like ngx.thread.wait) that complain +--- about nil args at the end. +local function safe_args(...) + local out = {} + for i = 1, select('#', ...) do + out[#out + 1] = select(i, ...) + end + return table_unpack(out) +end + + +function wrpc_peer:wait_threads() + local ok, err, perr = ngx.thread.wait(safe_args(self._receiving_thread, self._transmit_thread, self._channel_thread)) + + if self._receiving_thread then + ngx.thread.kill(self._receiving_thread) + self._receiving_thread = nil + end + + if self._transmit_thread then + ngx.thread.kill(self._transmit_thread) + self._transmit_thread = nil + end + + if self._channel_thread then + ngx.thread.kill(self._channel_thread) + self._channel_thread = nil + end + + return ok, err, perr +end + +--- Returns the response for a given call ID, if any +function wrpc_peer:get_response(req_id) + local resp_data = self.response_queue[req_id] + self.response_queue[req_id] = nil + + if resp_data == nil then + return nil, "no response" + end + + return resp_data +end + + +local function send_payload_to_channel(self, payload) + assert(self.channel:post(self.encode("wrpc.PayloadV1", payload))) +end + +local function remote_call(self, name, ...) + self:call(name, ...) + + local msg = assert(self.channel:get()) + local payload_back = assert(self.decode("wrpc.PayloadV1", msg)) + + if payload_back.mtype == "MESSAGE_TYPE_ERROR" then + return nil, payload_back.error.description + end + + if payload_back.mtype == "MESSAGE_TYPE_RPC" then + local rpc = self.service:get_method(payload_back.svc_id, payload_back.rpc_id) + return decodearray(self.decode, rpc.output_type, payload_back.payloads) + end + + return nil, "unknown message type" +end + +local function remote_close(self) + self.closing = true +end + +function wrpc.new_remote_client(service, channel_name) + local self = wrpc.new_peer(nil, service, { + channel = channel.new("wrpc_channel_" .. channel_name, CHANNEL_CLIENT_PREFIX .. ngx.worker.pid()), + send_payload = send_payload_to_channel, + close = remote_close, + remote_call = remote_call, + }) + return self +end + + +return wrpc diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index 48a499c3b136..1c669cab00cc 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -17,296 +17,299 @@ local KEY_AUTH_PLUGIN for _, strategy in helpers.each_strategy() do - describe("CP/DP sync works with #" .. strategy .. " backend", function() + for _, cluster_protocol in ipairs{"json", "wrpc"} do + describe("CP/DP sync works with #" .. strategy .. " backend, protocol " .. cluster_protocol, function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "plugins", - "upstreams", - "targets", - "certificates", - "clustering_data_planes", - }) -- runs migrations + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "plugins", + "upstreams", + "targets", + "certificates", + "clustering_data_planes", + }) -- runs migrations - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - database = strategy, - db_update_frequency = 0.1, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - })) + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + db_update_frequency = 0.1, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - })) + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + })) - for _, plugin in ipairs(helpers.get_plugins_list()) do - if plugin.name == "key-auth" then - KEY_AUTH_PLUGIN = plugin - break + for _, plugin in ipairs(helpers.get_plugins_list()) do + if plugin.name == "key-auth" then + KEY_AUTH_PLUGIN = plugin + break + end end - end - end) + end) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) - describe("status API", function() - it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + describe("status API", function() + it("shows DP status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - assert.near(14 * 86400, v.ttl, 3) - assert.matches("^(%d+%.%d+)%.%d+", v.version) - assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + assert.near(14 * 86400, v.ttl, 3) + assert.matches("^(%d+%.%d+)%.%d+", v.version) + assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) - return true + return true + end end - end - end, 10) - end) + end, 10) + end) - it("shows DP status (#deprecated)", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + it("shows DP status (#deprecated)", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - local res = assert(admin_client:get("/clustering/status")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + local res = assert(admin_client:get("/clustering/status")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - for _, v in pairs(json) do - if v.ip == "127.0.0.1" then - return true + for _, v in pairs(json) do + if v.ip == "127.0.0.1" then + return true + end end - end - end, 5) - end) + end, 5) + end) - it("disallow updates on the status endpoint", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + it("disallow updates on the status endpoint", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - local id - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - id = v.id + local id + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + id = v.id + end end - end - if not id then - return nil - end + if not id then + return nil + end - res = assert(admin_client:delete("/clustering/data-planes/" .. id)) - assert.res_status(404, res) - res = assert(admin_client:patch("/clustering/data-planes/" .. id)) - assert.res_status(404, res) + res = assert(admin_client:delete("/clustering/data-planes/" .. id)) + assert.res_status(404, res) + res = assert(admin_client:patch("/clustering/data-planes/" .. id)) + assert.res_status(404, res) - return true - end, 5) - end) - - it("disables the auto-generated collection endpoints", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() + return true + end, 5) end) - local res = assert(admin_client:get("/clustering_data_planes")) - assert.res_status(404, res) - end) - end) - - describe("sync works", function() - local route_id + it("disables the auto-generated collection endpoints", function() + local admin_client = helpers.admin_client(10000) + finally(function() + admin_client:close() + end) - it("proxy on DP follows CP config", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() + local res = assert(admin_client:get("/clustering_data_planes")) + assert.res_status(404, res) end) + end) - local res = assert(admin_client:post("/services", { - body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) + describe("sync works", function() + local route_id - res = assert(admin_client:post("/services/mockbin-service/routes", { - body = { paths = { "/" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) + it("proxy on DP follows CP config", function() + local admin_client = helpers.admin_client(10000) + finally(function() + admin_client:close() + end) - route_id = json.id - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) + local res = assert(admin_client:post("/services", { + body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(201, res) - res = proxy_client:send({ - method = "GET", - path = "/", - }) + res = assert(admin_client:post("/services/mockbin-service/routes", { + body = { paths = { "/" }, }, + headers = {["Content-Type"] = "application/json"} + })) + local body = assert.res_status(201, res) + local json = cjson.decode(body) - local status = res and res.status - proxy_client:close() - if status == 200 then - return true - end - end, 10) - end) + route_id = json.id + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) - it("cache invalidation works on config change", function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() + res = proxy_client:send({ + method = "GET", + path = "/", + }) + + local status = res and res.status + proxy_client:close() + if status == 200 then + return true + end + end, 10) end) - local res = assert(admin_client:send({ - method = "DELETE", - path = "/routes/" .. route_id, - })) - assert.res_status(204, res) + it("cache invalidation works on config change", function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) + local res = assert(admin_client:send({ + method = "DELETE", + path = "/routes/" .. route_id, + })) + assert.res_status(204, res) - res = proxy_client:send({ - method = "GET", - path = "/", - }) + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) - -- should remove the route from DP - local status = res and res.status - proxy_client:close() - if status == 404 then - return true - end - end, 5) - end) + res = proxy_client:send({ + method = "GET", + path = "/", + }) - it("local cached config file has correct permission", function() - local handle = io.popen("ls -l servroot2/config.cache.json.gz") - local result = handle:read("*a") - handle:close() + -- should remove the route from DP + local status = res and res.status + proxy_client:close() + if status == 404 then + return true + end + end, 5) + end) - assert.matches("-rw-------", result, nil, true) - end) + it("local cached config file has correct permission", function() + local handle = io.popen("ls -l servroot2/config.cache.json.gz") + local result = handle:read("*a") + handle:close() - it('does not sync services where enabled == false', function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() + assert.matches("-rw-------", result, nil, true) end) - -- create service - local res = assert(admin_client:post("/services", { - body = { name = "mockbin-service2", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) - local service_id = json.id - - -- -- create route - res = assert(admin_client:post("/services/mockbin-service2/routes", { - body = { paths = { "/soon-to-be-disabled" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) + it('does not sync services where enabled == false', function() + local admin_client = helpers.admin_client(10000) + finally(function() + admin_client:close() + end) - route_id = json.id + -- create service + local res = assert(admin_client:post("/services", { + body = { name = "mockbin-service2", url = "https://127.0.0.1:15556/request", }, + headers = {["Content-Type"] = "application/json"} + })) + local body = assert.res_status(201, res) + local json = cjson.decode(body) + local service_id = json.id - -- test route - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- -- create route + res = assert(admin_client:post("/services/mockbin-service2/routes", { + body = { paths = { "/soon-to-be-disabled" }, }, + headers = {["Content-Type"] = "application/json"} + })) + local body = assert.res_status(201, res) + local json = cjson.decode(body) - res = proxy_client:send({ - method = "GET", - path = "/soon-to-be-disabled", - }) + route_id = json.id - local status = res and res.status - proxy_client:close() - if status == 200 then - return true - end - end, 10) + -- test route + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) - -- disable service - local res = assert(admin_client:patch("/services/" .. service_id, { - body = { enabled = false, }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(200, res) - -- as this is testing a negative behavior, there is no sure way to wait - -- this can probably be optimizted - ngx.sleep(2) + res = proxy_client:send({ + method = "GET", + path = "/soon-to-be-disabled", + }) - local proxy_client = helpers.http_client("127.0.0.1", 9002) + local status = res and res.status + proxy_client:close() + if status == 200 then + return true + end + end, 10) - -- test route again - res = assert(proxy_client:send({ - method = "GET", - path = "/soon-to-be-disabled", - })) - assert.res_status(404, res) + -- disable service + local res = assert(admin_client:patch("/services/" .. service_id, { + body = { enabled = false, }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(200, res) + -- as this is testing a negative behavior, there is no sure way to wait + -- this can probably be optimizted + ngx.sleep(2) - proxy_client:close() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + + -- test route again + res = assert(proxy_client:send({ + method = "GET", + path = "/soon-to-be-disabled", + })) + assert.res_status(404, res) + + proxy_client:close() + end) end) end) - end) + end describe("CP/DP version check works with #" .. strategy, function() -- for these tests, we do not need a real DP, but rather use the fake DP -- client so we can mock various values (e.g. node_version) describe("relaxed compatibility check:", function() - lazy_setup(function() - local bp = helpers.get_db_utils(strategy, { - "routes", - "services", - "plugins", - "upstreams", - "targets", - "certificates", - "clustering_data_planes", - }) -- runs migrations + local bp = helpers.get_db_utils(strategy, { + "routes", + "services", + "plugins", + "upstreams", + "targets", + "certificates", + "clustering_data_planes", + }) -- runs migrations - bp.plugins:insert { - name = "key-auth", - } + bp.plugins:insert { + name = "key-auth", + } + lazy_setup(function() assert(helpers.start_kong({ role = "control_plane", @@ -556,7 +559,129 @@ for _, strategy in helpers.each_strategy() do end) end) - describe("CP/DP sync works with #" .. strategy .. " backend", function() + for _, cluster_protocol in ipairs{"json", "wrpc"} do + describe("CP/DP sync works with #" .. strategy .. " backend, protocol " .. cluster_protocol, function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "plugins", + "upstreams", + "targets", + "certificates", + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + db_update_frequency = 3, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("sync works", function() + it("pushes first change asap and following changes in a batch", function() + local admin_client = helpers.admin_client(10000) + local proxy_client = helpers.http_client("127.0.0.1", 9002) + finally(function() + admin_client:close() + proxy_client:close() + end) + + local res = admin_client:put("/routes/1", { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + paths = { "/1" }, + }, + }) + + assert.res_status(200, res) + + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- serviceless route should return 503 instead of 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 503 then + return true + end + end, 10) + + for i = 2, 5 do + res = admin_client:put("/routes/" .. i, { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + paths = { "/" .. i }, + }, + }) + + assert.res_status(200, res) + end + + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- serviceless route should return 503 instead of 404 + res = proxy_client:get("/2") + proxy_client:close() + if res and res.status == 503 then + return true + end + end, 5) + + for i = 5, 3, -1 do + res = proxy_client:get("/" .. i) + assert.res_status(503, res) + end + + for i = 1, 5 do + local res = admin_client:delete("/routes/" .. i) + assert.res_status(204, res) + end + + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- deleted route should return 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 404 then + return true + end + end, 5) + + for i = 5, 2, -1 do + res = proxy_client:get("/" .. i) + assert.res_status(404, res) + end + end) + end) + end) + end + + describe("CP/DP sync works with #" .. strategy .. " backend, two DPs via different protocols on the same CP", function() lazy_setup(function() helpers.get_db_utils(strategy, { "routes", @@ -580,6 +705,7 @@ for _, strategy in helpers.each_strategy() do assert(helpers.start_kong({ role = "data_plane", + cluster_protocol = "json", database = "off", prefix = "servroot2", cluster_cert = "spec/fixtures/kong_clustering.crt", @@ -587,91 +713,129 @@ for _, strategy in helpers.each_strategy() do cluster_control_plane = "127.0.0.1:9005", proxy_listen = "0.0.0.0:9002", })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = "wrpc", + database = "off", + prefix = "servroot3", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9003", + })) end) lazy_teardown(function() + helpers.stop_kong("servroot3") helpers.stop_kong("servroot2") helpers.stop_kong() end) - describe("sync works", function() - it("pushes first change asap and following changes in a batch", function() - local admin_client = helpers.admin_client(10000) + it("pushes first change asap and following changes in a batch", function() + local admin_client = helpers.admin_client(10000) + local proxy_client_A = helpers.http_client("127.0.0.1", 9002) + local proxy_client_B = helpers.http_client("127.0.0.1", 9003) + finally(function() + admin_client:close() + proxy_client_A:close() + proxy_client_B:close() + end) + + local res = admin_client:put("/routes/1", { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + paths = { "/1" }, + }, + }) + + assert.res_status(200, res) + + -- first CP got it + helpers.wait_until(function() local proxy_client = helpers.http_client("127.0.0.1", 9002) - finally(function() - admin_client:close() - proxy_client:close() - end) + -- serviceless route should return 503 instead of 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 503 then + return true + end + end, 10) - local res = admin_client:put("/routes/1", { + -- second CP got it + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9003) + -- serviceless route should return 503 instead of 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 503 then + return true + end + end, 10) + + for i = 2, 5 do + res = admin_client:put("/routes/" .. i, { headers = { ["Content-Type"] = "application/json", }, body = { - paths = { "/1" }, + paths = { "/" .. i }, }, }) assert.res_status(200, res) + end - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) - -- serviceless route should return 503 instead of 404 - res = proxy_client:get("/1") - proxy_client:close() - if res and res.status == 503 then - return true - end - end, 2) - - for i = 2, 5 do - res = admin_client:put("/routes/" .. i, { - headers = { - ["Content-Type"] = "application/json", - }, - body = { - paths = { "/" .. i }, - }, - }) - - assert.res_status(200, res) + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- serviceless route should return 503 instead of 404 + res = proxy_client:get("/2") + proxy_client:close() + if res and res.status == 503 then + return true end + end, 5) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) - -- serviceless route should return 503 instead of 404 - res = proxy_client:get("/2") - proxy_client:close() - if res and res.status == 503 then - return true - end - end, 5) + for i = 5, 3, -1 do + assert.res_status(503, proxy_client_A:get("/" .. i)) + assert.res_status(503, proxy_client_B:get("/" .. i)) + end - for i = 5, 3, -1 do - res = proxy_client:get("/" .. i) - assert.res_status(503, res) - end + for i = 1, 5 do + assert.res_status(204, admin_client:delete("/routes/" .. i)) + end - for i = 1, 5 do - local res = admin_client:delete("/routes/" .. i) - assert.res_status(204, res) + -- first CP no longer sees them + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) + -- deleted route should return 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 404 then + return true end + end, 5) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) - -- deleted route should return 404 - res = proxy_client:get("/1") - proxy_client:close() - if res and res.status == 404 then - return true - end - end, 5) + for i = 5, 2, -1 do + assert.res_status(404, proxy_client_A:get("/" .. i)) + end - for i = 5, 2, -1 do - res = proxy_client:get("/" .. i) - assert.res_status(404, res) + -- second CP + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9003) + -- deleted route should return 404 + res = proxy_client:get("/1") + proxy_client:close() + if res and res.status == 404 then + return true end - end) + end, 5) + + for i = 5, 2, -1 do + assert.res_status(404, proxy_client_B:get("/" .. i)) + end end) end) end diff --git a/spec/02-integration/09-hybrid_mode/02-start_stop_spec.lua b/spec/02-integration/09-hybrid_mode/02-start_stop_spec.lua index bfbfa361e331..efc02fe8a265 100644 --- a/spec/02-integration/09-hybrid_mode/02-start_stop_spec.lua +++ b/spec/02-integration/09-hybrid_mode/02-start_stop_spec.lua @@ -1,113 +1,123 @@ local helpers = require "spec.helpers" -describe("invalid config are rejected", function() - describe("role is control_plane", function() - it("can not disable admin_listen", function() - local ok, err = helpers.start_kong({ - role = "control_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - admin_listen = "off", - }) - - assert.False(ok) - assert.matches("Error: admin_listen must be specified when role = \"control_plane\"", err, nil, true) - end) +for _, cluster_protocol in ipairs{"json", "wrpc"} do + describe("invalid config are rejected, protocol " .. cluster_protocol, function() + describe("role is control_plane", function() + it("can not disable admin_listen", function() + local ok, err = helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + admin_listen = "off", + }) - it("can not disable cluster_listen", function() - local ok, err = helpers.start_kong({ - role = "control_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_listen = "off", - }) - - assert.False(ok) - assert.matches("Error: cluster_listen must be specified when role = \"control_plane\"", err, nil, true) - end) + assert.False(ok) + assert.matches("Error: admin_listen must be specified when role = \"control_plane\"", err, nil, true) + end) - it("can not use DB-less mode", function() - local ok, err = helpers.start_kong({ - role = "control_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - database = "off", - }) - - assert.False(ok) - assert.matches("Error: in-memory storage can not be used when role = \"control_plane\"", err, nil, true) - end) + it("can not disable cluster_listen", function() + local ok, err = helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_listen = "off", + }) - it("must define cluster_ca_cert", function() - local ok, err = helpers.start_kong({ - role = "control_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - cluster_mtls = "pki", - }) - - assert.False(ok) - assert.matches("Error: cluster_ca_cert must be specified when cluster_mtls = \"pki\"", err, nil, true) - end) - end) + assert.False(ok) + assert.matches("Error: cluster_listen must be specified when role = \"control_plane\"", err, nil, true) + end) - describe("role is proxy", function() - it("can not disable proxy_listen", function() - local ok, err = helpers.start_kong({ - role = "data_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - proxy_listen = "off", - }) - - assert.False(ok) - assert.matches("Error: proxy_listen must be specified when role = \"data_plane\"", err, nil, true) - end) + it("can not use DB-less mode", function() + local ok, err = helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = "off", + }) + + assert.False(ok) + assert.matches("Error: in-memory storage can not be used when role = \"control_plane\"", err, nil, true) + end) + + it("must define cluster_ca_cert", function() + local ok, err = helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_mtls = "pki", + }) - it("can not use DB mode", function() - local ok, err = helpers.start_kong({ - role = "data_plane", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - }) - - assert.False(ok) - assert.matches("Error: only in-memory storage can be used when role = \"data_plane\"\n" .. - "Hint: set database = off in your kong.conf", err, nil, true) + assert.False(ok) + assert.matches("Error: cluster_ca_cert must be specified when cluster_mtls = \"pki\"", err, nil, true) + end) end) - end) - for _, param in ipairs({ { "control_plane", "postgres" }, { "data_plane", "off" }, }) do - describe("role is " .. param[1], function() - it("errors if cluster certificate is not found", function() + describe("role is proxy", function() + it("can not disable proxy_listen", function() local ok, err = helpers.start_kong({ - role = param[1], - database = param[2], + role = "data_plane", + cluster_protocol = cluster_protocol, prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + proxy_listen = "off", }) assert.False(ok) - assert.matches("Error: cluster certificate and key must be provided to use Hybrid mode", err, nil, true) + assert.matches("Error: proxy_listen must be specified when role = \"data_plane\"", err, nil, true) end) - it("errors if cluster certificate key is not found", function() + it("can not use DB mode", function() local ok, err = helpers.start_kong({ - role = param[1], - database = param[2], + role = "data_plane", + cluster_protocol = cluster_protocol, prefix = "servroot2", cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", }) assert.False(ok) - assert.matches("Error: cluster certificate and key must be provided to use Hybrid mode", err, nil, true) + assert.matches("Error: only in-memory storage can be used when role = \"data_plane\"\n" .. + "Hint: set database = off in your kong.conf", err, nil, true) end) end) - end -end) + + for _, param in ipairs({ { "control_plane", "postgres" }, { "data_plane", "off" }, }) do + describe("role is " .. param[1], function() + it("errors if cluster certificate is not found", function() + local ok, err = helpers.start_kong({ + role = param[1], + cluster_protocol = cluster_protocol, + database = param[2], + prefix = "servroot2", + }) + + assert.False(ok) + assert.matches("Error: cluster certificate and key must be provided to use Hybrid mode", err, nil, true) + end) + + it("errors if cluster certificate key is not found", function() + local ok, err = helpers.start_kong({ + role = param[1], + cluster_protocol = cluster_protocol, + database = param[2], + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + }) + + assert.False(ok) + assert.matches("Error: cluster certificate and key must be provided to use Hybrid mode", err, nil, true) + end) + end) + end + end) +end diff --git a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua index 1a13dca4f034..d50f7a3d8b4a 100644 --- a/spec/02-integration/09-hybrid_mode/03-pki_spec.lua +++ b/spec/02-integration/09-hybrid_mode/03-pki_spec.lua @@ -2,155 +2,159 @@ local helpers = require "spec.helpers" local cjson = require "cjson.safe" -for _, strategy in helpers.each_strategy() do - describe("CP/DP PKI sync works with #" .. strategy .. " backend", function() - - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/kong_clustering.key", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/kong_clustering_client.crt", - cluster_cert_key = "spec/fixtures/kong_clustering_client.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/kong_clustering.crt", - })) - end) - - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) +for _, cluster_protocol in ipairs{"json", "wrpc"} do + for _, strategy in helpers.each_strategy() do + describe("CP/DP PKI sync works with #" .. strategy .. " backend, protocol " .. cluster_protocol, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/kong_clustering_ca.crt", + })) - describe("status API", function() - it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering_client.crt", + cluster_cert_key = "spec/fixtures/kong_clustering_client.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/kong_clustering.crt", + })) + end) - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - return true + describe("status API", function() + it("shows DP status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + return true + end + end + end, 5) + end) + it("shows DP status (#deprecated)", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/status")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json) do + if v.ip == "127.0.0.1" then + return true + end end - end - end, 5) + end, 5) + end) end) - it("shows DP status (#deprecated)", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() + + describe("sync works", function() + local route_id + + it("proxy on DP follows CP config", function() + local admin_client = helpers.admin_client(10000) finally(function() admin_client:close() end) - local res = assert(admin_client:get("/clustering/status")) - local body = assert.res_status(200, res) + local res = assert(admin_client:post("/services", { + body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, + headers = {["Content-Type"] = "application/json"} + })) + assert.res_status(201, res) + + res = assert(admin_client:post("/services/mockbin-service/routes", { + body = { paths = { "/" }, }, + headers = {["Content-Type"] = "application/json"} + })) + local body = assert.res_status(201, res) local json = cjson.decode(body) - for _, v in pairs(json) do - if v.ip == "127.0.0.1" then - return true - end - end - end, 5) - end) - end) + route_id = json.id - describe("sync works", function() - local route_id - - it("proxy on DP follows CP config", function() - local admin_client = helpers.admin_client(10000) - finally(function() - admin_client:close() - end) + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) - local res = assert(admin_client:post("/services", { - body = { name = "mockbin-service", url = "https://127.0.0.1:15556/request", }, - headers = {["Content-Type"] = "application/json"} - })) - assert.res_status(201, res) + res = proxy_client:send({ + method = "GET", + path = "/", + }) - res = assert(admin_client:post("/services/mockbin-service/routes", { - body = { paths = { "/" }, }, - headers = {["Content-Type"] = "application/json"} - })) - local body = assert.res_status(201, res) - local json = cjson.decode(body) + local status = res and res.status + proxy_client:close() + if status == 200 then + return true + end + end, 10) + end) - route_id = json.id + it("cache invalidation works on config change", function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) + local res = assert(admin_client:send({ + method = "DELETE", + path = "/routes/" .. route_id, + })) + assert.res_status(204, res) - res = proxy_client:send({ - method = "GET", - path = "/", - }) + helpers.wait_until(function() + local proxy_client = helpers.http_client("127.0.0.1", 9002) - local status = res and res.status - proxy_client:close() - if status == 200 then - return true - end - end, 10) - end) + res = proxy_client:send({ + method = "GET", + path = "/", + }) - it("cache invalidation works on config change", function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() + -- should remove the route from DP + local status = res and res.status + proxy_client:close() + if status == 404 then + return true + end + end, 5) end) - - local res = assert(admin_client:send({ - method = "DELETE", - path = "/routes/" .. route_id, - })) - assert.res_status(204, res) - - helpers.wait_until(function() - local proxy_client = helpers.http_client("127.0.0.1", 9002) - - res = proxy_client:send({ - method = "GET", - path = "/", - }) - - -- should remove the route from DP - local status = res and res.status - proxy_client:close() - if status == 404 then - return true - end - end, 5) end) end) - end) + end end diff --git a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua index c735fb76ea26..eee305f4475d 100644 --- a/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua @@ -68,10 +68,10 @@ for _, strategy in helpers.each_strategy() do local filepath = cfg.prefix .. "/" .. cfg.proxy_error_log helpers.wait_until(function() return find_in_file(filepath, - -- this line is only found on the other CP (the one not receiving the Admin API call) - "[clustering] received clustering:push_config event for services:create") and - find_in_file(filepath, - "worker-events: handling event; source=clustering, event=push_config") + -- this line is only found on the other CP (the one not receiving the Admin API call) + "clustering] received clustering:push_config event for services:create") and + find_in_file(filepath, + "worker-events: handling event; source=clustering, event=push_config") end, 10) end) end) diff --git a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua index 3f6275c150ff..cc800a745006 100644 --- a/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua +++ b/spec/02-integration/09-hybrid_mode/05-ocsp_spec.lua @@ -14,195 +14,133 @@ local function set_ocsp_status(status) end -for _, strategy in helpers.each_strategy() do - describe("cluster_ocsp = on works with #" .. strategy .. " backend", function() - describe("DP certificate good", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "on", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("good") - end) +for _, cluster_protocol in ipairs{"json", "wrpc"} do + for _, strategy in helpers.each_strategy() do + describe("cluster_ocsp = on works with #" .. strategy .. " backend, protocol " .. cluster_protocol, function() + describe("DP certificate good", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "on", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("good") + end) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) - describe("status API", function() - it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + describe("status API", function() + it("shows DP status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - return true + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + return true + end end - end - end, 5) + end, 5) + end) end) end) - end) - describe("DP certificate revoked", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "on", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("revoked") - end) - - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) - - it("revoked DP certificate can not connect to CP", function() - helpers.wait_until(function() - local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) - if logs:find([[client certificate was revoked: failed to validate OCSP response: certificate status "revoked" in the OCSP response]], 1, true) then - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - assert.equal(0, #json.data) - return true - end - end, 10) - end) - end) + describe("DP certificate revoked", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "on", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("revoked") + end) - describe("OCSP responder errors, DP are not allowed", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "on", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("error") - end) + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) - describe("status API", function() - it("does not show DP status", function() + it("revoked DP certificate can not connect to CP", function() helpers.wait_until(function() local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) - if logs:find('data plane client certificate revocation check failed: OCSP responder returns bad HTTP status code: 500', nil, true) then + if logs:find([[client certificate was revoked: failed to validate OCSP response: certificate status "revoked" in the OCSP response]], 1, true) then local admin_client = helpers.admin_client() finally(function() admin_client:close() @@ -215,221 +153,297 @@ for _, strategy in helpers.each_strategy() do assert.equal(0, #json.data) return true end - end, 5) + end, 10) end) end) - end) - end) - - describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() - describe("DP certificate revoked, not checking for OCSP", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "off", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("revoked") + + describe("OCSP responder errors, DP are not allowed", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "on", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("error") + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + describe("status API", function() + it("does not show DP status", function() + helpers.wait_until(function() + local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) + if logs:find('data plane client certificate revocation check failed: OCSP responder returns bad HTTP status code: 500', nil, true) then + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + assert.equal(0, #json.data) + return true + end + end, 5) + end) + end) end) + end) + + describe("cluster_ocsp = off works with #" .. strategy .. " backend", function() + describe("DP certificate revoked, not checking for OCSP", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "off", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("revoked") + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("status API", function() + it("shows DP status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + return true + end + end + end, 5) + end) + end) end) + end) + + describe("cluster_ocsp = optional works with #" .. strategy .. " backend", function() + describe("DP certificate revoked", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "optional", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("revoked") + end) - describe("status API", function() - it("shows DP status", function() + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + it("revoked DP certificate can not connect to CP", function() helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) + if logs:find('client certificate was revoked: failed to validate OCSP response: certificate status "revoked" in the OCSP response', nil, true) then + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - return true - end + assert.equal(0, #json.data) + return true end end, 5) end) end) - end) - end) - - describe("cluster_ocsp = optional works with #" .. strategy .. " backend", function() - describe("DP certificate revoked", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "optional", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("revoked") - end) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) + describe("OCSP responder errors, DP are allowed through", function() + lazy_setup(function() + helpers.get_db_utils(strategy, { + "routes", + "services", + "clustering_data_planes", + "upstreams", + "targets", + "certificates", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_protocol = cluster_protocol, + cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", + cluster_ocsp = "optional", + db_update_frequency = 0.1, + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + assert(helpers.start_kong({ + role = "data_plane", + cluster_protocol = cluster_protocol, + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", + cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + -- additional attributes for PKI: + cluster_mtls = "pki", + cluster_server_name = "kong_clustering", + cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", + })) + + set_ocsp_status("error") + end) - it("revoked DP certificate can not connect to CP", function() - helpers.wait_until(function() - local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) - if logs:find('client certificate was revoked: failed to validate OCSP response: certificate status "revoked" in the OCSP response', nil, true) then - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - assert.equal(0, #json.data) - return true - end - end, 5) - end) - end) + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + describe("status API", function() + it("shows DP status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) - describe("OCSP responder errors, DP are allowed through", function() - lazy_setup(function() - helpers.get_db_utils(strategy, { - "routes", - "services", - "clustering_data_planes", - "upstreams", - "targets", - "certificates", - }) -- runs migrations - - assert(helpers.start_kong({ - role = "control_plane", - cluster_cert = "spec/fixtures/ocsp_certs/kong_clustering.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_clustering.key", - cluster_ocsp = "optional", - db_update_frequency = 0.1, - database = strategy, - cluster_listen = "127.0.0.1:9005", - nginx_conf = "spec/fixtures/custom_nginx.template", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - assert(helpers.start_kong({ - role = "data_plane", - database = "off", - prefix = "servroot2", - cluster_cert = "spec/fixtures/ocsp_certs/kong_data_plane.crt", - cluster_cert_key = "spec/fixtures/ocsp_certs/kong_data_plane.key", - cluster_control_plane = "127.0.0.1:9005", - proxy_listen = "0.0.0.0:9002", - -- additional attributes for PKI: - cluster_mtls = "pki", - cluster_server_name = "kong_clustering", - cluster_ca_cert = "spec/fixtures/ocsp_certs/ca.crt", - })) - - set_ocsp_status("error") - end) + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) - lazy_teardown(function() - helpers.stop_kong("servroot2") - helpers.stop_kong() - end) - describe("status API", function() - it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) - - local res = assert(admin_client:get("/clustering/data-planes")) - local body = assert.res_status(200, res) - local json = cjson.decode(body) - - for _, v in pairs(json.data) do - if v.ip == "127.0.0.1" then - local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) - if logs:find('data plane client certificate revocation check failed: OCSP responder returns bad HTTP status code: 500', nil, true) then - return true + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + local logs = pl_file.read(TEST_CONF.prefix .. "/" .. TEST_CONF.proxy_error_log) + if logs:find('data plane client certificate revocation check failed: OCSP responder returns bad HTTP status code: 500', nil, true) then + return true + end end end - end - end, 5) + end, 5) + end) end) end) end) - end) + end end diff --git a/spec/fixtures/custom_nginx.template b/spec/fixtures/custom_nginx.template index 0dcef067a22a..dbb2403c4d87 100644 --- a/spec/fixtures/custom_nginx.template +++ b/spec/fixtures/custom_nginx.template @@ -462,6 +462,12 @@ http { Kong.serve_cluster_listener() } } + + location = /v1/wrpc { + content_by_lua_block { + Kong.serve_wrpc_listener() + } + } } > end -- role == "control_plane"