Skip to content

Commit

Permalink
perf(clustering) no concurrent inflating and json decoding on dps (#8959
Browse files Browse the repository at this point in the history
)

### Summary

Previously it could be that `read_thread` caused concurrent blocking when
inflating / json decoding incoming data. This commit moves it beyond the
semaphore to make it non-concurrent.
  • Loading branch information
bungle authored Jun 15, 2022
1 parent eec2835 commit 200f56e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 75 deletions.
81 changes: 38 additions & 43 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,46 @@ function _M:communicate(premature)

local ping_immediately
local config_exit
local next_data

local config_thread = ngx.thread.spawn(function()
while not exiting() and not config_exit do
local ok, err = config_semaphore:wait(1)
if ok then
local config_table = self.next_config
if config_table then
local config_hash = self.next_hash
local hashes = self.next_hashes

local pok, res
pok, res, err = pcall(config_helper.update,
self.declarative_config, config_table, config_hash, hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
local data = next_data
if data then
local msg = assert(inflate_gzip(data))
yield()
msg = assert(cjson_decode(msg))
yield()

if msg.type == "reconfigure" then
if msg.timestamp then
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
msg.timestamp, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
end

ping_immediately = true
local config_table = assert(msg.config_table)
local pok, res
pok, res, err = pcall(config_helper.update, self.declarative_config,
config_table, msg.config_hash, msg.hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end
ping_immediately = true

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end

if self.next_config == config_table then
self.next_config = nil
if next_data == data then
next_data = nil
end
end
end

Expand Down Expand Up @@ -223,31 +237,12 @@ function _M:communicate(premature)
last_seen = ngx_time()

if typ == "binary" then
data = assert(inflate_gzip(data))
yield()

local msg = assert(cjson_decode(data))
yield()

if msg.type == "reconfigure" then
if msg.timestamp then
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane with timestamp: ",
msg.timestamp, log_suffix)

else
ngx_log(ngx_DEBUG, _log_prefix, "received reconfigure frame from control plane", log_suffix)
end

self.next_config = assert(msg.config_table)
self.next_hash = msg.config_hash
self.next_hashes = msg.hashes

if config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
config_semaphore:post()
end
next_data = data
if config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
-- count is guaranteed to not exceed 1
config_semaphore:post()
end

elseif typ == "pong" then
Expand Down Expand Up @@ -277,8 +272,8 @@ function _M:communicate(premature)
-- the config thread might be holding a lock if it's in the middle of an
-- update, so we need to give it a chance to terminate gracefully
config_exit = true
ok, err, perr = ngx.thread.wait(config_thread)

ok, err, perr = ngx.thread.wait(config_thread)
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

Expand Down
61 changes: 29 additions & 32 deletions kong/clustering/wrpc_data_plane.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

local semaphore = require("ngx.semaphore")
local declarative = require("kong.db.declarative")
local wrpc = require("kong.tools.wrpc")
Expand All @@ -10,7 +9,9 @@ local cjson = require("cjson.safe")
local utils = require("kong.tools.utils")
local assert = assert
local setmetatable = setmetatable
local tonumber = tonumber
local math = math
local traceback = debug.traceback
local xpcall = xpcall
local ngx = ngx
local ngx_log = ngx.log
Expand Down Expand Up @@ -65,14 +66,7 @@ local function get_config_service()
wrpc_config_service:set_handler("ConfigService.SyncConfig", function(peer, data)
-- yield between steps to prevent long delay
if peer.config_semaphore then
local json_config = assert(inflate_gzip(data.config))
yield()
peer.config_obj.next_config = assert(cjson_decode(json_config))
yield()

peer.config_obj.next_hash = data.config_hash
peer.config_obj.next_hashes = data.hashes
peer.config_obj.next_config_version = tonumber(data.version)
peer.config_obj.next_data = data
if peer.config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
-- because `:count` will never yield, end result is that the semaphore
Expand Down Expand Up @@ -148,30 +142,33 @@ function _M:communicate(premature)
peer.semaphore = nil
config_semaphore = nil
end
local config_table = self.next_config
local config_hash = self.next_hash
local config_version = self.next_config_version
local hashes = self.next_hashes
if config_table and config_version > last_config_version then
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)

local pok, res
pok, res, err = xpcall(config_helper.update, debug.traceback,
self.declarative_config, config_table, config_hash, hashes)
if pok then
last_config_version = config_version
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end
local data = self.next_data
if data then
local config_version = tonumber(data.version)
if config_version > last_config_version then
local config_table = assert(inflate_gzip(data.config))
yield()
config_table = assert(cjson_decode(config_table))
yield()
ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix)

local pok, res
pok, res, err = xpcall(config_helper.update, traceback, self.declarative_config,
config_table, data.config_hash, data.hashes)
if pok then
last_config_version = config_version
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
end

else
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res)
end

if self.next_config == config_table then
self.next_config = nil
self.next_hash = nil
self.next_hashes = nil
if self.next_data == data then
self.next_data = nil
end
end
end

Expand Down Expand Up @@ -215,8 +212,8 @@ function _M:communicate(premature)
-- the config thread might be holding a lock if it's in the middle of an
-- update, so we need to give it a chance to terminate gracefully
config_exit = true
ok, err, perr = ngx.thread.wait(config_thread)

ok, err, perr = ngx.thread.wait(config_thread)
if not ok then
ngx_log(ngx_ERR, _log_prefix, err, log_suffix)

Expand Down

0 comments on commit 200f56e

Please sign in to comment.