Skip to content

Commit

Permalink
Remove the replication slot in test_snap_files at the end of the test
Browse files Browse the repository at this point in the history
Analysis of the LR benchmarking tests indicates that in the duration of
test_subscriber_lag, a leftover 'slotter' replication slot can lead to
retained WAL growing on the publisher. This replication slot is not used
by any subscriber. The only purpose of the slot is to generate snapshot
files for the puspose of test_snap_files.

Signed-off-by: Tristan Partin <tristan@neon.tech>
  • Loading branch information
tristan957 authored Nov 14, 2024
1 parent 8cde37b commit 49b599c
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions test_runner/performance/test_logical_replication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, cast

import psycopg2
Expand All @@ -18,7 +20,7 @@
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonApiEndpoint
from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres
from psycopg2.extensions import cursor
from psycopg2.extensions import connection, cursor


@pytest.mark.timeout(1000)
Expand Down Expand Up @@ -292,6 +294,48 @@ def test_snap_files(
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
the node and tries to peek the replication changes.
"""

@contextmanager
def replication_slot(conn: connection, slot_name: str) -> Iterator[None]:
"""
Make sure that the replication slot doesn't outlive the test. Normally
we wouldn't want this behavior, but since the test creates and drops
the replication slot, we do.
We've had problems in the past where this slot sticking around caused
issues with the publisher retaining WAL during the execution of the
other benchmarks in this suite.
"""

def __drop_replication_slot(c: cursor) -> None:
c.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = %(slot_name)s
) THEN
PERFORM pg_drop_replication_slot(%(slot_name)s);
END IF;
END $$;
""",
{"slot_name": slot_name},
)

with conn.cursor() as c:
__drop_replication_slot(c)
c.execute(
"SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')",
{"slot_name": slot_name},
)

yield

with conn.cursor() as c:
__drop_replication_slot(c)

test_duration_min = 60
test_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
Expand All @@ -314,48 +358,35 @@ def test_snap_files(
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
with replication_slot(conn, "slotter"):
workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)

conn.close()

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)
finally:
workload.terminate()

conn.close()

workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")

conn.close()

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)

finally:
workload.terminate()

1 comment on commit 49b599c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5500 tests run: 5253 passed, 1 failed, 246 skipped (full report)


Failures on Postgres 16

  • test_compaction_l0_memory[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_compaction_l0_memory[release-pg16-github-actions-selfhosted]"
Flaky tests (3)

Postgres 16

Postgres 15

Code coverage* (full report)

  • functions: 31.8% (7889 of 24837 functions)
  • lines: 49.4% (62450 of 126362 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
49b599c at 2024-11-14T18:57:07.752Z :recycle:

Please sign in to comment.