From c04510440ac1c4fade693565ae9e6028e9c200d3 Mon Sep 17 00:00:00 2001 From: sshniro Date: Wed, 29 Apr 2020 09:39:02 +0200 Subject: [PATCH 1/4] Adding function to remove stale objects from kafka logger --- apisix/plugins/kafka-logger.lua | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index a9050b9d6080..9bd7d32c8f7b 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -22,6 +22,8 @@ local pairs = pairs local type = type local table = table local plugin_name = "kafka-logger" +local stale_timer_running = false; +local timer_at = ngx.timer.at local ngx = ngx local buffers = {} @@ -90,6 +92,21 @@ local function send_kafka_data(conf, log_message) end +local function remove_stale_objects(premature, log_buffer, status) + if premature then + return + end + + for key, batch in ipairs(log_buffer) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.debug("removing batch processor stale object, route id:" .. tostring(key)) + log_buffer[key] = nil + end + end + status = false +end + + function _M.log(conf) local entry = log_util.get_full_log(ngx) @@ -100,6 +117,12 @@ function _M.log(conf) local log_buffer = buffers[entry.route_id] + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects, buffers, stale_timer_running) + stale_timer_running = true + end + if log_buffer then log_buffer:push(entry) return From ef946244aa36d3389ea84c9db2a2873a5bbba528 Mon Sep 17 00:00:00 2001 From: sshniro Date: Wed, 29 Apr 2020 09:41:29 +0200 Subject: [PATCH 2/4] Using local variables --- apisix/plugins/kafka-logger.lua | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 9bd7d32c8f7b..c7a6a72d5242 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -92,18 +92,19 @@ local function send_kafka_data(conf, log_message) end -local function remove_stale_objects(premature, log_buffer, status) +local function remove_stale_objects(premature) if premature then return end - for key, batch in ipairs(log_buffer) do + for key, batch in ipairs(buffers) do if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then core.log.debug("removing batch processor stale object, route id:" .. tostring(key)) - log_buffer[key] = nil + buffers[key] = nil end end - status = false + + stale_timer_running = false end @@ -119,7 +120,7 @@ function _M.log(conf) if not stale_timer_running then -- run the timer every 30 mins if any log is present - timer_at(1800, remove_stale_objects, buffers, stale_timer_running) + timer_at(1800, remove_stale_objects) stale_timer_running = true end From b0c3a466b49fa677e81554b15d244307019e74bf Mon Sep 17 00:00:00 2001 From: sshniro Date: Wed, 29 Apr 2020 10:09:27 +0200 Subject: [PATCH 3/4] Refactoring --- apisix/plugins/kafka-logger.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index c7a6a72d5242..a8ab92cfcdba 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -21,9 +21,11 @@ local batch_processor = require("apisix.utils.batch-processor") local pairs = pairs local type = type local table = table +local ipairs = ipairs local plugin_name = "kafka-logger" local stale_timer_running = false; local timer_at = ngx.timer.at +local tostring = tostring local ngx = ngx local buffers = {} @@ -99,7 +101,8 @@ local function remove_stale_objects(premature) for key, batch in ipairs(buffers) do if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then - core.log.debug("removing batch processor stale object, route id:" .. tostring(key)) + core.log.debug("removing batch processor stale object," .. + " route id:" .. tostring(key)) buffers[key] = nil end end From 2936ad8ea4b723fe4ce8e194205a9a120c7e3aa2 Mon Sep 17 00:00:00 2001 From: sshniro Date: Wed, 6 May 2020 08:53:25 +0200 Subject: [PATCH 4/4] Refactoring string concatenation --- apisix/plugins/kafka-logger.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index a8ab92cfcdba..1641c5e17bfe 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -101,8 +101,7 @@ local function remove_stale_objects(premature) for key, batch in ipairs(buffers) do if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then - core.log.debug("removing batch processor stale object," .. - " route id:" .. tostring(key)) + core.log.debug("removing batch processor stale object, route id:", tostring(key)) buffers[key] = nil end end