Skip to content

Commit

Permalink
Revert "fix(*): prevent queues from growing without bounds (#10046)"
Browse files Browse the repository at this point in the history
This reverts commit 2f45e7a.
  • Loading branch information
Hans Hübner authored and fffonion committed Jan 17, 2023
1 parent fffb4e8 commit 218cc0a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 94 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@
- Fix an issue where after a valid declarative configuration is loaded,
the configuration hash is incorrectly set to the value: `00000000000000000000000000000000`.
[#9911](https://github.com/Kong/kong/pull/9911)
- Update the batch queues module so that queues no longer grow without bounds if
their consumers fail to process the entries. Instead, old batches are now dropped
and an error is logged.
[#10046](https://github.com/Kong/kong/pull/10046)
- tls protocol upstream support upstream tls config
[#9947](https://github.com/Kong/kong/pull/9947)
Expand Down Expand Up @@ -143,6 +140,12 @@
and disable the wRPC protocol.
[#9921](https://github.com/Kong/kong/pull/9921)

#### Core

- Fix an issue where after a valid declarative configuration is loaded,
the configuration hash is incorrectly set to the value: `00000000000000000000000000000000`.
[#9911](https://github.com/Kong/kong/pull/9911)

### Dependencies

- Bumped luarocks from 3.9.1 to 3.9.2
Expand Down
2 changes: 0 additions & 2 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,6 @@ local CONF_INFERENCES = {

proxy_server = { typ = "string" },
proxy_server_ssl_verify = { typ = "boolean" },

max_queued_batches = { typ = "number" },
}


Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/datadog/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function DatadogHandler:log(conf)
}

local err
q, err = BatchQueue.new("datadog", process, opts)
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/http-log/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ function HttpLogHandler:log(conf)
}

local err
q, err = BatchQueue.new("http-log", process, opts)
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/opentelemetry/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ function OpenTelemetryHandler:log(conf)
}

local err
q, err = BatchQueue.new("opentelemetry", process, opts)
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/statsd/log.lua
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ function _M.execute(conf)
}

local err
q, err = BatchQueue.new("statsd", process, opts)
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
89 changes: 34 additions & 55 deletions kong/tools/batch_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
-- end
--
-- local q = BatchQueue.new(
-- name, -- name of the queue for identification purposes in the log
-- process, -- function used to "process/consume" values from the queue
-- { -- Opts table with control values. Defaults shown:
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- }
-- )
--
Expand Down Expand Up @@ -70,9 +68,11 @@ local timer_at = ngx.timer.at
local remove = table.remove
local type = type
local huge = math.huge
local fmt = string.format
local min = math.min
local now = ngx.now
local ERR = ngx.ERR
local ngx_log = ngx.log
local DEBUG = ngx.DEBUG
local WARN = ngx.WARN

Expand Down Expand Up @@ -100,10 +100,10 @@ local process
local function schedule_flush(self)
local ok, err = timer_at(self.flush_timeout/1000, flush, self)
if not ok then
self:log(ERR, "failed to create delayed flush timer: %s", err)
ngx_log(ERR, "failed to create delayed flush timer: ", err)
return
end
--self:log(DEBUG, "delayed timer created")
--ngx_log(DEBUG, "delayed timer created")
self.flush_scheduled = true
end

Expand All @@ -113,10 +113,10 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @param delay number: timer delay in seconds
local function schedule_process(self, delay)
local ok, err = timer_at(delay, process, self)
local function schedule_process(self, batch, delay)
local ok, err = timer_at(delay, process, self, batch)
if not ok then
self:log(ERR, "failed to create process timer: %s", err)
ngx_log(ERR, "failed to create process timer: ", err)
return
end
self.process_scheduled = true
Expand Down Expand Up @@ -147,13 +147,13 @@ flush = function(premature, self)

if get_now() - self.last_t < self.flush_timeout then
-- flushing reported: we had activity
self:log(DEBUG, "[flush] queue had activity, delaying flush")
ngx_log(DEBUG, "[flush] queue had activity, delaying flush")
schedule_flush(self)
return
end

-- no activity and timeout reached
self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
self:flush()
self.flush_scheduled = false
end
Expand All @@ -165,40 +165,38 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @return nothing
process = function(premature, self)
process = function(premature, self, batch)
if premature then
return
end

local batch = self.batch_queue[1]
if not batch then
self:log(WARN, "queue process called but no batches to be processed")
return
end

local next_retry_delay

local ok, err = self.process(batch.entries)
if ok then -- success, reset retry delays
self.retry_delay = 1
next_retry_delay = 0
remove(self.batch_queue, 1)

else
batch.retries = batch.retries + 1
if batch.retries < self.retry_count then
self:log(WARN, "failed to process entries: %s", tostring(err))
ngx_log(WARN, "failed to process entries: ", tostring(err))
-- queue our data for processing again, at the end of the queue
self.batch_queue[#self.batch_queue + 1] = batch
else
self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries)
remove(self.batch_queue, 1)
ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it",
batch.retries))
end

self.retry_delay = self.retry_delay + 1
next_retry_delay = min(RETRY_MAX_DELAY, self.retry_delay * self.retry_delay)
end

if #self.batch_queue > 0 then -- more to process?
self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue)
schedule_process(self, next_retry_delay)
ngx_log(DEBUG, fmt("processing oldest data, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, next_retry_delay)
return
end

Expand All @@ -220,15 +218,13 @@ end
-- @param opts table, optionally including
-- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay`
-- @return table: a Queue object.
function Queue.new(name, process, opts)
function Queue.new(process, opts)
opts = opts or {}

assert(type(name) == "string",
"arg #1 (name) must be a string")
assert(type(process) == "function",
"arg #2 (process) must be a function")
"arg #1 (process) must be a function")
assert(type(opts) == "table",
"arg #3 (opts) must be a table")
"arg #2 (opts) must be a table")
assert(opts.retry_count == nil or type(opts.retry_count) == "number",
"retry_count must be a number")
assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number",
Expand All @@ -237,19 +233,15 @@ function Queue.new(name, process, opts)
"batch_max_size must be a number")
assert(opts.process_delay == nil or type(opts.batch_max_size) == "number",
"process_delay must be a number")
assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number",
"max_queued_batches must be a number")

local self = {
name = name,
process = process,

-- flush timeout in milliseconds
flush_timeout = opts.flush_timeout and opts.flush_timeout * 1000 or 2000,
retry_count = opts.retry_count or 0,
batch_max_size = opts.batch_max_size or 1000,
process_delay = opts.process_delay or 1,
max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100,

retry_delay = 1,

Expand All @@ -266,17 +258,6 @@ function Queue.new(name, process, opts)
end


-------------------------------------------------------------------------------
-- Log a message that includes the name of the queue for identification purposes
-- @param self Queue
-- @param level: log level
-- @param formatstring: format string, will get the queue name and ": " prepended
-- @param ...: formatter arguments
function Queue:log(level, formatstring, ...)
return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...})))
end


-------------------------------------------------------------------------------
-- Add data to the queue
-- @param entry the value included in the queue. It can be any Lua value besides nil.
Expand All @@ -288,8 +269,8 @@ function Queue:add(entry)

if self.batch_max_size == 1 then
-- no batching
self.batch_queue = { { entries = { entry }, retries = 0 } }
schedule_process(self, 0)
local batch = { entries = { entry }, retries = 0 }
schedule_process(self, batch, 0)
return true
end

Expand Down Expand Up @@ -323,12 +304,8 @@ function Queue:flush()

-- Queue the current batch, if it has at least 1 entry
if current_batch_size > 0 then
self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size)
ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)")

while #self.batch_queue >= self.max_queued_batches do
self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches)
remove(self.batch_queue, 1)
end
self.batch_queue[#self.batch_queue + 1] = self.current_batch
self.current_batch = { entries = {}, retries = 0 }
end
Expand All @@ -337,8 +314,10 @@ function Queue:flush()
-- in the future. This will keep calling itself in the future until
-- the queue is empty
if #self.batch_queue > 0 and not self.process_scheduled then
self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue)
schedule_process(self, self.process_delay)
ngx_log(DEBUG, fmt("processing oldest entry, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, self.process_delay)
end

return true
Expand Down
30 changes: 0 additions & 30 deletions spec/01-unit/27-batch_queue_spec.lua

This file was deleted.

1 comment on commit 218cc0a

@khcp-gha-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:218cc0a81500986b0b94e6b4e350302326097018
Artifacts available https://github.com/Kong/kong/actions/runs/3938244689

Please sign in to comment.