Skip to content

Commit

Permalink
Restore running xacts from CLOG on replica startup (#7288)
Browse files Browse the repository at this point in the history
We have one pretty serious MVCC visibility bug with hot standby
replicas. We incorrectly treat any transactions that are in progress
in the primary, when the standby is started, as aborted. That can
break MVCC for queries running concurrently in the standby. It can
also lead to hint bits being set incorrectly, and that damage can last
until the replica is restarted.

The fundamental bug was that we treated any replica start as starting
from a shut down server. The fix for that is straightforward: we need
to set 'wasShutdown = false' in InitWalRecovery() (see changes in the
postgres repo).

However, that introduces a new problem: with wasShutdown = false, the
standby will not open up for queries until it receives a running-xacts
WAL record from the primary. That's correct, and that's how Postgres
hot standby always works. But it's a problem for Neon, because:

* It changes the historical behavior for existing users. Currently,
  the standby immediately opens up for queries, so if they now need to
  wait, we can breka existing use cases that were working fine
  (assuming you don't hit the MVCC issues).

* The problem is much worse for Neon than it is for standalone
  PostgreSQL, because in Neon, we can start a replica from an
  arbitrary LSN. In standalone PostgreSQL, the replica always starts
  WAL replay from a checkpoint record, and the primary arranges things
  so that there is always a running-xacts record soon after each
  checkpoint record. You can still hit this issue with PostgreSQL if
  you have a transaction with lots of subtransactions running in the
  primary, but it's pretty rare in practice.

To mitigate that, we introduce another way to collect the
running-xacts information at startup, without waiting for the
running-xacts WAL record: We can the CLOG for XIDs that haven't been
marked as committed or aborted. It has limitations with
subtransactions too, but should mitigate the problem for most users.

See #7236.
  • Loading branch information
hlinnaka committed Jun 25, 2024
1 parent c21cea0 commit 73cd094
Show file tree
Hide file tree
Showing 10 changed files with 962 additions and 41 deletions.
29 changes: 28 additions & 1 deletion pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,33 @@ impl WalIngest {
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;

// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}

// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
Expand Down Expand Up @@ -375,6 +401,7 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
pg_constants::RM_REPLORIGIN_ID => {
Expand Down
293 changes: 293 additions & 0 deletions pgxn/neon/neon.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "fmgr.h"

#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
Expand All @@ -22,10 +24,12 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
Expand Down Expand Up @@ -266,6 +270,293 @@ LogicalSlotsMonitorMain(Datum main_arg)
}
}

/*
* XXX: These private to procarray.c, but we need them here.
*/
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
#define TOTAL_MAX_CACHED_SUBXIDS \
((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)

/*
* Restore running-xact information by scanning the CLOG at startup.
*
* In PostgreSQL, a standby always has to wait for a running-xacts WAL record
* to arrive before it can start accepting queries. Furthermore, if there are
* transactions with too many subxids (> 64) open to fit in the in-memory
* subxids cache, the running-xacts record will be marked as "suboverflowed",
* and the standby will need to also wait for the currently in-progress
* transactions to finish.
*
* That's not great in PostgreSQL, because a hot standby does not necessary
* open up for queries immediately as you might expect. But it's worse in
* Neon: A standby in Neon doesn't need to start WAL replay from a checkpoint
* record; it can start at any LSN. Postgres arranges things so that there is
* a running-xacts record soon after every checkpoint record, but when you
* start from an arbitrary LSN, that doesn't help. If the primary is idle, or
* not running at all, it might never write a new running-xacts record,
* leaving the replica in a limbo where it can never start accepting queries.
*
* To mitigate that, we have an additional mechanism to find the running-xacts
* information: we scan the CLOG, making note of any XIDs not marked as
* committed or aborted. They are added to the Postgres known-assigned XIDs
* array by calling ProcArrayApplyRecoveryInfo() in the caller of this
* function.
*
* There is one big limitation with that mechanism: The size of the
* known-assigned XIDs is limited, so if there are a lot of in-progress XIDs,
* we have to give up. Furthermore, we don't know how many of the in-progress
* XIDs are subtransactions, and if we use up all the space in the
* known-assigned XIDs array for subtransactions, we might run out of space in
* the array later during WAL replay, causing the replica to shut down with
* "ERROR: too many KnownAssignedXids". The safe # of XIDs that we can add to
* the known-assigned array without risking that error later is very low,
* merely PGPROC_MAX_CACHED_SUBXIDS == 64, so we take our chances and use up
* to half of the known-assigned XIDs array for the subtransactions, even
* though that risks getting the error later.
*
* Note: It's OK if the recovered list of XIDs includes some transactions that
* have crashed in the primary, and hence will never commit. They will be seen
* as in-progress, until we see a new next running-acts record with an
* oldestActiveXid that invalidates them. That's how the known-assigned XIDs
* array always works.
*
* If scraping the CLOG doesn't succeed for some reason, like the subxid
* overflow, Postgres will fall back to waiting for a running-xacts record
* like usual.
*
* Returns true if a complete list of in-progress XIDs was scraped.
*/
static bool
RestoreRunningXactsFromClog(CheckPoint *checkpoint, TransactionId **xids, int *nxids)
{
TransactionId from;
TransactionId till;
int max_xcnt;
TransactionId *prepared_xids = NULL;
int n_prepared_xids;
TransactionId *restored_xids = NULL;
int n_restored_xids;
int next_prepared_idx;

Assert(*xids == NULL);

/*
* If the checkpoint doesn't have a valid oldestActiveXid, bail out. We
* don't know where to start the scan.
*
* This shouldn't happen, because the pageserver always maintains a valid
* oldestActiveXid nowadays. Except when starting at an old point in time
* that was ingested before the pageserver was taught to do that.
*/
if (!TransactionIdIsValid(checkpoint->oldestActiveXid))
{
elog(LOG, "cannot restore running-xacts from CLOG because oldestActiveXid is not set");
goto fail;
}

/*
* We will scan the CLOG starting from the oldest active XID.
*
* In some corner cases, the oldestActiveXid from the last checkpoint
* might already have been truncated from the CLOG. That is, oldestXid
* might be older than oldestActiveXid. That's possible because
* oldestActiveXid is only updated at checkpoints. After the last
* checkpoint, the oldest transaction might have committed, and the CLOG
* might also have been already truncated. So if oldestActiveXid is older
* than oldestXid, start at oldestXid instead. (Otherwise we'd try to
* access CLOG segments that have already been truncated away.)
*/
from = TransactionIdPrecedes(checkpoint->oldestXid, checkpoint->oldestActiveXid)
? checkpoint->oldestActiveXid : checkpoint->oldestXid;
till = XidFromFullTransactionId(checkpoint->nextXid);

/*
* To avoid "too many KnownAssignedXids" error later during replay, we
* limit number of collected transactions. This is a tradeoff: if we are
* willing to consume more of the KnownAssignedXids space for the XIDs
* now, that allows us to start up, but we might run out of space later.
*
* The size of the KnownAssignedXids array is TOTAL_MAX_CACHED_SUBXIDS,
* which is (PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS). In
* PostgreSQL, that's always enough because the primary will always write
* an XLOG_XACT_ASSIGNMENT record if a transaction has more than
* PGPROC_MAX_CACHED_SUBXIDS subtransactions. Seeing that record allows
* the standby to mark the XIDs in pg_subtrans and removing them from the
* KnowingAssignedXids array.
*
* Here, we don't know which XIDs belong to subtransactions that have
* already been WAL-logged with an XLOG_XACT_ASSIGNMENT record. If we
* wanted to be totally safe and avoid the possibility of getting a "too
* many KnownAssignedXids" error later, we would have to limit ourselves
* to PGPROC_MAX_CACHED_SUBXIDS, which is not much. And that includes top
* transaction IDs too, because we cannot distinguish between top
* transaction IDs and subtransactions here.
*
* Somewhat arbitrarily, we use up to half of KnownAssignedXids. That
* strikes a sensible balance between being useful, and risking a "too
* many KnownAssignedXids" error later.
*/
max_xcnt = TOTAL_MAX_CACHED_SUBXIDS / 2;

/*
* Collect XIDs of prepared transactions in an array. This includes only
* their top-level XIDs. We assume that StandbyRecoverPreparedTransactions
* has already been called, so we can find all the sub-transactions in
* pg_subtrans.
*/
PrescanPreparedTransactions(&prepared_xids, &n_prepared_xids);
qsort(prepared_xids, n_prepared_xids, sizeof(TransactionId), xidLogicalComparator);

/*
* Scan the CLOG, collecting in-progress XIDs into 'restored_xids'.
*/
elog(DEBUG1, "scanning CLOG between %u and %u for in-progress XIDs", from, till);
restored_xids = (TransactionId *) palloc(max_xcnt * sizeof(TransactionId));
n_restored_xids = 0;
next_prepared_idx = 0;
for (TransactionId xid = from; xid != till;)
{
XLogRecPtr xidlsn;
XidStatus xidstatus;

xidstatus = TransactionIdGetStatus(xid, &xidlsn);

/*
* "Merge" the prepared transactions into the restored_xids array as
* we go. The prepared transactions array is sorted. This is mostly
* a sanity check to ensure that all the prpeared transactions are
* seen as in-progress. (There is a check after the loop that we didn't
* miss any.)
*/
if (next_prepared_idx < n_prepared_xids && xid == prepared_xids[next_prepared_idx])
{
/*
* This is a top-level transaction ID of a prepared transaction.
* Include it in the array.
*/

/* sanity check */
if (xidstatus != TRANSACTION_STATUS_IN_PROGRESS)
{
elog(LOG, "prepared transaction %u has unexpected status %X, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}

elog(DEBUG1, "XID %u: was next prepared xact (%d / %d)", xid, next_prepared_idx, n_prepared_xids);
next_prepared_idx++;
}
else if (xidstatus == TRANSACTION_STATUS_COMMITTED)
{
elog(DEBUG1, "XID %u: was committed", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_ABORTED)
{
elog(DEBUG1, "XID %u: was aborted", xid);
goto skip;
}
else if (xidstatus == TRANSACTION_STATUS_IN_PROGRESS)
{
/*
* In-progress transactions are included in the array.
*
* Except subtransactions of the prepared transactions. They are
* already set in pg_subtrans, and hence don't need to be tracked
* in the known-assigned XIDs array.
*/
if (n_prepared_xids > 0)
{
TransactionId parent = SubTransGetParent(xid);

if (TransactionIdIsValid(parent))
{
/*
* This is a subtransaction belonging to a prepared
* transaction.
*
* Sanity check that it is in the prepared XIDs array. It
* should be, because StandbyRecoverPreparedTransactions
* populated pg_subtrans, and no other XID should be set
* in it yet. (This also relies on the fact that
* StandbyRecoverPreparedTransactions sets the parent of
* each subxid to point directly to the top-level XID,
* rather than restoring the original subtransaction
* hierarchy.)
*/
if (bsearch(&parent, prepared_xids, next_prepared_idx,
sizeof(TransactionId), xidLogicalComparator) == NULL)
{
elog(LOG, "sub-XID %u has unexpected parent %u, cannot restore running-xacts from CLOG",
xid, parent);
Assert(false);
goto fail;
}
elog(DEBUG1, "XID %u: was a subtransaction of prepared xid %u", xid, parent);
goto skip;
}
}

/* include it in the array */
elog(DEBUG1, "XID %u: is in progress", xid);
}
else
{
/*
* SUB_COMMITTED is a transient state used at commit. We don't
* expect to see that here.
*/
elog(LOG, "XID %u has unexpected status %X in pg_xact, cannot restore running-xacts from CLOG",
xid, xidstatus);
Assert(false);
goto fail;
}

if (n_restored_xids >= max_xcnt)
{
/*
* Overflowed. We won't be able to install the RunningTransactions
* snapshot.
*/
elog(LOG, "too many running xacts to restore from the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
checkpoint->oldestXid, checkpoint->oldestActiveXid,
XidFromFullTransactionId(checkpoint->nextXid));
goto fail;
}

restored_xids[n_restored_xids++] = xid;

skip:
TransactionIdAdvance(xid);
continue;
}

/* sanity check */
if (next_prepared_idx != n_prepared_xids)
{
elog(LOG, "prepared transaction ID %u was not visited in the CLOG scan, cannot restore running-xacts from CLOG",
prepared_xids[next_prepared_idx]);
Assert(false);
goto fail;
}

elog(LOG, "restored %d running xacts by scanning the CLOG; oldestXid=%u oldestActiveXid=%u nextXid %u",
n_restored_xids, checkpoint->oldestXid, checkpoint->oldestActiveXid, XidFromFullTransactionId(checkpoint->nextXid));
*nxids = n_restored_xids;
*xids = restored_xids;
return true;

fail:
*nxids = 0;
*xids = NULL;
if (restored_xids)
pfree(restored_xids);
if (prepared_xids)
pfree(prepared_xids);
return false;
}

void
_PG_init(void)
{
Expand All @@ -288,6 +579,8 @@ _PG_init(void)

pg_init_extension_server();

restore_running_xacts_callback = RestoreRunningXactsFromClog;

/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the
Expand Down
4 changes: 3 additions & 1 deletion test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3824,7 +3824,9 @@ def stop_all(self) -> "EndpointFactory":

return self

def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
Expand Down
2 changes: 1 addition & 1 deletion test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(100):
for i in range(1000):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
Expand Down
Loading

0 comments on commit 73cd094

Please sign in to comment.