Skip to content

Commit

Permalink
[PG15] Feature/replicas (#279)
Browse files Browse the repository at this point in the history
* Recovery requirements:

Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer.

* Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts.
This fixes some test failures that showed up after updating Neon code to do
more precise handling of replica's get_page_at_lsn's request_lsn lsns.

---------

Co-authored-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
  • Loading branch information
2 people authored and tristan957 committed Aug 9, 2023
1 parent 0e771f9 commit 52c0264
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -5303,6 +5303,14 @@ StartupXLOG(void)
RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
doPageWrites = lastFullPageWrites;

/*
* Setup last written lsn cache, max written LSN.
* Starting from here, we could be modifying pages through REDO, which requires
* the existance of maxLwLsn + LwLsn LRU.
*/
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

/* REDO */
if (InRecovery)
{
Expand Down Expand Up @@ -5671,8 +5679,6 @@ StartupXLOG(void)

XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
XLogCtl->maxLastWrittenLsn = EndOfLog;
dlist_init(&XLogCtl->lastWrittenLsnLRU);

/*
* Preallocate additional log files, if wanted.
Expand Down Expand Up @@ -8148,11 +8154,14 @@ xlog_redo(XLogReaderState *record)
continue;
}
result = XLogReadBufferForRedo(record, block_id, &buffer);
if (result == BLK_DONE && !IsUnderPostmaster)
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
{
/*
* In the special WAL process, blocks that are being ignored
* return BLK_DONE. Accept that.
* NEON: In the special WAL redo process, blocks that are being
* ignored return BLK_DONE. Accept that.
* Additionally, in standby mode, blocks that are not present
* in shared buffers are ignored during replay, so we also
* ignore those blocks.
*/
}
else if (result != BLK_RESTORED)
Expand Down
62 changes: 62 additions & 0 deletions src/backend/access/transam/xlogrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ typedef struct XLogRecoveryCtlData
XLogRecPtr lastReplayedReadRecPtr; /* start position */
XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */
TimeLineID lastReplayedTLI; /* timeline */
ConditionVariable replayProgressCV; /* CV for waiters */

/*
* When we're currently replaying a record, ie. in a redo function,
Expand Down Expand Up @@ -465,6 +466,7 @@ XLogRecoveryShmemInit(void)

SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogRecoveryCtl->replayProgressCV);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
}

Expand All @@ -486,6 +488,64 @@ EnableStandbyMode(void)
disable_startup_progress_timeout();
}

/*
* Wait for recovery to complete replaying all WAL up to and including
* redoEndRecPtr.
*
* This gets woken up for every WAL record replayed, so make sure you're not
* trying to wait an LSN that is too far in the future.
*/
void
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
{
static XLogRecPtr replayRecPtr = 0;

if (!RecoveryInProgress())
return;

/*
* Check the backend-local variable first, we may be able to skip accessing
* shared memory (which requires locking)
*/
if (redoEndRecPtr <= replayRecPtr)
return;

replayRecPtr = GetXLogReplayRecPtr(NULL);

/*
* Check again if we're going to need to wait, now that we've updated
* the local cached variable.
*/
if (redoEndRecPtr <= replayRecPtr)
return;

/*
* We need to wait for the variable, so prepare for that.
*
* Note: This wakes up every time a WAL record is replayed, so this can
* be expensive.
*/
ConditionVariablePrepareToSleep(&XLogRecoveryCtl->replayProgressCV);

while (redoEndRecPtr > replayRecPtr)
{
bool timeout;
timeout = ConditionVariableTimedSleep(&XLogRecoveryCtl->replayProgressCV,
10000000, /* 10 seconds */
WAIT_EVENT_RECOVERY_WAL_STREAM);

replayRecPtr = GetXLogReplayRecPtr(NULL);

if (timeout)
ereport(LOG,
(errmsg("Waiting for recovery to catch up to %X/%X (currently %X/%X)",
LSN_FORMAT_ARGS(redoEndRecPtr),
LSN_FORMAT_ARGS(replayRecPtr))));
}

ConditionVariableCancelSleep();
}

/*
* Prepare the system for WAL recovery, if needed.
*
Expand Down Expand Up @@ -2051,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}

ConditionVariableBroadcast(&XLogRecoveryCtl->replayProgressCV);
}

/*
Expand Down
1 change: 1 addition & 0 deletions src/include/access/xlogrecovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ extern void ShutdownWalRecovery(void);
extern void RemovePromoteSignalFiles(void);

extern bool HotStandbyActive(void);
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern RecoveryPauseState GetRecoveryPauseState(void);
extern void SetRecoveryPause(bool recoveryPause);
Expand Down
4 changes: 4 additions & 0 deletions src/include/access/xlogutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ typedef struct ReadLocalXLogPageNoWaitPrivate
bool end_of_wal; /* true, when end of WAL is reached */
} ReadLocalXLogPageNoWaitPrivate;

/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);

extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,
Expand Down

0 comments on commit 52c0264

Please sign in to comment.