Skip to content

Commit

Permalink
feat(db): batch cleanup expired rows from postgres (#10407)
Browse files Browse the repository at this point in the history
* feat(db): batch cleanup expired rows from postgres

### Summary

The PR #10405 changed cleanup to only happen on nodes that have Admin API listeners.

This PR makes deletion to happen in maximum of 50.000 row batches.

Inspired from #10331 and #10389.

* change to batch delete on every table

* update changelog

* add test for ttl cleanup

---------

Co-authored-by: windmgc <windmgc@gmail.com>
  • Loading branch information
bungle and windmgc authored Mar 3, 2023
1 parent e1f22ce commit fa90a68
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
#### Core

- Postgres TTL cleanup timer will now only run on traditional and control plane nodes that have enabled the Admin API.
- Postgres TTL cleanup timer now runs a batch delete loop on each ttl enabled table with a number of 50.000 rows per batch.

## 3.2.0

Expand Down
63 changes: 36 additions & 27 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ local insert = table.insert


local WARN = ngx.WARN
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local SQL_INFORMATION_SCHEMA_TABLES = [[
SELECT table_name
FROM information_schema.tables
Expand Down Expand Up @@ -325,43 +325,52 @@ function _mt:init_worker(strategies)
local table_name = table_names[i]
local column_name = table_name == "cluster_events" and expire_at_escaped
or ttl_escaped
cleanup_statements[i] = concat {
" DELETE FROM ",
self:escape_identifier(table_name),
" WHERE ",
column_name,
" < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';"
}
local table_name_escaped = self:escape_identifier(table_name)

cleanup_statements[i] = fmt([[
WITH rows AS (
SELECT ctid
FROM %s
WHERE %s < CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
ORDER BY %s LIMIT 50000 FOR UPDATE SKIP LOCKED)
DELETE
FROM %s
WHERE ctid IN (TABLE rows);]], table_name_escaped, column_name, column_name, table_name_escaped):gsub("CURRENT_TIMESTAMP", "TO_TIMESTAMP(%%s)")
end

local cleanup_statement = concat(cleanup_statements, "\n")

return timer_every(60, function(premature)
if premature then
return
end

local ok, err, _, num_queries = self:query(cleanup_statement)
if not ok then
if num_queries then
for i = num_queries + 1, cleanup_statements_count do
local statement = cleanup_statements[i]
local ok, err = self:query(statement)
if not ok then
if err then
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database (",
err, ")")
else
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database")
end
for i, statement in ipairs(cleanup_statements) do
local cleanup_start_time = self:escape_literal(tonumber(fmt("%.3f", now_updated())))

while true do -- batch delete looping
-- avoid using CURRENT_TIMESTAMP in the real query to prevent infinite loop
local ok, err = self:query(fmt(statement, cleanup_start_time))
if not ok then
if err then
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database (",
err, ")")

else
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database")
end
break
end

else
log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")")
if ok.affected_rows < 50000 then -- indicates that cleanup is done
break
end
end

local cleanup_end_time = now_updated()
local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time))
log(DEBUG, "cleaning up expired rows from table '", table_names[i],
"' took ", time_elapsed, " seconds")
end
end)
end
Expand Down
55 changes: 55 additions & 0 deletions spec/02-integration/03-db/20-ttl-cleanup_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
local helpers = require "spec.helpers"

for _, strategy in helpers.each_strategy() do
local postgres_only = strategy == "postgres" and describe or pending
postgres_only("postgres ttl cleanup logic", function()
describe("ttl cleanup timer #postgres", function()
local bp, db, consumer1
lazy_setup(function()
bp, db = helpers.get_db_utils("postgres", {
"routes",
"services",
"plugins",
"consumers",
"keyauth_credentials"
})

consumer1 = bp.consumers:insert {
username = "conumer1"
}

assert(helpers.start_kong({
database = strategy,
}))
end)

lazy_teardown(function()
helpers.stop_kong()
db:truncate()
end)

it("init_worker should run ttl cleanup in background timer", function ()
helpers.clean_logfile()
local names_of_table_with_ttl = db.connector._get_topologically_sorted_table_names(db.strategies)
assert.truthy(#names_of_table_with_ttl > 0)
for _, name in ipairs(names_of_table_with_ttl) do
assert.errlog().has.line([[cleaning up expired rows from table ']] .. name .. [[' took \d+\.\d+ seconds]], false, 120)
end

local _ = bp.keyauth_credentials:insert({
key = "secret1",
consumer = { id = consumer1.id },
}, {ttl = 3})
helpers.clean_logfile()

helpers.wait_until(function()
return assert.errlog().has.line([[cleaning up expired rows from table ']] .. "keyauth_credentials" .. [[' took \d+\.\d+ seconds]], false, 120)
end, 120)

local ok, err = db.connector:query("SELECT * FROM keyauth_credentials")
assert.is_nil(err)
assert.same(0, #ok)
end)
end)
end)
end

1 comment on commit fa90a68

@khcp-gha-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:fa90a685b5a7ac03774460a92f856293184d704d
Artifacts available https://github.com/Kong/kong/actions/runs/4322374032

Please sign in to comment.