Skip to content

Commit

Permalink
[yugabyte#9468] YSQL: Ensure clients don't see serialization errors i…
Browse files Browse the repository at this point in the history
…n READ COMMITTED isolation (Part-3)

Summary:
In this third part, we ensure that we don't throw kConflict errors to external
ysql clients when using READ COMMITTED isolation level. We do this by -

(1) Re-executing a statement when kConflict is seen: this is done by leveraging
savepoints. An internal savepoint is created before execution of every
statement, which is rolled back to on facing a kConflict. This helps get rid
of any provisional writes that where written by the statement before the
conflict and hence are no longer valid.

The statement is retried indefinitely until statement timeout with configurable
exponential backoff. This gives a feeling that pessimistic locking is also
in place. Note that we also lazily rely only on the statement timeout to get
rid of deadlocks, without proactively detecting them with a distributed
deadlock detection algorithm. That will be come in as a separate improvement
with pessimistic locking.

(2) Using the highest priority for READ COMMITTED txns: this helps ensure that
no other txns can abort a READ COMMITTED txn. Even other READ COMMITTED
txns can't.

Test Plan:
Jenkins: urgent

Enabled Postgres's existing eval-plan-qual isolation test with appropriate
modifications to disable cases that require features yet to be implemented on
YB.

Added a bunch of new tests from the functional spec as well:
src/test/isolation/specs/yb_pb_eval-plan-qual.spec
src/test/isolation/specs/yb_read_committed_insert.spec
src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec
src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec

Reviewers: mihnea, alex, rsami, mtakahara

Reviewed By: rsami, mtakahara

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D15383
  • Loading branch information
pkj415 authored and jayant anand committed Mar 8, 2022
1 parent 8d44fac commit 7dc868e
Show file tree
Hide file tree
Showing 40 changed files with 1,003 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.yb.pgsql;

import java.util.Collections;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -22,6 +23,13 @@
@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgIsolationRegress extends BasePgSQLTest {

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("yb_enable_read_committed_isolation", "true");
return flagMap;
}

private void runIsolationRegressTest() throws Exception {
runPgRegressTest(
PgRegressBuilder.PG_ISOLATION_REGRESS_DIR /* inputDir */, "yb_pg_isolation_schedule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
package org.yb.pgsql;

import java.util.Collections;
import java.util.Map;

import org.junit.Test;
Expand Down Expand Up @@ -43,4 +44,12 @@ protected Map<String, String> getTServerFlags() {
public void testPgRegressTransaction() throws Exception {
runPgRegressTest("yb_transaction_savepoints_schedule");
}

@Test
public void testPgRegressTransactionWithReadCommitted() throws Exception {
restartClusterWithFlags(Collections.emptyMap(),
Collections.singletonMap("TEST_inject_sleep_before_applying_intents_ms",
"100"));
runPgRegressTest("yb_transaction_savepoints_schedule");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -942,6 +940,7 @@ public List<Runnable> getRunnableThreads(ConnectionBuilder cb, Future<?> executi
int selectsFirstOpConflictDetected = 0;
int txnsSucceeded = 0;
int selectsWithAbortError = 0;
int commitOfTxnThatRequiresRestart = 0;
boolean resultsAlwaysMatched = true;

// We never expect SNAPSHOT ISOLATION/ READ COMMITTED transaction to result in "conflict"
Expand All @@ -967,7 +966,19 @@ public List<Runnable> getRunnableThreads(ConnectionBuilder cb, Future<?> executi
if (Thread.interrupted()) return; // Skips all post-loop checks
List<Row> rows2 = getRowList(executeQuery(stmt));
++numCompletedOps;
selectTxnConn.commit();
try {
selectTxnConn.commit();
} catch (Exception ex) {
// TODO(Piyush): Once #11514 is fixed, we won't have to handle this rare
// occurrence.
if (ex.getMessage().contains(
"Illegal state: Commit of transaction that requires restart is not " +
"allowed")){
commitOfTxnThatRequiresRestart++;
} else {
throw ex;
}
}
assertTrue("Two SELECTs done within same transaction mismatch" +
", " + isolation + " transaction isolation breach!",
rows1.equals(rows2) || (isolation == IsolationLevel.READ_COMMITTED));
Expand Down Expand Up @@ -1006,7 +1017,8 @@ public List<Runnable> getRunnableThreads(ConnectionBuilder cb, Future<?> executi
" selectsSecondOpRestartRequired=" + selectsSecondOpRestartRequired +
" selectsFirstOpConflictDetected=" + selectsFirstOpConflictDetected +
" txnsSucceeded=" + txnsSucceeded +
" selectsWithAbortError=" + selectsWithAbortError);
" selectsWithAbortError=" + selectsWithAbortError +
" commitOfTxnThatRequiresRestart=" + commitOfTxnThatRequiresRestart);

if (expectReadRestartErrors) {
assertTrue(selectsFirstOpRestartRequired > 0 && selectsSecondOpRestartRequired > 0);
Expand Down
93 changes: 78 additions & 15 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,14 @@ GetCurrentTransactionNestLevel(void)
return s->nestingLevel;
}

const char*
GetCurrentTransactionName(void)
{
TransactionState s = CurrentTransactionState;

return s->name;
}


/*
* TransactionIdIsCurrentTransactionId
Expand Down Expand Up @@ -1905,19 +1913,22 @@ YBInitializeTransaction(void)
if (YBTransactionsEnabled())
{
HandleYBStatus(YBCPgBeginTransaction());
HandleYBStatus(YBCPgSetTransactionIsolationLevel(XactIsoLevel));

int pg_isolation_level = XactIsoLevel;

if (pg_isolation_level == XACT_READ_UNCOMMITTED)
pg_isolation_level = XACT_READ_COMMITTED;

if ((pg_isolation_level == XACT_READ_COMMITTED) && !IsYBReadCommitted())
pg_isolation_level = XACT_REPEATABLE_READ;

HandleYBStatus(YBCPgSetTransactionIsolationLevel(pg_isolation_level));
HandleYBStatus(YBCPgEnableFollowerReads(YBReadFromFollowersEnabled(), YBFollowerReadStalenessMs()));
HandleYBStatus(YBCPgSetTransactionReadOnly(XactReadOnly));
HandleYBStatus(YBCPgSetTransactionDeferrable(XactDeferrable));
}
}

void
YBMaybeResetTransactionReadPoint(void)
{
HandleYBStatus(YBCPgMaybeResetTransactionReadPoint());
}

/*
* StartTransaction
*/
Expand Down Expand Up @@ -2906,8 +2917,7 @@ StartTransactionCommand(void)
*
* For READ COMMITTED isolation, we want to reset the read point to current ht time so that
* the query works on a newer snapshot that will include all txns committed before this
* command. There is an exception when we don't pick a new read point: in case we reach here
* as part of a read restart retry, we just have to use the restart read point.
* command.
*
* Read restart handling per statement
* -----------------------------------
Expand All @@ -2919,19 +2929,37 @@ StartTransactionCommand(void)
* records with ht after the chosen read ht and is unsure if the records were committed before
* the client issued read (as per real time), a kReadRestart will be received by postgres.
*
* Read restart retries are handled transparently for every statement in the txn. In case
* we reach here and see that the read point exists and was restarted recently as part of a
* retry, we don't pick a new read point using current time.
* Read restart retries are handled transparently for every statement in the txn in
* yb_attempt_to_restart_on_error().
*/
if (YBTransactionsEnabled() && IsYBReadCommitted()) {
if (YBTransactionsEnabled() && IsYBReadCommitted())
{
/*
* Reset field ybDataSentForCurrQuery (indicates whether any data was sent as part of the
* current query). This helps track if automatic restart of a query is possible in
* READ COMMITTED isolation level.
*/
s->ybDataSentForCurrQuery = false;
elog(DEBUG2, "Maybe resetting read point for statement in Read Committed txn");
YBMaybeResetTransactionReadPoint();
HandleYBStatus(YBCPgResetTransactionReadPoint());
elog(DEBUG2, "Resetting read point for statement in Read Committed txn");

/*
* Create a new internal sub txn before any execution. This aids in rolling back any changes
* before restarting the statement.
*
* We don't rely on the name of the internal sub transaction for rolling back to it in
* yb_attempt_to_restart_on_error(). We just assert that the name of the current sub txn
* matches before calling RollbackAndReleaseCurrentSubTransaction() to restart the
* statement.
*
* Instead of calling BeginInternalSubTransaction(), we have copy-pasted necessary logic
* into a new function since BeginInternalSubTransaction() again calls
* CommitTransactionCommand() and StartTransactionCommand() which will result in recursion.
* We could have solved the recursion problem by plumbing a flag to skip calling
* BeginInternalSubTransaction() again, but it is simpler and less error-prone to just copy
* the minimal required logic.
*/
BeginInternalSubTransactionForReadCommittedStatement();
}

break;
Expand Down Expand Up @@ -4475,6 +4503,40 @@ BeginInternalSubTransaction(const char *name)
StartTransactionCommand();
}

/*
* BeginInternalSubTransactionForReadCommittedStatement
* This is similar to BeginInternalSubTransaction() but doesn't call CommitTransactionCommand()
* and StartTransactionCommand(). It is okay to not call those since this method is called only
* in 2 specific cases (i.e., when starting a new statement in an already existing txn in
* READ COMMITED mode, or when rolling back to the internal sub txn while restarting a
* statement) and both cases satisfy the following property -
* CurrentTransactionState->blockState is TBLOCK_INPROGRESS, TBLOCK_IMPLICIT_INPROGRESS or
* TBLOCK_SUBINPROGRESS.
*/
void
BeginInternalSubTransactionForReadCommittedStatement() {
YBFlushBufferedOperations();
TransactionState s = CurrentTransactionState;

Assert(s->blockState == TBLOCK_SUBINPROGRESS ||
s->blockState == TBLOCK_IMPLICIT_INPROGRESS ||
s->blockState == TBLOCK_INPROGRESS);

if (IsInParallelMode())
ereport(ERROR,
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("cannot start subtransactions during a parallel operation")));

/* Normal subtransaction start */
PushTransaction();
s = CurrentTransactionState; /* changed by push */

s->name = MemoryContextStrdup(TopTransactionContext, YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME);

StartSubTransaction();
s->blockState = TBLOCK_SUBINPROGRESS;
}

/*
* ReleaseCurrentSubTransaction
*
Expand Down Expand Up @@ -5162,6 +5224,7 @@ PushTransaction(void)
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly;
s->parallelModeLevel = 0;
s->ybDataSentForCurrQuery = p->ybDataSentForCurrQuery;

CurrentTransactionState = s;
YBUpdateActiveSubTransaction(CurrentTransactionState);
Expand Down
10 changes: 5 additions & 5 deletions src/postgres/src/backend/executor/nodeLockRows.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ ExecLockRows(PlanState *pstate)

case HeapTupleUpdated:
/*
* TODO(Piyush): Right now in YB, READ COMMITTED isolation level maps to REPEATABLE READ and
* hence we should error out always. Once we implement READ COMMITTED in YB, we will have to
* add EvalQualPlan related handling specific to YB.
* TODO(Piyush): If handling using EvalPlanQual for READ COMMITTED in future, replace true
* with IsolationUsesXactSnapshot().
*/
if (true) // Replace with IsolationUsesXactSnapshot() once we truly support READ COMMITTED.
if (true)
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
errmsg("could not serialize access due to concurrent update"),
yb_txn_errcode(YBCGetTxnConflictErrorCode())));
if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
Expand Down
Loading

0 comments on commit 7dc868e

Please sign in to comment.