Skip to content

Commit

Permalink
feat(clustering): add cluster compat logic for wasm (#11217)
Browse files Browse the repository at this point in the history
* chore(clustering): add cluster compat logic for wasm

This adds basic cluster compatibility checks for wasm filters. The logic
is similar to how plugin checks work today. That is, if a wasm filter is
present on the control plane but _not_ on the data plane, config updates
will not be sent to the data plane, and the control plane will set a
(new) sync status for the data plane accordingly.

This is accomplished by adding a new `filters` attribute to the
basic_info payload that is sent by the data plane when initiating a
connection to the control plane. The `filters` attribute is a map of
strings to objects with a single `name` attribute:

```json
{
  "my-filter": {
    "name": "my-filter",
  },
  "my-other-filter": {
    "name": "my-other-filter",
  }
}
```

Using a map instead of an array was done for convenience of member
lookup (since we would probably convert the array to a map anyways), but
I get that it looks a little silly/redundant in its current form. Open
to changing this if there are objections.

Down the road we can potentially come up with some scheme for more
granular checks (i.e. a version or checksum field), but right now we
just check if a filter is present or absent.

* lint lint lint

* fix cluster status schema test
  • Loading branch information
flrgh authored and locao committed Jul 14, 2023
1 parent 0a6d9c0 commit e3e90dc
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 10 deletions.
20 changes: 20 additions & 0 deletions kong/clustering/compat/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ local COMPATIBILITY_CHECKERS = require("kong.clustering.compat.checkers")
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local KONG_VERSION = meta.version

local EMPTY = {}


local _M = {}

Expand Down Expand Up @@ -176,6 +178,24 @@ function _M.check_configuration_compatibility(cp, dp)
end
end

if cp.conf.wasm then
local dp_filters = dp.filters or EMPTY
local missing
for name in pairs(cp.filters or EMPTY) do
if not dp_filters[name] then
missing = missing or {}
table.insert(missing, name)
end
end

if missing then
local msg = "data plane is missing one or more wasm filters "
.. "(" .. table.concat(missing, ", ") .. ")"
return nil, msg, CLUSTERING_SYNC_STATUS.FILTER_SET_INCOMPATIBLE
end
end


return true, nil, CLUSTERING_SYNC_STATUS.NORMAL
end

Expand Down
18 changes: 13 additions & 5 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ function _M:handle_cp_websocket()

if self.deflated_reconfigure_payload then
-- initial configuration compatibility for sync status variable
_, _, sync_status = self:check_configuration_compatibility(
{ dp_plugins_map = dp_plugins_map, })
_, _, sync_status = self:check_configuration_compatibility({
dp_plugins_map = dp_plugins_map,
filters = data.filters,
})

table_insert(queue, RECONFIGURE_TYPE)
queue.post()
Expand Down Expand Up @@ -397,8 +399,11 @@ function _M:handle_cp_websocket()
assert(payload == RECONFIGURE_TYPE)

local previous_sync_status = sync_status
ok, err, sync_status = self:check_configuration_compatibility(
{ dp_plugins_map = dp_plugins_map, })
ok, err, sync_status = self:check_configuration_compatibility({
dp_plugins_map = dp_plugins_map,
filters = data.filters,
})

if not ok then
ngx_log(ngx_WARN, _log_prefix, "unable to send updated configuration to data plane: ", err, log_suffix)
if sync_status ~= previous_sync_status then
Expand Down Expand Up @@ -532,8 +537,9 @@ local function push_config_loop(premature, self, push_config_semaphore, delay)
end


function _M:init_worker(plugins_list)
function _M:init_worker(basic_info)
-- ROLE = "control_plane"
local plugins_list = basic_info.plugins
self.plugins_list = plugins_list
self.plugins_map = plugins_list_to_map(plugins_list)

Expand All @@ -547,6 +553,8 @@ function _M:init_worker(plugins_list)
self.plugin_versions[plugin.name] = plugin.version
end

self.filters = basic_info.filters

local push_config_semaphore = semaphore.new()

-- When "clustering", "push_config" worker event is received by a worker,
Expand Down
6 changes: 4 additions & 2 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ function _M.new(clustering)
end


function _M:init_worker(plugins_list)
function _M:init_worker(basic_info)
-- ROLE = "data_plane"

self.plugins_list = plugins_list
self.plugins_list = basic_info.plugins
self.filters = basic_info.filters

-- only run in process which worker_id() == 0
assert(ngx.timer.at(0, function(premature)
Expand Down Expand Up @@ -147,6 +148,7 @@ function _M:communicate(premature)
_, err = c:send_binary(cjson_encode({ type = "basic_info",
plugins = self.plugins_list,
process_conf = configuration,
filters = self.filters,
labels = labels, }))
if err then
ngx_log(ngx_ERR, _log_prefix, "unable to send basic information to control plane: ", uri,
Expand Down
16 changes: 14 additions & 2 deletions kong/clustering/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,27 @@ function _M:init_worker()
return { name = p.name, version = p.handler.VERSION, }
end, plugins_list)

local filters = {}
if kong.db.filter_chains.filters then
for _, filter in ipairs(kong.db.filter_chains.filters) do
filters[filter.name] = { name = filter.name }
end
end

local basic_info = {
plugins = plugins_list,
filters = filters,
}

local role = self.conf.role

if role == "control_plane" then
self:init_cp_worker(plugins_list)
self:init_cp_worker(basic_info)
return
end

if role == "data_plane" then
self:init_dp_worker(plugins_list)
self:init_dp_worker(basic_info)
end
end

Expand Down
1 change: 1 addition & 0 deletions kong/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ local constants = {
{ KONG_VERSION_INCOMPATIBLE = "kong_version_incompatible", },
{ PLUGIN_SET_INCOMPATIBLE = "plugin_set_incompatible", },
{ PLUGIN_VERSION_INCOMPATIBLE = "plugin_version_incompatible", },
{ FILTER_SET_INCOMPATIBLE = "filter_set_incompatible", },
},
CLUSTERING_TIMEOUT = 5000, -- 5 seconds
CLUSTERING_PING_INTERVAL = 30, -- 30 seconds
Expand Down
2 changes: 1 addition & 1 deletion spec/01-unit/01-db/01-schema/13-cluster_status_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe("plugins", function()
local ok, err = validate({ sync_status = "aaa", })
assert.is_nil(ok)

assert.equal("expected one of: unknown, normal, kong_version_incompatible, plugin_set_incompatible, plugin_version_incompatible", err.sync_status)
assert.equal("expected one of: unknown, normal, kong_version_incompatible, plugin_set_incompatible, plugin_version_incompatible, filter_set_incompatible", err.sync_status)
end)

it("accepts correct value", function()
Expand Down
Loading

0 comments on commit e3e90dc

Please sign in to comment.