Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix autocommit footguns in performance tests #9735

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 61 additions & 52 deletions test_runner/performance/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ def test_subscriber_lag(
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")

with (
psycopg2.connect(pub_connstr) as pub_conn,
psycopg2.connect(sub_connstr) as sub_conn,
):
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True

with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)

pub_conn.close()
sub_conn.close()
tristan957 marked this conversation as resolved.
Show resolved Hide resolved

log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
Expand Down Expand Up @@ -206,6 +210,7 @@ def test_publisher_restart(
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True

with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
pub_exists = len(pub_cur.fetchall()) != 0
Expand All @@ -222,6 +227,7 @@ def test_publisher_restart(
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")

initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)

pub_conn.close()
sub_conn.close()

Expand All @@ -248,12 +254,17 @@ def test_publisher_restart(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with (
psycopg2.connect(pub_connstr) as pub_conn,
psycopg2.connect(sub_connstr) as sub_conn,
):
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)

pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True

with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)

pub_conn.close()
sub_conn.close()

log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
Expand Down Expand Up @@ -288,58 +299,56 @@ def test_snap_files(
env = benchmark_project_pub.pgbench_env
connstr = benchmark_project_pub.connstr

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cast("bool", cur.fetchall()[0][0])
assert is_super, "This benchmark won't work if we don't have superuser"
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'")
is_super = cast("bool", cur.fetchall()[0][0])
assert is_super, "This benchmark won't work if we don't have superuser"

conn.close()

pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=env)

conn = psycopg2.connect(connstr)
conn.autocommit = True
cur = conn.cursor()
cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1")

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_reload_conf()")

with psycopg2.connect(connstr) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;

with conn.cursor() as cur:
cur.execute(
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'slotter'
) THEN
PERFORM pg_drop_replication_slot('slotter');
END IF;
END $$;
"""
)
cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')")

conn.close()

workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env)
try:
start = time.time()
prev_measurement = time.time()
while time.time() - start < test_duration_min * 60:
with psycopg2.connect(connstr) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute(
"SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())"
)
conn = psycopg2.connect(connstr)
conn.autocommit = True

with conn.cursor() as cur:
cur.execute(
"SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s"
)
check_pgbench_still_running(workload)
cur.execute("SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())")

conn.close()

# Measure storage
if time.time() - prev_measurement > test_interval_min * 60:
Expand Down
61 changes: 38 additions & 23 deletions test_runner/performance/test_physical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ def test_ro_replica_lag(
check_pgbench_still_running(master_workload)
check_pgbench_still_running(replica_workload)
time.sleep(sync_interval_min * 60)

conn_master = psycopg2.connect(master_connstr)
conn_replica = psycopg2.connect(replica_connstr)
conn_master.autocommit = True
conn_replica.autocommit = True

with (
psycopg2.connect(master_connstr) as conn_master,
psycopg2.connect(replica_connstr) as conn_replica,
conn_master.cursor() as cur_master,
conn_replica.cursor() as cur_replica,
):
with (
conn_master.cursor() as cur_master,
conn_replica.cursor() as cur_replica,
):
lag = measure_replication_lag(cur_master, cur_replica)
lag = measure_replication_lag(cur_master, cur_replica)

conn_master.close()
conn_replica.close()

log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
finally:
Expand Down Expand Up @@ -219,11 +225,15 @@ def test_replication_start_stop(
pg_bin.run_capture(["pgbench", "-i", "-I", "dtGvp", "-s10"], env=master_env)

# Sync replicas
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())
conn_master = psycopg2.connect(master_connstr)
conn_master.autocommit = True

with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())

conn_master.close()

master_pgbench = pg_bin.run_nonblocking(
[
Expand Down Expand Up @@ -277,17 +287,22 @@ def replica_enabled(iconfig: int = iconfig):

time.sleep(configuration_test_time_sec)

with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)
conn_master = psycopg2.connect(master_connstr)
conn_master.autocommit = True

with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)

conn_master.close()

master_pgbench.terminate()
except Exception as e:
error_occurred = True
Expand Down
Loading