From 4a634e0067104a84039726b12dab693d3d1d14c8 Mon Sep 17 00:00:00 2001 From: Michael Martin Date: Tue, 18 Jul 2023 10:34:09 -0700 Subject: [PATCH] feat(core): add wasm integration --- kong-3.4.0-0.rockspec | 20 +- kong.conf.default | 12 + kong/api/routes/filter_chains.lua | 185 +++++ kong/clustering/compat/init.lua | 20 + kong/clustering/control_plane.lua | 18 +- kong/clustering/data_plane.lua | 6 +- kong/clustering/init.lua | 16 +- kong/conf_loader/init.lua | 205 +++++ kong/constants.lua | 2 + kong/db/dao/filter_chains.lua | 108 +++ kong/db/migrations/core/020_330_to_340.lua | 46 ++ kong/db/migrations/core/init.lua | 1 + kong/db/schema/entities/filter_chains.lua | 69 ++ kong/init.lua | 11 + kong/runloop/events.lua | 25 + kong/runloop/handler.lua | 97 +++ kong/runloop/wasm.lua | 651 ++++++++++++++++ kong/templates/kong_defaults.lua | 4 + kong/templates/nginx.lua | 45 ++ .../01-schema/13-cluster_status_spec.lua | 2 +- spec/01-unit/03-conf_loader_spec.lua | 53 ++ spec/01-unit/04-prefix_handler_spec.lua | 165 ++++ .../20-wasm/01-admin-api_spec.lua | 711 ++++++++++++++++++ spec/02-integration/20-wasm/02-db_spec.lua | 469 ++++++++++++ .../20-wasm/03-runtime_spec.lua | 486 ++++++++++++ .../20-wasm/04-proxy-wasm_spec.lua | 395 ++++++++++ .../20-wasm/05-cache-invalidation_spec.lua | 508 +++++++++++++ .../20-wasm/06-clustering_spec.lua | 262 +++++++ .../migrations/core/020_330_to_340_spec.lua | 24 + spec/fixtures/blueprints.lua | 7 + spec/fixtures/custom_nginx.template | 48 ++ spec/helpers.lua | 37 + spec/kong_tests.conf | 2 + 33 files changed, 4692 insertions(+), 18 deletions(-) create mode 100644 kong/api/routes/filter_chains.lua create mode 100644 kong/db/dao/filter_chains.lua create mode 100644 kong/db/schema/entities/filter_chains.lua create mode 100644 kong/runloop/wasm.lua create mode 100644 spec/02-integration/20-wasm/01-admin-api_spec.lua create mode 100644 spec/02-integration/20-wasm/02-db_spec.lua create mode 100644 spec/02-integration/20-wasm/03-runtime_spec.lua create mode 100644 spec/02-integration/20-wasm/04-proxy-wasm_spec.lua create mode 100644 spec/02-integration/20-wasm/05-cache-invalidation_spec.lua create mode 100644 spec/02-integration/20-wasm/06-clustering_spec.lua diff --git a/kong-3.4.0-0.rockspec b/kong-3.4.0-0.rockspec index 6004e30a7f14..1001455d4f3c 100644 --- a/kong-3.4.0-0.rockspec +++ b/kong-3.4.0-0.rockspec @@ -124,19 +124,20 @@ build = { ["kong.api.api_helpers"] = "kong/api/api_helpers.lua", ["kong.api.arguments"] = "kong/api/arguments.lua", ["kong.api.endpoints"] = "kong/api/endpoints.lua", - ["kong.api.routes.kong"] = "kong/api/routes/kong.lua", - ["kong.api.routes.health"] = "kong/api/routes/health.lua", + ["kong.api.routes.cache"] = "kong/api/routes/cache.lua", + ["kong.api.routes.certificates"] = "kong/api/routes/certificates.lua", + ["kong.api.routes.clustering"] = "kong/api/routes/clustering.lua", ["kong.api.routes.config"] = "kong/api/routes/config.lua", ["kong.api.routes.consumers"] = "kong/api/routes/consumers.lua", + ["kong.api.routes.debug"] = "kong/api/routes/debug.lua", + ["kong.api.routes.filter_chains"] = "kong/api/routes/filter_chains.lua", + ["kong.api.routes.health"] = "kong/api/routes/health.lua", + ["kong.api.routes.kong"] = "kong/api/routes/kong.lua", ["kong.api.routes.plugins"] = "kong/api/routes/plugins.lua", - ["kong.api.routes.cache"] = "kong/api/routes/cache.lua", - ["kong.api.routes.upstreams"] = "kong/api/routes/upstreams.lua", - ["kong.api.routes.targets"] = "kong/api/routes/targets.lua", - ["kong.api.routes.certificates"] = "kong/api/routes/certificates.lua", ["kong.api.routes.snis"] = "kong/api/routes/snis.lua", ["kong.api.routes.tags"] = "kong/api/routes/tags.lua", - ["kong.api.routes.clustering"] = "kong/api/routes/clustering.lua", - ["kong.api.routes.debug"] = "kong/api/routes/debug.lua", + ["kong.api.routes.targets"] = "kong/api/routes/targets.lua", + ["kong.api.routes.upstreams"] = "kong/api/routes/upstreams.lua", ["kong.admin_gui"] = "kong/admin_gui/init.lua", @@ -174,6 +175,7 @@ build = { ["kong.runloop.plugin_servers.process"] = "kong/runloop/plugin_servers/process.lua", ["kong.runloop.plugin_servers.mp_rpc"] = "kong/runloop/plugin_servers/mp_rpc.lua", ["kong.runloop.plugin_servers.pb_rpc"] = "kong/runloop/plugin_servers/pb_rpc.lua", + ["kong.runloop.wasm"] = "kong/runloop/wasm.lua", ["kong.workspaces"] = "kong/workspaces/init.lua", @@ -195,6 +197,7 @@ build = { ["kong.db.schema"] = "kong/db/schema/init.lua", ["kong.db.dao.keys"] = "kong/db/dao/keys.lua", ["kong.db.dao.key_sets"] = "kong/db/dao/key_sets.lua", + ["kong.db.dao.filter_chains"] = "kong/db/dao/filter_chains.lua", ["kong.db.schema.entities.keys"] = "kong/db/schema/entities/keys.lua", ["kong.db.schema.entities.key_sets"] = "kong/db/schema/entities/key_sets.lua", ["kong.db.schema.entities.consumers"] = "kong/db/schema/entities/consumers.lua", @@ -212,6 +215,7 @@ build = { ["kong.db.schema.entities.workspaces"] = "kong/db/schema/entities/workspaces.lua", ["kong.db.schema.entities.clustering_data_planes"] = "kong/db/schema/entities/clustering_data_planes.lua", ["kong.db.schema.entities.parameters"] = "kong/db/schema/entities/parameters.lua", + ["kong.db.schema.entities.filter_chains"] = "kong/db/schema/entities/filter_chains.lua", ["kong.db.schema.others.migrations"] = "kong/db/schema/others/migrations.lua", ["kong.db.schema.others.declarative_config"] = "kong/db/schema/others/declarative_config.lua", ["kong.db.schema.entity"] = "kong/db/schema/entity.lua", diff --git a/kong.conf.default b/kong.conf.default index 9b26e9057ab8..109d4cafe038 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1951,3 +1951,15 @@ # # Granularity can be adjusted through the `log_level` # directive. + + +#------------------------------------------------------------------------------ +# WASM +#------------------------------------------------------------------------------ +# +# +#wasm = off # Use this setting to enable wasm, this allows running + # wasm filters to process request data. + +#wasm_filters_path = # Path to the directory containing Wasm filters + # that Kong must load on startup. diff --git a/kong/api/routes/filter_chains.lua b/kong/api/routes/filter_chains.lua new file mode 100644 index 000000000000..089d4e174d18 --- /dev/null +++ b/kong/api/routes/filter_chains.lua @@ -0,0 +1,185 @@ +local cjson = require "cjson" +local endpoints = require "kong.api.endpoints" + + +local kong = kong + + +if kong.configuration.wasm == false then + + local function wasm_disabled_error() + return kong.response.exit(400, { + message = "this endpoint is only available when wasm is enabled" + }) + end + + return { + ["/filter-chains"] = { + before = wasm_disabled_error, + }, + + ["/filter-chains/:filter_chains"] = { + before = wasm_disabled_error, + }, + + ["/filter-chains/:filter_chains/route"] = { + before = wasm_disabled_error, + }, + + ["/filter-chains/:filter_chains/service"] = { + before = wasm_disabled_error, + }, + + -- foreign key endpoints: + + ["/routes/:routes/filter-chains"] = { + before = wasm_disabled_error, + }, + + ["/routes/:routes/filter-chains/:filter_chains"] = { + before = wasm_disabled_error, + }, + + ["/services/:services/filter-chains"] = { + before = wasm_disabled_error, + }, + + ["/services/:services/filter-chains/:filter_chains"] = { + before = wasm_disabled_error, + }, + + -- custom endpoints (implemented below): + + ["/routes/:routes/filters/enabled"] = { + GET = wasm_disabled_error, + }, + + ["/routes/:routes/filters/disabled"] = { + GET = wasm_disabled_error, + }, + + ["/routes/:routes/filters/all"] = { + GET = wasm_disabled_error, + }, + } +end + + +local function add_filters(filters, chain, from) + if not chain then + return + end + + for _, filter in ipairs(chain.filters) do + table.insert(filters, { + name = filter.name, + config = filter.config, + from = from, + enabled = (chain.enabled == true and filter.enabled == true), + filter_chain = { + name = chain.name, + id = chain.id, + } + }) + end +end + + +local function get_filters(self, db) + local route, _, err_t = endpoints.select_entity(self, db, db.routes.schema) + if err_t then + return nil, err_t + end + + if not route then + return kong.response.exit(404, { message = "Not found" }) + end + + local route_chain + for chain, _, err_t in kong.db.filter_chains:each_for_route(route, nil, { nulls = true }) do + if not chain then + return nil, err_t + end + + route_chain = chain + end + + local service + local service_chain + + if route.service then + service , _, err_t = kong.db.services:select(route.service) + if err_t then + return nil, err_t + end + + for chain, _, err_t in kong.db.filter_chains:each_for_service(service, nil, { nulls = true }) do + if not chain then + return nil, err_t + end + + service_chain = chain + end + end + + local filters = setmetatable({}, cjson.array_mt) + add_filters(filters, service_chain, "service") + add_filters(filters, route_chain, "route") + + return filters +end + + +return { + ["/routes/:routes/filters/all"] = { + GET = function(self, db) + local filters, err_t = get_filters(self, db) + if err_t then + return endpoints.handle_error(err_t) + end + + return kong.response.exit(200, { + filters = filters, + }) + end + }, + + ["/routes/:routes/filters/enabled"] = { + GET = function(self, db) + local filters, err_t = get_filters(self, db) + if err_t then + return endpoints.handle_error(err_t) + end + + for i = #filters, 1, -1 do + if not filters[i].enabled then + table.remove(filters, i) + end + end + + return kong.response.exit(200, { + filters = filters, + }) + end + }, + + ["/routes/:routes/filters/disabled"] = { + GET = function(self, db) + local filters, err_t = get_filters(self, db) + if err_t then + return endpoints.handle_error(err_t) + end + + for i = #filters, 1, -1 do + if filters[i].enabled then + table.remove(filters, i) + end + end + + return kong.response.exit(200, { + filters = filters, + }) + end + }, + +} diff --git a/kong/clustering/compat/init.lua b/kong/clustering/compat/init.lua index 782f9b96e1f7..9ae08eadc317 100644 --- a/kong/clustering/compat/init.lua +++ b/kong/clustering/compat/init.lua @@ -29,6 +29,8 @@ local COMPATIBILITY_CHECKERS = require("kong.clustering.compat.checkers") local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local KONG_VERSION = meta.version +local EMPTY = {} + local _M = {} @@ -176,6 +178,24 @@ function _M.check_configuration_compatibility(cp, dp) end end + if cp.conf.wasm then + local dp_filters = dp.filters or EMPTY + local missing + for name in pairs(cp.filters or EMPTY) do + if not dp_filters[name] then + missing = missing or {} + table.insert(missing, name) + end + end + + if missing then + local msg = "data plane is missing one or more wasm filters " + .. "(" .. table.concat(missing, ", ") .. ")" + return nil, msg, CLUSTERING_SYNC_STATUS.FILTER_SET_INCOMPATIBLE + end + end + + return true, nil, CLUSTERING_SYNC_STATUS.NORMAL end diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index e564cfd07e41..2add807e88cf 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -279,8 +279,10 @@ function _M:handle_cp_websocket() if self.deflated_reconfigure_payload then -- initial configuration compatibility for sync status variable - _, _, sync_status = self:check_configuration_compatibility( - { dp_plugins_map = dp_plugins_map, }) + _, _, sync_status = self:check_configuration_compatibility({ + dp_plugins_map = dp_plugins_map, + filters = data.filters, + }) table_insert(queue, RECONFIGURE_TYPE) queue.post() @@ -397,8 +399,11 @@ function _M:handle_cp_websocket() assert(payload == RECONFIGURE_TYPE) local previous_sync_status = sync_status - ok, err, sync_status = self:check_configuration_compatibility( - { dp_plugins_map = dp_plugins_map, }) + ok, err, sync_status = self:check_configuration_compatibility({ + dp_plugins_map = dp_plugins_map, + filters = data.filters, + }) + if not ok then ngx_log(ngx_WARN, _log_prefix, "unable to send updated configuration to data plane: ", err, log_suffix) if sync_status ~= previous_sync_status then @@ -532,8 +537,9 @@ local function push_config_loop(premature, self, push_config_semaphore, delay) end -function _M:init_worker(plugins_list) +function _M:init_worker(basic_info) -- ROLE = "control_plane" + local plugins_list = basic_info.plugins self.plugins_list = plugins_list self.plugins_map = plugins_list_to_map(plugins_list) @@ -547,6 +553,8 @@ function _M:init_worker(plugins_list) self.plugin_versions[plugin.name] = plugin.version end + self.filters = basic_info.filters + local push_config_semaphore = semaphore.new() -- When "clustering", "push_config" worker event is received by a worker, diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index e2ae11f6c86a..4176a47b8441 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -73,10 +73,11 @@ function _M.new(clustering) end -function _M:init_worker(plugins_list) +function _M:init_worker(basic_info) -- ROLE = "data_plane" - self.plugins_list = plugins_list + self.plugins_list = basic_info.plugins + self.filters = basic_info.filters -- only run in process which worker_id() == 0 assert(ngx.timer.at(0, function(premature) @@ -147,6 +148,7 @@ function _M:communicate(premature) _, err = c:send_binary(cjson_encode({ type = "basic_info", plugins = self.plugins_list, process_conf = configuration, + filters = self.filters, labels = labels, })) if err then ngx_log(ngx_ERR, _log_prefix, "unable to send basic information to control plane: ", uri, diff --git a/kong/clustering/init.lua b/kong/clustering/init.lua index 44ca74964218..6cf89e283e89 100644 --- a/kong/clustering/init.lua +++ b/kong/clustering/init.lua @@ -101,15 +101,27 @@ function _M:init_worker() return { name = p.name, version = p.handler.VERSION, } end, plugins_list) + local filters = {} + if kong.db.filter_chains.filters then + for _, filter in ipairs(kong.db.filter_chains.filters) do + filters[filter.name] = { name = filter.name } + end + end + + local basic_info = { + plugins = plugins_list, + filters = filters, + } + local role = self.conf.role if role == "control_plane" then - self:init_cp_worker(plugins_list) + self:init_cp_worker(basic_info) return end if role == "data_plane" then - self:init_dp_worker(plugins_list) + self:init_dp_worker(basic_info) end end diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 7376314376e0..faab180ac067 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -3,6 +3,7 @@ local require = require local kong_default_conf = require "kong.templates.kong_defaults" local process_secrets = require "kong.cmd.utils.process_secrets" +local nginx_signals = require "kong.cmd.utils.nginx_signals" local openssl_pkey = require "resty.openssl.pkey" local openssl_x509 = require "resty.openssl.x509" local pl_stringio = require "pl.stringio" @@ -42,6 +43,7 @@ local concat = table.concat local getenv = os.getenv local exists = pl_path.exists local abspath = pl_path.abspath +local isdir = pl_path.isdir local tostring = tostring local tonumber = tonumber local setmetatable = setmetatable @@ -221,6 +223,31 @@ local DYNAMIC_KEY_NAMESPACES = { prefix = "vault_", ignore = EMPTY, }, + { + injected_conf_name = "nginx_wasm_wasmtime_directives", + prefix = "nginx_wasm_wasmtime_", + ignore = EMPTY, + }, + { + injected_conf_name = "nginx_wasm_v8_directives", + prefix = "nginx_wasm_v8_", + ignore = EMPTY, + }, + { + injected_conf_name = "nginx_wasm_wasmer_directives", + prefix = "nginx_wasm_wasmer_", + ignore = EMPTY, + }, + { + injected_conf_name = "nginx_wasm_main_shm_directives", + prefix = "nginx_wasm_shm_", + ignore = EMPTY, + }, + { + injected_conf_name = "nginx_wasm_main_directives", + prefix = "nginx_wasm_", + ignore = EMPTY, + }, } @@ -549,6 +576,9 @@ local CONF_PARSERS = { proxy_server = { typ = "string" }, proxy_server_ssl_verify = { typ = "boolean" }, + wasm = { typ = "boolean" }, + wasm_filters_path = { typ = "string" }, + error_template_html = { typ = "string" }, error_template_json = { typ = "string" }, error_template_xml = { typ = "string" }, @@ -641,6 +671,63 @@ local function parse_value(value, typ) end +-- Check if module is dynamic +local function check_dynamic_module(mod_name) + local configure_line = ngx.config.nginx_configure() + local mod_re = [[^.*--add-dynamic-module=(.+\/]] .. mod_name .. [[(\s|$)).*$]] + return ngx.re.match(configure_line, mod_re, "oi") ~= nil +end + + +-- Lookup dynamic module object +-- this function will lookup for the `mod_name` dynamic module in the following +-- paths: +-- - /usr/local/kong/modules -- default path for modules in container images +-- - /../modules +-- @param[type=string] mod_name The module name to lookup, without file extension +local function lookup_dynamic_module_so(mod_name, kong_conf) + log.debug("looking up dynamic module %s", mod_name) + + local mod_file = fmt("/usr/local/kong/modules/%s.so", mod_name) + if exists(mod_file) then + log.debug("module '%s' found at '%s'", mod_name, mod_file) + return mod_file + end + + local nginx_bin = nginx_signals.find_nginx_bin(kong_conf) + mod_file = fmt("%s/../modules/%s.so", pl_path.dirname(nginx_bin), mod_name) + if exists(mod_file) then + log.debug("module '%s' found at '%s'", mod_name, mod_file) + return mod_file + end + + return nil, fmt("%s dynamic module shared object not found", mod_name) +end + + +-- Validate Wasm properties +local function validate_wasm(conf) + local wasm_enabled = conf.wasm + local filters_path = conf.wasm_filters_path + + if wasm_enabled then + if filters_path and not exists(filters_path) and not isdir(filters_path) then + return nil, fmt("wasm_filters_path '%s' is not a valid directory", filters_path) + end + else + for cfg in pairs(conf) do + local wasm_cfg = match(cfg, "wasm_(.+)") + if wasm_cfg then + log.warn("wasm is disabled but ", wasm_cfg, + " property is used, please check your configuration.") + end + end + end + + return true +end + + -- Validate properties (type/enum/custom) and infer their type. -- @param[type=table] conf The configuration table to treat. local function check_and_parse(conf, opts) @@ -1246,6 +1333,19 @@ local function check_and_parse(conf, opts) errors[#errors + 1] = "Cassandra as a datastore for Kong is not supported in versions 3.4 and above. Please use Postgres." end + local ok, err = validate_wasm(conf) + if not ok then + errors[#errors + 1] = err + end + + if conf.wasm and check_dynamic_module("ngx_wasm_module") then + local err + conf.wasm_dynamic_module, err = lookup_dynamic_module_so("ngx_wasm_module", conf) + if err then + errors[#errors + 1] = err + end + end + return #errors == 0, errors[1], errors end @@ -1427,6 +1527,34 @@ local function load_config_file(path) return load_config(f) end +--- Get available Wasm filters list +-- @param[type=string] Path where Wasm filters are stored. +local function get_wasm_filters(filters_path) + local wasm_filters = {} + + if filters_path then + local filter_files = {} + for entry in pl_path.dir(filters_path) do + local pathname = pl_path.join(filters_path, entry) + if not filter_files[pathname] and pl_path.isfile(pathname) then + filter_files[pathname] = pathname + + local extension = pl_path.extension(entry) + if string.lower(extension) == ".wasm" then + insert(wasm_filters, { + name = entry:sub(0, -#extension - 1), + path = pathname, + }) + else + log.debug("ignoring file ", entry, " in ", filters_path, ": does not contain wasm suffix") + end + end + end + end + + return wasm_filters +end + --- Load Kong configuration -- The loaded configuration will have all properties from the default config @@ -1839,6 +1967,83 @@ local function load(path, custom_conf, opts) end end + -- WebAssembly module support + if conf.wasm then + + local wasm_directives = conf["nginx_wasm_main_directives"] + + local wasm_filters = get_wasm_filters(conf.wasm_filters_path) + conf.wasm_modules_parsed = setmetatable(wasm_filters, _nop_tostring_mt) + + -- wasm vm properties are inherited from previously set directives + if conf.lua_ssl_trusted_certificate then + if #conf.lua_ssl_trusted_certificate >= 1 then + insert(wasm_directives, { + name = "tls_trusted_certificate", + value = conf.lua_ssl_trusted_certificate[1], + }) + end + end + if conf.lua_ssl_verify_depth and conf.lua_ssl_verify_depth > 0 then + insert(wasm_directives, { + name = "tls_verify_cert", + value = "on", + }) + insert(wasm_directives, { + name = "tls_verify_host", + value = "on", + }) + insert(wasm_directives, { + name = "tls_no_verify_warn", + value = "on", + }) + end + + local found_proxy_wasm_lua_resolver = false + + for _, directive in ipairs(conf["nginx_http_directives"]) do + if directive.name == "proxy_connect_timeout" then + insert(wasm_directives, { + name = "socket_connect_timeout", + value = directive.value, + }) + elseif directive.name == "proxy_read_timeout" then + insert(wasm_directives, { + name = "socket_read_timeout", + value = directive.value, + }) + elseif directive.name == "proxy_send_timeout" then + insert(wasm_directives, { + name = "socket_send_timeout", + value = directive.value, + }) + elseif directive.name == "proxy_buffer_size" then + insert(wasm_directives, { + name = "socket_buffer_size", + value = directive.value, + }) + elseif directive.name == "large_client_header_buffers" then + insert(wasm_directives, { + name = "socket_large_buffers", + value = directive.value, + }) + elseif directive.name == "proxy_wasm_lua_resolver" then + found_proxy_wasm_lua_resolver = true + end + end + + -- proxy_wasm_lua_resolver is intended to be 'on' by default, but we can't + -- set it as such in kong_defaults, because it can only be used if wasm is + -- _also_ enabled. We inject it here if the user has not opted to set it + -- themselves. + if not found_proxy_wasm_lua_resolver then + insert(conf["nginx_http_directives"], { + name = "proxy_wasm_lua_resolver", + value = "on", + }) + end + end + for _, dyn_namespace in ipairs(DYNAMIC_KEY_NAMESPACES) do if dyn_namespace.injected_conf_name then sort(conf[dyn_namespace.injected_conf_name], function(a, b) diff --git a/kong/constants.lua b/kong/constants.lua index 0f9124adf5fd..851b95de4ec7 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -141,6 +141,7 @@ local constants = { "vaults", "key_sets", "keys", + "filter_chains", }, ENTITY_CACHE_STORE = setmetatable({ consumers = "cache", @@ -209,6 +210,7 @@ local constants = { { KONG_VERSION_INCOMPATIBLE = "kong_version_incompatible", }, { PLUGIN_SET_INCOMPATIBLE = "plugin_set_incompatible", }, { PLUGIN_VERSION_INCOMPATIBLE = "plugin_version_incompatible", }, + { FILTER_SET_INCOMPATIBLE = "filter_set_incompatible", }, }, CLUSTERING_TIMEOUT = 5000, -- 5 seconds CLUSTERING_PING_INTERVAL = 30, -- 30 seconds diff --git a/kong/db/dao/filter_chains.lua b/kong/db/dao/filter_chains.lua new file mode 100644 index 000000000000..d7e54dc601f5 --- /dev/null +++ b/kong/db/dao/filter_chains.lua @@ -0,0 +1,108 @@ +local filter_chains = {} + +local insert = table.insert +local fmt = string.format + +local EMPTY = {} + + +local function check_enabled_filters(self, chain) + if not self.filters then + local err_t = self.errors:schema_violation({ + filters = "no wasm filters are configured", + }) + return nil, tostring(err_t), err_t + end + + if type(chain.filters) ~= "table" then + return true + end + + local errs + + for i, filter in ipairs(chain.filters) do + local name = filter.name + + -- let the standard schema validation catch invalid name errors + if type(name) == "string" + and not self.filters_by_name[name] + then + errs = errs or {} + errs[i] = { name = "no such filter: " .. filter.name } + end + end + + if errs then + local err_t = self.errors:schema_violation({ + filters = errs, + }) + return nil, tostring(err_t), err_t + end + + return true +end + + +function filter_chains:load_filters(wasm_filters) + local filters = {} + local filters_by_name = {} + + local errors = {} + + for i, filter in ipairs(wasm_filters or EMPTY) do + insert(filters, filter) + + if type(filter.name) ~= "string" then + insert(errors, fmt("filter #%d name is not a string", i)) + + elseif filters_by_name[filter.name] then + insert(errors, fmt("duplicate filter name (%s) at #%d", filter.name, i)) + + else + filters_by_name[filter.name] = filter + + end + end + + if #errors > 0 then + return nil, "failed to load filters: " .. table.concat(errors, ", ") + end + + self.filters = filters + self.filters_by_name = filters_by_name + + return true +end + + +function filter_chains:insert(entity, options) + local ok, err, err_t = check_enabled_filters(self, entity) + if not ok then + return nil, err, err_t + end + + return self.super.insert(self, entity, options) +end + + +function filter_chains:update(primary_key, entity, options) + local ok, err, err_t = check_enabled_filters(self, entity) + if not ok then + return nil, err, err_t + end + + return self.super.update(self, primary_key, entity, options) +end + + +function filter_chains:upsert(primary_key, entity, options) + local ok, err, err_t = check_enabled_filters(self, entity) + if not ok then + return nil, err, err_t + end + + return self.super.upsert(self, primary_key, entity, options) +end + + +return filter_chains diff --git a/kong/db/migrations/core/020_330_to_340.lua b/kong/db/migrations/core/020_330_to_340.lua index 9301209a767f..ee7361882fbd 100644 --- a/kong/db/migrations/core/020_330_to_340.lua +++ b/kong/db/migrations/core/020_330_to_340.lua @@ -2,6 +2,52 @@ return { postgres = { up = [[ DROP TABLE IF EXISTS "ttls"; + + CREATE TABLE IF NOT EXISTS "filter_chains" ( + "id" UUID PRIMARY KEY, + "name" TEXT UNIQUE, + "enabled" BOOLEAN DEFAULT TRUE, + "route_id" UUID REFERENCES "routes" ("id") ON DELETE CASCADE, + "service_id" UUID REFERENCES "services" ("id") ON DELETE CASCADE, + "ws_id" UUID REFERENCES "workspaces" ("id") ON DELETE CASCADE, + "cache_key" TEXT UNIQUE, + "filters" JSONB[], + "tags" TEXT[], + "created_at" TIMESTAMP WITH TIME ZONE, + "updated_at" TIMESTAMP WITH TIME ZONE + ); + + DO $$ + BEGIN + CREATE UNIQUE INDEX IF NOT EXISTS "filter_chains_name_idx" + ON "filter_chains" ("name"); + END$$; + + DO $$ + BEGIN + CREATE UNIQUE INDEX IF NOT EXISTS "filter_chains_cache_key_idx" + ON "filter_chains" ("cache_key"); + END$$; + + DO $$ + BEGIN + CREATE INDEX IF NOT EXISTS "filter_chains_tags_idx" ON "filter_chains" USING GIN ("tags"); + EXCEPTION WHEN UNDEFINED_COLUMN then + -- do nothing, accept existing state + END$$; + + DROP TRIGGER IF EXISTS "filter_chains_sync_tags_trigger" ON "filter_chains"; + + DO $$ + BEGIN + CREATE TRIGGER "filter_chains_sync_tags_trigger" + AFTER INSERT OR UPDATE OF "tags" + OR DELETE ON "filter_chains" + FOR EACH ROW + EXECUTE PROCEDURE "sync_tags" (); + EXCEPTION WHEN undefined_column OR undefined_table THEN + -- do nothing, accept existing state + END$$; ]] } } diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index 6c6787c54ae8..44206d4a0014 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -17,4 +17,5 @@ return { "017_300_to_310", "018_310_to_320", "019_320_to_330", + "020_330_to_340", } diff --git a/kong/db/schema/entities/filter_chains.lua b/kong/db/schema/entities/filter_chains.lua new file mode 100644 index 000000000000..5710632cd02d --- /dev/null +++ b/kong/db/schema/entities/filter_chains.lua @@ -0,0 +1,69 @@ +local typedefs = require "kong.db.schema.typedefs" + +---@class kong.db.schema.entities.filter_chain : table +--- +---@field id string +---@field name string|nil +---@field enabled boolean +---@field route table|nil +---@field service table|nil +---@field protocols table|nil +---@field created_at number +---@field updated_at number +---@field tags string[] +---@field filters kong.db.schema.entities.wasm_filter[] + + +---@class kong.db.schema.entities.wasm_filter : table +--- +---@field name string +---@field enabled boolean +---@field config string|table|nil + +local filter = { + type = "record", + fields = { + { name = { type = "string", required = true, }, }, + { config = { type = "string", required = false, }, }, + { enabled = { type = "boolean", default = true, required = true, }, }, + }, +} + + +return { + name = "filter_chains", + primary_key = { "id" }, + endpoint_key = "name", + admin_api_name = "filter-chains", + generate_admin_api = true, + workspaceable = true, + dao = "kong.db.dao.filter_chains", + cache_key = { "route", "service" }, + + fields = { + { id = typedefs.uuid }, + { name = typedefs.utf8_name }, + { enabled = { type = "boolean", required = true, default = true, }, }, + { route = { type = "foreign", reference = "routes", on_delete = "cascade", + default = ngx.null, unique = true }, }, + { service = { type = "foreign", reference = "services", on_delete = "cascade", + default = ngx.null, unique = true }, }, + { filters = { type = "array", required = true, elements = filter, len_min = 1, } }, + { created_at = typedefs.auto_timestamp_s }, + { updated_at = typedefs.auto_timestamp_s }, + { tags = typedefs.tags }, + }, + entity_checks = { + { mutually_exclusive = { + "service", + "route", + } + }, + + { at_least_one_of = { + "service", + "route", + } + }, + }, +} diff --git a/kong/init.lua b/kong/init.lua index 60659b7a73e7..c5ab8ae82911 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -92,6 +92,7 @@ local utils = require "kong.tools.utils" local constants = require "kong.constants" local get_ctx_table = require("resty.core.ctx").get_ctx_table local admin_gui = require "kong.admin_gui" +local wasm = require "kong.runloop.wasm" local kong = kong @@ -580,6 +581,7 @@ function Kong.init() kong_global.init_pdk(kong, config) instrumentation.init(config) + wasm.init(config) local db = assert(DB.new(config)) instrumentation.db_query(db.connector) @@ -624,6 +626,8 @@ function Kong.init() -- Load plugins as late as possible so that everything is set up assert(db.plugins:load_plugin_schemas(config.loaded_plugins)) + assert(db.filter_chains:load_filters(config.wasm_modules_parsed)) + if is_stream_module then stream_api.load_handlers() end @@ -861,6 +865,12 @@ function Kong.init_worker() if kong.clustering then kong.clustering:init_worker() end + + ok, err = wasm.init_worker() + if not ok then + err = "wasm nginx worker initialization failed: " .. tostring(err) + stash_init_worker_error(err) + end end @@ -1042,6 +1052,7 @@ function Kong.access() return kong.response.error(503, "no Service found with those values") end + runloop.wasm_attach(ctx) runloop.access.after(ctx) ctx.KONG_ACCESS_ENDED_AT = get_updated_now_ms() diff --git a/kong/runloop/events.lua b/kong/runloop/events.lua index 4a63edb87aac..ab93706828cb 100644 --- a/kong/runloop/events.lua +++ b/kong/runloop/events.lua @@ -3,6 +3,7 @@ local constants = require "kong.constants" local certificate = require "kong.runloop.certificate" local balancer = require "kong.runloop.balancer" local workspaces = require "kong.workspaces" +local wasm = require "kong.runloop.wasm" local kong = kong @@ -298,6 +299,26 @@ local function crud_consumers_handler(data) end +local function crud_wasm_handler(data, schema_name) + if not wasm.enabled() then + return + end + + -- cache is invalidated on service/route deletion to ensure we don't + -- have oprhaned filter chain data cached + local is_delete = data.operation == "delete" + and (schema_name == "services" + or schema_name == "routes") + + local updated = schema_name == "filter_chains" or is_delete + + if updated then + log(DEBUG, "[events] wasm filter chains updated, invalidating cache") + core_cache:invalidate("filter_chains:version") + end +end + + local LOCAL_HANDLERS = { { "dao:crud", nil , dao_crud_handler }, @@ -313,6 +334,10 @@ local LOCAL_HANDLERS = { -- As we support conifg.anonymous to be configured as Consumer.username, -- so add an event handler to invalidate the extra cache in case of data inconsistency { "crud" , "consumers" , crud_consumers_handler }, + + { "crud" , "filter_chains" , crud_wasm_handler }, + { "crud" , "services" , crud_wasm_handler }, + { "crud" , "routes" , crud_wasm_handler }, } diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 79972c2bae3e..4c0f4dc9bd9a 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -5,6 +5,7 @@ local utils = require "kong.tools.utils" local Router = require "kong.router" local balancer = require "kong.runloop.balancer" local events = require "kong.runloop.events" +local wasm = require "kong.runloop.wasm" local reports = require "kong.reports" local constants = require "kong.constants" local certificate = require "kong.runloop.certificate" @@ -95,6 +96,9 @@ local ROUTER_SYNC_OPTS local PLUGINS_ITERATOR local PLUGINS_ITERATOR_SYNC_OPTS +local WASM_STATE_VERSION +local WASM_STATE_SYNC_OPTS + local RECONFIGURE_OPTS local GLOBAL_QUERY_OPTS = { workspace = ngx.null, show_ws_id = true } @@ -569,6 +573,43 @@ local function _set_update_plugins_iterator(f) end +local function build_wasm_state() + local version = wasm.get_version() + local ok, err = wasm.update_in_place(version) + + if not ok then + return nil, err + end + + WASM_STATE_VERSION = version + + return true +end + + +local function rebuild_wasm_state(opts) + return rebuild("filter_chains", build_wasm_state, + WASM_STATE_VERSION, opts) +end + + +local function wasm_attach(ctx) + if not wasm.enabled() then + return + end + + if kong.db.strategy ~= "off" and kong.configuration.worker_consistency == "strict" then + local ok, err = rebuild_wasm_state(WASM_STATE_SYNC_OPTS) + if not ok then + log(ERR, "could not update wasm filter chain state: ", err, + " (stale state will be used)") + end + end + + wasm.attach(ctx) +end + + local reconfigure_handler do local now = ngx.now @@ -651,6 +692,19 @@ do " ms on worker #", worker_id) end + local wasm_state + if wasm.enabled() then + local start = get_now_ms() + wasm_state, err = wasm.rebuild_state() + + if not wasm_state then + return nil, err + end + + log(INFO, "rebuilding wasm filter chain state took ", get_now_ms() - start, + " ms on worker #", worker_id) + end + -- below you are not supposed to yield and this should be fast and atomic -- TODO: we should perhaps only purge the configuration related cache. @@ -681,6 +735,10 @@ do CURRENT_BALANCER_HASH = balancer_hash or 0 end + if wasm_state then + wasm.set_state(wasm_state) + end + return true end) -- concurrency.with_coroutine_mutex @@ -853,6 +911,12 @@ local function set_init_versions_in_cache() return nil, "failed to set initial plugins iterator version in cache: " .. tostring(err) end + ok, err = core_cache_shm:safe_set("kong_core_db_cachefilter_chains:version", marshalled_value) + if not ok then + return nil, "failed to set initial wasm filter chains version in cache: " .. tostring(err) + end + + return true end @@ -867,6 +931,7 @@ return { get_plugins_iterator = get_plugins_iterator, get_updated_plugins_iterator = get_updated_plugins_iterator, set_init_versions_in_cache = set_init_versions_in_cache, + wasm_attach = wasm_attach, -- exposed only for tests _set_router = _set_router, @@ -937,6 +1002,12 @@ return { timeout = rebuild_timeout, on_timeout = "run_unlocked", } + + WASM_STATE_SYNC_OPTS = { + name = "wasm", + timeout = rebuild_timeout, + on_timeout = "run_unlocked", + } end end @@ -995,6 +1066,32 @@ return { log(ERR, "could not schedule timer to rebuild plugins iterator: ", err) end + + if wasm.enabled() then + local wasm_async_opts = { + name = "wasm", + timeout = 0, + on_timeout = "return_true", + } + + local function rebuild_wasm_filter_chains_timer(premature) + if premature then + return + end + + local _, err = rebuild_wasm_state(wasm_async_opts) + if err then + log(ERR, "could not rebuild wasm filter chains via timer: ", err) + end + end + + local _, err = kong.timer:named_every("wasm-filter-chains-rebuild", + worker_state_update_frequency, + rebuild_wasm_filter_chains_timer) + if err then + log(ERR, "could not schedule timer to rebuild wasm filter chains: ", err) + end + end end end, }, diff --git a/kong/runloop/wasm.lua b/kong/runloop/wasm.lua new file mode 100644 index 000000000000..ce7181fc36d8 --- /dev/null +++ b/kong/runloop/wasm.lua @@ -0,0 +1,651 @@ +local _M = {} + +local utils = require "kong.tools.utils" +local dns = require "kong.tools.dns" +local clear_tab = require "table.clear" + +---@module 'resty.http.proxy_wasm' +local proxy_wasm + +local kong = _G.kong +local ngx = ngx +local log = ngx.log +local DEBUG = ngx.DEBUG +local ERR = ngx.ERR +local CRIT = ngx.CRIT +local tostring = tostring +local ipairs = ipairs +local type = type +local assert = assert +local concat = table.concat +local insert = table.insert +local sha256 = utils.sha256_bin + + +local VERSION_KEY = "filter_chains:version" +local TTL_ZERO = { ttl = 0 } +local ATTACH_OPTS = {} + + +--- +-- Fetch the current version of the filter chain state from cache +-- +---@return string +local function get_version() + return kong.core_cache:get(VERSION_KEY, TTL_ZERO, utils.uuid) +end + + +---@alias kong.wasm.filter_chain_type +---| 0 # service +---| 1 # route +---| 2 # combined + +local TYPE_SERVICE = 0 +local TYPE_ROUTE = 1 +local TYPE_COMBINED = 2 + + +local ENABLED = false + +local hash_chain +do + local HASH_DISABLED = sha256("disabled") + local HASH_NONE = sha256("none") + + local buf = {} + + ---@param chain kong.db.schema.entities.filter_chain + ---@return string + local function hash_chain_entity(chain) + if not chain then + return HASH_NONE + + elseif not chain.enabled then + return HASH_DISABLED + end + + local n = 0 + for _, filter in ipairs(chain.filters) do + buf[n + 1] = filter.name + buf[n + 2] = tostring(filter.enabled) + buf[n + 3] = tostring(filter.enabled and sha256(filter.config)) + n = n + 3 + end + + local s = concat(buf, "", 1, n) + clear_tab(buf) + + return sha256(s) + end + + --- + -- Generate a hash key for a filter chain from a service + -- and route filter chain [entity] combo. + -- + -- The result of this is used to invalidate cached filter chain + -- plans. + -- + ---@param service? kong.db.schema.entities.filter_chain + ---@param route? kong.db.schema.entities.filter_chain + ---@return string + function hash_chain(service, route) + assert(service ~= nil or route ~= nil, + "hash_chain() called with neither service nor route") + + return sha256(hash_chain_entity(service) .. hash_chain_entity(route)) + end +end + + +---@class kong.runloop.wasm.filter_chain_reference +--- +---@field type kong.wasm.filter_chain_type +---@field label string +---@field hash string +---@field c_plan ffi.cdata*|nil +--- +---@field service_chain kong.db.schema.entities.filter_chain|nil +---@field service_id string|nil +--- +---@field route_chain kong.db.schema.entities.filter_chain|nil +---@field route_id string|nil + + +---@class kong.runloop.wasm.state +local STATE = { + -- mapping of service IDs to service filter chains + -- + ---@type table + by_service = {}, + + -- mapping of route IDs to route filter chains + -- + ---@type table + by_route = {}, + + -- two level mapping: the top level is indexed by service ID, and the + -- secondary level is indexed by route ID + -- + ---@type table> + combined = {}, + + version = -1, +} + + +--- +-- Initialize and return a filter chain plan from a list of filters. +-- +---@param filters kong.db.schema.entities.wasm_filter[]|nil +---@return ffi.cdata*? c_plan +---@return string? error +local function init_c_plan(filters) + if not filters then + return + end + + local c_plan, err = proxy_wasm.new(filters) + if not c_plan then + return nil, "failed instantiating filter chain: " + .. tostring(err) + end + + local ok + ok, err = proxy_wasm.load(c_plan) + if not ok then + return nil, "failed loading filters: " .. tostring(err) + end + + return c_plan +end + + +-- Helper method for retrieving a filter chain reference from +-- the state table. +-- +---@param state kong.runloop.wasm.state +---@param typ kong.wasm.filter_chain_type +---@param service_id? string +---@param route_id? string +--- +---@return kong.runloop.wasm.filter_chain_reference? ref +local function get_chain_ref(state, typ, service_id, route_id) + local ref + + if typ == TYPE_SERVICE and service_id then + ref = state.by_service[service_id] + + elseif typ == TYPE_ROUTE and route_id then + ref = state.by_route[route_id] + + elseif typ == TYPE_COMBINED and service_id and route_id then + local routes = state.combined[service_id] + ref = routes and routes[route_id] + + else + -- unreachable + error("unknown filter chain type: " .. tostring(typ), 2) + end + + return ref +end + + +--- +-- Helper method for storing a new filter chain reference within +-- the state table. +-- +---@param state kong.runloop.wasm.state +---@param ref kong.runloop.wasm.filter_chain_reference +local function store_chain_ref(state, ref) + local typ = ref.type + local service_id = ref.service_id + local route_id = ref.route_id + + if typ == TYPE_SERVICE then + assert(type(service_id) == "string", + ref.label .. " chain has no service ID") + + state.by_service[service_id] = ref + + elseif typ == TYPE_ROUTE then + assert(type(route_id) == "string", + ref.label .. " chain has no route ID") + + state.by_route[route_id] = ref + + elseif typ == TYPE_COMBINED then + assert(type(service_id) == "string" and type(route_id) == "string", + ref.label .. " chain is missing a service ID or route ID") + + local routes = state.combined[service_id] + + if not routes then + routes = {} + state.combined[service_id] = routes + end + + routes[route_id] = ref + + else + -- unreachable + error("unknown filter chain type: " .. tostring(typ), 2) + end +end + + +--- +-- Create a log-friendly string label for a filter chain reference. +-- +---@param service_id? string +---@param route_id? string +---@return string label +local function label_for(service_id, route_id) + if service_id and route_id then + return "combined " .. + "service(" .. service_id .. "), " .. + "route(" .. route_id .. ")" + + elseif service_id then + return "service(" .. service_id .. ")" + + elseif route_id then + return "route(" .. route_id .. ")" + + else + -- unreachable + error("can't compute a label for a filter chain with no route/service", 2) + end +end + + +--- +-- Build a combined filter list from 1-2 filter chain entities. +-- +-- Disabled filter chains are skipped, and disabled filters are +-- skipped. +-- +-- Returns `nil` if no enabled filters are found. +-- +---@param service_chain? kong.db.schema.entities.filter_chain +---@param route_chain? kong.db.schema.entities.filter_chain +--- +---@return kong.db.schema.entities.wasm_filter[]? +local function build_filter_list(service_chain, route_chain) + ---@type kong.db.schema.entities.wasm_filter[]|nil + local combined + local n = 0 + + if service_chain and service_chain.enabled then + for _, filter in ipairs(service_chain.filters) do + if filter.enabled then + n = n + 1 + combined = combined or {} + combined[n] = filter + end + end + end + + if route_chain and route_chain.enabled then + for _, filter in ipairs(route_chain.filters) do + if filter.enabled then + n = n + 1 + combined = combined or {} + combined[n] = filter + end + end + end + + return combined +end + + +--- +-- Unconditionally rebuild and return a new wasm state table from the db. +-- +---@param db table # kong.db +---@param version any +---@param old_state kong.runloop.wasm.state +---@return kong.runloop.wasm.state? new_state +---@return string? err +local function rebuild_state(db, version, old_state) + ---@type kong.db.schema.entities.filter_chain[] + local route_chains = {} + + ---@type table + local service_chains_by_id = {} + + ---@type kong.runloop.wasm.state + local state = { + by_service = {}, + by_route = {}, + combined = {}, + version = version, + } + + ---@type kong.runloop.wasm.filter_chain_reference[] + local all_chain_refs = {} + + local page_size = db.filter_chains.max_page_size + + for chain, err in db.filter_chains:each(page_size) do + if err then + return nil, "failed iterating filter chains: " .. tostring(err) + end + + if chain.enabled then + local route_id = chain.route and chain.route.id + local service_id = chain.service and chain.service.id + + local chain_type = service_id and TYPE_SERVICE or TYPE_ROUTE + + insert(all_chain_refs, { + type = chain_type, + + service_chain = (chain_type == TYPE_SERVICE and chain) or nil, + service_id = service_id, + + route_chain = (chain_type == TYPE_ROUTE and chain) or nil, + route_id = route_id, + }) + + if chain_type == TYPE_SERVICE then + service_chains_by_id[service_id] = chain + + else + insert(route_chains, chain) + end + end + end + + local routes = db.routes + local select_route = routes.select + + -- the only cache lookups here are for route entities, + -- so use the core cache + local cache = kong.core_cache + + + -- locate matching route/service chain entities to build combined + -- filter chain references + for _, rchain in ipairs(route_chains) do + local cache_key = routes:cache_key(rchain.route.id) + + local route, err = cache:get(cache_key, nil, + select_route, routes, rchain.route) + + if err then + return nil, "failed to load route for filter chain " .. + rchain.id .. ": " .. tostring(err) + end + + local service_id = route and route.service and route.service.id + local schain = service_id and service_chains_by_id[service_id] + + if schain then + insert(all_chain_refs, { + type = TYPE_COMBINED, + + service_chain = schain, + service_id = service_id, + + route_chain = rchain, + route_id = route.id, + }) + end + end + + for _, chain_ref in ipairs(all_chain_refs) do + local service_id = chain_ref.service_id + local route_id = chain_ref.route_id + + local new_chain_hash = hash_chain(chain_ref.service_chain, chain_ref.route_chain) + local old_ref = get_chain_ref(old_state, chain_ref.type, service_id, route_id) + local new_ref + + if old_ref then + if old_ref.hash == new_chain_hash then + new_ref = old_ref + log(DEBUG, old_ref.label, ": reusing existing filter chain reference") + + else + log(DEBUG, old_ref.label, ": filter chain has changed and will be rebuilt") + end + end + + + if not new_ref then + new_ref = chain_ref + new_ref.label = label_for(service_id, route_id) + + local filters = build_filter_list(chain_ref.service_chain, chain_ref.route_chain) + local c_plan, err = init_c_plan(filters) + + if err then + return nil, "failed to initialize " .. new_ref.label .. + " filter chain: " .. tostring(err) + + elseif not c_plan then + log(DEBUG, new_ref.label, " filter chain has no enabled filters") + end + + new_ref.hash = new_chain_hash + new_ref.c_plan = c_plan + end + + store_chain_ref(state, new_ref) + end + + return state +end + + +--- +-- Replace the current filter chain state with a new one. +-- +-- This function does not do any I/O or other yielding operations. +-- +---@param new kong.runloop.wasm.state +local function set_state(new) + if type(new) ~= "table" then + error("bad argument #1 to 'set_state' (table expected, got " .. + type(new) .. ")", 2) + end + + local old = STATE + + if old.version == new.version then + log(DEBUG, "called with new version that is identical to the last") + end + + STATE = new +end + + +--- +-- Conditionally rebuild and update the filter chain state. +-- +-- If the current state matches the desired version, no update +-- will be performed. +-- +---@param new_version? string +---@return boolean? ok +---@return string? error +local function update_in_place(new_version) + if not ENABLED then + return true + end + + new_version = new_version or get_version() + local old = STATE + + if new_version == old.version then + log(DEBUG, "filter chain state is already up-to-date, no changes needed") + return true + end + + local new, err = rebuild_state(kong.db, new_version, old) + if not new then + log(ERR, "failed rebuilding filter chain state: ", err) + return nil, err + end + + set_state(new) + + return true +end + + + +---@param route? { id: string } +---@param service? { id: string } +---@return kong.runloop.wasm.filter_chain_reference? +local function get_filter_chain_for_request(route, service) + local service_id = service and service.id + local route_id = route and route.id + local state = STATE + + return get_chain_ref(state, TYPE_COMBINED, service_id, route_id) + or get_chain_ref(state, TYPE_SERVICE, service_id) + or get_chain_ref(state, TYPE_ROUTE, nil, route_id) +end + + +_M.get_version = get_version + +_M.update_in_place = update_in_place + +_M.set_state = set_state + + +---@param kong_config table +function _M.init(kong_config) + if not kong_config.wasm then + return + end + + local modules = kong_config.wasm_modules_parsed + if not modules or #modules == 0 then + return + end + + -- setup a DNS client for ngx_wasm_module + _G.dns_client = dns(kong_config) + + proxy_wasm = require "resty.http.proxy_wasm" + ENABLED = true + + ATTACH_OPTS.isolation = proxy_wasm.isolations.FILTER +end + + +---@return boolean? ok +---@return string? error +function _M.init_worker() + if not ENABLED then + return true + end + + local ok, err = update_in_place() + if not ok then + return nil, err + end + + return true +end + + +local function set_proxy_wasm_property(property, value) + if not value then + return + end + + local ok, err = proxy_wasm.set_property(property, value) + if not ok then + log(ERR, "failed to set proxy-wasm '", property, "' property: ", err) + end +end + + +--- +-- Lookup and execute the filter chain that applies to the current request +-- (if any). +-- +---@param ctx table # the request ngx.ctx table +function _M.attach(ctx) + if not ENABLED then + return + end + + local chain = get_filter_chain_for_request(ctx.route, ctx.service) + + if not chain then + -- no matching chain for this route/service + return + end + + if not chain.c_plan then + -- all filters in this chain are disabled + return + end + + local ok, err = proxy_wasm.attach(chain.c_plan, ATTACH_OPTS) + if not ok then + log(CRIT, "failed attaching ", chain.label, " filter chain to request: ", err) + return kong.response.error(500) + end + + set_proxy_wasm_property("kong.route_id", ctx.route and ctx.route.id) + set_proxy_wasm_property("kong.service_id", ctx.service and ctx.service.id) + + ok, err = proxy_wasm.start() + if not ok then + log(CRIT, "failed to execute ", chain.label, " filter chain for request: ", err) + return kong.response.error(500) + end +end + + +--- +-- Unconditionally rebuild and return the current filter chain state. +-- +-- This function is intended to be used in conjunction with `set_state()` +-- to perform an atomic update of the filter chain state alongside other +-- node updates: +-- +-- ```lua +-- local new_state, err = wasm.rebuild_state() +-- if not new_state then +-- -- handle error +-- end +-- +-- -- do some other things in preparation of an update +-- -- [...] +-- +-- +-- -- finally, swap in the new state +-- wasm.set_state(new_state) +-- ``` +-- +---@return kong.runloop.wasm.state? state +---@return string? error +function _M.rebuild_state() + -- return the default/empty state + if not ENABLED then + return STATE + end + + local old = STATE + local version = get_version() + + return rebuild_state(kong.db, version, old) +end + + +function _M.enabled() + return ENABLED +end + + +return _M diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index acd8cf047fd3..a3cbdbed5a91 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -195,4 +195,8 @@ admin_gui_url = admin_gui_origin = # for internal use only, can not be configured manually admin_gui_path = / admin_gui_api_url = + +wasm = off +wasm_filters_path = NONE +wasm_dynamic_module = NONE ]] diff --git a/kong/templates/nginx.lua b/kong/templates/nginx.lua index 18c630c13da3..265c55ebe350 100644 --- a/kong/templates/nginx.lua +++ b/kong/templates/nginx.lua @@ -1,5 +1,9 @@ return [[ pid pids/nginx.pid; +> if wasm and wasm_dynamic_module then +load_module $(wasm_dynamic_module); +> end + error_log ${{PROXY_ERROR_LOG}} ${{LOG_LEVEL}}; # injected nginx_main_* directives @@ -19,6 +23,47 @@ events { > end } +> if wasm then +wasm { +> for _, el in ipairs(nginx_wasm_main_shm_directives) do + shm_kv $(el.name) $(el.value); +> end + +> for _, module in ipairs(wasm_modules_parsed) do + module $(module.name) $(module.path); +> end + +> for _, el in ipairs(nginx_wasm_main_directives) do + $(el.name) $(el.value); +> end + +> if #nginx_wasm_wasmtime_directives > 0 then + wasmtime { +> for _, el in ipairs(nginx_wasm_wasmtime_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- wasmtime + +> if #nginx_wasm_v8_directives > 0 then + v8 { +> for _, el in ipairs(nginx_wasm_v8_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- v8 + +> if #nginx_wasm_wasmer_directives > 0 then + wasmer { +> for _, el in ipairs(nginx_wasm_wasmer_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- wasmer + +} +> end + > if role == "control_plane" or #proxy_listeners > 0 or #admin_listeners > 0 or #status_listeners > 0 then http { include 'nginx-kong.conf'; diff --git a/spec/01-unit/01-db/01-schema/13-cluster_status_spec.lua b/spec/01-unit/01-db/01-schema/13-cluster_status_spec.lua index b378117218aa..8449ca7c6e3f 100644 --- a/spec/01-unit/01-db/01-schema/13-cluster_status_spec.lua +++ b/spec/01-unit/01-db/01-schema/13-cluster_status_spec.lua @@ -45,7 +45,7 @@ describe("plugins", function() local ok, err = validate({ sync_status = "aaa", }) assert.is_nil(ok) - assert.equal("expected one of: unknown, normal, kong_version_incompatible, plugin_set_incompatible, plugin_version_incompatible", err.sync_status) + assert.equal("expected one of: unknown, normal, kong_version_incompatible, plugin_set_incompatible, plugin_version_incompatible, filter_set_incompatible", err.sync_status) end) it("accepts correct value", function() diff --git a/spec/01-unit/03-conf_loader_spec.lua b/spec/01-unit/03-conf_loader_spec.lua index 7a4f6bf1eaf1..0f36109727c9 100644 --- a/spec/01-unit/03-conf_loader_spec.lua +++ b/spec/01-unit/03-conf_loader_spec.lua @@ -1840,6 +1840,59 @@ describe("Configuration loader", function() end) end) + describe("#wasm properties", function() + local temp_dir, cleanup + + lazy_setup(function() + temp_dir, cleanup = helpers.make_temp_dir() + assert(helpers.file.write(temp_dir .. "/empty-filter.wasm", "hello!")) + end) + + lazy_teardown(function() cleanup() end) + + it("wasm disabled", function() + local conf, err = conf_loader(nil, { + wasm = "off", + wasm_filters_path = temp_dir, + }) + assert.is_nil(err) + assert.is_nil(conf.wasm_modules_parsed) + end) + + it("wasm default disabled", function() + local conf, err = conf_loader(nil, { + wasm_filters_path = temp_dir, + }) + assert.is_nil(err) + assert.is_nil(conf.wasm_modules_parsed) + end) + + it("wasm_filters_path", function() + local conf, err = conf_loader(nil, { + wasm = "on", + wasm_filters_path = temp_dir, + }) + assert.is_nil(err) + assert.same({ + { + name = "empty-filter", + path = temp_dir .. "/empty-filter.wasm", + } + }, conf.wasm_modules_parsed) + assert.same(temp_dir, conf.wasm_filters_path) + end) + + it("invalid wasm_filters_path", function() + local conf, err = conf_loader(nil, { + wasm = "on", + wasm_filters_path = "spec/fixtures/no-wasm-here/unit-test", + }) + assert.same(err, "wasm_filters_path 'spec/fixtures/no-wasm-here/unit-test' is not a valid directory") + assert.is_nil(conf) + end) + + end) + describe("errors", function() it("returns inexistent file", function() local conf, err = conf_loader "inexistent" diff --git a/spec/01-unit/04-prefix_handler_spec.lua b/spec/01-unit/04-prefix_handler_spec.lua index 3a93d1bb34ed..26c1e15ae0a3 100644 --- a/spec/01-unit/04-prefix_handler_spec.lua +++ b/spec/01-unit/04-prefix_handler_spec.lua @@ -820,6 +820,171 @@ describe("NGINX conf compiler", function() ]]) assert.matches("resolver%s+1%.2%.3%.4 5%.6%.7%.8 ipv6=off;", nginx_conf) end) + + describe("#wasm subsystem", function() + local temp_dir, cleanup + local filter + + lazy_setup(function() + temp_dir, cleanup = helpers.make_temp_dir() + filter = temp_dir .. "/empty-filter.wasm" + assert(helpers.file.write(filter, "testme")) + end) + + lazy_teardown(function() cleanup() end) + + local _compile = function(cfg, config_compiler, debug) + local ngx_conf = config_compiler(assert(conf_loader(nil, cfg))) + if debug then + print(ngx_conf) + end + return ngx_conf + end + local ngx_cfg = function(cfg, debug) return _compile(cfg, prefix_handler.compile_nginx_conf, debug) end + + local debug = false + it("has no wasm{} block by default", function() + assert.not_matches("wasm {", ngx_cfg({ wasm = nil }, debug)) + end) + it("injects global wasm{} block", function() + assert.matches("wasm {", ngx_cfg({ wasm = true }, debug)) + end) + it("injects a filter", function() + assert.matches(("module empty-filter %s;"):format(filter), ngx_cfg({ wasm = true, wasm_filters_path = temp_dir }, debug), nil, true) + end) + it("injects a main block directive", function() + assert.matches("wasm {.+socket_connect_timeout 10s;.+}", ngx_cfg({ wasm = true, nginx_wasm_socket_connect_timeout="10s" }, debug)) + end) + it("injects a shm_kv", function() + assert.matches("wasm {.+shm_kv counters 10m;.+}", ngx_cfg({ wasm = true, nginx_wasm_shm_counters="10m" }, debug)) + end) + it("injects multiple shm_kvs", function() + assert.matches( + "wasm {.+shm_kv cache 10m.+shm_kv counters 10m;.+}", + ngx_cfg({ wasm = true, nginx_wasm_shm_cache="10m", nginx_wasm_shm_counters="10m"}, debug) + ) + end) + it("injects runtime-specific directives (wasmtime)", function() + assert.matches( + "wasm {.+wasmtime {.+flag flag1 on;.+flag flag2 1m;.+}.+", + ngx_cfg({ + wasm = true, + nginx_wasm_wasmtime_flag1=true, + nginx_wasm_wasmtime_flag2="1m", + }, debug) + ) + end) + it("injects runtime-specific directives (v8)", function() + assert.matches( + "wasm {.+v8 {.+flag flag1 on;.+flag flag2 1m;.+}.+", + ngx_cfg({ + wasm = true, + nginx_wasm_v8_flag1=true, + nginx_wasm_v8_flag2="1m", + }, debug) + ) + end) + it("injects runtime-specific directives (wasmer)", function() + assert.matches( + "wasm {.+wasmer {.+flag flag1 on;.+flag flag2 1m;.+}.+", + ngx_cfg({ + wasm = true, + nginx_wasm_wasmer_flag1=true, + nginx_wasm_wasmer_flag2="1m", + }, debug) + ) + end) + describe("injects inherited directives", function() + describe("lua_ssl_trusted_certificate", function() + it("with one cert", function() + assert.matches( + "wasm {.+tls_trusted_certificate spec/fixtures/kong_clustering_ca.crt.+}", + ngx_cfg({ + wasm = true, + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering_ca.crt", + }, debug) + ) + end) + it("with more than one cert, picks first", function() + assert.matches( + "wasm {.+tls_trusted_certificate spec/fixtures/kong_clustering_ca.crt.+}", + ngx_cfg({ + wasm = true, + lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering_ca.crt,spec/fixtures/kong_clustering.crt", + }, debug) + ) + end) + end) + it("lua_ssl_verify_depth", function() + assert.matches( + "wasm {.+tls_verify_cert on;.+}", + ngx_cfg({ + wasm = true, + lua_ssl_verify_depth = 2, + }, debug) + ) + assert.matches( + "wasm {.+tls_verify_host on;.+}", + ngx_cfg({ + wasm = true, + lua_ssl_verify_depth = 2, + }, debug) + ) + assert.matches( + "wasm {.+tls_no_verify_warn on;.+}", + ngx_cfg({ + wasm = true, + lua_ssl_verify_depth = 2, + }, debug) + ) + end) + it("proxy_connect_timeout", function() + assert.matches( + "wasm {.+socket_connect_timeout 1s;.+}", + ngx_cfg({ + wasm = true, + nginx_http_proxy_connect_timeout = "1s", + }, debug) + ) + end) + it("proxy_read_timeout", function() + assert.matches( + "wasm {.+socket_read_timeout 1s;.+}", + ngx_cfg({ + wasm = true, + nginx_http_proxy_read_timeout = "1s", + }, debug) + ) + end) + it("proxy_send_timeout", function() + assert.matches( + "wasm {.+socket_send_timeout 1s;.+}", + ngx_cfg({ + wasm = true, + nginx_http_proxy_send_timeout = "1s", + }, debug) + ) + end) + it("proxy_buffer_size", function() + assert.matches( + "wasm {.+socket_buffer_size 1m;.+}", + ngx_cfg({ + wasm = true, + nginx_http_proxy_buffer_size = "1m", + }, debug) + ) + end) + it("large_client_header_buffers", function() + assert.matches( + "wasm {.+socket_large_buffers 4 24k;.+}", + ngx_cfg({ + wasm = true, + nginx_http_large_client_header_buffers = "4 24k", + }, debug) + ) + end) + end) + end) end) describe("prepare_prefix()", function() diff --git a/spec/02-integration/20-wasm/01-admin-api_spec.lua b/spec/02-integration/20-wasm/01-admin-api_spec.lua new file mode 100644 index 000000000000..59cf76a735ac --- /dev/null +++ b/spec/02-integration/20-wasm/01-admin-api_spec.lua @@ -0,0 +1,711 @@ +local helpers = require "spec.helpers" +local utils = require "kong.tools.utils" + +local fmt = string.format + + +local function json(body) + return { + headers = { ["Content-Type"] = "application/json" }, + body = body, + } +end + + +-- no cassandra support +for _, strategy in helpers.each_strategy({ "postgres" }) do + +describe("wasm admin API [#" .. strategy .. "]", function() + local admin + local bp, db + local service, route + + lazy_setup(function() + bp, db = helpers.get_db_utils(strategy, { + "routes", + "services", + "filter_chains", + }) + + db.filter_chains:load_filters({ + { name = "tests" }, + { name = "response_transformer" }, + }) + + service = assert(db.services:insert { + name = "wasm-test", + url = "http://wasm.test", + }) + + route = assert(db.routes:insert { + service = { id = service.id }, + hosts = { "wasm.test" }, + paths = { "/" }, + }) + + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + })) + + + admin = helpers.admin_client() + end) + + lazy_teardown(function() + if admin then admin:close() end + helpers.stop_kong(nil, true) + end) + + + local function reset_filter_chains() + db.filter_chains:truncate() + end + + + local function unsupported(method, path) + describe(method, function() + it("is not supported", function() + local res = assert(admin:send { + method = method, + path = path, + }) + assert.response(res).has.status(405) + end) + end) + end + + + describe("/filter-chains", function() + + describe("POST", function() + lazy_setup(reset_filter_chains) + + it("creates a filter chain", function() + local res = admin:post("/filter-chains", json({ + filters = { { name = "tests" } }, + service = { id = service.id }, + }) + ) + + assert.response(res).has.status(201) + local body = assert.response(res).has.jsonbody() + + assert.is_string(body.id) + assert.truthy(utils.is_valid_uuid(body.id)) + + assert.equals(1, #body.filters) + assert.equals("tests", body.filters[1].name) + end) + end) + + describe("GET", function() + lazy_setup(reset_filter_chains) + + it("returns a collection of filter chains", function() + local res = admin:get("/filter-chains") + assert.response(res).has.status(200) + + local body = assert.response(res).has.jsonbody() + assert.same({ data = {}, next = ngx.null }, body) + + local chain = assert(bp.filter_chains:insert({ + filters = { { name = "tests" } }, + service = { id = service.id }, + tags = { "a" }, + }, { nulls = true })) + + res = admin:get("/filter-chains") + assert.response(res).has.status(200) + + body = assert.response(res).has.jsonbody() + assert.equals(1, #body.data, "unexpected number of filter chain entities") + assert.same(chain, body.data[1]) + + assert.response( + admin:post("/filter-chains", json { + filters = { { name = "tests" } }, + route = { id = route.id }, + tags = { "b" }, + }) + ).has.status(201) + + res = admin:get("/filter-chains") + assert.response(res).has.status(200) + + body = assert.response(res).has.jsonbody() + assert.equals(2, #body.data, "unexpected number of filter chain entities") + end) + end) + + unsupported("PATCH", "/filter-chains") + unsupported("PUT", "/filter-chains") + unsupported("DELETE", "/filter-chains") + end) + + for _, key in ipairs({ "id", "name" }) do + + describe("/filter-chains/:" .. key, function() + describe("GET", function() + local chain + + lazy_setup(function() + chain = bp.filter_chains:insert({ + service = assert(bp.services:insert({})), + filters = { { name = "tests" } }, + }, { nulls = true }) + end) + + it("fetches a filter chain", function() + local res = admin:get("/filter-chains/" .. chain[key]) + assert.response(res).has.status(200) + local got = assert.response(res).has.jsonbody() + assert.same(chain, got) + end) + + it("returns 404 if not found", function() + assert.response( + admin:get("/filter-chains/" .. utils.uuid()) + ).has.status(404) + end) + end) + + describe("PATCH", function() + local chain + + lazy_setup(function() + chain = bp.filter_chains:insert({ + service = assert(bp.services:insert({})), + filters = { { name = "tests" } }, + }, { nulls = true }) + end) + + it("updates a filter chain in-place", function() + assert.equals(ngx.null, chain.tags) + assert.is_true(chain.enabled) + + local res = admin:patch("/filter-chains/" .. chain[key], json { + tags = { "foo", "bar" }, + enabled = false, + filters = { + { name = "tests", config = "123", enabled = true }, + { name = "tests", config = "456", enabled = false }, + }, + }) + + assert.response(res).has.status(200) + local patched = assert.response(res).has.jsonbody() + + assert.same({ "foo", "bar" }, patched.tags) + assert.is_false(patched.enabled) + assert.equals(2, #patched.filters) + assert.same({ name = "tests", config = "123", enabled = true }, + patched.filters[1]) + assert.same({ name = "tests", config = "456", enabled = false }, + patched.filters[2]) + end) + end) + + describe("DELETE", function() + local chain + + lazy_setup(function() + chain = bp.filter_chains:insert({ + service = assert(bp.services:insert({})), + filters = { { name = "tests" } }, + }, { nulls = true }) + end) + + it("removes a filter chain", function() + local res = admin:delete("/filter-chains/" .. chain[key]) + assert.response(res).has.status(204) + + assert.response( + admin:get("/filter-chains/" .. chain[key]) + ).has.status(404) + end) + + end) + + unsupported("POST", "/filter-chains/" .. utils.uuid()) + end) + + end -- each { "id", "name" } + + + -- * /services/:service/filter-chains + -- * /services/:service/filter-chains/:chain + -- * /routes/:route/filter-chains + -- * /routes/:route/filter-chains/:chain + for _, rel in ipairs({ "service", "route" }) do + + describe(fmt("/%ss/:%s/filter-chains", rel, rel), function() + local path, entity + + before_each(function() + if rel == "service" then + entity = assert(bp.services:insert({})) + else + entity = assert(bp.routes:insert({ hosts = { "wasm.test" } })) + end + + path = fmt("/%ss/%s/filter-chains", rel, entity.id) + end) + + describe("POST", function() + it("creates a " .. rel .. " filter chain", function() + local res = admin:post(path, json({ + filters = { { name = "tests" } }, + }) + ) + + assert.response(res).has.status(201) + local body = assert.response(res).has.jsonbody() + + assert.is_string(body.id) + assert.truthy(utils.is_valid_uuid(body.id)) + + assert.equals(1, #body.filters) + assert.equals("tests", body.filters[1].name) + end) + end) + + describe("GET", function() + it("returns existing " .. rel .. " filter chains", function() + local res = admin:get(path) + assert.response(res).has.status(200) + + local body = assert.response(res).has.jsonbody() + assert.same({ data = {}, next = ngx.null }, body) + + res = admin:post(path, json { + filters = { { name = "tests" } }, + tags = { "a" }, + }) + + assert.response(res).has.status(201) + local chain = assert.response(res).has.jsonbody() + + res = admin:get(path) + assert.response(res).has.status(200) + + body = assert.response(res).has.jsonbody() + assert.equals(1, #body.data, "unexpected number of filter chain entities") + assert.same(chain, body.data[1]) + end) + end) + + unsupported("PATCH", path) + unsupported("PUT", path) + unsupported("DELETE", path) + end) + + describe(fmt("/%ss/:%s/filter-chains/:chain", rel, rel), function() + local path, entity + local chain + + before_each(function() + if rel == "service" then + entity = assert(bp.services:insert({})) + chain = assert(bp.filter_chains:insert({ + service = entity, + filters = { { name = "tests" } }, + }, { nulls = true })) + + else + entity = assert(bp.routes:insert({ hosts = { "wasm.test" } })) + chain = assert(bp.filter_chains:insert({ + route = entity, + filters = { { name = "tests" } }, + }, { nulls = true })) + end + + path = fmt("/%ss/%s/filter-chains/", rel, entity.id) + end) + + describe("GET", function() + it("fetches a filter chain", function() + local res = admin:get(path .. chain.id) + assert.response(res).has.status(200) + local got = assert.response(res).has.jsonbody() + assert.same(chain, got) + end) + + it("returns 404 if not found", function() + assert.response( + admin:get(path .. utils.uuid()) + ).has.status(404) + end) + end) + + describe("PATCH", function() + it("updates a filter chain in-place", function() + assert.equals(ngx.null, chain.tags) + assert.is_true(chain.enabled) + + local res = admin:patch(path .. chain.id, json { + tags = { "foo", "bar" }, + enabled = false, + filters = { + { name = "tests", config = "123", enabled = true }, + { name = "tests", config = "456", enabled = false }, + }, + }) + + assert.response(res).has.status(200) + local patched = assert.response(res).has.jsonbody() + + assert.same({ "foo", "bar" }, patched.tags) + assert.is_false(patched.enabled) + assert.equals(2, #patched.filters) + assert.same({ name = "tests", config = "123", enabled = true }, + patched.filters[1]) + assert.same({ name = "tests", config = "456", enabled = false }, + patched.filters[2]) + end) + end) + + describe("DELETE", function() + it("removes a filter chain", function() + local res = admin:delete(path .. chain.id) + assert.response(res).has.status(204) + + assert.response( + admin:get(path .. chain.id) + ).has.status(404) + end) + + end) + end) + + end -- each relation (service, route) + + local function build_filters_response_from_fixtures(mode, fcs) + local filters = {} + for _, fc in ipairs(fcs) do + for _, f in ipairs(fc.filters) do + if (mode == "all") or + (f.enabled == true and mode == "enabled") or + (f.enabled == false and mode == "disabled") then + + table.insert(filters, { + config = f.config, + enabled = f.enabled, + filter_chain = { + id = fc.id, + name = fc.name, + }, + from = (fc.service ~= ngx.null) and "service" or "route", + name = f.name, + }) + + end + end + end + return filters + end + + describe("/routes/:routes/filters with chains from service and route", function() + local path, service, route, fcs + + lazy_setup(function() + reset_filter_chains() + + service = assert(bp.services:insert({})) + route = assert(bp.routes:insert({ + hosts = { "wasm.test" }, + service = { id = service.id }, + })) + + fcs = { + assert(bp.filter_chains:insert({ + filters = { + { name = "tests", config = ngx.null, enabled = true }, + { name = "response_transformer", config = "{}", enabled = false }, + }, + service = { id = service.id }, + name = "fc1", + }, { nulls = true })), + + assert(bp.filter_chains:insert({ + filters = { + { name = "tests", config = ngx.null, enabled = false }, + { name = "response_transformer", config = ngx.null, enabled = true } + }, + route = { id = route.id }, + }, { nulls = true })), + } + + path = fmt("/routes/%s/filters", route.id) + end) + + for _, mode in ipairs({"enabled", "disabled", "all"}) do + it(fmt("/routes/:routes/filters/%s GET returns 200", mode), function() + local filters = build_filters_response_from_fixtures(mode, fcs) + assert.equal(mode == "all" and 4 or 2, #filters) + + local res = admin:get(fmt("%s/%s", path, mode)) + assert.response(res).has.status(200) + local got = assert.response(res).has.jsonbody() + assert.same({ filters = filters }, got) + end) + end + end) + + describe("/routes/:routes/filters with chains from service only", function() + local path, service, route, fcs + + lazy_setup(function() + reset_filter_chains() + + service = assert(bp.services:insert({})) + route = assert(bp.routes:insert({ + hosts = { "wasm.test" }, + service = { id = service.id }, + })) + + fcs = { + assert(bp.filter_chains:insert({ + filters = { + { name = "tests", enabled = true }, + { name = "response_transformer", config = "{}", enabled = false }, + }, + service = { id = service.id }, + name = "fc1", + }, { nulls = true })), + } + + path = fmt("/routes/%s/filters", route.id) + end) + + for _, mode in ipairs({"enabled", "disabled", "all"}) do + it(fmt("/routes/:routes/filters/%s GET returns 200", mode), function() + local filters = build_filters_response_from_fixtures(mode, fcs) + assert.equal(mode == "all" and 2 or 1, #filters) + + local res = admin:get(fmt("%s/%s", path, mode)) + assert.response(res).has.status(200) + local got = assert.response(res).has.jsonbody() + assert.same({ filters = filters }, got) + end) + end + end) + + describe("/routes/:routes/filters with chains from route only", function() + local path, service, route, fcs + + lazy_setup(function() + reset_filter_chains() + + service = assert(bp.services:insert({})) + route = assert(bp.routes:insert({ + hosts = { "wasm.test" }, + service = { id = service.id }, + })) + + fcs = { + assert(bp.filter_chains:insert({ + filters = { + { name = "tests", enabled = true }, + { name = "response_transformer", config = "{}", enabled = false }, + { name = "tests", enabled = true }, + }, + route = { id = route.id }, + name = "fc1", + }, { nulls = true })), + } + + path = fmt("/routes/%s/filters", route.id) + end) + + for _, mode in ipairs({"enabled", "disabled", "all"}) do + it(fmt("/routes/:routes/filters/%s GET returns 200", mode), function() + local filters = build_filters_response_from_fixtures(mode, fcs) + assert.equal(mode == "all" and 3 + or mode == "enabled" and 2 + or mode == "disabled" and 1, #filters) + + local res = admin:get(fmt("%s/%s", path, mode)) + assert.response(res).has.status(200) + local got = assert.response(res).has.jsonbody() + assert.same({ filters = filters }, got) + end) + end + end) +end) + +describe("wasm admin API - wasm = off [#" .. strategy .. "]", function() + local admin + local bp, db + local service + + lazy_setup(function() + bp, db = helpers.get_db_utils(strategy, { + "routes", + "services", + }) + + service = assert(db.services:insert { + name = "wasm-test", + url = "http://wasm.test", + }) + + assert(helpers.start_kong({ + database = strategy, + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = "off", + })) + + admin = helpers.admin_client() + end) + + lazy_teardown(function() + if admin then admin:close() end + helpers.stop_kong(nil, true) + end) + + describe("/filter-chains", function() + + describe("POST", function() + it("returns 400", function() + local res = admin:post("/filter-chains", json({ + filters = { { name = "tests" } }, + service = { id = service.id }, + }) + ) + + assert.response(res).has.status(400) + end) + end) + + describe("GET", function() + it("returns 400", function() + local res = admin:get("/filter-chains") + assert.response(res).has.status(400) + end) + end) + + describe("PATCH", function() + it("returns 400", function() + local res = admin:patch("/filter-chains/a-name", json { + tags = { "foo", "bar" }, + enabled = false, + filters = { + { name = "tests", config = "123", enabled = true }, + { name = "tests", config = "456", enabled = false }, + }, + }) + assert.response(res).has.status(400) + end) + end) + + describe("PUT", function() + it("returns 400", function() + local res = admin:put("/filter-chains/another-name", json { + tags = { "foo", "bar" }, + enabled = false, + filters = { + { name = "tests", config = "123", enabled = true }, + { name = "tests", config = "456", enabled = false }, + }, + }) + assert.response(res).has.status(400) + end) + end) + + describe("DELETE", function() + it("returns 400", function() + local res = admin:delete("/filter-chains/even-another-name") + assert.response(res).has.status(400) + end) + end) + + end) + + -- * /services/:service/filter-chains + -- * /services/:service/filter-chains/:chain + -- * /routes/:route/filter-chains + -- * /routes/:route/filter-chains/:chain + for _, rel in ipairs({ "service", "route" }) do + + describe(fmt("/%ss/:%s/filter-chains", rel, rel), function() + local path, entity + + before_each(function() + if rel == "service" then + entity = assert(bp.services:insert({})) + else + entity = assert(bp.routes:insert({ hosts = { "wasm.test" } })) + end + + path = fmt("/%ss/%s/filter-chains", rel, entity.id) + end) + + it("GET returns 400", function() + assert.response( + admin:get(path) + ).has.status(400) + end) + + it("POST returns 400", function() + assert.response( + admin:post(path, json({ + filters = { { name = "tests" } }, + service = { id = service.id }, + }) + ) + ).has.status(400) + end) + + it("PATCH returns 400", function() + assert.response( + admin:patch(path .. "/" .. utils.uuid()), json({ + filters = { { name = "tests" } }, + service = { id = service.id }, + }) + ).has.status(400) + end) + + it("PUT returns 400", function() + assert.response( + admin:put(path .. "/" .. utils.uuid()), json({ + filters = { { name = "tests" } }, + service = { id = service.id }, + }) + ).has.status(400) + end) + + it("DELETE returns 400", function() + assert.response( + admin:delete(path .. "/" .. utils.uuid()) + ).has.status(400) + end) + + end) + + end -- each relation (service, route) + + for _, mode in ipairs({"enabled", "disabled", "all"}) do + + describe(fmt("/routes/:routes/filters/%s", mode), function() + local path, route + + before_each(function() + route = assert(bp.routes:insert({ hosts = { "wasm.test" } })) + path = fmt("/routes/%s/filters/%s", route.id, mode) + end) + + it("GET returns 400", function() + assert.response( + admin:get(path) + ).has.status(400) + end) + end) + + end -- each mode (enabled, disabled, all) + +end) + +end -- each strategy diff --git a/spec/02-integration/20-wasm/02-db_spec.lua b/spec/02-integration/20-wasm/02-db_spec.lua new file mode 100644 index 000000000000..2f9ec3703b63 --- /dev/null +++ b/spec/02-integration/20-wasm/02-db_spec.lua @@ -0,0 +1,469 @@ +local helpers = require "spec.helpers" +local utils = require "kong.tools.utils" + +-- no cassandra support +for _, strategy in helpers.each_strategy({ "postgres" }) do + +describe("wasm DB entities [#" .. strategy .. "]", function() + local db, dao + + local function reset_db() + if not db then return end + db.filter_chains:truncate() + db.filter_chains:load_filters({}) + db.routes:truncate() + db.services:truncate() + db.workspaces:truncate() + end + + + lazy_setup(function() + local _ + _, db = helpers.get_db_utils(strategy, { + "workspaces", + "routes", + "services", + "filter_chains", + }) + + dao = db.filter_chains + dao:load_filters({ + { name = "test", }, + { name = "other", }, + }) + end) + + lazy_teardown(reset_db) + + describe("filter_chains", function() + local function make_service() + local service = assert(db.services:insert({ + url = "http://wasm.test/", + })) + return { id = service.id } + end + + describe(".id", function() + it("is auto-generated", function() + local chain = assert(dao:insert({ + id = nil, + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_string(chain.id) + assert.truthy(utils.is_valid_uuid(chain.id)) + end) + + it("can be user-generated", function() + local id = utils.uuid() + local chain = assert(dao:insert({ + id = id, + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_string(chain.id) + assert.equals(id, chain.id) + assert.truthy(utils.is_valid_uuid(chain.id)) + end) + + it("must be a valid uuid", function() + local chain, err, err_t = dao:insert({ + id = "nope!", + service = make_service(), + filters = { { name = "test" } }, + }) + + assert.is_nil(chain, err) + assert.is_string(err) + assert.is_table(err_t) + + assert.same({ id = "expected a valid UUID" }, err_t.fields) + assert.equals("schema violation", err_t.name) + end) + end) + + describe(".name", function() + it("is optional", function() + local chain = assert(dao:insert({ + name = nil, + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_nil(chain.name) + end) + + it("must be unique", function() + local name = "my-unique-filter" + + assert(dao:insert({ + name = name, + service = make_service(), + filters = { { name = "test" } }, + })) + + local other, err, err_t = dao:insert({ + name = name, + service = make_service(), + filters = { { name = "test" } }, + }) + + assert.is_string(err) + assert.is_table(err_t) + assert.is_nil(other) + + assert.equals("unique constraint violation", err_t.name) + assert.same({ name = name }, err_t.fields) + end) + end) + + describe(".enabled", function() + it("defaults to 'true'", function() + local chain = assert(dao:insert({ + enabled = nil, + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_true(chain.enabled) + end) + + it("must be a boolean", function() + local chain, err, err_t = dao:insert({ + enabled = "nope!", + service = make_service(), + filters = { { name = "test" } }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + + assert.equals("schema violation", err_t.name) + assert.same({ enabled = "expected a boolean" }, err_t.fields) + end) + end) + + describe(".route", function() + it("references a route", function() + local route = assert(db.routes:insert({ + protocols = { "http" }, + methods = { "GET" }, + paths = { "/" }, + })) + + local chain, err = dao:insert({ + filters = { { name = "test" } }, + route = { id = route.id }, + }) + + assert.is_table(chain, err) + assert.is_nil(err) + assert.equals(route.id, chain.route.id) + end) + + it("requires the route to exist", function() + local chain, err, err_t = dao:insert({ + filters = { { name = "test" } }, + route = { id = utils.uuid() }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + + assert.equals("foreign key violation", err_t.name) + assert.is_table(err_t.fields) + assert.is_table(err_t.fields.route) + end) + end) + + describe(".service", function() + it("references a service", function() + local service = assert(db.services:insert({ + url = "http://wasm.test/", + })) + + local chain, err = dao:insert({ + filters = { { name = "test" } }, + service = { id = service.id }, + }) + + assert.is_table(chain, err) + assert.is_nil(err) + assert.equals(service.id, chain.service.id) + end) + + it("requires the service to exist", function() + local chain, err, err_t = dao:insert({ + filters = { { name = "test" } }, + service = { id = utils.uuid() }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + + assert.equals("foreign key violation", err_t.name) + assert.is_table(err_t.fields) + assert.is_table(err_t.fields.service) + end) + end) + + describe(".created_at", function() + it("is auto-generated", function() + local chain = assert(dao:insert({ + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_number(chain.created_at) + assert.truthy(math.abs(ngx.now() - chain.created_at) < 5) + end) + end) + + describe(".updated_at", function() + it("is updated when the entity is updated", function() + local chain = assert(dao:insert({ + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_number(chain.updated_at) + + helpers.wait_until(function() + local updated = assert(dao:update( + { id = chain.id }, + { tags = { utils.uuid() } } + )) + + return updated.updated_at > chain.updated_at + end, 5, 0.1) + end) + end) + + describe(".tags", function() + it("has tags", function() + local chain = assert(dao:insert({ + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_nil(chain.tags) + + chain = assert(dao:update({ id = chain.id }, { tags = { "foo" } })) + assert.same({ "foo" }, chain.tags) + end) + end) + + describe(".filters", function() + it("are required", function() + local chain, err, err_t = dao:insert({ + service = make_service(), + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.same({ filters = "required field missing" }, err_t.fields) + end) + + it("cannot be empty", function() + local chain, err, err_t = dao:insert({ + service = make_service(), + filters = {}, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.same({ filters = "length must be at least 1" }, err_t.fields) + end) + + describe(".name", function() + it("is required", function() + local chain, err, err_t = dao:insert({ + service = make_service(), + filters = { { config = "config" } }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.is_table(err_t.fields) + assert.is_table(err_t.fields.filters) + assert.same({ [1] = { name = "required field missing" } }, err_t.fields.filters) + end) + + it("must be a valid, enabled filter name", function() + local chain, err, err_t = dao:insert({ + service = make_service(), + filters = { + { name = "test" }, + { name = "missing" }, + { name = "other" }, + { name = "also-missing" }, + }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.is_table(err_t.fields) + assert.same({ + filters = { + [2] = { name = "no such filter: missing" }, + [4] = { name = "no such filter: also-missing" }, + }, + }, err_t.fields) + + assert(dao:insert({ + service = make_service(), + filters = { { name = "test" } }, + })) + + chain, err, err_t = dao:insert({ + service = make_service(), + filters = { + { name = "test" }, + { name = "missing" }, + { name = "other" }, + { name = "also-missing" }, + }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.is_table(err_t.fields) + assert.same({ + filters = { + [2] = { name = "no such filter: missing" }, + [4] = { name = "no such filter: also-missing" }, + }, + }, err_t.fields) + + end) + end) + + describe(".enabled", function() + it("defaults to 'true'", function() + local chain = assert(dao:insert({ + service = make_service(), + filters = { { name = "test" } }, + })) + + assert.is_true(chain.filters[1].enabled) + end) + end) + + describe(".config", function() + pending("is validated against the filter schema") + end) + end) + + describe("entity checks", function() + it("service and route are mutually exclusive", function() + local route = assert(db.routes:insert({ + protocols = { "http" }, + methods = { "GET" }, + paths = { "/" }, + })) + + local service = assert(db.services:insert({ + url = "http://example.test", + })) + + + local chain, err, err_t = dao:insert({ + route = { id = route.id }, + service = { id = service.id }, + filters = { { name = "test" } }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.same({ + ["@entity"] = { + "only one or none of these fields must be set: 'service', 'route'", + }, + }, err_t.fields) + end) + + it("allows only one chain per service", function() + local service = assert(db.services:insert({ + url = "http://example.test", + })) + + assert(dao:insert({ + service = { id = service.id }, + filters = { { name = "test" } }, + tags = { "original" }, + })) + + local chain, err, err_t = dao:insert({ + service = { id = service.id }, + filters = { { name = "test" } }, + tags = { "new" }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.equals("unique constraint violation", err_t.name) + assert.is_table(err_t.fields.service) + end) + + it("allows only one chain per route", function() + local route = assert(db.routes:insert({ + protocols = { "http" }, + methods = { "GET" }, + paths = { "/" }, + })) + + + assert(dao:insert({ + route = { id = route.id }, + filters = { { name = "test" } }, + tags = { "original" }, + })) + + local chain, err, err_t = dao:insert({ + route = { id = route.id }, + filters = { { name = "test" } }, + tags = { "new" }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.equals("unique constraint violation", err_t.name) + assert.is_table(err_t.fields.route) + end) + + it("requires a service or a route", function() + local chain, err, err_t = dao:insert({ + filters = { { name = "test" } }, + }) + + assert.is_nil(chain) + assert.is_string(err) + assert.is_table(err_t) + assert.is_table(err_t.fields) + assert.same( + { + ["@entity"] = { + [1] = [[at least one of these fields must be non-empty: 'service', 'route']] + }, + }, + err_t.fields + ) + end) + end) + end) +end) + +end -- each strategy diff --git a/spec/02-integration/20-wasm/03-runtime_spec.lua b/spec/02-integration/20-wasm/03-runtime_spec.lua new file mode 100644 index 000000000000..0d8281897ee4 --- /dev/null +++ b/spec/02-integration/20-wasm/03-runtime_spec.lua @@ -0,0 +1,486 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + +local HEADER = "X-Proxy-Wasm" + +local json = cjson.encode + +local function response_transformer(value, disabled) + return { + name = "response_transformer", + enabled = not disabled, + config = json { + append = { + headers = { + HEADER .. ":" .. value, + }, + }, + } + } +end + +for _, strategy in helpers.each_strategy({ "postgres", "off" }) do + + +describe("#wasm filter execution (#" .. strategy .. ")", function() + lazy_setup(function() + local bp, db = helpers.get_db_utils("postgres", { + "routes", + "services", + "filter_chains", + }) + + + db.filter_chains:load_filters({ + { name = "tests" }, + { name = "response_transformer" }, + }) + + + local function service_and_route(name) + local service = assert(bp.services:insert({ + name = name, + url = helpers.mock_upstream_url, + })) + + local route = assert(bp.routes:insert { + name = name, + service = { id = service.id }, + paths = { "/" }, + hosts = { name }, + }) + + return service, route + end + + local function create_filter_chain(entity) + return assert(bp.filter_chains:insert(entity)) + end + + + do + -- a filter chain attached to a service + local name = "service.test" + local service = service_and_route(name) + create_filter_chain({ + service = { id = service.id }, + filters = { + response_transformer(name), + } + }) + end + + do + -- a filter chain attached to a route + local name = "route.test" + local _, route = service_and_route(name) + create_filter_chain({ + route = { id = route.id }, + filters = { + response_transformer(name), + } + }) + end + + do + -- service and route each have a filter chain + local name = "service-and-route.test" + local service, route = service_and_route(name) + + create_filter_chain({ + service = { id = service.id }, + filters = { + response_transformer("service"), + } + }) + + create_filter_chain({ + route = { id = route.id }, + filters = { + response_transformer("route"), + } + }) + end + + do + -- a disabled filter chain attached to a service + local name = "service-disabled.test" + local service = service_and_route(name) + create_filter_chain({ + enabled = false, + service = { id = service.id }, + filters = { + response_transformer(name), + } + }) + end + + do + -- a disabled filter chain attached to a route + local name = "route-disabled.test" + local _, route = service_and_route(name) + create_filter_chain({ + enabled = false, + route = { id = route.id }, + filters = { + response_transformer(name), + } + }) + end + + do + -- service filter chain is disabled + -- route filter chain is enabled + local name = "service-disabled.route-enabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = false, + service = { id = service.id }, + filters = { + response_transformer("service"), + } + }) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("route"), + } + }) + end + + do + -- service filter chain is enabled + -- route filter chain is disabled + local name = "service-enabled.route-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("service"), + } + }) + + create_filter_chain({ + enabled = false, + route = { id = route.id }, + filters = { + response_transformer("route"), + } + }) + end + + do + -- service and route filter chains both disabled + local name = "service-disabled.route-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = false, + service = { id = service.id }, + filters = { + response_transformer("service"), + } + }) + + create_filter_chain({ + enabled = false, + route = { id = route.id }, + filters = { + response_transformer("route"), + } + }) + end + + do + -- service filter chain with one enabled filter and one disabled filter + local name = "service-partial-disabled.test" + local service = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("disabled", true), + response_transformer("enabled"), + } + }) + end + + do + -- route filter chain with one enabled filter and one disabled filter + local name = "route-partial-disabled.test" + local _, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("disabled", true), + response_transformer("enabled"), + } + }) + end + + do + -- combined service and route filter chains with some disabled filters + local name = "combined-partial-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("service-enabled"), + response_transformer("service-disabed", true), + } + }) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("route-disabled", true), + response_transformer("route-enabled"), + } + }) + end + + do + -- service filter chain with no enabled filters + local name = "service-fully-disabled.test" + local service = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("disabled", true), + response_transformer("also-disabled", true), + } + }) + end + + do + -- route filter chain with no enabled filters + local name = "route-fully-disabled.test" + local _, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("disabled", true), + response_transformer("also-disabled", true), + } + }) + end + + do + -- combined service and route filter chain with no enabled filters + local name = "combined-fully-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("service-disabled", true), + response_transformer("service-also-disabled", true), + } + }) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("route-disabled", true), + response_transformer("route-also-disabled", true), + } + }) + end + + do + -- combined service and route filter chain with all service filters disabled + local name = "combined-service-filters-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("service-disabled", true), + response_transformer("service-also-disabled", true), + } + }) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("route-enabled"), + response_transformer("route-disabled", true), + } + }) + end + + do + -- combined service and route filter chain with all route filters disabled + local name = "combined-route-filters-disabled.test" + local service, route = service_and_route(name) + + create_filter_chain({ + enabled = true, + service = { id = service.id }, + filters = { + response_transformer("service-disabled", true), + response_transformer("service-enabled"), + } + }) + + create_filter_chain({ + enabled = true, + route = { id = route.id }, + filters = { + response_transformer("route-disabled", true), + response_transformer("route-also-disabled", true), + } + }) + end + + + assert(helpers.start_kong({ + database = strategy, + declarative_config = strategy == "off" + and helpers.make_yaml_file() + or nil, + + nginx_conf = "spec/fixtures/custom_nginx.template", + + wasm = true, + })) + end) + + + lazy_teardown(function() + helpers.stop_kong(nil, true) + end) + + + local client + before_each(function() + helpers.clean_logfile() + client = helpers.proxy_client() + end) + + + after_each(function() + if client then client:close() end + end) + + + local function assert_filter(host, expect_header) + local res = client:get("/", { + headers = { host = host }, + }) + + assert.response(res).has.status(200) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + + local header = assert.response(res).has.header(HEADER) + + if type(expect_header) == "string" then + expect_header = { expect_header } + end + + if type(header) == "string" then + header = { header } + end + + assert.same(expect_header, header) + end + + + local function assert_no_filter(host) + local res = client:get("/", { + headers = { host = host }, + }) + + assert.response(res).has.status(200) + assert.response(res).has.no.header(HEADER) + end + + + describe("single filter chain", function() + it("attached to a service", function() + assert_filter("service.test", "service.test") + end) + + it("attached to a route", function() + assert_filter("route.test", "route.test") + end) + end) + + describe("multiple filter chains", function() + it("service and route with their own filter chains", function() + assert_filter("service-and-route.test", { "service", "route" }) + end) + end) + + describe("disabled filter chains", function() + it("attached to a service", function() + assert_no_filter("service-disabled.test") + end) + + it("attached to a route", function() + assert_no_filter("route-disabled.test") + end) + + it("service disabled, route enabled", function() + assert_filter("service-disabled.route-enabled.test", "route") + end) + + it("service enabled, route disabled", function() + assert_filter("service-enabled.route-disabled.test", "service") + end) + + it("service disabled, route disabled", function() + assert_no_filter("service-disabled.route-disabled.test") + end) + end) + + describe("disabled filters are not executed", function() + it("(service)", function() + assert_filter("service-partial-disabled.test", "enabled") + end) + + it("(route)", function() + assert_filter("route-partial-disabled.test", "enabled") + end) + + it("(combined)", function() + assert_filter("combined-partial-disabled.test", + { "service-enabled", "route-enabled" }) + + assert_filter("combined-service-filters-disabled.test", + { "route-enabled" }) + + assert_filter("combined-route-filters-disabled.test", + { "service-enabled" }) + end) + + it("and all filters can be disabled", function() + assert_no_filter("service-fully-disabled.test") + assert_no_filter("route-fully-disabled.test") + assert_no_filter("combined-fully-disabled.test") + end) + end) +end) + + +end -- each strategy diff --git a/spec/02-integration/20-wasm/04-proxy-wasm_spec.lua b/spec/02-integration/20-wasm/04-proxy-wasm_spec.lua new file mode 100644 index 000000000000..459f4a1e3ed0 --- /dev/null +++ b/spec/02-integration/20-wasm/04-proxy-wasm_spec.lua @@ -0,0 +1,395 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" + + +local DATABASE = "postgres" +local HEADER_NAME_PHASE = "X-PW-Phase" +local HEADER_NAME_TEST = "X-PW-Test" +local HEADER_NAME_INPUT = "X-PW-Input" +local HEADER_NAME_DISPATCH_ECHO = "X-PW-Dispatch-Echo" +local HEADER_NAME_ADD_REQ_HEADER = "X-PW-Add-Header" +local HEADER_NAME_ADD_RESP_HEADER = "X-PW-Add-Resp-Header" + +local DNS_HOSTNAME = "wasm.test" +local MOCK_UPSTREAM_DNS_ADDR = DNS_HOSTNAME .. ":" .. helpers.mock_upstream_port + + +describe("proxy-wasm filters (#wasm)", function() + local r_single, mock_service + local hosts_file + + lazy_setup(function() + local bp, db = helpers.get_db_utils(DATABASE, { + "routes", + "services", + "filter_chains", + }) + + db.filter_chains:load_filters({ { name = "tests" } }) + + mock_service = assert(bp.services:insert { + host = helpers.mock_upstream_host, + port = helpers.mock_upstream_port, + }) + + r_single = assert(bp.routes:insert { + paths = { "/single" }, + strip_path = true, + service = mock_service, + }) + + local r_double = assert(bp.routes:insert { + paths = { "/double" }, + strip_path = true, + service = mock_service, + }) + + assert(db.filter_chains:insert { + route = r_single, + filters = { + { name = "tests" }, + }, + }) + + assert(db.filter_chains:insert { + route = r_double, + filters = { + { name = "tests" }, + { name = "tests" }, + }, + }) + + -- XXX our dns mock fixture doesn't work when called from wasm land + hosts_file = os.tmpname() + assert(helpers.file.write(hosts_file, + "127.0.0.1 " .. DNS_HOSTNAME .. "\n")) + + assert(helpers.start_kong({ + database = DATABASE, + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + dns_hostsfile = hosts_file, + })) + end) + + lazy_teardown(function() + helpers.stop_kong(nil, true) + os.remove(hosts_file) + end) + + before_each(function() + helpers.clean_logfile() + end) + + describe("runs a filter chain", function() + it("with a single filter", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + }) + + assert.res_status(200, res) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("with multiple filters", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/double/status/200", + }) + + assert.res_status(200, res) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + end) + + describe("filters can", function() + it("add request headers", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_ADD_REQ_HEADER] = "Via=proxy-wasm", + } + }) + + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal("proxy-wasm", json.headers["via"]) + -- TODO: honor case-sensitivity (proxy-wasm-rust-sdk/ngx_wasm_module investigation) + -- assert.equal("proxy-wasm", json.headers["Via"]) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("remove request headers", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_ADD_REQ_HEADER] = "Via=proxy-wasm", + } + }) + + local body = assert.res_status(200, res) + local json = cjson.decode(body) + -- The 'test' Rust filter removes the "X-PW-*" request headers + assert.is_nil(json.headers[HEADER_NAME_ADD_REQ_HEADER]) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("add response headers on_request_headers", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_ADD_RESP_HEADER] = "X-Via=proxy-wasm", + } + }) + + assert.res_status(200, res) + local via = assert.response(res).has.header("x-via") + assert.equal("proxy-wasm", via) + assert.logfile().has.line([[testing in "RequestHeaders"]]) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("add response headers on_response_headers", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_PHASE] = "response_headers", + [HEADER_NAME_ADD_RESP_HEADER] = "X-Via=proxy-wasm", + } + }) + + assert.res_status(200, res) + local via = assert.response(res).has.header("x-via") + assert.equal("proxy-wasm", via) + assert.logfile().has.line([[testing in "ResponseHeaders"]]) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + -- describe+it: + -- "filters can NOT ..." + it("NOT add response headers on_log", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_PHASE] = "log", + [HEADER_NAME_ADD_RESP_HEADER] = "X-Via=proxy-wasm", + } + }) + + assert.res_status(200, res) + assert.response(res).has.no.header("x-via") + assert.logfile().has.line([[testing in "Log"]]) + assert.logfile().has.line("cannot add response header: headers already sent") + end) + + pending("throw a trap", function() + -- Used to work but now broken (obscure wasmtime SIGSEV), no clue + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + headers = { + [HEADER_NAME_TEST] = "trap", + } + }) + + assert.res_status(500, res) + assert.logfile().has.line("panicked at 'trap msg'") + assert.logfile().has.line("trap in proxy_on_request_headers:.*?unreachable") + end) + + pending("send a local response", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/201", -- status overriden by 'test' filter + headers = { + [HEADER_NAME_TEST] = "local_response", + [HEADER_NAME_INPUT] = "Hello from proxy-wasm", + } + }) + + local body = assert.res_status(200, res) + assert.equal("Hello from proxy-wasm", body) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("read kong.route_id", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/201", + headers = { + [HEADER_NAME_TEST] = "get_kong_property", + [HEADER_NAME_INPUT] = "route_id", + [HEADER_NAME_DISPATCH_ECHO] = "on", + } + }) + + local body = assert.res_status(200, res) + assert.equal(r_single.id, body) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("read kong.service_id", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/201", + headers = { + [HEADER_NAME_TEST] = "get_kong_property", + [HEADER_NAME_INPUT] = "service_id", + [HEADER_NAME_DISPATCH_ECHO] = "on", + } + }) + + local body = assert.res_status(200, res) + assert.equal(mock_service.id, body) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("send an http dispatch, return its response body", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/201", + headers = { + [HEADER_NAME_TEST] = "echo_http_dispatch", + [HEADER_NAME_INPUT] = "path=/headers", + [HEADER_NAME_DISPATCH_ECHO] = "on", + } + }) + + -- The dispatch went to the local mock upstream /headers endpoint + -- which itself sent back + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port, + json.headers["host"]) + assert.equal("http://" .. helpers.mock_upstream_host .. ":" .. + helpers.mock_upstream_port .. "/headers", + json.url) + + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + end) + + it("resolves DNS hostnames to send an http dispatch, return its response body", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/201", + headers = { + [HEADER_NAME_TEST] = "echo_http_dispatch", + [HEADER_NAME_INPUT] = "path=/headers host=" .. MOCK_UPSTREAM_DNS_ADDR, + [HEADER_NAME_DISPATCH_ECHO] = "on", + } + }) + + -- The dispatch went to the local mock upstream /headers endpoint + -- which itself sent back + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(MOCK_UPSTREAM_DNS_ADDR, json.headers["host"]) + assert.equal("http://" .. MOCK_UPSTREAM_DNS_ADDR .. "/headers", + json.url) + + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + + assert.logfile().has.line("wasm lua resolver using existing dns_client") + assert.logfile().has.line([[wasm lua resolved "]] + .. DNS_HOSTNAME .. + [[" to "127.0.0.1"]]) + end) + + pending("start on_tick background timer", function() + -- Pending on internal ngx_wasm_module changes + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/single/status/200", + }) + + assert.res_status(200, res) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + + -- TODO + end) + end) + + describe("behavior with", function() + pending("multiple filters, one sends a local response", function() + local client = helpers.proxy_client() + finally(function() client:close() end) + + local res = assert(client:send { + method = "GET", + path = "/double/", + headers = { + [HEADER_NAME_TEST] = "local_response", + } + }) + + local body = assert.res_status(200, res) + assert.equal("", body) + assert.logfile().has.no.line("[error]", true, 0) + assert.logfile().has.no.line("[crit]", true, 0) + + -- TODO: test that phases are properly invoked and the chain + -- correctly interrupted, but how? + -- no equivalent to Test::Nginx's grep_error_log_out + end) + end) +end) diff --git a/spec/02-integration/20-wasm/05-cache-invalidation_spec.lua b/spec/02-integration/20-wasm/05-cache-invalidation_spec.lua new file mode 100644 index 000000000000..23b3c4d2b7c5 --- /dev/null +++ b/spec/02-integration/20-wasm/05-cache-invalidation_spec.lua @@ -0,0 +1,508 @@ +local helpers = require "spec.helpers" +local cjson = require "cjson" +local nkeys = require "table.nkeys" + +local HEADER = "X-Proxy-Wasm" +local TIMEOUT = 20 +local STEP = 0.1 + + +local json = cjson.encode +local fmt = string.format + +local function make_config(src) + return json { + append = { + headers = { + HEADER .. ":" .. src, + }, + }, + } +end + + +local function update_yaml_config() + local fname = helpers.make_yaml_file() + local yaml = assert(helpers.file.read(fname)) + + local client = helpers.admin_client() + + local res = client:post("/config", { + headers = { ["Content-Type"] = "text/yaml" }, + body = yaml, + }) + + assert.response(res).has.status(201) + + client:close() +end + + +local function declarative_api_functions(db) + local dao = db.filter_chains + + local insert = function(entity) + local res = assert(dao:insert(entity)) + update_yaml_config() + return res + end + + local update = function(id, updates) + if type(id) == "string" then + id = { id = id } + end + local res = assert(dao:update(id, updates)) + update_yaml_config() + return res + end + + local delete = function(id) + if type(id) == "string" then + id = { id = id } + end + local res = assert(dao:delete(id)) + update_yaml_config() + return res + end + + return insert, update, delete +end + + +local function db_api_functions() + local api = require("spec.fixtures.admin_api").filter_chains + + local insert = function(entity) + return assert(api:insert(entity)) + end + + local update = function(id, updates) + return assert(api:update(id, updates)) + end + + local delete = function(id) + local _, err = api:remove(id) + assert(not err, err) + end + + return insert, update, delete +end + + +local function make_api(strategy, db) + if strategy == "off" then + return declarative_api_functions(db) + end + + return db_api_functions() +end + + +-- we must use more than one worker to ensure adequate test coverage +local WORKER_COUNT = 4 +local WORKER_ID_HEADER = "X-Worker-Id" + +for _, strategy in ipairs({ "postgres", "off"}) do + +for _, consistency in ipairs({ "eventual", "strict" }) do + +local mode_suffix = fmt("(strategy: #%s) (#%s consistency)", + strategy, consistency) + +describe("#wasm filter chain cache " .. mode_suffix, function() + local db + + local insert, update, delete + + local hosts = { + filter = "filter.test", + filter_alt = "filter-alt.test", + no_filter = "no-filter.test", + } + + local services = {} + local routes = {} + + + local function assert_no_filter(host, suffix) + local msg = fmt("response header %q should be absent for all workers", + HEADER) + if suffix then + msg = msg .. " " .. suffix + end + + local workers_seen = {} + + helpers.pwait_until(function() + local client = helpers.proxy_client() + local res = client:get("/", { + headers = { host = host }, + }) + + assert.response(res).has.status(200) + client:close() + + assert.response(res).has.no_header(HEADER) + + -- ensure that we've received the correct response from each + -- worker at least once + local worker_id = assert.response(res).has.header(WORKER_ID_HEADER) + workers_seen[worker_id] = true + assert.same(WORKER_COUNT, nkeys(workers_seen), msg) + end, TIMEOUT, STEP) + end + + + local function assert_filters(host, exp, suffix) + local msg = fmt("response header %q should be equal to %q for all workers", + HEADER, table.concat(exp, ",")) + + if suffix then + msg = msg .. " " .. suffix + end + + local workers_seen = {} + + helpers.pwait_until(function() + local client = helpers.proxy_client() + + local res = client:get("/", { + headers = { host = host }, + }) + + assert.response(res).has.status(200) + client:close() + + local header = res.headers[HEADER] + assert.not_nil(header, msg) + + if type(header) == "string" then + header = { header } + end + + if type(exp) == "string" then + exp = { exp } + end + + assert.same(exp, header, msg) + + -- ensure that we've received the correct response from each + -- worker at least once + local worker_id = assert.response(res).has.header(WORKER_ID_HEADER) + workers_seen[worker_id] = true + assert.same(WORKER_COUNT, nkeys(workers_seen), msg) + end, TIMEOUT, STEP) + end + + + lazy_setup(function() + local bp + bp, db = helpers.get_db_utils("postgres", { + "services", + "routes", + "filter_chains", + }) + + db.filter_chains:load_filters({ + { name = "response_transformer", }, + }) + + services.filter = bp.services:insert({ name = hosts.filter }) + services.no_filter = bp.services:insert({ name = hosts.no_filter }) + + routes.filter = bp.routes:insert({ + name = hosts.filter, + service = services.filter, + hosts = { hosts.filter }, + paths = { "/" }, + }) + + routes.filter_alt = bp.routes:insert({ + name = hosts.filter_alt, + service = services.filter, + hosts = { hosts.filter_alt }, + paths = { "/" }, + }) + + routes.no_filter = bp.routes:insert({ + name = hosts.no_filter, + service = services.no_filter, + hosts = { hosts.no_filter }, + paths = { "/" }, + }) + + assert(bp.plugins:insert({ + name = "pre-function", + config = { + rewrite = {[[ + kong.response.set_header( + "]] .. WORKER_ID_HEADER .. [[", + ngx.worker.id() + ) + ]]} + } + })) + + + insert, update, delete = make_api(strategy, db) + + + assert(helpers.start_kong({ + database = strategy, + declarative_config = strategy == "off" + and helpers.make_yaml_file() + or nil, + + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + + nginx_main_worker_processes = WORKER_COUNT, + + worker_consistency = consistency, + worker_state_update_frequency = 0.25, + + proxy_listen = fmt("%s:%s reuseport", + helpers.get_proxy_ip(), + helpers.get_proxy_port()), + })) + end) + + + lazy_teardown(function() + helpers.stop_kong(nil, true) + end) + + + before_each(function() + helpers.clean_logfile() + db.filter_chains:truncate() + + -- sanity + assert_no_filter(hosts.no_filter, "(test setup)") + end) + + + it("is invalidated on filter creation and removal", function() + assert_no_filter(hosts.filter, "(initial test)") + + local service_chain = insert({ + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("service") + }, + } + }) + + assert_filters(hosts.filter, { "service" }, + "after adding a service filter chain") + + local route_chain = insert({ + route = routes.filter, + filters = { + { name = "response_transformer", + config = make_config("route") + }, + } + }) + + assert_filters(hosts.filter, { "service", "route" }, + "after adding a route filter chain") + + delete({ id = route_chain.id }) + assert_filters(hosts.filter, { "service" }, + "after removing the route filter chain") + + delete({ id = service_chain.id }) + + assert_no_filter(hosts.filter, + "after removing all relevant filter chains") + end) + + it("is invalidated on update when a filter chain is enabled/disabled", function() + assert_no_filter(hosts.filter, "(initial test)") + + local service_chain = insert({ + enabled = false, + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("service") + }, + } + }) + + local route_chain = insert({ + enabled = false, + route = routes.filter, + filters = { + { name = "response_transformer", + config = make_config("route") + }, + } + }) + + -- sanity + assert_no_filter(hosts.filter, "after adding disabled filter chains") + + assert(update(service_chain.id, { enabled = true })) + + assert_filters(hosts.filter, { "service" }, + "after enabling the service filter chain") + + + assert(update(route_chain.id, { enabled = true })) + + assert_filters(hosts.filter, { "service", "route" }, + "after enabling the route filter chain") + + + assert(update(route_chain.id, { enabled = false })) + + assert_filters(hosts.filter, { "service" }, + "after disabling the route filter chain") + + + assert(update(service_chain.id, { enabled = false })) + + assert_no_filter(hosts.filter, "after disabling all filter chains") + end) + + it("is invalidated on update when filters are added/removed", function() + local service_chain = insert({ + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("service") + }, + } + }) + + assert_filters(hosts.filter, { "service" }, + "after enabling a service filter chain") + + assert(update(service_chain.id, { + filters = { + { name = "response_transformer", + config = make_config("service") + }, + + { name = "response_transformer", + config = make_config("new") + }, + } + })) + + assert_filters(hosts.filter, { "service", "new" }, + "after adding a filter to the service filter chain") + + assert(update(service_chain.id, { + filters = { + { name = "response_transformer", + config = make_config("new") + }, + } + })) + + assert_filters(hosts.filter, { "new" }, + "after removing a filter from the service filter chain") + end) + + it("is invalidated when filters are enabled/disabled", function() + local service_chain = insert({ + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("service"), + enabled = true, + }, + + { name = "response_transformer", + config = make_config("other"), + enabled = true, + }, + } + }) + + assert_filters(hosts.filter, { "service", "other" }, + "after enabling a service filter chain") + + service_chain.filters[1].enabled = false + assert(update(service_chain.id, service_chain)) + + assert_filters(hosts.filter, { "other" }, + "after disabling a filter in the chain") + + service_chain.filters[1].enabled = true + service_chain.filters[2].enabled = false + assert(update(service_chain.id, service_chain)) + assert_filters(hosts.filter, { "service" }, + "after changing the enabled filters in the chain") + + service_chain.filters[1].enabled = false + service_chain.filters[2].enabled = false + assert(update(service_chain.id, service_chain)) + + assert_no_filter(hosts.filter, "after disabling all filters in the chain") + end) + + it("is invalidated when filters are re-ordered", function() + local service_chain = insert({ + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("first"), + enabled = true, + }, + + { name = "response_transformer", + config = make_config("middle"), + enabled = true, + }, + + { name = "response_transformer", + config = make_config("last"), + enabled = true, + }, + } + }) + + assert_filters(hosts.filter, { "first", "middle", "last" }, + "after enabling a service filter chain") + + service_chain.filters[1], service_chain.filters[3] + = service_chain.filters[3], service_chain.filters[1] + + assert(update(service_chain.id, service_chain)) + + assert_filters(hosts.filter, { "last", "middle", "first" }, + "after re-ordering the filter chain items") + end) + + + it("is invalidated when filter configuration is changed", function() + local service_chain = insert({ + service = services.filter, + filters = { + { name = "response_transformer", + config = make_config("before"), + enabled = true, + }, + } + }) + + assert_filters(hosts.filter, { "before" }, + "after enabling a service filter chain") + + service_chain.filters[1].config = make_config("after") + assert(update(service_chain.id, service_chain)) + + assert_filters(hosts.filter, { "after" }, + "after enabling a service filter chain") + end) +end) + + +end -- each consistency + +end -- each strategy diff --git a/spec/02-integration/20-wasm/06-clustering_spec.lua b/spec/02-integration/20-wasm/06-clustering_spec.lua new file mode 100644 index 000000000000..5b5b15867334 --- /dev/null +++ b/spec/02-integration/20-wasm/06-clustering_spec.lua @@ -0,0 +1,262 @@ +local helpers = require "spec.helpers" +local utils = require "kong.tools.utils" +local cjson = require "cjson.safe" +local STATUS = require("kong.constants").CLUSTERING_SYNC_STATUS +local admin = require "spec.fixtures.admin_api" + +local HEADER = "X-Proxy-Wasm" + +local json = cjson.encode + +local function get_node_id(prefix) + local data = helpers.wait_for_file_contents(prefix .. "/kong.id") + data = data:gsub("%s*(.-)%s*", "%1") + assert(utils.is_valid_uuid(data), "invalid kong node ID found in " .. prefix) + return data +end + + +local function expect_status(prefix, exp) + local id = get_node_id(prefix) + local msg = "waiting for clustering sync status to equal" + .. " '" .. exp .. "' for data plane" + + assert + .eventually(function() + local cp_client = helpers.admin_client() + + local res = cp_client:get("/clustering/data-planes/") + res:read_body() + + cp_client:close() + + local body = assert.response(res).has.jsonbody() + + if res.status ~= 200 then + return nil, { + msg = "bad http status", + exp = 200, + got = res.status, + } + end + + assert.is_table(body.data) + local found + for _, dp in ipairs(body.data) do + if dp.id == id then + found = dp + break + end + end + + if not found then + return nil, { + msg = "dp with id " .. id .. " not found in response", + res = body, + } + + elseif found.sync_status ~= exp then + return nil, { + msg = "unexpected sync_status", + exp = exp, + got = found.sync_status, + dp = found, + } + end + + return true + end) + .is_truthy(msg) +end + + +describe("#wasm - hybrid mode", function() + local cp_prefix = "cp" + local cp_errlog = cp_prefix .. "/logs/error.log" + + local dp_prefix = "dp" + + lazy_setup(function() + local _, db = helpers.get_db_utils("postgres", { + "services", + "routes", + "filter_chains", + "clustering_data_planes", + }) + + db.clustering_data_planes:truncate() + + assert(helpers.start_kong({ + role = "control_plane", + database = "postgres", + prefix = cp_prefix, + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + db_update_frequency = 0.1, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + })) + end) + + lazy_teardown(function() + helpers.stop_kong(cp_prefix, true) + end) + + describe("[happy path]", function() + local client + + lazy_setup(function() + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = dp_prefix, + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + admin_listen = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = true, + })) + + client = helpers.proxy_client() + end) + + lazy_teardown(function() + if client then client:close() end + helpers.stop_kong(dp_prefix) + end) + + it("syncs wasm filter chains to the data plane", function() + local service = admin.services:insert({}) + local host = "wasm-" .. utils.random_string() .. ".test" + + admin.routes:insert({ + service = service, + hosts = { host }, + }) + + local params = { + headers = { + host = host, + } + } + + assert + .eventually(function() + local res = client:get("/status/200", params) + return res.status == 200, { + exp = 200, + got = res.status, + res = res:read_body(), + } + end) + .is_truthy("service/route are ready on the data plane") + + local value = utils.random_string() + + local filter = admin.filter_chains:insert({ + service = { id = service.id }, + filters = { + { + name = "response_transformer", + config = json { + append = { + headers = { + HEADER .. ":" .. value, + }, + }, + } + } + } + }) + + assert + .eventually(function() + local res = client:get("/status/200", params) + res:read_body() + + if res.status ~= 200 then + return { + msg = "bad http status", + exp = 200, + got = res.status, + res = res:read_body(), + } + end + + if res.headers[HEADER] ~= value then + return nil, { + msg = "missing/incorrect " .. HEADER .. " header", + exp = value, + got = res.headers[HEADER] or "", + } + end + + return true + end) + .is_truthy("wasm filter is configured on the data plane") + + admin.filter_chains:remove({ id = filter.id }) + + assert + .eventually(function() + local res = client:get("/status/200", params) + res:read_body() + + if res.status ~= 200 then + return { + msg = "bad http status", + exp = 200, + got = res.status, + res = res:read_body(), + } + end + + if res.headers[HEADER] ~= nil then + return nil, { + msg = "expected " .. HEADER .. " header to be absent", + exp = "", + got = res.headers[HEADER], + } + end + + return true + end) + .is_truthy("wasm filter has been removed from the data plane") + + expect_status(dp_prefix, STATUS.NORMAL) + end) + end) + + describe("data planes with wasm disabled", function() + lazy_setup(function() + helpers.clean_logfile(cp_errlog) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = dp_prefix, + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + admin_listen = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + wasm = "off", + })) + end) + + + lazy_teardown(function() + helpers.stop_kong(dp_prefix, true) + end) + + it("does not sync configuration", function() + assert.logfile(cp_errlog).has.line( + [[unable to send updated configuration to data plane: data plane is missing one or more wasm filters]], + true, 5) + + expect_status(dp_prefix, STATUS.FILTER_SET_INCOMPATIBLE) + end) + end) +end) diff --git a/spec/05-migration/db/migrations/core/020_330_to_340_spec.lua b/spec/05-migration/db/migrations/core/020_330_to_340_spec.lua index 54768c7489f4..5ace57dc0cc9 100644 --- a/spec/05-migration/db/migrations/core/020_330_to_340_spec.lua +++ b/spec/05-migration/db/migrations/core/020_330_to_340_spec.lua @@ -7,4 +7,28 @@ describe("database migration", function() assert.not_database_has_relation("ttls") end) end + + do -- wasm + uh.old_after_up("has created the expected new columns", function() + assert.table_has_column("filter_chains", "id", "uuid") + assert.table_has_column("filter_chains", "name", "text") + assert.table_has_column("filter_chains", "enabled", "boolean") + + assert.table_has_column("filter_chains", "cache_key", "text") + assert.table_has_column("filter_chains", "filters", "ARRAY") + assert.table_has_column("filter_chains", "tags", "ARRAY") + assert.table_has_column("filter_chains", "created_at", "timestamp with time zone") + assert.table_has_column("filter_chains", "updated_at", "timestamp with time zone") + + assert.table_has_column("filter_chains", "route_id", "uuid") + assert.table_has_column("filter_chains", "service_id", "uuid") + assert.table_has_column("filter_chains", "ws_id", "uuid") + end) + + if uh.database_type() == "postgres" then + uh.all_phases("has created the expected triggers", function () + assert.database_has_trigger("filter_chains_sync_tags_trigger") + end) + end + end end) diff --git a/spec/fixtures/blueprints.lua b/spec/fixtures/blueprints.lua index 900c06504588..8427a168c80d 100644 --- a/spec/fixtures/blueprints.lua +++ b/spec/fixtures/blueprints.lua @@ -403,6 +403,13 @@ function _M.new(db) return {} end) + local filter_chains_seq = new_sequence("filter-chains-%d") + res.filter_chains = new_blueprint(db.filter_chains, function() + return { + name = filter_chains_seq:next(), + } + end) + return res end diff --git a/spec/fixtures/custom_nginx.template b/spec/fixtures/custom_nginx.template index c4db91f35d5f..15f4261363d4 100644 --- a/spec/fixtures/custom_nginx.template +++ b/spec/fixtures/custom_nginx.template @@ -1,6 +1,11 @@ # This is a custom nginx configuration template for Kong specs pid pids/nginx.pid; # mandatory even for custom config templates + +> if wasm and wasm_dynamic_module then +load_module $(wasm_dynamic_module); +> end + error_log ${{PROXY_ERROR_LOG}} ${{LOG_LEVEL}}; # injected nginx_main_* directives @@ -20,6 +25,49 @@ events { > end } +> if wasm then +wasm { +> for _, el in ipairs(nginx_wasm_main_shm_directives) do + shm_kv $(el.name) $(el.value); +> end + +> for _, module in ipairs(wasm_modules_parsed) do + module $(module.name) $(module.path); +> end + +> for _, el in ipairs(nginx_wasm_main_directives) do + $(el.name) $(el.value); +> end + +> if #nginx_wasm_wasmtime_directives > 0 then + wasmtime { +> for _, el in ipairs(nginx_wasm_wasmtime_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- wasmtime + +> if #nginx_wasm_v8_directives > 0 then + v8 { +> for _, el in ipairs(nginx_wasm_v8_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- v8 + +> if #nginx_wasm_wasmer_directives > 0 then + wasmer { +> for _, el in ipairs(nginx_wasm_wasmer_directives) do + flag $(el.name) $(el.value); +> end + } +> end -- wasmer + +} +> end + + + > if role == "control_plane" or #proxy_listeners > 0 or #admin_listeners > 0 or #status_listeners > 0 then http { server_tokens off; diff --git a/spec/helpers.lua b/spec/helpers.lua index 0178fb3e6568..bdc7bd4cbe0b 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -3826,6 +3826,41 @@ local function generate_keys(fmt) end +local make_temp_dir +do + local seeded = false + + function make_temp_dir() + if not seeded then + ngx.update_time() + math.randomseed(ngx.worker.pid() + ngx.now()) + seeded = true + end + + local tmp + local ok, err + + local tries = 1000 + for _ = 1, tries do + local name = "/tmp/.kong-test" .. math.random() + + ok, err = pl_path.mkdir(name) + + if ok then + tmp = name + break + end + end + + assert(tmp ~= nil, "failed to create temporary directory " .. + "after " .. tostring(tries) .. " tries, " .. + "last error: " .. tostring(err)) + + return tmp, function() pl_dir.rmtree(tmp) end + end +end + + ---------------- -- Variables/constants -- @section exported-fields @@ -4060,4 +4095,6 @@ end return table_clone(PLUGINS_LIST) end, get_available_port = get_available_port, + + make_temp_dir = make_temp_dir, } diff --git a/spec/kong_tests.conf b/spec/kong_tests.conf index 5efda77d4cdc..4afb45a0985f 100644 --- a/spec/kong_tests.conf +++ b/spec/kong_tests.conf @@ -44,3 +44,5 @@ untrusted_lua = sandbox vaults = bundled pg_password = foo\#bar# this is a comment that should be stripped + +wasm_filters_path = ./spec/fixtures/proxy_wasm_filters/build