Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the replication slot in test_snap_files at the end of the test #9752

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 74 additions & 43 deletions test_runner/performance/test_logical_replication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, cast

import psycopg2
Expand All @@ -18,7 +20,7 @@
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonApiEndpoint
from fixtures.neon_fixtures import NeonEnv, PgBin, VanillaPostgres
from psycopg2.extensions import cursor
from psycopg2.extensions import connection, cursor


@pytest.mark.timeout(1000)
Expand Down Expand Up @@ -292,6 +294,48 @@ def test_snap_files(
then runs pgbench inserts while generating large numbers of snapfiles. Then restarts
the node and tries to peek the replication changes.
"""

@contextmanager
def replication_slot(conn: connection, slot_name: str) -> Iterator[None]:
"""
Make sure that the replication slot doesn't outlive the test. Normally
we wouldn't want this behavior, but since the test creates and drops
the replication slot, we do.

We've had problems in the past where this slot sticking around caused
issues with the publisher retaining WAL during the execution of the
other benchmarks in this suite.
"""

def __drop_replication_slot(c: cursor) -> None:
c.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = %(slot_name)s
) THEN
PERFORM pg_drop_replication_slot(%(slot_name)s);
END IF;
END $$;
""",
{"slot_name": slot_name},
)

with conn.cursor() as c:
__drop_replication_slot(c)
c.execute(
"SELECT pg_create_logical_replication_slot(%(slot_name)s, 'test_decoding')",
{"slot_name": slot_name},
)

yield

with conn.cursor() as c:
__drop_replication_slot(c)

test_duration_min = 60
test_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
Expand All @@ -314,48 +358,35 @@ def test_snap_files(
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
with replication_slot(conn, "slotter"):
workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)

conn.close()

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)
finally:
workload.terminate()

conn.close()

workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")

conn.close()

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
storage = benchmark_project_pub.get_synthetic_storage_size()
zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER)
prev_measurement = time.time()
time.sleep(test_interval_min * 60 / 3)

finally:
workload.terminate()
Loading