diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f524cbc694..07aafd4ee75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,13 @@ [#9603](https://github.com/Kong/kong/pull/9603) - **Template**: Do not add default charset to the `Content-Type` response header when upstream response doesn't contain it. [#9905](https://github.com/Kong/kong/pull/9905) +- Fix an issue where after a valid declarative configuration is loaded, + the configuration hash is incorrectly set to the value: `00000000000000000000000000000000`. + [#9911](https://github.com/Kong/kong/pull/9911) +- Update the batch queues module so that queues no longer grow without bounds if + their consumers fail to process the entries. Instead, old batches are now dropped + and an error is logged. + [#10046](https://github.com/Kong/kong/pull/10046) #### Plugins @@ -106,12 +113,6 @@ - **JWT**: Deny requests that have different tokens in the jwt token search locations. Thanks Jackson 'Che-Chun' Kuo from Latacora for reporting this issue. [#9946](https://github.com/Kong/kong/pull/9946) -#### Core - -- Fix an issue where after a valid declarative configuration is loaded, - the configuration hash is incorrectly set to the value: `00000000000000000000000000000000`. - [#9911](https://github.com/Kong/kong/pull/9911) - ### Dependencies - Bumped luarocks from 3.9.1 to 3.9.2 diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 886cfa113ed..766199e6373 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -550,6 +550,8 @@ local CONF_INFERENCES = { proxy_server = { typ = "string" }, proxy_server_ssl_verify = { typ = "boolean" }, + + max_queued_batches = { typ = "number" }, } diff --git a/kong/plugins/datadog/handler.lua b/kong/plugins/datadog/handler.lua index e1757005f09..18dcb422e5a 100644 --- a/kong/plugins/datadog/handler.lua +++ b/kong/plugins/datadog/handler.lua @@ -133,7 +133,7 @@ function DatadogHandler:log(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("datadog", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index de5b0f96db1..b2461852c9a 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -171,7 +171,7 @@ function HttpLogHandler:log(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("http-log", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/plugins/opentelemetry/handler.lua b/kong/plugins/opentelemetry/handler.lua index 5b0b00a2e6c..988ddedb168 100644 --- a/kong/plugins/opentelemetry/handler.lua +++ b/kong/plugins/opentelemetry/handler.lua @@ -157,7 +157,7 @@ function OpenTelemetryHandler:log(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("opentelemetry", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/plugins/statsd/log.lua b/kong/plugins/statsd/log.lua index 5b68f287b4e..290aae1c7ea 100644 --- a/kong/plugins/statsd/log.lua +++ b/kong/plugins/statsd/log.lua @@ -374,7 +374,7 @@ function _M.execute(conf) } local err - q, err = BatchQueue.new(process, opts) + q, err = BatchQueue.new("statsd", process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/tools/batch_queue.lua b/kong/tools/batch_queue.lua index 8eaf5ae56ef..92322905a22 100644 --- a/kong/tools/batch_queue.lua +++ b/kong/tools/batch_queue.lua @@ -24,12 +24,14 @@ -- end -- -- local q = BatchQueue.new( +-- name, -- name of the queue for identification purposes in the log -- process, -- function used to "process/consume" values from the queue -- { -- Opts table with control values. Defaults shown: --- retry_count = 0, -- number of times to retry processing --- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing --- process_delay = 1, -- in seconds, how often the current batch is closed & queued --- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- retry_count = 0, -- number of times to retry processing +-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing +-- process_delay = 1, -- in seconds, how often the current batch is closed & queued +-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued +-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued -- } -- ) -- @@ -68,11 +70,9 @@ local timer_at = ngx.timer.at local remove = table.remove local type = type local huge = math.huge -local fmt = string.format local min = math.min local now = ngx.now local ERR = ngx.ERR -local ngx_log = ngx.log local DEBUG = ngx.DEBUG local WARN = ngx.WARN @@ -100,10 +100,10 @@ local process local function schedule_flush(self) local ok, err = timer_at(self.flush_timeout/1000, flush, self) if not ok then - ngx_log(ERR, "failed to create delayed flush timer: ", err) + self:log(ERR, "failed to create delayed flush timer: %s", err) return end - --ngx_log(DEBUG, "delayed timer created") + --self:log(DEBUG, "delayed timer created") self.flush_scheduled = true end @@ -113,10 +113,10 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @param delay number: timer delay in seconds -local function schedule_process(self, batch, delay) - local ok, err = timer_at(delay, process, self, batch) +local function schedule_process(self, delay) + local ok, err = timer_at(delay, process, self) if not ok then - ngx_log(ERR, "failed to create process timer: ", err) + self:log(ERR, "failed to create process timer: %s", err) return end self.process_scheduled = true @@ -147,13 +147,13 @@ flush = function(premature, self) if get_now() - self.last_t < self.flush_timeout then -- flushing reported: we had activity - ngx_log(DEBUG, "[flush] queue had activity, delaying flush") + self:log(DEBUG, "[flush] queue had activity, delaying flush") schedule_flush(self) return end -- no activity and timeout reached - ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") + self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") self:flush() self.flush_scheduled = false end @@ -165,27 +165,31 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @return nothing -process = function(premature, self, batch) +process = function(premature, self) if premature then return end + local batch = self.batch_queue[1] + if not batch then + self:log(WARN, "queue process called but no batches to be processed") + return + end + local next_retry_delay local ok, err = self.process(batch.entries) if ok then -- success, reset retry delays self.retry_delay = 1 next_retry_delay = 0 - + remove(self.batch_queue, 1) else batch.retries = batch.retries + 1 if batch.retries < self.retry_count then - ngx_log(WARN, "failed to process entries: ", tostring(err)) - -- queue our data for processing again, at the end of the queue - self.batch_queue[#self.batch_queue + 1] = batch + self:log(WARN, "failed to process entries: %s", tostring(err)) else - ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it", - batch.retries)) + self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries) + remove(self.batch_queue, 1) end self.retry_delay = self.retry_delay + 1 @@ -193,10 +197,8 @@ process = function(premature, self, batch) end if #self.batch_queue > 0 then -- more to process? - ngx_log(DEBUG, fmt("processing oldest data, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, next_retry_delay) + self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue) + schedule_process(self, next_retry_delay) return end @@ -218,13 +220,15 @@ end -- @param opts table, optionally including -- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay` -- @return table: a Queue object. -function Queue.new(process, opts) +function Queue.new(name, process, opts) opts = opts or {} + assert(type(name) == "string", + "arg #1 (name) must be a string") assert(type(process) == "function", - "arg #1 (process) must be a function") + "arg #2 (process) must be a function") assert(type(opts) == "table", - "arg #2 (opts) must be a table") + "arg #3 (opts) must be a table") assert(opts.retry_count == nil or type(opts.retry_count) == "number", "retry_count must be a number") assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number", @@ -233,8 +237,11 @@ function Queue.new(process, opts) "batch_max_size must be a number") assert(opts.process_delay == nil or type(opts.batch_max_size) == "number", "process_delay must be a number") + assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number", + "max_queued_batches must be a number") local self = { + name = name, process = process, -- flush timeout in milliseconds @@ -242,6 +249,7 @@ function Queue.new(process, opts) retry_count = opts.retry_count or 0, batch_max_size = opts.batch_max_size or 1000, process_delay = opts.process_delay or 1, + max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100, retry_delay = 1, @@ -258,6 +266,17 @@ function Queue.new(process, opts) end +------------------------------------------------------------------------------- +-- Log a message that includes the name of the queue for identification purposes +-- @param self Queue +-- @param level: log level +-- @param formatstring: format string, will get the queue name and ": " prepended +-- @param ...: formatter arguments +function Queue:log(level, formatstring, ...) + return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...}))) +end + + ------------------------------------------------------------------------------- -- Add data to the queue -- @param entry the value included in the queue. It can be any Lua value besides nil. @@ -269,8 +288,8 @@ function Queue:add(entry) if self.batch_max_size == 1 then -- no batching - local batch = { entries = { entry }, retries = 0 } - schedule_process(self, batch, 0) + self.batch_queue = { { entries = { entry }, retries = 0 } } + schedule_process(self, 0) return true end @@ -304,8 +323,12 @@ function Queue:flush() -- Queue the current batch, if it has at least 1 entry if current_batch_size > 0 then - ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)") + self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size) + while #self.batch_queue >= self.max_queued_batches do + self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches) + remove(self.batch_queue, 1) + end self.batch_queue[#self.batch_queue + 1] = self.current_batch self.current_batch = { entries = {}, retries = 0 } end @@ -314,10 +337,8 @@ function Queue:flush() -- in the future. This will keep calling itself in the future until -- the queue is empty if #self.batch_queue > 0 and not self.process_scheduled then - ngx_log(DEBUG, fmt("processing oldest entry, %d still queued", - #self.batch_queue - 1)) - local oldest_batch = remove(self.batch_queue, 1) - schedule_process(self, oldest_batch, self.process_delay) + self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue) + schedule_process(self, self.process_delay) end return true diff --git a/spec/01-unit/27-batch_queue_spec.lua b/spec/01-unit/27-batch_queue_spec.lua new file mode 100644 index 00000000000..c15a6290227 --- /dev/null +++ b/spec/01-unit/27-batch_queue_spec.lua @@ -0,0 +1,30 @@ + +local BatchQueue = require "kong.tools.batch_queue" + +describe("batch queue", function() + + it("observes the limit parameter", function() + local count = 0 + local last + local function process(entries) + count = count + #entries + last = entries[#entries] + return true + end + + local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0}) + + q:add(1) + q:flush() + q:add(2) + q:flush() + q:add(3) + q:flush() + + -- run scheduled timer tasks + ngx.sleep(0) + + assert.equal(2, count) + assert.equal(3, last) + end) +end)