Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PG15] Feature/replicas #279

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -5303,6 +5303,14 @@ StartupXLOG(void)
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
doPageWrites = lastFullPageWrites;

/*
* Setup last written lsn cache, max written LSN.
* Starting from here, we could be modifying pages through REDO, which requires
* the existance of maxLwLsn + LwLsn LRU.
*/
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

/* REDO */
if (InRecovery)
{
Expand Down Expand Up @@ -5671,8 +5679,6 @@ StartupXLOG(void)

XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
XLogCtl->maxLastWrittenLsn = EndOfLog;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

/*
* Preallocate additional log files, if wanted.
Expand Down Expand Up @@ -8144,11 +8150,14 @@ xlog_redo(XLogReaderState *record)
continue;
}
result = XLogReadBufferForRedo(record, block_id, &buffer);
if (result == BLK_DONE && !IsUnderPostmaster)
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
{
/*
* In the special WAL process, blocks that are being ignored
* return BLK_DONE. Accept that.
* NEON: In the special WAL redo process, blocks that are being
* ignored return BLK_DONE. Accept that.
* Additionally, in standby mode, blocks that are not present
* in shared buffers are ignored during replay, so we also
* ignore those blocks.
*/
}
else if (result != BLK_RESTORED)
Expand Down
62 changes: 62 additions & 0 deletions src/backend/access/transam/xlogrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData
XLogRecPtr lastReplayedReadRecPtr; /* start position */
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
TimeLineID lastReplayedTLI; /* timeline */
ConditionVariable replayProgressCV; /* CV for waiters */

/*
* When we're currently replaying a record, ie. in a redo function,
Expand Down Expand Up @@ -464,9 +465,68 @@ XLogRecoveryShmemInit(void)

SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
}

/*
* Wait for recovery to complete replaying all WAL up to and including
* redoEndRecPtr.
*
* This gets woken up for every WAL record replayed, so make sure you're not
* trying to wait an LSN that is too far in the future.
*/
void
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
{
static XLogRecPtr replayRecPtr = 0;

if (!RecoveryInProgress())
return;

/*
* Check the backend-local variable first, we may be able to skip accessing
* shared memory (which requires locking)
*/
if (redoEndRecPtr <= replayRecPtr)
return;

replayRecPtr = GetXLogReplayRecPtr(NULL);

/*
* Check again if we're going to need to wait, now that we've updated
* the local cached variable.
*/
if (redoEndRecPtr <= replayRecPtr)
return;

/*
* We need to wait for the variable, so prepare for that.
*
* Note: This wakes up every time a WAL record is replayed, so this can
* be expensive.
*/
ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV);

while (redoEndRecPtr > replayRecPtr)
{
bool timeout;
timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV,
10000000, /* 10 seconds */
WAIT_EVENT_RECOVERY_WAL_STREAM);

replayRecPtr = GetXLogReplayRecPtr(NULL);

if (timeout)
ereport(LOG,
(errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)",
LSN_FORMAT_ARGS(redoEndRecPtr),
LSN_FORMAT_ARGS(replayRecPtr))));
}

ConditionVariableCancelSleep();
}

/*
* Prepare the system for WAL recovery, if needed.
*
Expand Down Expand Up @@ -2032,6 +2092,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}

ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV);
}

/*
Expand Down
1 change: 1 addition & 0 deletions src/include/access/xlogrecovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void);
extern void RemovePromoteSignalFiles(void);

extern bool HotStandbyActive(void);
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern RecoveryPauseState GetRecoveryPauseState(void);
extern void SetRecoveryPause(bool recoveryPause);
Expand Down
4 changes: 4 additions & 0 deletions src/include/access/xlogutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate
bool end_of_wal; /* true, when end of WAL is reached */
} ReadLocalXLogPageNoWaitPrivate;

/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);

extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,
Expand Down