Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(clustering) no concurrent inflating and json decoding on dps #8959

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 38 additions & 43 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,46 @@ function _M:communicate(premature)

local ping_immediately
local config_exit
local next_data

local config_thread = ngx.thread.spawn(function()
while not exiting() and not config_exit do
local ok, err = config_semaphore:wait(1)
if ok then
local config_table = self.next_config
if config_table then
local config_hash = self.next_hash
local hashes = self.next_hashes

local pok, res
pok, res, err = pcall(config_helper.update,
self.declarative_config, config_table, config_hash, hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
local data = next_data
if data then
local msg = assert(inflate_gzip(data))
yield()
msg = assert(cjson_decode(msg))
yield()

if msg.type == "reconfigure" then
if msg.timestamp then
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
msg.timestamp, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
end

ping_immediately = true
local config_table = assert(msg.config_table)
local pok, res
pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end
ping_immediately = true

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end

if self.next_config == config_table then
self.next_config = nil
if next_data == data then
next_data = nil
end
end
end

Expand Down Expand Up @@ -223,31 +237,12 @@ function _M:communicate(premature)
last_seen = ngx_time()

if typ == "binary" then
data = assert(inflate_gzip(data))
yield()

local msg = assert(cjson_decode(data))
yield()

if msg.type == "reconfigure" then
if msg.timestamp then
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
msg.timestamp, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
end

self.next_config = assert(msg.config_table)
self.next_hash = msg.config_hash
self.next_hashes = msg.hashes

if config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
config_semaphore:post()
end
next_data = data
if config_semaphore:count() <= 0 then
flrgh marked this conversation as resolved.
Show resolved Hide resolved
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
config_semaphore:post()
end

elseif typ == "pong" then
Expand Down Expand Up @@ -277,8 +272,8 @@ function _M:communicate(premature)
-- the config thread might be holding a lock if it's in the middle of an
-- update, so we need to give it a chance to terminate gracefully
config_exit = true
ok, err, perr = ngx.thread.wait(config_thread)

ok, err, perr = ngx.thread.wait(config_thread)
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

Expand Down
61 changes: 29 additions & 32 deletions kong/clustering/wrpc_data_plane.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

local semaphore = require("ngx.semaphore")
local declarative = require("kong.db.declarative")
local wrpc = require("kong.tools.wrpc")
Expand All @@ -10,7 +9,9 @@ local cjson = require("cjson.safe")
local utils = require("kong.tools.utils")
local assert = assert
local setmetatable = setmetatable
local tonumber = tonumber
local math = math
local traceback = debug.traceback
local xpcall = xpcall
local ngx = ngx
local ngx_log = ngx.log
Expand Down Expand Up @@ -65,14 +66,7 @@ local function get_config_service()
wrpc_config_service:set_handler("ConfigService.SyncConfig", function(peer, data)
-- yield between steps to prevent long delay
if peer.config_semaphore then
local json_config = assert(inflate_gzip(data.config))
yield()
peer.config_obj.next_config = assert(cjson_decode(json_config))
yield()

peer.config_obj.next_hash = data.config_hash
peer.config_obj.next_hashes = data.hashes
peer.config_obj.next_config_version = tonumber(data.version)
peer.config_obj.next_data = data
if peer.config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
Expand Down Expand Up @@ -148,30 +142,33 @@ function _M:communicate(premature)
peer.semaphore = nil
config_semaphore = nil
end
local config_table = self.next_config
local config_hash = self.next_hash
local config_version = self.next_config_version
local hashes = self.next_hashes
if config_table and config_version > last_config_version then
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)

local pok, res
pok, res, err = xpcall(config_helper.update, debug.traceback,
self.declarative_config, config_table, config_hash, hashes)
if pok then
last_config_version = config_version
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end
local data = self.next_data
if data then
local config_version = tonumber(data.version)
if config_version > last_config_version then
local config_table = assert(inflate_gzip(data.config))
yield()
config_table = assert(cjson_decode(config_table))
yield()
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)

local pok, res
pok, res, err = xpcall(config_helper.update, traceback, self.declarative_config,
config_table, data.config_hash, data.hashes)
if pok then
last_config_version = config_version
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end

if self.next_config == config_table then
self.next_config = nil
self.next_hash = nil
self.next_hashes = nil
if self.next_data == data then
self.next_data = nil
end
end
end

Expand Down Expand Up @@ -215,8 +212,8 @@ function _M:communicate(premature)
-- the config thread might be holding a lock if it's in the middle of an
-- update, so we need to give it a chance to terminate gracefully
config_exit = true
ok, err, perr = ngx.thread.wait(config_thread)

ok, err, perr = ngx.thread.wait(config_thread)
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

Expand Down