Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add On-demand WAL Download to logicalfuncs #7960

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ postgres-%: postgres-configure-% \
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
+@echo "Compiling amcheck $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
+@echo "Compiling test_decoding $*"
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install

.PHONY: postgres-clean-%
postgres-clean-%:
Expand Down
2 changes: 2 additions & 0 deletions pgxn/neon/neon.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "catalog/pg_type.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/procsignal.h"
Expand Down Expand Up @@ -280,6 +281,7 @@ _PG_init(void)
pg_init_libpagestore();
pg_init_walproposer();
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
LogicalFuncs_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;

InitLogicalReplicationMonitor();

Expand Down
27 changes: 26 additions & 1 deletion pgxn/neon/walsender_hooks.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
#include "walproposer.h"

static NeonWALReader *wal_reader = NULL;

struct WalSnd;
extern struct WalSnd *MyWalSnd;
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);

static XLogRecPtr
NeonWALReadWaitForWAL(XLogRecPtr loc)
Expand All @@ -36,7 +40,28 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
CHECK_FOR_INTERRUPTS();
}

return WalSndWaitForWal(loc);
// Walsender sends keepalives and stuff, so better use its normal wait
if (MyWalSnd != NULL)
return WalSndWaitForWal(loc);

for (;;)
{
XLogRecPtr flush_ptr;
if (!RecoveryInProgress())
#if PG_VERSION_NUM >= 150000
flush_ptr = GetFlushRecPtr(NULL);
#else
flush_ptr = GetFlushRecPtr();
#endif
else
flush_ptr = GetXLogReplayRecPtr(NULL);

if (loc <= flush_ptr)
return flush_ptr;

CHECK_FOR_INTERRUPTS();
pg_usleep(1000);
}
}

static int
Expand Down
30 changes: 30 additions & 0 deletions test_runner/regress/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,35 @@ def slot_removed(ep):
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))


def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")

with endpoint.connect().cursor() as cur:
cur.execute("create table wal_generator (id serial primary key, data text)")
cur.execute(
"SELECT * FROM pg_create_logical_replication_slot('slotty_mcslotface', 'test_decoding')"
)
cur.execute(
"""
INSERT INTO wal_generator (data)
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
"""
)

endpoint.stop_and_destroy()
endpoint = env.endpoints.create_start("init")

with endpoint.connect().cursor() as cur:
cur.execute(
"SELECT * FROM pg_logical_slot_peek_binary_changes('slotty_mcslotface', NULL, NULL, 'include-xids', '0')"
)


# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
neon_env_builder.num_safekeepers = 3
Expand All @@ -247,6 +276,7 @@ def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
logical_replication_sync(vanilla_pg, endpoint)

vanilla_pg.stop()

# Pause the safekeepers so that they can't send WAL (except to pageserver)
Expand Down
6 changes: 3 additions & 3 deletions vendor/revisions.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
"v16": ["16.3", "9837db157837fcf43ef7348be0017d3a2238cd27"],
"v15": ["15.7", "e22098d86d6c40276b6bd75c29133a33fb283ab6"],
"v14": ["14.12", "4c51945a6167ca06c0169e7a4ca5a8e7ffa3faba"]
}
Loading