From 96e028cc4966a2975577afc9d0d0e9863aea1fa1 Mon Sep 17 00:00:00 2001 From: windmgc Date: Mon, 6 Feb 2023 14:01:02 +0800 Subject: [PATCH 1/9] make ttl cleanup timer configurable --- kong.conf.default | 5 +++++ kong/conf_loader/init.lua | 11 +++++++++++ kong/db/strategies/postgres/connector.lua | 3 ++- kong/templates/kong_defaults.lua | 1 + spec/01-unit/03-conf_loader_spec.lua | 16 ++++++++++++++++ 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/kong.conf.default b/kong.conf.default index 6d77e84fac05..52c0b59da4e3 100644 --- a/kong.conf.default +++ b/kong.conf.default @@ -1202,6 +1202,11 @@ G# ----------------------- # If not specified, then number of open connections # to the Postgres server is not limited. +#pg_expired_rows_cleanup_interval = 60 # This value will control the interval of + # running cleanup on the expired rows(whose + # ttl < CURRENT_TIMESTAMP) in the Postgres + # database. + #pg_ro_host = # Same as `pg_host`, but for the # read-only connection. # **Note:** Refer to the documentation diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index 53082b15f424..5cd2206d1628 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -360,6 +360,7 @@ local CONF_PARSERS = { pg_keepalive_timeout = { typ = "number" }, pg_pool_size = { typ = "number" }, pg_backlog = { typ = "number" }, + pg_expired_rows_cleanup_interval = { typ = "number" }, pg_ro_port = { typ = "number" }, pg_ro_timeout = { typ = "number" }, @@ -1020,6 +1021,16 @@ local function check_and_parse(conf, opts) end end + if conf.pg_expired_rows_cleanup_interval then + if conf.pg_expired_rows_cleanup_interval < 0 then + errors[#errors + 1] = "pg_expired_rows_cleanup_interval must be greater than 0" + end + + if conf.pg_expired_rows_cleanup_interval ~= floor(conf.pg_expired_rows_cleanup_interval) then + errors[#errors + 1] = "pg_expired_rows_cleanup_interval must be an integer greater than 0" + end + end + if conf.pg_ro_max_concurrent_queries then if conf.pg_ro_max_concurrent_queries < 0 then errors[#errors + 1] = "pg_ro_max_concurrent_queries must be greater than 0" diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 1a5c6a4540bd..fc2d6ca48e43 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -337,7 +337,7 @@ function _mt:init_worker(strategies) local cleanup_statement = concat(cleanup_statements, "\n") - return timer_every(60, function(premature) + return timer_every(self.config.expired_rows_cleanup_interval, function(premature) if premature then return end @@ -934,6 +934,7 @@ function _M.new(kong_config) sem_timeout = (kong_config.pg_semaphore_timeout or 60000) / 1000, pool_size = kong_config.pg_pool_size, backlog = kong_config.pg_backlog, + expired_rows_cleanup_interval = kong_config.pg_expired_rows_cleanup_interval or 60, --- not used directly by pgmoon, but used internally in connector to set the keepalive timeout keepalive_timeout = kong_config.pg_keepalive_timeout, diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index bf8a58336991..4bae31a49b14 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -110,6 +110,7 @@ pg_semaphore_timeout = 60000 pg_keepalive_timeout = NONE pg_pool_size = NONE pg_backlog = NONE +pg_expired_rows_cleanup_interval = 60 pg_ro_host = NONE pg_ro_port = NONE diff --git a/spec/01-unit/03-conf_loader_spec.lua b/spec/01-unit/03-conf_loader_spec.lua index 8ad8859ea758..14929a669bdb 100644 --- a/spec/01-unit/03-conf_loader_spec.lua +++ b/spec/01-unit/03-conf_loader_spec.lua @@ -1506,6 +1506,22 @@ describe("Configuration loader", function() assert.is_nil(conf) assert.equal("pg_backlog must be an integer greater than 0", err) end) + + it("rejects a pg_expired_rows_cleanup_interval with a negative number", function() + local conf, err = conf_loader(nil, { + pg_expired_rows_cleanup_interval = -1, + }) + assert.is_nil(conf) + assert.equal("pg_expired_rows_cleanup_interval must be greater than 0", err) + end) + + it("rejects a pg_expired_rows_cleanup_interval with a decimal", function() + local conf, err = conf_loader(nil, { + pg_expired_rows_cleanup_interval = 0.1, + }) + assert.is_nil(conf) + assert.equal("pg_expired_rows_cleanup_interval must be an integer greater than 0", err) + end) end) describe("pg read-only connection pool options", function() From 6828a9b00e6ac1fa3ef2edebd388000539dec180 Mon Sep 17 00:00:00 2001 From: windmgc Date: Fri, 17 Feb 2023 15:44:57 +0800 Subject: [PATCH 2/9] - split cleanup table by table so that table name can be shown in error log - add cluster mutex to limit concurrency across traditional cluster --- kong/db/strategies/postgres/connector.lua | 96 ++++++++++++++--------- 1 file changed, 58 insertions(+), 38 deletions(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index fc2d6ca48e43..bdbd3f048a1f 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -6,6 +6,8 @@ local stringx = require "pl.stringx" local semaphore = require "ngx.semaphore" local kong_global = require "kong.global" local constants = require "kong.constants" +local knode = kong and kong.node + or require "kong.pdk.node".new() local setmetatable = setmetatable @@ -51,6 +53,7 @@ local OPERATIONS = { } local ADMIN_API_PHASE = kong_global.phases.admin_api local CORE_ENTITIES = constants.CORE_ENTITIES +local EXPIRED_ROWS_CLEANUP_LOCK_KEY = "db_cluster_expired_rows_cleanup" local function now_updated() @@ -314,55 +317,72 @@ function _mt:init() end -function _mt:init_worker(strategies) - if ngx.worker.id() == 0 then +local function cleanup_expired_rows_in_table(connector, table_name) + local ttl_escaped = connector:escape_identifier("ttl") + local expired_at_escaped = connector:escape_identifier("expired_at") + local column_name = table_name == "cluster_events" and expired_at_escaped + or ttl_escaped - local table_names = get_names_of_tables_with_ttl(strategies) - local ttl_escaped = self:escape_identifier("ttl") - local expire_at_escaped = self:escape_identifier("expire_at") - local cleanup_statements = {} - local cleanup_statements_count = #table_names - for i = 1, cleanup_statements_count do - local table_name = table_names[i] - local column_name = table_name == "cluster_events" and expire_at_escaped - or ttl_escaped - cleanup_statements[i] = concat { - " DELETE FROM ", - self:escape_identifier(table_name), - " WHERE ", - column_name, - " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';" - } + local cleanup_statement = concat { + "DELETE FROM ", + connector:escape_identifier(table_name), + " WHERE ", + column_name, + " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';", + } + + local ok, err = connector:query(cleanup_statement) + if not ok then + if err then + log(WARN, "failed to clean expired rows from table '", + table_name, "' on PostgreSQL database (", + err, ")") + else + log(WARN, "failed to clean expired rows from table '", + table_name, "' on PostgreSQL database") end - local cleanup_statement = concat(cleanup_statements, "\n") + return ok, err + end +end + + +function _mt:init_worker(strategies) + if ngx.worker.id() == 0 then + local table_names = get_names_of_tables_with_ttl(strategies) return timer_every(self.config.expired_rows_cleanup_interval, function(premature) if premature then return end - local ok, err, _, num_queries = self:query(cleanup_statement) + local locked, err = self:read_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY) + if err then + log(ERR, "unable to read lock for expired rows cleanup: ", err) + return + end + + if locked then + log(WARN, "expired rows cleanup already running on another node, skipping") + return + end + + local id = knode.get_id() or "unknown" + local ok, err = self:insert_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY, self.config.expired_rows_cleanup_interval, id) if not ok then - if num_queries then - for i = num_queries + 1, cleanup_statements_count do - local statement = cleanup_statements[i] - local ok, err = self:query(statement) - if not ok then - if err then - log(WARN, "unable to clean expired rows from table '", - table_names[i], "' on PostgreSQL database (", - err, ")") - else - log(WARN, "unable to clean expired rows from table '", - table_names[i], "' on PostgreSQL database") - end - end - end + log(WARN, "unable to acquire lock for expired rows cleanup: ", err) + return + end - else - log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")") - end + -- Cleanup tables sequentially + for _, table_name in ipairs(table_names) do + cleanup_expired_rows_in_table(self, table_name) + end + + local _, err = self:remove_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY, id) + if err then + -- do nothing and wait for the next timer to clean up and retry + log(WARN, "unable to remove lock for expired rows cleanup: ", err) end end) end From a58fdae9b39190951c4073feccaf0653a161b6f3 Mon Sep 17 00:00:00 2001 From: windmgc Date: Fri, 17 Feb 2023 16:04:00 +0800 Subject: [PATCH 3/9] fix cluster_events expire_at field name --- kong/db/strategies/postgres/connector.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index bdbd3f048a1f..8ad35a19356f 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -319,7 +319,7 @@ end local function cleanup_expired_rows_in_table(connector, table_name) local ttl_escaped = connector:escape_identifier("ttl") - local expired_at_escaped = connector:escape_identifier("expired_at") + local expired_at_escaped = connector:escape_identifier("expire_at") local column_name = table_name == "cluster_events" and expired_at_escaped or ttl_escaped From 9b2cc51fc9e926dacc783bcdb23bf27ca447a1bb Mon Sep 17 00:00:00 2001 From: windmgc Date: Mon, 20 Feb 2023 13:46:49 +0800 Subject: [PATCH 4/9] add debug log for time elapsed --- kong/db/strategies/postgres/connector.lua | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 8ad35a19356f..f4a1175428a6 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -36,6 +36,7 @@ local insert = table.insert local WARN = ngx.WARN local ERR = ngx.ERR +local DEBUG = ngx.DEBUG local SQL_INFORMATION_SCHEMA_TABLES = [[ SELECT table_name FROM information_schema.tables @@ -318,6 +319,7 @@ end local function cleanup_expired_rows_in_table(connector, table_name) + local time1 = now() local ttl_escaped = connector:escape_identifier("ttl") local expired_at_escaped = connector:escape_identifier("expire_at") local column_name = table_name == "cluster_events" and expired_at_escaped @@ -332,6 +334,10 @@ local function cleanup_expired_rows_in_table(connector, table_name) } local ok, err = connector:query(cleanup_statement) + local time2 = now() + local time_elapsed = tonumber(fmt("%.3f", time2 - time1)) + log(DEBUG, "cleaning up expired rows from table '", table_name, + "' took ", time_elapsed, " seconds") if not ok then if err then log(WARN, "failed to clean expired rows from table '", From ff84908b6d0730bccd7e5c411e6d5e1fa8706a7e Mon Sep 17 00:00:00 2001 From: windmgc Date: Tue, 21 Feb 2023 12:21:46 +0800 Subject: [PATCH 5/9] create new connection on each table cleanup --- kong/db/strategies/postgres/connector.lua | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index f4a1175428a6..3f752762e0e6 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -9,6 +9,7 @@ local constants = require "kong.constants" local knode = kong and kong.node or require "kong.pdk.node".new() +local tablex = require "pl.tablex" local setmetatable = setmetatable local encode_array = arrays.encode_array @@ -55,6 +56,7 @@ local OPERATIONS = { local ADMIN_API_PHASE = kong_global.phases.admin_api local CORE_ENTITIES = constants.CORE_ENTITIES local EXPIRED_ROWS_CLEANUP_LOCK_KEY = "db_cluster_expired_rows_cleanup" +local EMPTY_T = tablex.readonly {} local function now_updated() @@ -318,7 +320,10 @@ function _mt:init() end -local function cleanup_expired_rows_in_table(connector, table_name) +local function cleanup_expired_rows_in_table(config, table_name) + -- Create new connection on each table to avoid reusing a connection that might be already timed out + local connector = connect(config) + local time1 = now() local ttl_escaped = connector:escape_identifier("ttl") local expired_at_escaped = connector:escape_identifier("expire_at") @@ -347,15 +352,15 @@ local function cleanup_expired_rows_in_table(connector, table_name) log(WARN, "failed to clean expired rows from table '", table_name, "' on PostgreSQL database") end - - return ok, err end + + connector:disconnect() end function _mt:init_worker(strategies) if ngx.worker.id() == 0 then - local table_names = get_names_of_tables_with_ttl(strategies) + local table_names = get_names_of_tables_with_ttl(strategies) or EMPTY_T return timer_every(self.config.expired_rows_cleanup_interval, function(premature) if premature then @@ -382,7 +387,7 @@ function _mt:init_worker(strategies) -- Cleanup tables sequentially for _, table_name in ipairs(table_names) do - cleanup_expired_rows_in_table(self, table_name) + cleanup_expired_rows_in_table(self.config, table_name) end local _, err = self:remove_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY, id) From ef7ee4175b3e406e449c9ac45c9a3559c5dd08f9 Mon Sep 17 00:00:00 2001 From: windmgc Date: Tue, 21 Feb 2023 14:05:14 +0800 Subject: [PATCH 6/9] delete batch by batch and add max loop limit --- kong/db/strategies/postgres/connector.lua | 46 +++++++++++++++-------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 3f752762e0e6..2da4f0ca3c8c 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -56,6 +56,8 @@ local OPERATIONS = { local ADMIN_API_PHASE = kong_global.phases.admin_api local CORE_ENTITIES = constants.CORE_ENTITIES local EXPIRED_ROWS_CLEANUP_LOCK_KEY = "db_cluster_expired_rows_cleanup" +local EXPIRED_ROW_BATCH_SIZE = 50000 +local EXPIRED_ROW_CLEANUP_LOOP_MAX = 10000 -- 10000 * 50000 = 500M rows local EMPTY_T = tablex.readonly {} @@ -324,7 +326,7 @@ local function cleanup_expired_rows_in_table(config, table_name) -- Create new connection on each table to avoid reusing a connection that might be already timed out local connector = connect(config) - local time1 = now() + local cleanup_start_time = now_updated() local ttl_escaped = connector:escape_identifier("ttl") local expired_at_escaped = connector:escape_identifier("expire_at") local column_name = table_name == "cluster_events" and expired_at_escaped @@ -333,27 +335,41 @@ local function cleanup_expired_rows_in_table(config, table_name) local cleanup_statement = concat { "DELETE FROM ", connector:escape_identifier(table_name), + " WHERE ctid in (SELECT ctid FROM ", + connector:escape_identifier(table_name), " WHERE ", column_name, - " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';", + " < TO_TIMESTAMP(%s) AT TIME ZONE 'UTC' LIMIT ", + EXPIRED_ROW_BATCH_SIZE, + ")", } - local ok, err = connector:query(cleanup_statement) - local time2 = now() - local time_elapsed = tonumber(fmt("%.3f", time2 - time1)) - log(DEBUG, "cleaning up expired rows from table '", table_name, - "' took ", time_elapsed, " seconds") - if not ok then - if err then - log(WARN, "failed to clean expired rows from table '", - table_name, "' on PostgreSQL database (", - err, ")") - else - log(WARN, "failed to clean expired rows from table '", - table_name, "' on PostgreSQL database") + cleanup_statement = fmt(cleanup_statement, connector:escape_literal(tonumber(fmt("%.3f", cleanup_start_time)))) + + local cleanup_loop = 0 + while cleanup_loop <= EXPIRED_ROW_CLEANUP_LOOP_MAX do + cleanup_loop = cleanup_loop + 1 + local ok, err = connector:query(cleanup_statement) + if not ok then + if err then + log(WARN, "failed to clean expired rows from table '", + table_name, "' on PostgreSQL database (", + err, ")") + else + log(WARN, "failed to clean expired rows from table '", + table_name, "' on PostgreSQL database") + end + end + + if ok.affected_rows == 0 then + break end end + local cleanup_end_time = now_updated() + local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time)) + log(DEBUG, "cleaning up expired rows from table '", table_name, + "' took ", time_elapsed, " seconds") connector:disconnect() end From 538ec3f2dda35cd43702b5f4896716b96a049a43 Mon Sep 17 00:00:00 2001 From: windmgc Date: Mon, 27 Feb 2023 14:42:21 +0800 Subject: [PATCH 7/9] optimize last loop check --- kong/db/strategies/postgres/connector.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 2da4f0ca3c8c..9dc66c4add30 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -361,7 +361,7 @@ local function cleanup_expired_rows_in_table(config, table_name) end end - if ok.affected_rows == 0 then + if tonumber(ok.affected_rows) < EXPIRED_ROW_BATCH_SIZE then break end end From 0b431f9ef24069732346bd8520bfd7927f59eeed Mon Sep 17 00:00:00 2001 From: windmgc Date: Mon, 27 Feb 2023 21:43:32 +0800 Subject: [PATCH 8/9] add related test --- .../03-db/20-ttl-cleanup_spec.lua | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 spec/02-integration/03-db/20-ttl-cleanup_spec.lua diff --git a/spec/02-integration/03-db/20-ttl-cleanup_spec.lua b/spec/02-integration/03-db/20-ttl-cleanup_spec.lua new file mode 100644 index 000000000000..b1ca779c6b1b --- /dev/null +++ b/spec/02-integration/03-db/20-ttl-cleanup_spec.lua @@ -0,0 +1,97 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + local postgres_only = strategy == "postgres" and describe or pending + postgres_only("postgres ttl cleanup logic", function() + describe("cleanup_expired_rows_in_table function", function () + local bp, db, consumer1 + lazy_setup(function() + bp, db = helpers.get_db_utils("postgres", { + "consumers", + "keyauth_credentials" + }) + + consumer1 = bp.consumers:insert { + username = "conumer1" + } + end) + + lazy_teardown(function() + db:truncate() + db:close() + end) + + it("cleanup expired rows should work as expected #test", function () + local cleanup_func = db.connector._cleanup_expired_rows_in_table + + local kauth_cred = bp.keyauth_credentials:insert({ + key = "secret1", + consumer = { id = consumer1.id }, + }, {ttl = 1}) + + local ok, err = db.connector:query("SELECT * FROM keyauth_credentials") + assert.is_nil(err) + assert.same(1, #ok) + helpers.wait_until(function() + -- wait until the keyauth credential expired + local kauth_cred2 = db.keyauth_credentials:select{id = kauth_cred.id} + return kauth_cred2 == nil + end, 3) + cleanup_func(db.connector.config, "keyauth_credentials") + ok, err = db.connector:query("SELECT * FROM keyauth_credentials") + assert.is_nil(err) + assert.same(0, #ok) + end) + end) + + describe("ttl cleanup timer #postgres", function() + local bp, db, consumer1 + lazy_setup(function() + bp, db = helpers.get_db_utils("postgres", { + "routes", + "services", + "plugins", + "consumers", + "keyauth_credentials" + }) + + consumer1 = bp.consumers:insert { + username = "conumer1" + } + + assert(helpers.start_kong({ + pg_expired_rows_cleanup_interval = 10, + database = strategy, + })) + end) + + lazy_teardown(function() + helpers.stop_kong() + db:truncate() + end) + + it("init_worker should run ttl cleanup in background timer", function () + helpers.clean_logfile() + local names_of_table_with_ttl = db.connector._get_topologically_sorted_table_names(db.strategies) + assert.truthy(#names_of_table_with_ttl > 0) + for _, name in ipairs(names_of_table_with_ttl) do + assert.errlog().has.line([[cleaning up expired rows from table ']] .. name .. [[' took \d+\.\d+ seconds]], false, 60) + end + + local _ = bp.keyauth_credentials:insert({ + key = "secret1", + consumer = { id = consumer1.id }, + }, {ttl = 3}) + helpers.clean_logfile() + + helpers.wait_until(function() + return assert.errlog().has.line([[cleaning up expired rows from table ']] .. "keyauth_credentials" .. [[' took \d+\.\d+ seconds]], false, 12) + end, 20) + + local ok, err = db.connector:query("SELECT * FROM keyauth_credentials") + assert.is_nil(err) + assert.same(0, #ok) + end) + end) + end) +end From f1a03ecbb277899971dfb0bb601677a9aadbab6e Mon Sep 17 00:00:00 2001 From: windmgc Date: Mon, 27 Feb 2023 23:46:05 +0800 Subject: [PATCH 9/9] function exposed to module level for testing purpose --- kong/db/strategies/postgres/connector.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 9dc66c4add30..67ae154a7371 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -1074,6 +1074,7 @@ end -- for tests only _mt._get_topologically_sorted_table_names = get_names_of_tables_with_ttl +_mt._cleanup_expired_rows_in_table = cleanup_expired_rows_in_table return _M