Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(postgres): optimize the expired rows cleanup routine in postgres connector #10331

Closed
wants to merge 9 commits into from
5 changes: 5 additions & 0 deletions kong.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,11 @@ G# -----------------------
# If not specified, then number of open connections
# to the Postgres server is not limited.

#pg_expired_rows_cleanup_interval = 60 # This value will control the interval of
# running cleanup on the expired rows(whose
# ttl < CURRENT_TIMESTAMP) in the Postgres
# database.

#pg_ro_host = # Same as `pg_host`, but for the
# read-only connection.
# **Note:** Refer to the documentation
Expand Down
11 changes: 11 additions & 0 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ local CONF_PARSERS = {
pg_keepalive_timeout = { typ = "number" },
pg_pool_size = { typ = "number" },
pg_backlog = { typ = "number" },
pg_expired_rows_cleanup_interval = { typ = "number" },

pg_ro_port = { typ = "number" },
pg_ro_timeout = { typ = "number" },
Expand Down Expand Up @@ -1020,6 +1021,16 @@ local function check_and_parse(conf, opts)
end
end

if conf.pg_expired_rows_cleanup_interval then
if conf.pg_expired_rows_cleanup_interval < 0 then
errors[#errors + 1] = "pg_expired_rows_cleanup_interval must be greater than 0"
end

if conf.pg_expired_rows_cleanup_interval ~= floor(conf.pg_expired_rows_cleanup_interval) then
errors[#errors + 1] = "pg_expired_rows_cleanup_interval must be an integer greater than 0"
end
end

if conf.pg_ro_max_concurrent_queries then
if conf.pg_ro_max_concurrent_queries < 0 then
errors[#errors + 1] = "pg_ro_max_concurrent_queries must be greater than 0"
Expand Down
127 changes: 88 additions & 39 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ local stringx = require "pl.stringx"
local semaphore = require "ngx.semaphore"
local kong_global = require "kong.global"
local constants = require "kong.constants"
local knode = kong and kong.node
or require "kong.pdk.node".new()

local tablex = require "pl.tablex"

local setmetatable = setmetatable
local encode_array = arrays.encode_array
Expand Down Expand Up @@ -34,6 +37,7 @@ local insert = table.insert

local WARN = ngx.WARN
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local SQL_INFORMATION_SCHEMA_TABLES = [[
SELECT table_name
FROM information_schema.tables
Expand All @@ -51,6 +55,10 @@ local OPERATIONS = {
}
local ADMIN_API_PHASE = kong_global.phases.admin_api
local CORE_ENTITIES = constants.CORE_ENTITIES
local EXPIRED_ROWS_CLEANUP_LOCK_KEY = "db_cluster_expired_rows_cleanup"
local EXPIRED_ROW_BATCH_SIZE = 50000
local EXPIRED_ROW_CLEANUP_LOOP_MAX = 10000 -- 10000 * 50000 = 500M rows
local EMPTY_T = tablex.readonly {}


local function now_updated()
Expand Down Expand Up @@ -314,55 +322,94 @@ function _mt:init()
end


function _mt:init_worker(strategies)
if ngx.worker.id() == 0 then
local function cleanup_expired_rows_in_table(config, table_name)
-- Create new connection on each table to avoid reusing a connection that might be already timed out
local connector = connect(config)

local cleanup_start_time = now_updated()
local ttl_escaped = connector:escape_identifier("ttl")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we define ttl_escaped as a global variable so that we won't have to run escape_identifier every time for the same result.
By the way, personally I don't think either ttl or expired_at need to be escaped, neither of them is a keyword in current postgres-sql syntax.

Copy link
Member Author

@windmgc windmgc Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The escape_identifier depends on creating a Postgres connector in pgmoon(which makes it feel weird to reuse at module level), and it is merely just string manipulation so the cost is small, so I think it's okay to keep the current state

local expired_at_escaped = connector:escape_identifier("expire_at")
local column_name = table_name == "cluster_events" and expired_at_escaped
or ttl_escaped

local cleanup_statement = concat {
"DELETE FROM ",
connector:escape_identifier(table_name),
" WHERE ctid in (SELECT ctid FROM ",
connector:escape_identifier(table_name),
" WHERE ",
column_name,
" < TO_TIMESTAMP(%s) AT TIME ZONE 'UTC' LIMIT ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful if we force db to use index here? I think it might not be better, but at least it wouldn't be worse than seq scan when using index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a google and found no way of using force index in Postgres... it seems that this is not doable
SELECTing a batch of rows here should be fine since we already have ttl index for these table, so let's just leave the job for query engine to decide whether it should use index

EXPIRED_ROW_BATCH_SIZE,
")",
}

local table_names = get_names_of_tables_with_ttl(strategies)
local ttl_escaped = self:escape_identifier("ttl")
local expire_at_escaped = self:escape_identifier("expire_at")
local cleanup_statements = {}
local cleanup_statements_count = #table_names
for i = 1, cleanup_statements_count do
local table_name = table_names[i]
local column_name = table_name == "cluster_events" and expire_at_escaped
or ttl_escaped
cleanup_statements[i] = concat {
" DELETE FROM ",
self:escape_identifier(table_name),
" WHERE ",
column_name,
" < CURRENT_TIMESTAMP AT TIME ZONE 'UTC';"
}
cleanup_statement = fmt(cleanup_statement, connector:escape_literal(tonumber(fmt("%.3f", cleanup_start_time))))

local cleanup_loop = 0
while cleanup_loop <= EXPIRED_ROW_CLEANUP_LOOP_MAX do
cleanup_loop = cleanup_loop + 1
local ok, err = connector:query(cleanup_statement)
if not ok then
if err then
log(WARN, "failed to clean expired rows from table '",
table_name, "' on PostgreSQL database (",
err, ")")
else
log(WARN, "failed to clean expired rows from table '",
table_name, "' on PostgreSQL database")
end
end

if tonumber(ok.affected_rows) < EXPIRED_ROW_BATCH_SIZE then
break
end
end

local cleanup_end_time = now_updated()
local time_elapsed = tonumber(fmt("%.3f", cleanup_end_time - cleanup_start_time))
log(DEBUG, "cleaning up expired rows from table '", table_name,
"' took ", time_elapsed, " seconds")
connector:disconnect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a connection pool and the usage of connection pool could be in our control, we should try to use it rather than disconnecting. Processing connection is a expensive action for DB.

Copy link
Member Author

@windmgc windmgc Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to create a new connection for each table here intentionally, to avoid problems caused by connection reuse(such as a former connection waiting for locks and timed-out, re-use on this connection will be problematic, please check OpenResty's tcpsocket.receive document on read timeout)

Since this timer will only run on worker 0 and a cluster mutex is also added in this PR, the number of new concurrent connections running cleanup job in this cluster will be 1 constantly, so I think there is also no problem with creating too many connections

end


local cleanup_statement = concat(cleanup_statements, "\n")
function _mt:init_worker(strategies)
if ngx.worker.id() == 0 then
local table_names = get_names_of_tables_with_ttl(strategies) or EMPTY_T

return timer_every(60, function(premature)
return timer_every(self.config.expired_rows_cleanup_interval, function(premature)
if premature then
return
end

local ok, err, _, num_queries = self:query(cleanup_statement)
local locked, err = self:read_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY)
if err then
log(ERR, "unable to read lock for expired rows cleanup: ", err)
return
end

if locked then
log(WARN, "expired rows cleanup already running on another node, skipping")
return
end

local id = knode.get_id() or "unknown"
local ok, err = self:insert_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY, self.config.expired_rows_cleanup_interval, id)
if not ok then
if num_queries then
for i = num_queries + 1, cleanup_statements_count do
local statement = cleanup_statements[i]
local ok, err = self:query(statement)
if not ok then
if err then
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database (",
err, ")")
else
log(WARN, "unable to clean expired rows from table '",
table_names[i], "' on PostgreSQL database")
end
end
end
log(WARN, "unable to acquire lock for expired rows cleanup: ", err)
return
end

else
log(ERR, "unable to clean expired rows from PostgreSQL database (", err, ")")
end
-- Cleanup tables sequentially
for _, table_name in ipairs(table_names) do
cleanup_expired_rows_in_table(self.config, table_name)
end

local _, err = self:remove_lock(EXPIRED_ROWS_CLEANUP_LOCK_KEY, id)
if err then
-- do nothing and wait for the next timer to clean up and retry
log(WARN, "unable to remove lock for expired rows cleanup: ", err)
end
end)
end
Expand Down Expand Up @@ -934,6 +981,7 @@ function _M.new(kong_config)
sem_timeout = (kong_config.pg_semaphore_timeout or 60000) / 1000,
pool_size = kong_config.pg_pool_size,
backlog = kong_config.pg_backlog,
expired_rows_cleanup_interval = kong_config.pg_expired_rows_cleanup_interval or 60,

--- not used directly by pgmoon, but used internally in connector to set the keepalive timeout
keepalive_timeout = kong_config.pg_keepalive_timeout,
Expand Down Expand Up @@ -1026,6 +1074,7 @@ end

-- for tests only
_mt._get_topologically_sorted_table_names = get_names_of_tables_with_ttl
_mt._cleanup_expired_rows_in_table = cleanup_expired_rows_in_table


return _M
1 change: 1 addition & 0 deletions kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pg_semaphore_timeout = 60000
pg_keepalive_timeout = NONE
pg_pool_size = NONE
pg_backlog = NONE
pg_expired_rows_cleanup_interval = 60

pg_ro_host = NONE
pg_ro_port = NONE
Expand Down
16 changes: 16 additions & 0 deletions spec/01-unit/03-conf_loader_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,22 @@ describe("Configuration loader", function()
assert.is_nil(conf)
assert.equal("pg_backlog must be an integer greater than 0", err)
end)

it("rejects a pg_expired_rows_cleanup_interval with a negative number", function()
local conf, err = conf_loader(nil, {
pg_expired_rows_cleanup_interval = -1,
})
assert.is_nil(conf)
assert.equal("pg_expired_rows_cleanup_interval must be greater than 0", err)
end)

it("rejects a pg_expired_rows_cleanup_interval with a decimal", function()
local conf, err = conf_loader(nil, {
pg_expired_rows_cleanup_interval = 0.1,
})
assert.is_nil(conf)
assert.equal("pg_expired_rows_cleanup_interval must be an integer greater than 0", err)
end)
end)

describe("pg read-only connection pool options", function()
Expand Down
97 changes: 97 additions & 0 deletions spec/02-integration/03-db/20-ttl-cleanup_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
local helpers = require "spec.helpers"

for _, strategy in helpers.each_strategy() do
local postgres_only = strategy == "postgres" and describe or pending
postgres_only("postgres ttl cleanup logic", function()
describe("cleanup_expired_rows_in_table function", function ()
local bp, db, consumer1
lazy_setup(function()
bp, db = helpers.get_db_utils("postgres", {
"consumers",
"keyauth_credentials"
})

consumer1 = bp.consumers:insert {
username = "conumer1"
}
end)

lazy_teardown(function()
db:truncate()
db:close()
end)

it("cleanup expired rows should work as expected #test", function ()
local cleanup_func = db.connector._cleanup_expired_rows_in_table

local kauth_cred = bp.keyauth_credentials:insert({
key = "secret1",
consumer = { id = consumer1.id },
}, {ttl = 1})

local ok, err = db.connector:query("SELECT * FROM keyauth_credentials")
assert.is_nil(err)
assert.same(1, #ok)
helpers.wait_until(function()
-- wait until the keyauth credential expired
local kauth_cred2 = db.keyauth_credentials:select{id = kauth_cred.id}
return kauth_cred2 == nil
end, 3)
cleanup_func(db.connector.config, "keyauth_credentials")
ok, err = db.connector:query("SELECT * FROM keyauth_credentials")
assert.is_nil(err)
assert.same(0, #ok)
end)
end)

describe("ttl cleanup timer #postgres", function()
local bp, db, consumer1
lazy_setup(function()
bp, db = helpers.get_db_utils("postgres", {
"routes",
"services",
"plugins",
"consumers",
"keyauth_credentials"
})

consumer1 = bp.consumers:insert {
username = "conumer1"
}

assert(helpers.start_kong({
pg_expired_rows_cleanup_interval = 10,
database = strategy,
}))
end)

lazy_teardown(function()
helpers.stop_kong()
db:truncate()
end)

it("init_worker should run ttl cleanup in background timer", function ()
helpers.clean_logfile()
local names_of_table_with_ttl = db.connector._get_topologically_sorted_table_names(db.strategies)
assert.truthy(#names_of_table_with_ttl > 0)
for _, name in ipairs(names_of_table_with_ttl) do
assert.errlog().has.line([[cleaning up expired rows from table ']] .. name .. [[' took \d+\.\d+ seconds]], false, 60)
end

local _ = bp.keyauth_credentials:insert({
key = "secret1",
consumer = { id = consumer1.id },
}, {ttl = 3})
helpers.clean_logfile()

helpers.wait_until(function()
return assert.errlog().has.line([[cleaning up expired rows from table ']] .. "keyauth_credentials" .. [[' took \d+\.\d+ seconds]], false, 12)
end, 20)

local ok, err = db.connector:query("SELECT * FROM keyauth_credentials")
assert.is_nil(err)
assert.same(0, #ok)
end)
end)
end)
end