Skip to content

Commit

Permalink
Rergister custom xlog reader callbacks for on-demand WAL download in …
Browse files Browse the repository at this point in the history
…StartupDecodingContext
  • Loading branch information
Konstantin Knizhnik authored and knizhnik committed Nov 19, 2024
1 parent b7e9ac3 commit f5cfc6f
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 22 deletions.
8 changes: 8 additions & 0 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "utils/builtins.h"
#include "utils/memutils.h"

void (*Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/* data for errcontext callback */
typedef struct LogicalErrorCallbackState
{
Expand Down Expand Up @@ -180,6 +182,12 @@ StartupDecodingContext(List *output_plugin_options,
if (!fast_forward)
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));

/*
* NEON: override page_read/segment_open/segment_close functions to support on-demand WAL download
*/
if (Custom_XLogReaderRoutines != NULL)
Custom_XLogReaderRoutines(xl_routine);

/*
* Now that the slot's xmin has been set, we can announce ourselves as a
* logical decoding backend which doesn't need to be checked individually
Expand Down
5 changes: 0 additions & 5 deletions src/backend/replication/logical/logicalfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
#include "utils/regproc.h"
#include "utils/resowner.h"

void (*LogicalFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/* Private data for writing out data */
typedef struct DecodingOutputState
{
Expand Down Expand Up @@ -211,9 +209,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
xlr.segment_open = wal_segment_open;
xlr.segment_close = wal_segment_close;

if (LogicalFuncs_Custom_XLogReaderRoutines != NULL)
LogicalFuncs_Custom_XLogReaderRoutines(&xlr);

/* restart at slot's confirmed_flush */
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
Expand Down
8 changes: 0 additions & 8 deletions src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "utils/pg_lsn.h"
#include "utils/resowner.h"

void (*SlotFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/*
* Helper function for creating a new physical replication slot with
* given arguments. Note that this function doesn't release the created
Expand Down Expand Up @@ -128,9 +126,6 @@ create_logical_replication_slot(char *name, char *plugin,
xlr.segment_open = wal_segment_open;
xlr.segment_close = wal_segment_close;

if (SlotFuncs_Custom_XLogReaderRoutines != NULL)
SlotFuncs_Custom_XLogReaderRoutines(&xlr);

Assert(!MyReplicationSlot);

/*
Expand Down Expand Up @@ -490,9 +485,6 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
xlr.segment_open = wal_segment_open;
xlr.segment_close = wal_segment_close;

if (SlotFuncs_Custom_XLogReaderRoutines != NULL)
SlotFuncs_Custom_XLogReaderRoutines(&xlr);

/*
* Create our decoding context in fast_forward mode, passing start_lsn
* as InvalidXLogRecPtr, so that we start processing from my slot's
Expand Down
3 changes: 0 additions & 3 deletions src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
* data message */
bool log_replication_commands = false;

void (*WalSender_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
/*
* State for WalSndWakeupRequest
*/
Expand Down Expand Up @@ -1285,8 +1284,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
xlr.page_read = logical_read_xlog_page;
xlr.segment_open = WalSndSegmentOpen;
xlr.segment_close = wal_segment_close;
if (WalSender_Custom_XLogReaderRoutines != NULL)
WalSender_Custom_XLogReaderRoutines(&xlr);

/*
* Create our decoding context, making it start at the previously ack'ed
Expand Down
2 changes: 1 addition & 1 deletion src/include/replication/logical.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ typedef struct LogicalDecodingContext
bool in_create;
} LogicalDecodingContext;

extern void (*LogicalFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
extern void (*Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

extern void CheckLogicalDecodingRequirements(void);

Expand Down
2 changes: 0 additions & 2 deletions src/include/replication/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ typedef struct ReplicationSlot
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)

extern void (*SlotFuncs_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);

/*
* Shared memory control area for all of replication slots.
*/
Expand Down
3 changes: 0 additions & 3 deletions src/include/replication/walsender.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ extern PGDLLIMPORT int max_wal_senders;
extern PGDLLIMPORT int wal_sender_timeout;
extern PGDLLIMPORT bool log_replication_commands;

struct XLogReaderRoutine;
extern PGDLLIMPORT void (*WalSender_Custom_XLogReaderRoutines)(struct XLogReaderRoutine *xlr);

extern void InitWalSender(void);
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
Expand Down

0 comments on commit f5cfc6f

Please sign in to comment.