diff --git a/CHANGELOG.md b/CHANGELOG.md index 932f3893d8bf..922d9cbdbc13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -104,6 +104,7 @@ #### Core - Postgres TTL cleanup timer will now only run on traditional and control plane nodes that have enabled the Admin API. +- Postgres TTL cleanup timer now runs a batch delete loop on each ttl enabled table with a number of 50.000 rows per batch. ## 3.2.0 diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index cadfbe221712..a3daf9d43a95 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -33,7 +33,7 @@ local insert = table.insert local WARN = ngx.WARN -local ERR = ngx.ERR +local DEBUG = ngx.DEBUG local SQL_INFORMATION_SCHEMA_TABLES = [[ SELECT table_name FROM information_schema.tables @@ -325,43 +325,52 @@ function _mt:init_worker(strategies) local table_name = table_names[i] local column_name = table_name == "cluster_events" and expire_at_escaped or ttl_escaped - cleanup_statements[i] = concat { - " DELETE FROM ", - self:escape_identifier(table_name), - " WHERE ", - column_name, - " < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';" - } + local table_name_escaped = self:escape_identifier(table_name) + + cleanup_statements[i] = fmt([[ + WITH rows AS ( + SELECT ctid + FROM %s + WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' +ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED) + DELETE + FROM %s + WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, column_name, table_name_escaped):gsub("CURRENT_TIMESTAMP", "TO_TIMESTAMP(%%s)") end - local cleanup_statement = concat(cleanup_statements, "\n") - return timer_every(60, function(premature) if premature then return end - local ok, err, _, num_queries = self:query(cleanup_statement) - if not ok then - if num_queries then - for i = num_queries + 1, cleanup_statements_count do - local statement = cleanup_statements[i] - local ok, err = self:query(statement) - if not ok then - if err then - log(WARN, "unable to clean expired rows from table '", - table_names[i], "' on PostgreSQL database (", - err, ")") - else - log(WARN, "unable to clean expired rows from table '", - table_names[i], "' on PostgreSQL database") - end + for i, statement in ipairs(cleanup_statements) do + local cleanup_start_time = self:escape_literal(tonumber(fmt("%.3f", now_updated()))) + + while true do -- batch delete looping + -- avoid using CURRENT_TIMESTAMP in the real query to prevent infinite loop + local ok, err = self:query(fmt(statement, cleanup_start_time)) + if not ok then + if err then + log(WARN, "unable to clean expired rows from table '", + table_names[i], "' on PostgreSQL database (", + err, ")") + + else + log(WARN, "unable to clean expired rows from table '", + table_names[i], "' on PostgreSQL database") end + break end - else - log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")") + if ok.affected_rows < 50000 then -- indicates that cleanup is done + break + end end + + local cleanup_end_time = now_updated() + local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time)) + log(DEBUG, "cleaning up expired rows from table '", table_names[i], + "' took ", time_elapsed, " seconds") end end) end diff --git a/spec/02-integration/03-db/20-ttl-cleanup_spec.lua b/spec/02-integration/03-db/20-ttl-cleanup_spec.lua new file mode 100644 index 000000000000..45b095cfd3ca --- /dev/null +++ b/spec/02-integration/03-db/20-ttl-cleanup_spec.lua @@ -0,0 +1,55 @@ +local helpers = require "spec.helpers" + +for _, strategy in helpers.each_strategy() do + local postgres_only = strategy == "postgres" and describe or pending + postgres_only("postgres ttl cleanup logic", function() + describe("ttl cleanup timer #postgres", function() + local bp, db, consumer1 + lazy_setup(function() + bp, db = helpers.get_db_utils("postgres", { + "routes", + "services", + "plugins", + "consumers", + "keyauth_credentials" + }) + + consumer1 = bp.consumers:insert { + username = "conumer1" + } + + assert(helpers.start_kong({ + database = strategy, + })) + end) + + lazy_teardown(function() + helpers.stop_kong() + db:truncate() + end) + + it("init_worker should run ttl cleanup in background timer", function () + helpers.clean_logfile() + local names_of_table_with_ttl = db.connector._get_topologically_sorted_table_names(db.strategies) + assert.truthy(#names_of_table_with_ttl > 0) + for _, name in ipairs(names_of_table_with_ttl) do + assert.errlog().has.line([[cleaning up expired rows from table ']] .. name .. [[' took \d+\.\d+ seconds]], false, 120) + end + + local _ = bp.keyauth_credentials:insert({ + key = "secret1", + consumer = { id = consumer1.id }, + }, {ttl = 3}) + helpers.clean_logfile() + + helpers.wait_until(function() + return assert.errlog().has.line([[cleaning up expired rows from table ']] .. "keyauth_credentials" .. [[' took \d+\.\d+ seconds]], false, 120) + end, 120) + + local ok, err = db.connector:query("SELECT * FROM keyauth_credentials") + assert.is_nil(err) + assert.same(0, #ok) + end) + end) + end) +end