Skip to content

Commit

Permalink
Add On-demand WAL Download to logicalfuncs (#7960)
Browse files Browse the repository at this point in the history
We implemented on-demand WAL download for walsender, but other things
that may want to read the WAL from safekeepers don't do that yet. This
PR makes it do that by adding the same set of hooks to logicalfuncs.

Addresses #7959

Also relies on:
neondatabase/postgres#438
neondatabase/postgres#437
neondatabase/postgres#436
  • Loading branch information
Sasha Krassovsky authored and VladLazar committed Jun 12, 2024
1 parent 7e3acf2 commit 29d18e7
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ postgres-%: postgres-configure-% \
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
+@echo "Compiling amcheck $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install

.PHONY: postgres-clean-%
postgres-clean-%:
Expand Down
2 changes: 2 additions & 0 deletions pgxn/neon/neon.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "catalog/pg_type.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/procsignal.h"
Expand Down Expand Up @@ -280,6 +281,7 @@ _PG_init(void)
pg_init_libpagestore();
pg_init_walproposer();
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
LogicalFuncs_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;

InitLogicalReplicationMonitor();

Expand Down
27 changes: 26 additions & 1 deletion pgxn/neon/walsender_hooks.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
#include "walproposer.h"

static NeonWALReader *wal_reader = NULL;

struct WalSnd;
extern struct WalSnd *MyWalSnd;
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);

static XLogRecPtr
NeonWALReadWaitForWAL(XLogRecPtr loc)
Expand All @@ -36,7 +40,28 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
CHECK_FOR_INTERRUPTS();
}

return WalSndWaitForWal(loc);
// Walsender sends keepalives and stuff, so better use its normal wait
if (MyWalSnd != NULL)
return WalSndWaitForWal(loc);

for (;;)
{
XLogRecPtr flush_ptr;
if (!RecoveryInProgress())
#if PG_VERSION_NUM >= 150000
flush_ptr = GetFlushRecPtr(NULL);
#else
flush_ptr = GetFlushRecPtr();
#endif
else
flush_ptr = GetXLogReplayRecPtr(NULL);

if (loc <= flush_ptr)
return flush_ptr;

CHECK_FOR_INTERRUPTS();
pg_usleep(1000);
}
}

static int
Expand Down
30 changes: 30 additions & 0 deletions test_runner/regress/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,35 @@ def slot_removed(ep):
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))


def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")

with endpoint.connect().cursor() as cur:
cur.execute("create table wal_generator (id serial primary key, data text)")
cur.execute(
"SELECT * FROM pg_create_logical_replication_slot('slotty_mcslotface', 'test_decoding')"
)
cur.execute(
"""
INSERT INTO wal_generator (data)
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
"""
)

endpoint.stop_and_destroy()
endpoint = env.endpoints.create_start("init")

with endpoint.connect().cursor() as cur:
cur.execute(
"SELECT * FROM pg_logical_slot_peek_binary_changes('slotty_mcslotface', NULL, NULL, 'include-xids', '0')"
)


# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
neon_env_builder.num_safekeepers = 3
Expand All @@ -247,6 +276,7 @@ def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
logical_replication_sync(vanilla_pg, endpoint)

vanilla_pg.stop()

# Pause the safekeepers so that they can't send WAL (except to pageserver)
Expand Down
2 changes: 1 addition & 1 deletion vendor/postgres-v14
2 changes: 1 addition & 1 deletion vendor/postgres-v15
2 changes: 1 addition & 1 deletion vendor/postgres-v16
6 changes: 3 additions & 3 deletions vendor/revisions.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
"v16": ["16.3", "9837db157837fcf43ef7348be0017d3a2238cd27"],
"v15": ["15.7", "e22098d86d6c40276b6bd75c29133a33fb283ab6"],
"v14": ["14.12", "4c51945a6167ca06c0169e7a4ca5a8e7ffa3faba"]
}

0 comments on commit 29d18e7

Please sign in to comment.