From 9f2cc7fe3730025d1fd0a56517cebde362e3ab65 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Wed, 2 Feb 2022 09:34:56 -0800 Subject: [PATCH] [#9468] YSQL: Ensure clients don't see serialization errors in READ COMMITTED isolation (Part-3) Summary: In this third part, we ensure that we don't throw kConflict errors to external ysql clients when using READ COMMITTED isolation level. We do this by - (1) Re-executing a statement when kConflict is seen: this is done by leveraging savepoints. An internal savepoint is created before execution of every statement, which is rolled back to on facing a kConflict. This helps get rid of any provisional writes that where written by the statement before the conflict and hence are no longer valid. The statement is retried indefinitely until statement timeout with configurable exponential backoff. This gives a feeling that pessimistic locking is also in place. Note that we also lazily rely only on the statement timeout to get rid of deadlocks, without proactively detecting them with a distributed deadlock detection algorithm. That will be come in as a separate improvement with pessimistic locking. (2) Using the highest priority for READ COMMITTED txns: this helps ensure that no other txns can abort a READ COMMITTED txn. Even other READ COMMITTED txns can't. Test Plan: Jenkins: urgent Enabled Postgres's existing eval-plan-qual isolation test with appropriate modifications to disable cases that require features yet to be implemented on YB. Added a bunch of new tests from the functional spec as well: src/test/isolation/specs/yb_pb_eval-plan-qual.spec src/test/isolation/specs/yb_read_committed_insert.spec src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec Reviewers: mihnea, alex, rsami, mtakahara Reviewed By: rsami, mtakahara Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D15383 --- .../org/yb/pgsql/TestPgIsolationRegress.java | 8 + .../TestPgRegressTransactionSavepoints.java | 9 + .../yb/pgsql/TestPgTransparentRestarts.java | 20 +- .../src/backend/access/transam/xact.c | 93 ++++++-- .../src/backend/executor/nodeLockRows.c | 10 +- src/postgres/src/backend/tcop/postgres.c | 145 +++++++++--- src/postgres/src/include/access/xact.h | 7 +- .../expected/yb-skip-locked-after-update.out | 2 +- .../expected/yb_pg_eval-plan-qual.out | 177 +++++++++++++++ .../expected/yb_read_committed_insert.out | 48 ++++ ...read_committed_test_internal_savepoint.out | 14 ++ ..._committed_update_and_explicit_locking.out | 21 ++ .../src/test/isolation/isolationtester.c | 16 ++ .../yb-modification-followed-by-lock.spec | 4 +- .../specs/yb-skip-locked-after-update.spec | 6 +- .../isolation/specs/yb_pg_eval-plan-qual.spec | 207 ++++++++++++++++++ .../specs/yb_read_committed_insert.spec | 32 +++ ...ead_committed_test_internal_savepoint.spec | 40 ++++ ...committed_update_and_explicit_locking.spec | 48 ++++ .../test/isolation/yb_pg_isolation_schedule | 14 ++ src/yb/client/transaction.cc | 16 +- src/yb/client/transaction.h | 6 + src/yb/common/consistent_read_point.cc | 1 - src/yb/common/consistent_read_point.h | 12 - src/yb/common/row_mark.cc | 8 +- src/yb/common/row_mark.h | 4 +- src/yb/common/transaction.proto | 1 + src/yb/common/ybc_util.cc | 4 + src/yb/common/ybc_util.h | 1 + src/yb/docdb/conflict_resolution.cc | 8 +- src/yb/docdb/intent.cc | 1 + src/yb/yql/pggate/pg_session.cc | 31 ++- src/yb/yql/pggate/pg_session.h | 7 +- src/yb/yql/pggate/pg_txn_manager.cc | 63 ++++-- src/yb/yql/pggate/pg_txn_manager.h | 13 +- src/yb/yql/pggate/pggate.cc | 8 +- src/yb/yql/pggate/pggate.h | 3 +- src/yb/yql/pggate/ybc_pggate.cc | 8 +- src/yb/yql/pggate/ybc_pggate.h | 3 +- src/yb/yql/pgwrapper/libpq_utils.cc | 2 + 40 files changed, 1003 insertions(+), 118 deletions(-) create mode 100644 src/postgres/src/test/isolation/expected/yb_pg_eval-plan-qual.out create mode 100644 src/postgres/src/test/isolation/expected/yb_read_committed_insert.out create mode 100644 src/postgres/src/test/isolation/expected/yb_read_committed_test_internal_savepoint.out create mode 100644 src/postgres/src/test/isolation/expected/yb_read_committed_update_and_explicit_locking.out create mode 100644 src/postgres/src/test/isolation/specs/yb_pg_eval-plan-qual.spec create mode 100644 src/postgres/src/test/isolation/specs/yb_read_committed_insert.spec create mode 100644 src/postgres/src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec create mode 100644 src/postgres/src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgIsolationRegress.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgIsolationRegress.java index 85750b6d40f3..1a2ffe1f4681 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgIsolationRegress.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgIsolationRegress.java @@ -14,6 +14,7 @@ package org.yb.pgsql; import java.util.Collections; +import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; @@ -22,6 +23,13 @@ @RunWith(value=YBTestRunnerNonTsanOnly.class) public class TestPgIsolationRegress extends BasePgSQLTest { + @Override + protected Map getTServerFlags() { + Map flagMap = super.getTServerFlags(); + flagMap.put("yb_enable_read_committed_isolation", "true"); + return flagMap; + } + private void runIsolationRegressTest() throws Exception { runPgRegressTest( PgRegressBuilder.PG_ISOLATION_REGRESS_DIR /* inputDir */, "yb_pg_isolation_schedule", diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressTransactionSavepoints.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressTransactionSavepoints.java index d3ed33a16bc3..9ec34efffdff 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressTransactionSavepoints.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgRegressTransactionSavepoints.java @@ -12,6 +12,7 @@ // package org.yb.pgsql; +import java.util.Collections; import java.util.Map; import org.junit.Test; @@ -43,4 +44,12 @@ protected Map getTServerFlags() { public void testPgRegressTransaction() throws Exception { runPgRegressTest("yb_transaction_savepoints_schedule"); } + + @Test + public void testPgRegressTransactionWithReadCommitted() throws Exception { + restartClusterWithFlags(Collections.emptyMap(), + Collections.singletonMap("TEST_inject_sleep_before_applying_intents_ms", + "100")); + runPgRegressTest("yb_transaction_savepoints_schedule"); + } } diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java index 5578683c583a..2287783d4fa8 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgTransparentRestarts.java @@ -21,10 +21,8 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -942,6 +940,7 @@ public List getRunnableThreads(ConnectionBuilder cb, Future executi int selectsFirstOpConflictDetected = 0; int txnsSucceeded = 0; int selectsWithAbortError = 0; + int commitOfTxnThatRequiresRestart = 0; boolean resultsAlwaysMatched = true; // We never expect SNAPSHOT ISOLATION/ READ COMMITTED transaction to result in "conflict" @@ -967,7 +966,19 @@ public List getRunnableThreads(ConnectionBuilder cb, Future executi if (Thread.interrupted()) return; // Skips all post-loop checks List rows2 = getRowList(executeQuery(stmt)); ++numCompletedOps; - selectTxnConn.commit(); + try { + selectTxnConn.commit(); + } catch (Exception ex) { + // TODO(Piyush): Once #11514 is fixed, we won't have to handle this rare + // occurrence. + if (ex.getMessage().contains( + "Illegal state: Commit of transaction that requires restart is not " + + "allowed")){ + commitOfTxnThatRequiresRestart++; + } else { + throw ex; + } + } assertTrue("Two SELECTs done within same transaction mismatch" + ", " + isolation + " transaction isolation breach!", rows1.equals(rows2) || (isolation == IsolationLevel.READ_COMMITTED)); @@ -1006,7 +1017,8 @@ public List getRunnableThreads(ConnectionBuilder cb, Future executi " selectsSecondOpRestartRequired=" + selectsSecondOpRestartRequired + " selectsFirstOpConflictDetected=" + selectsFirstOpConflictDetected + " txnsSucceeded=" + txnsSucceeded + - " selectsWithAbortError=" + selectsWithAbortError); + " selectsWithAbortError=" + selectsWithAbortError + + " commitOfTxnThatRequiresRestart=" + commitOfTxnThatRequiresRestart); if (expectReadRestartErrors) { assertTrue(selectsFirstOpRestartRequired > 0 && selectsSecondOpRestartRequired > 0); diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index f2a4200ba93a..f9e06726363a 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -795,6 +795,14 @@ GetCurrentTransactionNestLevel(void) return s->nestingLevel; } +const char* +GetCurrentTransactionName(void) +{ + TransactionState s = CurrentTransactionState; + + return s->name; +} + /* * TransactionIdIsCurrentTransactionId @@ -1905,19 +1913,22 @@ YBInitializeTransaction(void) if (YBTransactionsEnabled()) { HandleYBStatus(YBCPgBeginTransaction()); - HandleYBStatus(YBCPgSetTransactionIsolationLevel(XactIsoLevel)); + + int pg_isolation_level = XactIsoLevel; + + if (pg_isolation_level == XACT_READ_UNCOMMITTED) + pg_isolation_level = XACT_READ_COMMITTED; + + if ((pg_isolation_level == XACT_READ_COMMITTED) && !IsYBReadCommitted()) + pg_isolation_level = XACT_REPEATABLE_READ; + + HandleYBStatus(YBCPgSetTransactionIsolationLevel(pg_isolation_level)); HandleYBStatus(YBCPgEnableFollowerReads(YBReadFromFollowersEnabled(), YBFollowerReadStalenessMs())); HandleYBStatus(YBCPgSetTransactionReadOnly(XactReadOnly)); HandleYBStatus(YBCPgSetTransactionDeferrable(XactDeferrable)); } } -void -YBMaybeResetTransactionReadPoint(void) -{ - HandleYBStatus(YBCPgMaybeResetTransactionReadPoint()); -} - /* * StartTransaction */ @@ -2906,8 +2917,7 @@ StartTransactionCommand(void) * * For READ COMMITTED isolation, we want to reset the read point to current ht time so that * the query works on a newer snapshot that will include all txns committed before this - * command. There is an exception when we don't pick a new read point: in case we reach here - * as part of a read restart retry, we just have to use the restart read point. + * command. * * Read restart handling per statement * ----------------------------------- @@ -2919,19 +2929,37 @@ StartTransactionCommand(void) * records with ht after the chosen read ht and is unsure if the records were committed before * the client issued read (as per real time), a kReadRestart will be received by postgres. * - * Read restart retries are handled transparently for every statement in the txn. In case - * we reach here and see that the read point exists and was restarted recently as part of a - * retry, we don't pick a new read point using current time. + * Read restart retries are handled transparently for every statement in the txn in + * yb_attempt_to_restart_on_error(). */ - if (YBTransactionsEnabled() && IsYBReadCommitted()) { + if (YBTransactionsEnabled() && IsYBReadCommitted()) + { /* * Reset field ybDataSentForCurrQuery (indicates whether any data was sent as part of the * current query). This helps track if automatic restart of a query is possible in * READ COMMITTED isolation level. */ s->ybDataSentForCurrQuery = false; - elog(DEBUG2, "Maybe resetting read point for statement in Read Committed txn"); - YBMaybeResetTransactionReadPoint(); + HandleYBStatus(YBCPgResetTransactionReadPoint()); + elog(DEBUG2, "Resetting read point for statement in Read Committed txn"); + + /* + * Create a new internal sub txn before any execution. This aids in rolling back any changes + * before restarting the statement. + * + * We don't rely on the name of the internal sub transaction for rolling back to it in + * yb_attempt_to_restart_on_error(). We just assert that the name of the current sub txn + * matches before calling RollbackAndReleaseCurrentSubTransaction() to restart the + * statement. + * + * Instead of calling BeginInternalSubTransaction(), we have copy-pasted necessary logic + * into a new function since BeginInternalSubTransaction() again calls + * CommitTransactionCommand() and StartTransactionCommand() which will result in recursion. + * We could have solved the recursion problem by plumbing a flag to skip calling + * BeginInternalSubTransaction() again, but it is simpler and less error-prone to just copy + * the minimal required logic. + */ + BeginInternalSubTransactionForReadCommittedStatement(); } break; @@ -4475,6 +4503,40 @@ BeginInternalSubTransaction(const char *name) StartTransactionCommand(); } +/* + * BeginInternalSubTransactionForReadCommittedStatement + * This is similar to BeginInternalSubTransaction() but doesn't call CommitTransactionCommand() + * and StartTransactionCommand(). It is okay to not call those since this method is called only + * in 2 specific cases (i.e., when starting a new statement in an already existing txn in + * READ COMMITED mode, or when rolling back to the internal sub txn while restarting a + * statement) and both cases satisfy the following property - + * CurrentTransactionState->blockState is TBLOCK_INPROGRESS, TBLOCK_IMPLICIT_INPROGRESS or + * TBLOCK_SUBINPROGRESS. + */ +void +BeginInternalSubTransactionForReadCommittedStatement() { + YBFlushBufferedOperations(); + TransactionState s = CurrentTransactionState; + + Assert(s->blockState == TBLOCK_SUBINPROGRESS || + s->blockState == TBLOCK_IMPLICIT_INPROGRESS || + s->blockState == TBLOCK_INPROGRESS); + + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start subtransactions during a parallel operation"))); + + /* Normal subtransaction start */ + PushTransaction(); + s = CurrentTransactionState; /* changed by push */ + + s->name = MemoryContextStrdup(TopTransactionContext, YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME); + + StartSubTransaction(); + s->blockState = TBLOCK_SUBINPROGRESS; +} + /* * ReleaseCurrentSubTransaction * @@ -5162,6 +5224,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->ybDataSentForCurrQuery = p->ybDataSentForCurrQuery; CurrentTransactionState = s; YBUpdateActiveSubTransaction(CurrentTransactionState); diff --git a/src/postgres/src/backend/executor/nodeLockRows.c b/src/postgres/src/backend/executor/nodeLockRows.c index b0c7ffdd43f3..67e9d6aca956 100644 --- a/src/postgres/src/backend/executor/nodeLockRows.c +++ b/src/postgres/src/backend/executor/nodeLockRows.c @@ -250,14 +250,14 @@ ExecLockRows(PlanState *pstate) case HeapTupleUpdated: /* - * TODO(Piyush): Right now in YB, READ COMMITTED isolation level maps to REPEATABLE READ and - * hence we should error out always. Once we implement READ COMMITTED in YB, we will have to - * add EvalQualPlan related handling specific to YB. + * TODO(Piyush): If handling using EvalPlanQual for READ COMMITTED in future, replace true + * with IsolationUsesXactSnapshot(). */ - if (true) // Replace with IsolationUsesXactSnapshot() once we truly support READ COMMITTED. + if (true) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("could not serialize access due to concurrent update"))); + errmsg("could not serialize access due to concurrent update"), + yb_txn_errcode(YBCGetTxnConflictErrorCode()))); if (ItemPointerIndicatesMovedPartitions(&hufd.ctid)) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), diff --git a/src/postgres/src/backend/tcop/postgres.c b/src/postgres/src/backend/tcop/postgres.c index e7b7d7bad5c8..9ef8e74ab524 100644 --- a/src/postgres/src/backend/tcop/postgres.c +++ b/src/postgres/src/backend/tcop/postgres.c @@ -4048,17 +4048,24 @@ yb_is_restart_possible(const ErrorData* edata, return false; } + elog(DEBUG1, "Error details: edata->message=%s edata->filename=%s edata->lineno=%d", + edata->message, edata->filename, edata->lineno); bool is_read_restart_error = YBCIsRestartReadError(edata->yb_txn_errcode); bool is_conflict_error = YBCIsTxnConflictError(edata->yb_txn_errcode); if (!is_read_restart_error && !is_conflict_error) { if (yb_debug_log_internal_restarts) - elog(LOG, "Restart isn't possible, code %d isn't a restart/conflict error", + elog(LOG, "Restart isn't possible, code %d isn't a read restart/conflict error", edata->yb_txn_errcode); return false; } - if (is_conflict_error && attempt >= YBCGetMaxWriteRestartAttempts()) + /* + * In case of READ COMMITTED, retries for kConflict are performed indefinitely until statement + * timeout is hit. + */ + if (!IsYBReadCommitted() && + (is_conflict_error && attempt >= YBCGetMaxWriteRestartAttempts())) { if (yb_debug_log_internal_restarts) elog(LOG, "Restart isn't possible, we're out of write restart attempts (%d)", @@ -4084,9 +4091,8 @@ yb_is_restart_possible(const ErrorData* edata, // We can perform kReadRestart retries in READ COMMITTED isolation level even if data has been // sent as part of the txn, but not as part of the current query. This is because we just have to // retry the query and not the whole transaction. - if ((XactIsoLevel != XACT_READ_COMMITTED && YBIsDataSent()) || - (XactIsoLevel == XACT_READ_COMMITTED && is_conflict_error && YBIsDataSent()) || - (XactIsoLevel == XACT_READ_COMMITTED && is_read_restart_error && YBIsDataSentForCurrQuery())) + if ((!IsYBReadCommitted() && YBIsDataSent()) || + (IsYBReadCommitted() && YBIsDataSentForCurrQuery())) { if (yb_debug_log_internal_restarts) elog(LOG, "Restart isn't possible, data was already sent. Txn error code=%d", @@ -4109,8 +4115,7 @@ yb_is_restart_possible(const ErrorData* edata, return false; } - // TODO(Piyush): Restart even in sub-transactions if in READ COMMITTED isolation. - if (IsSubTransaction()) { + if (IsSubTransaction() && !IsYBReadCommitted()) { if (yb_debug_log_internal_restarts) elog(LOG, "Restart isn't possible, savepoints have been used"); return false; @@ -4376,45 +4381,122 @@ yb_attempt_to_restart_on_error(int attempt, attempt))); } /* - * Cleanup the error, restart portal, restart txn and let the control - * flow continue + * Cleanup the error and restart portal. */ FlushErrorState(); - if (restart_data->portal_name) { + if (restart_data->portal_name) + { + elog(DEBUG1, "Restarting portal %s for retry", restart_data->portal_name); yb_restart_portal(restart_data->portal_name); } YBRestoreOutputBufferPosition(); - /* - * The txn might or might not have performed writes. Reset the state in - * either case to avoid checking/tracking if a write could have been - * performed. - */ - YBCRestartWriteTransaction(); - - if (YBCIsRestartReadError(edata->yb_txn_errcode)) - { - YBCRestartTransaction(false /* force_restart */); - } - else if (YBCIsTxnConflictError(edata->yb_txn_errcode)) + if (IsYBReadCommitted() && IsInTransactionBlock(true /* isTopLevel */)) { /* - * Recreate the YB state for the transaction. This call preserves the - * priority of the current YB transaction so that when we retry, we re-use - * the same priority. + * In this case the txn is not restarted, just the statement is restarted after rolling back + * to the internal savepoint registered at start of the statement. + */ + elog(DEBUG1, "Rolling back statement"); + + /* + * Presence of triggers pushes additional snapshots. Pop all of them. + */ + PopAllActiveSnapshots(); + + // TODO(Piyush): Perform pg_session_->InvalidateForeignKeyReferenceCache() and create tests + // that would fail without this. + + /* + * Rollback to the savepoint that was started in StartTransactionCommand() for READ COMMITTED + * isolation. */ - YBCRecreateTransaction(); - pg_usleep(yb_get_sleep_usecs_on_txn_conflict(attempt)); + + if (restart_data->portal_name) + { + Portal portal = GetPortalByName(restart_data->portal_name); + Assert(portal); + + /* + * Set the createSubid to the next internal sub txn that we are going to create after + * RollbackAndReleaseCurrentSubTransaction(). This is ensure + * RollbackAndReleaseCurrentSubTransaction() doesn't clean up the portal we had just + * restarted using yb_restart_portal(). + */ + portal->createSubid = GetCurrentSubTransactionId() + 1; + portal->activeSubid = portal->createSubid; + ResourceOwnerNewParent(portal->resowner, NULL); + } + + Assert(strcmp(GetCurrentTransactionName(), YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME) == 0); + RollbackAndReleaseCurrentSubTransaction(); + BeginInternalSubTransactionForReadCommittedStatement(); + + if (restart_data->portal_name) + { + Portal portal = GetPortalByName(restart_data->portal_name); + Assert(portal); + ResourceOwnerNewParent(portal->resowner, CurTransactionResourceOwner); + } + + if (YBCIsRestartReadError(edata->yb_txn_errcode)) + { + HandleYBStatus(YBCPgRestartReadPoint()); + } + else if (YBCIsTxnConflictError(edata->yb_txn_errcode)) + { + HandleYBStatus(YBCPgResetTransactionReadPoint()); + pg_usleep(yb_get_sleep_usecs_on_txn_conflict(attempt)); + } + else + { + /* + * We shouldn't really be able to reach here. If yb_is_restart_possible() + * was true, the error should have been either of kReadRestart/kConflict + */ + MemoryContextSwitchTo(error_context); + PG_RE_THROW(); + } } else { /* - * We shouldn't really be able to reach here. If yb_is_restart_possible() - * was true, the error should have been either of kReadRestart/kConflict + * In this case the txn is restarted, which can be done since we haven't executed even the + * first statement fully and no data has been sent to the client. + */ + elog(DEBUG1, "Restarting txn"); + + /* + * The txn might or might not have performed writes. Reset the state in + * either case to avoid checking/tracking if a write could have been + * performed. */ - MemoryContextSwitchTo(error_context); - PG_RE_THROW(); + YBCRestartWriteTransaction(); + + if (YBCIsRestartReadError(edata->yb_txn_errcode)) + { + YBCRestartTransaction(false /* force_restart */); + } + else if (YBCIsTxnConflictError(edata->yb_txn_errcode)) + { + /* + * Recreate the YB state for the transaction. This call preserves the + * priority of the current YB transaction so that when we retry, we re-use + * the same priority. + */ + YBCRecreateTransaction(); + pg_usleep(yb_get_sleep_usecs_on_txn_conflict(attempt)); + } + else + { + /* + * We shouldn't really be able to reach here. If yb_is_restart_possible() + * was true, the error should have been either of kReadRestart/kConflict + */ + MemoryContextSwitchTo(error_context); + PG_RE_THROW(); + } } } else { /* if we shouldn't restart - propagate the error */ @@ -4438,6 +4520,7 @@ yb_exec_query_wrapper(MemoryContext exec_context, bool retry = true; for (int attempt = 0; retry; ++attempt) { + elog(DEBUG2, "yb_exec_query_wrapper attempt %d for %s", attempt, restart_data->query_string); YBSaveOutputBufferPosition( !yb_is_begin_transaction(restart_data->command_tag)); PG_TRY(); diff --git a/src/postgres/src/include/access/xact.h b/src/postgres/src/include/access/xact.h index c7f2e77bdf41..a8b46f96a2e8 100644 --- a/src/postgres/src/include/access/xact.h +++ b/src/postgres/src/include/access/xact.h @@ -39,6 +39,8 @@ #define XACT_REPEATABLE_READ 2 #define XACT_SERIALIZABLE 3 +#define YB_READ_COMMITTED_INTERNAL_SUB_TXN_NAME "yb_internal_txn_for_read_committed" + extern int DefaultXactIsoLevel; extern PGDLLIMPORT int XactIsoLevel; @@ -372,11 +374,13 @@ extern TimestampTz GetCurrentStatementStartTimestamp(void); extern TimestampTz GetCurrentTransactionStopTimestamp(void); extern void SetCurrentStatementStartTimestamp(void); extern int GetCurrentTransactionNestLevel(void); +extern const char* GetCurrentTransactionName(void); extern bool TransactionIdIsCurrentTransactionId(TransactionId xid); extern void CommandCounterIncrement(void); extern void ForceSyncCommit(void); extern void YBInitializeTransaction(void); -extern void YBMaybeResetTransactionReadPoint(void); +extern void YBResetTransactionReadPoint(void); +extern void YBRestartReadPoint(void); extern void StartTransactionCommand(void); extern void YBCRestartWriteTransaction(void); extern void SetTxnWithPGRel(void); @@ -393,6 +397,7 @@ extern void ReleaseSavepoint(const char *name); extern void DefineSavepoint(const char *name); extern void RollbackToSavepoint(const char *name); extern void BeginInternalSubTransaction(const char *name); +extern void BeginInternalSubTransactionForReadCommittedStatement(); extern void ReleaseCurrentSubTransaction(void); extern void RollbackAndReleaseCurrentSubTransaction(void); extern bool IsSubTransaction(void); diff --git a/src/postgres/src/test/isolation/expected/yb-skip-locked-after-update.out b/src/postgres/src/test/isolation/expected/yb-skip-locked-after-update.out index 0d08b9a66ebe..4bc0c37b5ad7 100644 --- a/src/postgres/src/test/isolation/expected/yb-skip-locked-after-update.out +++ b/src/postgres/src/test/isolation/expected/yb-skip-locked-after-update.out @@ -51,7 +51,7 @@ id data status 2 bar NEW step s1b: UPDATE queue set status='OLD' WHERE id=1; step s1c: COMMIT; -step s2_sleep: SELECT pg_sleep(5); +step s2_sleep: SELECT pg_sleep(1); pg_sleep diff --git a/src/postgres/src/test/isolation/expected/yb_pg_eval-plan-qual.out b/src/postgres/src/test/isolation/expected/yb_pg_eval-plan-qual.out new file mode 100644 index 000000000000..86246b7464cc --- /dev/null +++ b/src/postgres/src/test/isolation/expected/yb_pg_eval-plan-qual.out @@ -0,0 +1,177 @@ +Parsed test spec with 3 sessions + +starting permutation: wx1 wx2 c1 c2 read +step wx1: UPDATE accounts SET balance = balance - 200 WHERE accountid = 'checking'; +step wx2: UPDATE accounts SET balance = balance + 450 WHERE accountid = 'checking'; +step c1: COMMIT; +step wx2: <... completed> +step c2: COMMIT; +step read: SELECT * FROM accounts ORDER BY accountid; +accountid balance + +checking 850 +savings 600 + +starting permutation: wy1 wy2 c1 c2 read +step wy1: UPDATE accounts SET balance = balance + 500 WHERE accountid = 'checking'; +step wy2: UPDATE accounts SET balance = balance + 1000 WHERE accountid = 'checking' AND balance < 1000; +step c1: COMMIT; +step wy2: <... completed> +step c2: COMMIT; +step read: SELECT * FROM accounts ORDER BY accountid; +accountid balance + +checking 1100 +savings 600 + +starting permutation: upsert1 upsert2 c1 c2 read +step upsert1: + WITH upsert AS + (UPDATE accounts SET balance = balance + 500 + WHERE accountid = 'savings' + RETURNING accountid) + INSERT INTO accounts SELECT 'savings', 500 + WHERE NOT EXISTS (SELECT 1 FROM upsert); + +step upsert2: + WITH upsert AS + (UPDATE accounts SET balance = balance + 1234 + WHERE accountid = 'savings' + RETURNING accountid) + INSERT INTO accounts SELECT 'savings', 1234 + WHERE NOT EXISTS (SELECT 1 FROM upsert); + +step c1: COMMIT; +step upsert2: <... completed> +step c2: COMMIT; +step read: SELECT * FROM accounts ORDER BY accountid; +accountid balance + +checking 600 +savings 2334 + +starting permutation: wx2 partiallock c2 c1 read +step wx2: UPDATE accounts SET balance = balance + 450 WHERE accountid = 'checking'; +step partiallock: + SELECT * FROM accounts a1, accounts a2 + WHERE a1.accountid = a2.accountid + FOR UPDATE OF a1; + +step c2: COMMIT; +step partiallock: <... completed> +accountid balance accountid balance + +savings 600 savings 600 +checking 1050 checking 1050 +step c1: COMMIT; +step read: SELECT * FROM accounts ORDER BY accountid; +accountid balance + +checking 1050 +savings 600 + +starting permutation: wx2 lockwithvalues c2 c1 read +step wx2: UPDATE accounts SET balance = balance + 450 WHERE accountid = 'checking'; +step lockwithvalues: + SELECT * FROM accounts a1, (values('checking'),('savings')) v(id) + WHERE a1.accountid = v.id + FOR UPDATE OF a1; + +step c2: COMMIT; +step lockwithvalues: <... completed> +accountid balance id + +checking 1050 checking +savings 600 savings +step c1: COMMIT; +step read: SELECT * FROM accounts ORDER BY accountid; +accountid balance + +checking 1050 +savings 600 + +starting permutation: updateforss readforss c1 c2 +step updateforss: + UPDATE table_a SET value = 'newTableAValue' WHERE id = 1; + UPDATE table_b SET value = 'newTableBValue' WHERE id = 1; + +step readforss: + SELECT ta.id AS ta_id, ta.value AS ta_value, + (SELECT ROW(tb.id, tb.value) + FROM table_b tb WHERE ta.id = tb.id) AS tb_row + FROM table_a ta + WHERE ta.id = 1 FOR UPDATE OF ta; + +step c1: COMMIT; +step readforss: <... completed> +ta_id ta_value tb_row + +1 newTableAValue (1,newTableBValue) +step c2: COMMIT; + +starting permutation: updateforcip updateforcip2 c1 c2 read_a +step updateforcip: + UPDATE table_a SET value = NULL WHERE id = 1; + +step updateforcip2: + UPDATE table_a SET value = COALESCE(value, (SELECT text 'newValue')) WHERE id = 1; + +step c1: COMMIT; +step updateforcip2: <... completed> +step c2: COMMIT; +step read_a: SELECT * FROM table_a ORDER BY id; +id value + +1 newValue + +starting permutation: updateforcip updateforcip3 c1 c2 read_a +step updateforcip: + UPDATE table_a SET value = NULL WHERE id = 1; + +step updateforcip3: + WITH d(val) AS (SELECT text 'newValue' FROM generate_series(1,1)) + UPDATE table_a SET value = COALESCE(value, (SELECT val FROM d)) WHERE id = 1; + +step c1: COMMIT; +step updateforcip3: <... completed> +step c2: COMMIT; +step read_a: SELECT * FROM table_a ORDER BY id; +id value + +1 newValue + +starting permutation: wrtwcte readwcte c1 c2 +step wrtwcte: UPDATE table_a SET value = 'tableAValue2' WHERE id = 1; +step readwcte: + WITH + cte1 AS ( + SELECT id FROM table_b WHERE value = 'tableBValue' + ), + cte2 AS ( + SELECT * FROM table_a + WHERE id = (SELECT id FROM cte1) + FOR UPDATE + ) + SELECT * FROM cte2; + +step c1: COMMIT; +step c2: COMMIT; +step readwcte: <... completed> +id value + +1 tableAValue2 + +starting permutation: wrtwcte multireadwcte c1 c2 +step wrtwcte: UPDATE table_a SET value = 'tableAValue2' WHERE id = 1; +step multireadwcte: + WITH updated AS ( + UPDATE table_a SET value = 'tableAValue3' WHERE id = 1 RETURNING id + ) + SELECT (SELECT id FROM updated) AS subid, * FROM updated; + +step c1: COMMIT; +step c2: COMMIT; +step multireadwcte: <... completed> +subid id + +1 1 diff --git a/src/postgres/src/test/isolation/expected/yb_read_committed_insert.out b/src/postgres/src/test/isolation/expected/yb_read_committed_insert.out new file mode 100644 index 000000000000..92af137c4fe8 --- /dev/null +++ b/src/postgres/src/test/isolation/expected/yb_read_committed_insert.out @@ -0,0 +1,48 @@ +Parsed test spec with 2 sessions + +starting permutation: update_k1_to_2 insert_k1 c2 select c1 +step update_k1_to_2: update test set k=2 where k=1; +step insert_k1: insert into test values (1, 1); +step c2: commit; +step insert_k1: <... completed> +step select: select * from test; +k v + +1 1 +2 1 +step c1: commit; + +starting permutation: update_k1_to_2 insert_k2 c2 r1 select +step update_k1_to_2: update test set k=2 where k=1; +step insert_k2: insert into test values (2, 1); +step c2: commit; +step insert_k2: <... completed> +error in steps c2 insert_k2: ERROR: duplicate key value violates unique constraint "test_pkey" +step r1: rollback; +step select: select * from test; +k v + +2 1 + +starting permutation: update_k1_to_2 insert_k1_on_conflict c2 select c1 +step update_k1_to_2: update test set k=2 where k=1; +step insert_k1_on_conflict: insert into test values (1, 1) on conflict (k) do update set v=100; +step c2: commit; +step insert_k1_on_conflict: <... completed> +step select: select * from test; +k v + +1 1 +2 1 +step c1: commit; + +starting permutation: update_k1_to_2 insert_k2_on_conflict c2 select c1 +step update_k1_to_2: update test set k=2 where k=1; +step insert_k2_on_conflict: insert into test values (2, 1) on conflict (k) do update set v=100; +step c2: commit; +step insert_k2_on_conflict: <... completed> +step select: select * from test; +k v + +2 100 +step c1: commit; diff --git a/src/postgres/src/test/isolation/expected/yb_read_committed_test_internal_savepoint.out b/src/postgres/src/test/isolation/expected/yb_read_committed_test_internal_savepoint.out new file mode 100644 index 000000000000..16750b32ab64 --- /dev/null +++ b/src/postgres/src/test/isolation/expected/yb_read_committed_test_internal_savepoint.out @@ -0,0 +1,14 @@ +Parsed test spec with 2 sessions + +starting permutation: update_k2_in_s2 update_k1_in_s1 update_k2_in_s1 c2 select c1 +step update_k2_in_s2: update test set v=40 where k=2; +step update_k1_in_s1: update test set v=10 where k=1; +step update_k2_in_s1: update test set v=20 where k=2; +step c2: commit; +step update_k2_in_s1: <... completed> +step select: select * from test; +k v + +1 10 +2 20 +step c1: commit; diff --git a/src/postgres/src/test/isolation/expected/yb_read_committed_update_and_explicit_locking.out b/src/postgres/src/test/isolation/expected/yb_read_committed_update_and_explicit_locking.out new file mode 100644 index 000000000000..7a1739dc1a90 --- /dev/null +++ b/src/postgres/src/test/isolation/expected/yb_read_committed_update_and_explicit_locking.out @@ -0,0 +1,21 @@ +Parsed test spec with 2 sessions + +starting permutation: insert_new_satisfying_row_k_5 delete_satisfying_row update_true_to_false update_true_to_true update_false_to_true pk_update update c2 select c1 +step insert_new_satisfying_row_k_5: insert into test values (5, 5); +step delete_satisfying_row: delete from test where k=3; +step update_true_to_false: update test set v=1 where k=1; +step update_true_to_true: update test set v=10 where k=2; +step update_false_to_true: update test set v=10 where k=4; +step pk_update: update test set k=10 where k=0; +step update: update test set v=100 where v>=5; +step c2: commit; +step update: <... completed> +step select: select * from test; +k v + +5 100 +1 1 +10 100 +4 100 +2 100 +step c1: commit; diff --git a/src/postgres/src/test/isolation/isolationtester.c b/src/postgres/src/test/isolation/isolationtester.c index d32dbb7d68c3..86697295bcbc 100644 --- a/src/postgres/src/test/isolation/isolationtester.c +++ b/src/postgres/src/test/isolation/isolationtester.c @@ -765,6 +765,22 @@ try_complete_step(Step *step, int flags) td *= USECS_PER_SEC; td += (int64) current_time.tv_usec - (int64) start_time.tv_usec; + /* Yugabyte specific logic: + * Since we don't use pg_locks, we can't determine if a session is blocked on another + * session using the PREP_WAITING function above. So, we instead assume that being blocked + * for >= 2 second means the session is waiting on another session. + * + * This is not a perfect check but good enough for now. + * + * TODO(Piyush): Replace this by a deterministic check when pessimistic locking is + * implemented and wait queue information is exposed via Pg. + */ + if (td > 2 * USECS_PER_SEC && !canceled) { + if (!(flags & STEP_RETRY)) + printf("step %s: %s \n", step->name, step->sql); + return true; + } + /* * After 180 seconds, try to cancel the query. * diff --git a/src/postgres/src/test/isolation/specs/yb-modification-followed-by-lock.spec b/src/postgres/src/test/isolation/specs/yb-modification-followed-by-lock.spec index 1f3cbbe0f2d3..c57df3f12ee5 100644 --- a/src/postgres/src/test/isolation/specs/yb-modification-followed-by-lock.spec +++ b/src/postgres/src/test/isolation/specs/yb-modification-followed-by-lock.spec @@ -14,14 +14,14 @@ teardown } session "s1" -setup { BEGIN; } +setup { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; } step "s1a" { SELECT * FROM queue; -- this is just to ensure we have picked the read point} step "s1b" { SELECT * FROM queue ORDER BY id FOR UPDATE LIMIT 1; } step "s1c" { COMMIT; } session "s2" -setup { BEGIN; } +setup { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; } step "s2a" { SELECT * FROM queue; -- this is just to ensure we have picked the read point} step "s2b" { UPDATE queue set status='OLD' WHERE id=1; } step "s2c" { COMMIT; } diff --git a/src/postgres/src/test/isolation/specs/yb-skip-locked-after-update.spec b/src/postgres/src/test/isolation/specs/yb-skip-locked-after-update.spec index 39127ed7c06a..bb7dafdf701e 100644 --- a/src/postgres/src/test/isolation/specs/yb-skip-locked-after-update.spec +++ b/src/postgres/src/test/isolation/specs/yb-skip-locked-after-update.spec @@ -21,7 +21,7 @@ teardown # 3. committed and applied. session "s1" -setup { BEGIN; } +setup { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; } step "s1a" { SELECT * FROM queue; -- this is just to ensure we have picked the read point} # UPDATE takes a kStrongRead + kStrongWrite intent on the sub doc key made of # (hash code, pk, status col). Also the value portion will have 'OLD'. @@ -30,14 +30,14 @@ step "s1c" { COMMIT; } session "s2" -setup { BEGIN; } +setup { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; } step "s2a" { SELECT * FROM queue; -- this is just to ensure we have picked the read point} # FOR UPDATE attempts to take a kStrongRead + kStrongWrite on the sub doc key made of # (hash code, pk, status col) with the value portion empty. But it will skip locking the first row # with id=1 due to the above UPDATE (if that executes earlier). step "s2b" { SELECT * FROM queue ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1; } step "s2c" { COMMIT; } -step "s2_sleep" { SELECT pg_sleep(5); } +step "s2_sleep" { SELECT pg_sleep(1); } # (1) case of pending update: # Ensure that the SELECT skips locking the row that has been implicitly locked in a conflicting diff --git a/src/postgres/src/test/isolation/specs/yb_pg_eval-plan-qual.spec b/src/postgres/src/test/isolation/specs/yb_pg_eval-plan-qual.spec new file mode 100644 index 000000000000..2d518711d648 --- /dev/null +++ b/src/postgres/src/test/isolation/specs/yb_pg_eval-plan-qual.spec @@ -0,0 +1,207 @@ +# Tests for the EvalPlanQual mechanism +# +# EvalPlanQual is used in READ COMMITTED isolation level to attempt to +# re-execute UPDATE and DELETE operations against rows that were updated +# by some concurrent transaction. + +setup +{ + CREATE TABLE accounts (accountid text PRIMARY KEY, balance numeric not null); + INSERT INTO accounts VALUES ('checking', 600), ('savings', 600); + + /* + * TODO: Currently alter on YB results in test failure because an online schema change + * marks the other sessions's txns as expired. To fix it, we will have to make changes to + * isolationtester.c and start other sessions only after setup is done. + * + * CREATE TABLE accounts_ext (accountid text PRIMARY KEY, balance numeric not null, other text); + * INSERT INTO accounts_ext VALUES ('checking', 600, 'other'), ('savings', 700, null); + * ALTER TABLE accounts_ext ADD COLUMN newcol int DEFAULT 42; + * ALTER TABLE accounts_ext ADD COLUMN newcol2 text DEFAULT NULL; + */ + + /* + * Commenting out tests with table inheritance since we don't support it yet. + * CREATE TABLE p (a int, b int, c int); + * CREATE TABLE c1 () INHERITS (p); + * CREATE TABLE c2 () INHERITS (p); + * CREATE TABLE c3 () INHERITS (p); + * INSERT INTO c1 SELECT 0, a / 3, a % 3 FROM generate_series(0, 9) a; + * INSERT INTO c2 SELECT 1, a / 3, a % 3 FROM generate_series(0, 9) a; + * INSERT INTO c3 SELECT 2, a / 3, a % 3 FROM generate_series(0, 9) a; + */ + + CREATE TABLE table_a (id integer, value text); + CREATE TABLE table_b (id integer, value text); + INSERT INTO table_a VALUES (1, 'tableAValue'); + INSERT INTO table_b VALUES (1, 'tableBValue'); + + CREATE TABLE jointest AS SELECT generate_series(1,10) AS id, 0 AS data; + CREATE INDEX ON jointest(id); +} + +teardown +{ + DROP TABLE accounts; + DROP TABLE table_a, table_b, jointest; + + /* + * DROP TABLE accounts_ext; + * DROP TABLE p CASCADE; + */ +} + +session "s1" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +# wx1 then wx2 checks the basic case of re-fetching up-to-date values +step "wx1" { UPDATE accounts SET balance = balance - 200 WHERE accountid = 'checking'; } +# wy1 then wy2 checks the case where quals pass then fail +step "wy1" { UPDATE accounts SET balance = balance + 500 WHERE accountid = 'checking'; } +# upsert tests are to check writable-CTE cases +step "upsert1" { + WITH upsert AS + (UPDATE accounts SET balance = balance + 500 + WHERE accountid = 'savings' + RETURNING accountid) + INSERT INTO accounts SELECT 'savings', 500 + WHERE NOT EXISTS (SELECT 1 FROM upsert); +} + +# tests with table p check inheritance cases: +# readp1/writep1/readp2 tests a bug where nodeLockRows did the wrong thing +# when the first updated tuple was in a non-first child table. +# writep2/returningp1 tests a memory allocation issue + +step "readp1" { SELECT tableoid::regclass, ctid, * FROM p WHERE b IN (0, 1) AND c = 0 FOR UPDATE; } +step "writep1" { UPDATE p SET b = -1 WHERE a = 1 AND b = 1 AND c = 0; } +step "writep2" { UPDATE p SET b = -b WHERE a = 1 AND c = 0; } +step "c1" { COMMIT; } + +# these tests are meant to exercise EvalPlanQualFetchRowMarks, +# ie, handling non-locked tables in an EvalPlanQual recheck + +step "partiallock" { + SELECT * FROM accounts a1, accounts a2 + WHERE a1.accountid = a2.accountid + FOR UPDATE OF a1; +} +step "lockwithvalues" { + SELECT * FROM accounts a1, (values('checking'),('savings')) v(id) + WHERE a1.accountid = v.id + FOR UPDATE OF a1; +} +step "partiallock_ext" { + SELECT * FROM accounts_ext a1, accounts_ext a2 + WHERE a1.accountid = a2.accountid + FOR UPDATE OF a1; +} + +# these tests exercise EvalPlanQual with a SubLink sub-select (which should be +# unaffected by any EPQ recheck behavior in the outer query); cf bug #14034 + +step "updateforss" { + UPDATE table_a SET value = 'newTableAValue' WHERE id = 1; + UPDATE table_b SET value = 'newTableBValue' WHERE id = 1; +} + +# these tests exercise EvalPlanQual with conditional InitPlans which +# have not been executed prior to the EPQ + +step "updateforcip" { + UPDATE table_a SET value = NULL WHERE id = 1; +} + +# these tests exercise mark/restore during EPQ recheck, cf bug #15032 + +step "selectjoinforupdate" { + set enable_nestloop to 0; + set enable_hashjoin to 0; + set enable_seqscan to 0; + explain (costs off) + select * from jointest a join jointest b on a.id=b.id for update; + select * from jointest a join jointest b on a.id=b.id for update; +} + + +session "s2" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "wx2" { UPDATE accounts SET balance = balance + 450 WHERE accountid = 'checking'; } +step "wy2" { UPDATE accounts SET balance = balance + 1000 WHERE accountid = 'checking' AND balance < 1000; } +step "upsert2" { + WITH upsert AS + (UPDATE accounts SET balance = balance + 1234 + WHERE accountid = 'savings' + RETURNING accountid) + INSERT INTO accounts SELECT 'savings', 1234 + WHERE NOT EXISTS (SELECT 1 FROM upsert); +} +step "wx2_ext" { UPDATE accounts_ext SET balance = balance + 450; } +step "readp2" { SELECT tableoid::regclass, ctid, * FROM p WHERE b IN (0, 1) AND c = 0 FOR UPDATE; } +step "returningp1" { + WITH u AS ( UPDATE p SET b = b WHERE a > 0 RETURNING * ) + SELECT * FROM u; +} +step "readforss" { + SELECT ta.id AS ta_id, ta.value AS ta_value, + (SELECT ROW(tb.id, tb.value) + FROM table_b tb WHERE ta.id = tb.id) AS tb_row + FROM table_a ta + WHERE ta.id = 1 FOR UPDATE OF ta; +} +step "updateforcip2" { + UPDATE table_a SET value = COALESCE(value, (SELECT text 'newValue')) WHERE id = 1; +} +step "updateforcip3" { + WITH d(val) AS (SELECT text 'newValue' FROM generate_series(1,1)) + UPDATE table_a SET value = COALESCE(value, (SELECT val FROM d)) WHERE id = 1; +} +step "wrtwcte" { UPDATE table_a SET value = 'tableAValue2' WHERE id = 1; } +step "wrjt" { UPDATE jointest SET data = 42 WHERE id = 7; } +step "c2" { COMMIT; } + +session "s3" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "read" { SELECT * FROM accounts ORDER BY accountid; } +step "read_ext" { SELECT * FROM accounts_ext ORDER BY accountid; } +step "read_a" { SELECT * FROM table_a ORDER BY id; } + +# this test exercises EvalPlanQual with a CTE, cf bug #14328 +step "readwcte" { + WITH + cte1 AS ( + SELECT id FROM table_b WHERE value = 'tableBValue' + ), + cte2 AS ( + SELECT * FROM table_a + WHERE id = (SELECT id FROM cte1) + FOR UPDATE + ) + SELECT * FROM cte2; +} + +# this test exercises a different CTE misbehavior, cf bug #14870 +step "multireadwcte" { + WITH updated AS ( + UPDATE table_a SET value = 'tableAValue3' WHERE id = 1 RETURNING id + ) + SELECT (SELECT id FROM updated) AS subid, * FROM updated; +} + +teardown { COMMIT; } + +permutation "wx1" "wx2" "c1" "c2" "read" +permutation "wy1" "wy2" "c1" "c2" "read" +permutation "upsert1" "upsert2" "c1" "c2" "read" +# permutation "readp1" "writep1" "readp2" "c1" "c2" +# permutation "writep2" "returningp1" "c1" "c2" +permutation "wx2" "partiallock" "c2" "c1" "read" +permutation "wx2" "lockwithvalues" "c2" "c1" "read" +# permutation "wx2_ext" "partiallock_ext" "c2" "c1" "read_ext" +permutation "updateforss" "readforss" "c1" "c2" +permutation "updateforcip" "updateforcip2" "c1" "c2" "read_a" +permutation "updateforcip" "updateforcip3" "c1" "c2" "read_a" +permutation "wrtwcte" "readwcte" "c1" "c2" +# TODO(Piyush): The below is commented out because explain gives a different plan on YB for unknown +# reason. Once that is fixed, the below test can be enabled. +# permutation "wrjt" "selectjoinforupdate" "c2" "c1" +permutation "wrtwcte" "multireadwcte" "c1" "c2" diff --git a/src/postgres/src/test/isolation/specs/yb_read_committed_insert.spec b/src/postgres/src/test/isolation/specs/yb_read_committed_insert.spec new file mode 100644 index 000000000000..a4df02564488 --- /dev/null +++ b/src/postgres/src/test/isolation/specs/yb_read_committed_insert.spec @@ -0,0 +1,32 @@ +# Tests for the Read Committed isolation + +setup +{ + create table test (k int primary key, v int); + insert into test values (1, 1); +} + +teardown +{ + DROP TABLE test; +} + +session "s1" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "insert_k1" { insert into test values (1, 1); } +step "insert_k2" { insert into test values (2, 1); } +step "insert_k1_on_conflict" { insert into test values (1, 1) on conflict (k) do update set v=100; } +step "insert_k2_on_conflict" { insert into test values (2, 1) on conflict (k) do update set v=100; } +step "select" { select * from test; } +step "r1" { rollback; } +step "c1" { commit; } + +session "s2" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "update_k1_to_2" { update test set k=2 where k=1; } +step "c2" { commit; } + +permutation "update_k1_to_2" "insert_k1" "c2" "select" "c1" +permutation "update_k1_to_2" "insert_k2" "c2" "r1" "select" +permutation "update_k1_to_2" "insert_k1_on_conflict" "c2" "select" "c1" +permutation "update_k1_to_2" "insert_k2_on_conflict" "c2" "select" "c1" \ No newline at end of file diff --git a/src/postgres/src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec b/src/postgres/src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec new file mode 100644 index 000000000000..3c48c1d8b56e --- /dev/null +++ b/src/postgres/src/test/isolation/specs/yb_read_committed_test_internal_savepoint.spec @@ -0,0 +1,40 @@ +# Tests for the Read Committed isolation + +setup +{ + create table test (k int primary key, v int); + insert into test values (1, 1), (2, 2); +} + +teardown +{ + DROP TABLE test; +} + +session "s1" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "update_k1_in_s1" { update test set v=10 where k=1; } +step "update_k2_in_s1" { update test set v=20 where k=2; } +step "select" { select * from test; } +step "c1" { commit; } + +session "s2" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "update_k2_in_s2" { update test set v=40 where k=2; } +step "c2" { commit; } + + + +# When session 1 faces a conflict while writing k=2, it will wait for session 2 to complete and then +# retry the write for k=2. Once the write for k=2 is done and session 1 commits, we should be able +# to see the update to k=1 as well. It should not be lost due to any reason. + +# Motivation for below test: +# -------------------------- +# There was an existing bug with the automatic per-statement retries for kReadRestart errors in +# READ COMMITTED isolation. A retry would restart the whole txn and then re-run the statement. But +# that results in removal of previous writes. Instead of restarting the whole txn, we should have +# set an internal savepoint before running the statement and rolled back to it. To ensure we are +# not restarting the txn, we are testing with kConflict errors instead of kReadRestart. + +permutation "update_k2_in_s2" "update_k1_in_s1" "update_k2_in_s1" "c2" "select" "c1" diff --git a/src/postgres/src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec b/src/postgres/src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec new file mode 100644 index 000000000000..1a6e96383e5f --- /dev/null +++ b/src/postgres/src/test/isolation/specs/yb_read_committed_update_and_explicit_locking.spec @@ -0,0 +1,48 @@ +# Tests for the Read Committed isolation + +setup +{ + create table test (k int primary key, v int); + insert into test values (0, 5), (1, 5), (2, 5), (3, 5), (4, 1); +} + +teardown +{ + DROP TABLE test; +} + +# The test checks semantics of UPDATE, SELECT FOR UPDATE, SELECT FOR NO KEY UPDATE, +# SELECT FOR KEY SHARE, and SELECT FOR SHARE with the following predicate - where v>=5; +# One of these statements will be run from session 1. + +# We check the semantics of the statement in session 1 against 6 cases - +# 1. Concurrent insertion of a new row that satisfies the predicate. +# 2. Concurrent deletion of an existing row that satisfies the predicate. +# 3. Concurrent update that changes the predicate from - +# i) true to false +# ii) true to true +# iii) false to true +# 4. Concurrent pk update of row that satifies predicate + +session "s1" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "update" { update test set v=100 where v>=5; } +step "select_for_update" { select * from test where v>=5 for update; } +step "select_for_no_key_update" { select * from test where v>=5 for no key update; } +step "select_for_key_share" { select * from test where v>=5 for key share; } +step "select_for_share" { select * from test where v>=5 for share; } +step "select" { select * from test; } +step "c1" { commit; } + +session "s2" +setup { BEGIN ISOLATION LEVEL READ COMMITTED; } +step "insert_new_satisfying_row_k_5" { insert into test values (5, 5); } +step "delete_satisfying_row" { delete from test where k=3; } +step "update_true_to_false" { update test set v=1 where k=1; } +step "update_true_to_true" { update test set v=10 where k=2; } +step "update_false_to_true" { update test set v=10 where k=4; } +step "pk_update" { update test set k=10 where k=0; } +step "c2" { commit; } + +permutation "insert_new_satisfying_row_k_5" "delete_satisfying_row" "update_true_to_false" + "update_true_to_true" "update_false_to_true" "pk_update" "update" "c2" "select" "c1" \ No newline at end of file diff --git a/src/postgres/src/test/isolation/yb_pg_isolation_schedule b/src/postgres/src/test/isolation/yb_pg_isolation_schedule index 02c73a647c1a..640746927830 100644 --- a/src/postgres/src/test/isolation/yb_pg_isolation_schedule +++ b/src/postgres/src/test/isolation/yb_pg_isolation_schedule @@ -1,4 +1,18 @@ +test: yb_pg_eval-plan-qual +test: yb_read_committed_update_and_explicit_locking +test: yb_read_committed_insert +test: yb_read_committed_test_internal_savepoint + +# Skip locked related tests from Pg test: yb_pg_skip-locked test: yb_pg_skip-locked-2 + +# Skip locked related tests newly added at YB test: yb-modification-followed-by-lock test: yb-skip-locked-after-update + +# TODO: +# 1. Test to ensure that new sub txns inherit ybDataSentForCurrQuery and parent txns +# inherit it back from nested sub txns once they are removed. +# 2. Test to ensure clean-up of partial intents using savepoints. +# 3. Test all skip locked specs with REPEATABLE READ isolation as well. diff --git a/src/yb/client/transaction.cc b/src/yb/client/transaction.cc index 4c4d05126f51..4e55eabc2808 100644 --- a/src/yb/client/transaction.cc +++ b/src/yb/client/transaction.cc @@ -250,6 +250,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { other->read_point_.MoveFrom(&read_point_); other->read_point_.Restart(); other->metadata_.isolation = metadata_.isolation; + // TODO(Piyush): Do we need the below? If yes, prove with a test case and add it. + // other->metadata_.priority = metadata_.priority; if (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { other->metadata_.start_time = other->read_point_.GetReadTime().read; } else { @@ -426,7 +428,16 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } } else { const TransactionError txn_err(status); - if (txn_err.value() != TransactionErrorCode::kSkipLocking) { + // We don't abort the txn in case of a kSkipLocking error to make further progress. + // READ COMMITTED isolation retries errors of kConflict and kReadRestart by restarting + // statements instead of the whole txn and hence should avoid aborting the txn in this case + // too. + bool avoid_abort = + (txn_err.value() == TransactionErrorCode::kSkipLocking) || + (metadata_.isolation == IsolationLevel::READ_COMMITTED && + (txn_err.value() == TransactionErrorCode::kReadRestartRequired || + txn_err.value() == TransactionErrorCode::kConflict)); + if (!avoid_abort) { auto state = state_.load(std::memory_order_acquire); VLOG_WITH_PREFIX(4) << "Abort desired, state: " << AsString(state); if (state == TransactionState::kRunning) { @@ -741,7 +752,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { void SetReadTimeIfNeeded(bool do_it) { if (!read_point_.GetReadTime() && do_it && - metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION) { + (metadata_.isolation == IsolationLevel::SNAPSHOT_ISOLATION || + metadata_.isolation == IsolationLevel::READ_COMMITTED)) { read_point_.SetCurrentReadTime(); } } diff --git a/src/yb/client/transaction.h b/src/yb/client/transaction.h index fad261027409..7d6fd09548ad 100644 --- a/src/yb/client/transaction.h +++ b/src/yb/client/transaction.h @@ -36,6 +36,12 @@ class HybridTime; class Trace; +enum TxnPriorityRequirement { + kLowerPriorityRange, + kHigherPriorityRange, + kHighestPriority +}; + namespace client { using Waiter = boost::function; diff --git a/src/yb/common/consistent_read_point.cc b/src/yb/common/consistent_read_point.cc index 4b3ac2d4b777..bb9809913d0a 100644 --- a/src/yb/common/consistent_read_point.cc +++ b/src/yb/common/consistent_read_point.cc @@ -95,7 +95,6 @@ void ConsistentReadPoint::Restart() { std::lock_guard lock(mutex_); local_limits_ = std::move(restarts_); read_time_.read = restart_read_ht_; - recently_restarted_read_point_ = true; } void ConsistentReadPoint::Defer() { diff --git a/src/yb/common/consistent_read_point.h b/src/yb/common/consistent_read_point.h index 6aa899adfab4..f2ffeb4d3b3a 100644 --- a/src/yb/common/consistent_read_point.h +++ b/src/yb/common/consistent_read_point.h @@ -85,14 +85,6 @@ class ConsistentReadPoint { // Sets in transaction limit. void SetInTxnLimit(HybridTime value) EXCLUDES(mutex_); - bool RecentlyRestartedReadPoint() const { - return recently_restarted_read_point_; - } - - void UnSetRecentlyRestartedReadPoint() { - recently_restarted_read_point_ = false; - } - private: void UpdateLimitsMapUnlocked( const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) REQUIRES(mutex_); @@ -114,10 +106,6 @@ class ConsistentReadPoint { // Restarts that happen during a consistent read. Used to initialise local_limits for restarted // read. HybridTimeMap restarts_ GUARDED_BY(mutex_); - // This field is useful in READ COMMITTED isolation to indicate that the read point has already - // been restarted as part of a transparent read restart retry and we need not pick a new read - // point based on current time in StartTransactionCommand(). - bool recently_restarted_read_point_ = false; }; } // namespace yb diff --git a/src/yb/common/row_mark.cc b/src/yb/common/row_mark.cc index dc7cdf905aa7..1fcf643fa625 100644 --- a/src/yb/common/row_mark.cc +++ b/src/yb/common/row_mark.cc @@ -41,11 +41,11 @@ bool IsValidRowMarkType(RowMarkType row_mark_type) { } } -bool RowMarkNeedsPessimisticLock(RowMarkType row_mark_type) { +bool RowMarkNeedsHigherPriority(RowMarkType row_mark_type) { /* - * Currently, using pessimistic locking for all supported row marks except the key share lock. - * This is because key share locks are used for foreign keys and we don't want pessimistic - * locking there. + * Currently, using higher priority for all supported row marks except the key share lock. + * This is because key share locks are used for foreign keys and we don't want higher priority + * there. */ return IsValidRowMarkType(row_mark_type) && row_mark_type != RowMarkType::ROW_MARK_KEYSHARE; diff --git a/src/yb/common/row_mark.h b/src/yb/common/row_mark.h index afa0bf7dba26..002e763e4328 100644 --- a/src/yb/common/row_mark.h +++ b/src/yb/common/row_mark.h @@ -40,11 +40,11 @@ RowMarkType GetStrongestRowMarkType(std::initializer_list row_mark_ bool IsValidRowMarkType(RowMarkType row_mark_type); /* - * Returns whether an operation with this row mark should try to use pessimistic locking. + * Returns whether an operation with this row mark should try to use a higher priority txn. * Currently txn layer will use a best-effort approach, by setting the txn priority to highest if * this is a new txn (first operation within a transaction). */ -bool RowMarkNeedsPessimisticLock(RowMarkType row_mark_type); +bool RowMarkNeedsHigherPriority(RowMarkType row_mark_type); } // namespace yb diff --git a/src/yb/common/transaction.proto b/src/yb/common/transaction.proto index b28810e64b9a..e98cb8fcdc71 100644 --- a/src/yb/common/transaction.proto +++ b/src/yb/common/transaction.proto @@ -20,6 +20,7 @@ enum IsolationLevel { NON_TRANSACTIONAL = 0; SNAPSHOT_ISOLATION = 1; SERIALIZABLE_ISOLATION = 2; + READ_COMMITTED = 3; } enum TransactionStatus { diff --git a/src/yb/common/ybc_util.cc b/src/yb/common/ybc_util.cc index 6a212a5b09e4..e0e383c34279 100644 --- a/src/yb/common/ybc_util.cc +++ b/src/yb/common/ybc_util.cc @@ -264,6 +264,10 @@ bool YBCIsTxnSkipLockingError(uint16_t txn_errcode) { return txn_errcode == to_underlying(TransactionErrorCode::kSkipLocking); } +uint16_t YBCGetTxnConflictErrorCode() { + return to_underlying(TransactionErrorCode::kConflict); +} + YBCStatus YBCInit(const char* argv0, YBCPAllocFn palloc_fn, YBCCStringToTextWithLenFn cstring_to_text_with_len_fn) { diff --git a/src/yb/common/ybc_util.h b/src/yb/common/ybc_util.h index 23f349987921..1b30e6005045 100644 --- a/src/yb/common/ybc_util.h +++ b/src/yb/common/ybc_util.h @@ -73,6 +73,7 @@ bool YBCIsRestartReadError(uint16_t txn_errcode); bool YBCIsTxnConflictError(uint16_t txn_errcode); bool YBCIsTxnSkipLockingError(uint16_t txn_errcode); +uint16_t YBCGetTxnConflictErrorCode(); void YBCResolveHostname(); diff --git a/src/yb/docdb/conflict_resolution.cc b/src/yb/docdb/conflict_resolution.cc index 36199860bcd5..6cd108961578 100644 --- a/src/yb/docdb/conflict_resolution.cc +++ b/src/yb/docdb/conflict_resolution.cc @@ -748,7 +748,13 @@ class ConflictResolverContextBase : public ConflictResolverContext { return STATUS(InternalError, "Skip locking since entity is already locked", TransactionError(TransactionErrorCode::kSkipLocking)); } - if (our_priority < their_priority) { + + // READ COMMITTED txns require a guarantee that no txn abort it. They can handle facing a + // kConflict due to another txn's conflicting intent, but can't handle aborts. To ensure + // these guarantees - + // 1. all READ COMMITTED txns are given kHighestPriority and + // 2. a kConflict is raised even if their_priority equals our_priority. + if (our_priority <= their_priority) { return MakeConflictStatus( our_transaction_id, transaction.id, "higher priority", GetConflictsMetric()); } diff --git a/src/yb/docdb/intent.cc b/src/yb/docdb/intent.cc index 64d408774106..1661a6145866 100644 --- a/src/yb/docdb/intent.cc +++ b/src/yb/docdb/intent.cc @@ -129,6 +129,7 @@ IntentTypeSet GetStrongIntentTypeSet( } switch (level) { + case IsolationLevel::READ_COMMITTED: case IsolationLevel::SNAPSHOT_ISOLATION: return IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); case IsolationLevel::SERIALIZABLE_ISOLATION: diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index 9a08ef107608..775ff9b189ca 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -404,18 +404,25 @@ Status PgSession::RunHelper::Apply(std::shared_ptr op, read_only = read_only && pending_ops_.empty(); } } - bool pessimistic_lock_required = false; + + TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; if (op->type() == YBOperation::Type::PGSQL_READ) { const PgsqlReadRequestPB& read_req = down_cast(op.get())->request(); auto row_mark_type = GetRowMarkTypeFromPB(read_req); read_only = read_only && !IsValidRowMarkType(row_mark_type); - pessimistic_lock_required = RowMarkNeedsPessimisticLock(row_mark_type); + if (RowMarkNeedsHigherPriority((RowMarkType) row_mark_type)) { + txn_priority_requirement = kHigherPriorityRange; + } + } + + if (pg_session_.GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { + txn_priority_requirement = kHighestPriority; } auto session = VERIFY_RESULT(pg_session_.GetSession( transactional_, IsReadOnlyOperation(read_only), - IsPessimisticLockRequired(pessimistic_lock_required), + txn_priority_requirement, IsCatalogOperation(op->IsYsqlCatalogOp()))); if (!yb_session_) { yb_session_ = session->shared_from_this(); @@ -871,6 +878,10 @@ void PgSession::DropBufferedOperations() { buffered_txn_ops_.clear(); } +PgIsolationLevel PgSession::GetIsolationLevel() { + return pg_txn_manager_->GetIsolationLevel(); +} + Status PgSession::FlushBufferedOperationsImpl(const Flusher& flusher) { auto ops = std::move(buffered_ops_); auto txn_ops = std::move(buffered_txn_ops_); @@ -912,12 +923,12 @@ Result PgSession::ShouldHandleTransactionally(const client::YBPgsqlOp& op) Result PgSession::GetSession(IsTransactionalSession transactional, IsReadOnlyOperation read_only_op, - IsPessimisticLockRequired pessimistic_lock_required, + TxnPriorityRequirement txn_priority_requirement, IsCatalogOperation is_catalog_op) { if (transactional) { YBSession* txn_session = VERIFY_RESULT(pg_txn_manager_->GetTransactionalSession()); RETURN_NOT_OK(pg_txn_manager_->BeginWriteTransactionIfNecessary(read_only_op, - pessimistic_lock_required)); + txn_priority_requirement)); VLOG(2) << __PRETTY_FUNCTION__ << ": read_only_op=" << read_only_op << ", returning transactional session: " << txn_session; @@ -952,7 +963,15 @@ Status PgSession::ApplyOperation(client::YBSession *session, Status PgSession::FlushOperations(PgsqlOpBuffer ops, IsTransactionalSession transactional) { DCHECK(ops.size() > 0 && ops.size() <= FLAGS_ysql_session_max_batch_size); - auto session = VERIFY_RESULT(GetSession(transactional, IsReadOnlyOperation::kFalse)); + + TxnPriorityRequirement txn_priority_requirement = kLowerPriorityRange; + if (GetIsolationLevel() == PgIsolationLevel::READ_COMMITTED) { + txn_priority_requirement = kHighestPriority; + } + + auto session = VERIFY_RESULT(GetSession( + transactional, IsReadOnlyOperation::kFalse, + txn_priority_requirement)); if (session != session_.get()) { DCHECK(transactional); session->SetInTxnLimit(HybridTime(clock_->Now().ToUint64())); diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 7edaabd5ed0f..f46812e2cbb9 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -21,6 +21,7 @@ #include "yb/client/client_fwd.h" #include "yb/client/session.h" +#include "yb/client/transaction.h" #include "yb/common/pg_types.h" #include "yb/common/transaction.h" @@ -37,6 +38,7 @@ #include "yb/yql/pggate/pg_gate_fwd.h" #include "yb/yql/pggate/pg_env.h" #include "yb/yql/pggate/pg_tabledesc.h" +#include "yb/yql/pggate/pg_txn_manager.h" namespace yb { namespace pggate { @@ -104,7 +106,6 @@ class RowIdentifier { }; YB_STRONGLY_TYPED_BOOL(IsTransactionalSession); -YB_STRONGLY_TYPED_BOOL(IsPessimisticLockRequired); YB_STRONGLY_TYPED_BOOL(IsReadOnlyOperation); YB_STRONGLY_TYPED_BOOL(IsCatalogOperation); @@ -213,6 +214,8 @@ class PgSession : public RefCountedThreadSafe { // Drop all pending buffered operations. Buffering mode remain unchanged. void DropBufferedOperations(); + PgIsolationLevel GetIsolationLevel(); + // Run (apply + flush) the given operation to read and write database content. // Template is used here to handle all kind of derived operations // (shared_ptr, shared_ptr) @@ -373,7 +376,7 @@ class PgSession : public RefCountedThreadSafe { Result GetSession( IsTransactionalSession transactional, IsReadOnlyOperation read_only_op, - IsPessimisticLockRequired pessimistic_lock_required = IsPessimisticLockRequired::kFalse, + TxnPriorityRequirement txn_priority_requirement, IsCatalogOperation is_catalog_op = IsCatalogOperation::kFalse); // Flush buffered write operations from the given buffer. diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index 268ef543e39a..f6d13d113136 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -28,6 +28,7 @@ #include "yb/util/debug-util.h" #include "yb/util/format.h" +#include "yb/util/logging.h" #include "yb/util/random_util.h" #include "yb/util/shared_mem.h" #include "yb/util/status.h" @@ -195,6 +196,10 @@ Status PgTxnManager::SetIsolationLevel(int level) { return Status::OK(); } +PgIsolationLevel PgTxnManager::GetIsolationLevel() { + return pg_isolation_level_; +} + Status PgTxnManager::SetReadOnly(bool read_only) { read_only_ = read_only; VLOG(2) << __func__ << " set to " << read_only_ << " from " << GetStackTrace(); @@ -246,22 +251,29 @@ void PgTxnManager::StartNewSession() { updated_read_time_for_follower_reads_ = false; } -uint64_t PgTxnManager::GetPriority(const NeedsPessimisticLocking needs_pessimistic_locking) { +uint64_t PgTxnManager::GetPriority(TxnPriorityRequirement txn_priority_requirement) { + + VLOG_WITH_FUNC(1) << "txn_priority_requirement=" << txn_priority_requirement; + if (use_saved_priority_) { return saved_priority_; } - // Use high priority for transactions that need pessimistic locking. - if (needs_pessimistic_locking) { + if (txn_priority_requirement == kHighestPriority) { + return txn_priority_highpri_upper_bound; + } + + if (txn_priority_requirement == kHigherPriorityRange) { return RandomUniformInt(txn_priority_highpri_lower_bound, txn_priority_highpri_upper_bound); } + return RandomUniformInt(txn_priority_regular_lower_bound, txn_priority_regular_upper_bound); } -Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op, - bool needs_pessimistic_locking) { +Status PgTxnManager::BeginWriteTransactionIfNecessary( + bool read_only_op, TxnPriorityRequirement txn_priority_requirement) { if (ddl_txn_) { VLOG_TXN_STATE(2); return Status::OK(); @@ -283,7 +295,9 @@ Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op, const IsolationLevel docdb_isolation = (pg_isolation_level_ == PgIsolationLevel::SERIALIZABLE) && !read_only_ ? IsolationLevel::SERIALIZABLE_ISOLATION - : IsolationLevel::SNAPSHOT_ISOLATION; + : (pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED + ? IsolationLevel::READ_COMMITTED + : IsolationLevel::SNAPSHOT_ISOLATION); const bool defer = read_only_ && deferrable_; VLOG_TXN_STATE(2) << "DocDB isolation level: " << IsolationLevel_Name(docdb_isolation); @@ -298,7 +312,9 @@ Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op, txn_->isolation(), IsolationLevel_Name(docdb_isolation), pg_isolation_level_, read_only_); } - } else if (read_only_op && docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION) { + } else if (read_only_op && + (docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION || + docdb_isolation == IsolationLevel::READ_COMMITTED)) { if (defer) { // This call is idempotent, meaning it has no effect after the first call. session_->DeferReadPoint(); @@ -333,9 +349,10 @@ Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op, txn_ = std::make_shared(GetOrCreateTransactionManager()); } - txn_->SetPriority(GetPriority(NeedsPessimisticLocking(needs_pessimistic_locking))); + txn_->SetPriority(GetPriority(txn_priority_requirement)); - if (docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION) { + if (docdb_isolation == IsolationLevel::SNAPSHOT_ISOLATION || + docdb_isolation == IsolationLevel::READ_COMMITTED) { txn_->InitWithReadPoint(docdb_isolation, std::move(*session_->read_point())); } else { DCHECK_EQ(docdb_isolation, IsolationLevel::SERIALIZABLE_ISOLATION); @@ -351,8 +368,12 @@ Status PgTxnManager::BeginWriteTransactionIfNecessary(bool read_only_op, } Status PgTxnManager::SetActiveSubTransaction(SubTransactionId id) { + auto txn_priority_requirement = kLowerPriorityRange; + if (pg_isolation_level_ == PgIsolationLevel::READ_COMMITTED) + txn_priority_requirement = kHighestPriority; + RETURN_NOT_OK(BeginWriteTransactionIfNecessary( - false /* read_only_op */, false /* needs_pessimistic_locking */)); + false /* read_only_op */, txn_priority_requirement)); SCHECK( txn_, InternalError, "Attempted to set active subtransaction on uninitialized transaciton."); txn_->SetActiveSubTransaction(id); @@ -391,20 +412,32 @@ Status PgTxnManager::RestartTransaction() { } /* This is called at the start of each statement in READ COMMITTED isolation level */ -Status PgTxnManager::MaybeResetTransactionReadPoint() { +Status PgTxnManager::ResetTransactionReadPoint() { CHECK_NOTNULL(session_); // If a txn_ has been created, session_->read_point() returns the read point stored in txn_. ConsistentReadPoint* rp = session_->read_point(); - if (rp->RecentlyRestartedReadPoint()) { - rp->UnSetRecentlyRestartedReadPoint(); - return Status::OK(); - } rp->SetCurrentReadTime(); VLOG(1) << "Setting current ht as read point " << rp->GetReadTime(); return Status::OK(); } +/* This is called when a read committed transaction wants to restart its read point */ +Status PgTxnManager::RestartReadPoint() { + CHECK_NOTNULL(session_); + + // If a txn_ has been created, session_->read_point() returns the read point stored in txn_. + ConsistentReadPoint* rp = session_->read_point(); + + if (!rp->IsRestartRequired()) { + return STATUS(IllegalState, "Restart of read point that does not require restart"); + } + rp->Restart(); + + VLOG(1) << "Restarting read point to " << rp->GetReadTime(); + return Status::OK(); +} + Status PgTxnManager::CommitTransaction() { if (!txn_in_progress_) { VLOG_TXN_STATE(2) << "No transaction in progress, nothing to commit."; diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index be42c4a799f1..9437425473f8 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -19,6 +19,7 @@ #include #include "yb/client/client_fwd.h" +#include "yb/client/transaction.h" #include "yb/client/transaction_manager.h" #include "yb/client/async_initializer.h" #include "yb/common/clock.h" @@ -63,10 +64,12 @@ class PgTxnManager : public RefCountedThreadSafe { CHECKED_STATUS BeginTransaction(); CHECKED_STATUS RecreateTransaction(); CHECKED_STATUS RestartTransaction(); - CHECKED_STATUS MaybeResetTransactionReadPoint(); + CHECKED_STATUS ResetTransactionReadPoint(); + CHECKED_STATUS RestartReadPoint(); CHECKED_STATUS CommitTransaction(); void AbortTransaction(); CHECKED_STATUS SetIsolationLevel(int isolation); + PgIsolationLevel GetIsolationLevel(); CHECKED_STATUS SetReadOnly(bool read_only); CHECKED_STATUS EnableFollowerReads(bool enable_follower_reads, int32_t staleness); CHECKED_STATUS SetDeferrable(bool deferrable); @@ -79,8 +82,8 @@ class PgTxnManager : public RefCountedThreadSafe { std::shared_future> GetDdlTxnMetadata() const; - CHECKED_STATUS BeginWriteTransactionIfNecessary(bool read_only_op, - bool needs_pessimistic_locking = false); + CHECKED_STATUS BeginWriteTransactionIfNecessary( + bool read_only_op, TxnPriorityRequirement txn_priority_requirement); CHECKED_STATUS SetActiveSubTransaction(SubTransactionId id); @@ -93,7 +96,7 @@ class PgTxnManager : public RefCountedThreadSafe { bool ShouldUseFollowerReads() const { return updated_read_time_for_follower_reads_; } private: - YB_STRONGLY_TYPED_BOOL(NeedsPessimisticLocking); + YB_STRONGLY_TYPED_BOOL(NeedsHigherPriorityTxn); YB_STRONGLY_TYPED_BOOL(SavePriority); client::TransactionManager* GetOrCreateTransactionManager(); @@ -102,7 +105,7 @@ class PgTxnManager : public RefCountedThreadSafe { Status UpdateReadTimeForFollowerReadsIfRequired(); Status RecreateTransaction(SavePriority save_priority); - uint64_t GetPriority(NeedsPessimisticLocking needs_pessimistic_locking); + uint64_t GetPriority(TxnPriorityRequirement txn_priority_requirement); std::string TxnStateDebugStr() const; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index c2f4b6c8b280..484e42c1e4bb 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1407,8 +1407,12 @@ Status PgApiImpl::RestartTransaction() { return pg_txn_manager_->RestartTransaction(); } -Status PgApiImpl::MaybeResetTransactionReadPoint() { - return pg_txn_manager_->MaybeResetTransactionReadPoint(); +Status PgApiImpl::ResetTransactionReadPoint() { + return pg_txn_manager_->ResetTransactionReadPoint(); +} + +Status PgApiImpl::RestartReadPoint() { + return pg_txn_manager_->RestartReadPoint(); } Status PgApiImpl::CommitTransaction() { diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index db1df2de12bb..e3709f7aba6b 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -478,7 +478,8 @@ class PgApiImpl { CHECKED_STATUS BeginTransaction(); CHECKED_STATUS RecreateTransaction(); CHECKED_STATUS RestartTransaction(); - CHECKED_STATUS MaybeResetTransactionReadPoint(); + CHECKED_STATUS ResetTransactionReadPoint(); + CHECKED_STATUS RestartReadPoint(); CHECKED_STATUS CommitTransaction(); void AbortTransaction(); CHECKED_STATUS SetTransactionIsolationLevel(int isolation); diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index cc73a1097a63..ff47ddb5fbdd 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -908,8 +908,12 @@ YBCStatus YBCPgRestartTransaction() { return ToYBCStatus(pgapi->RestartTransaction()); } -YBCStatus YBCPgMaybeResetTransactionReadPoint() { - return ToYBCStatus(pgapi->MaybeResetTransactionReadPoint()); +YBCStatus YBCPgResetTransactionReadPoint() { + return ToYBCStatus(pgapi->ResetTransactionReadPoint()); +} + +YBCStatus YBCPgRestartReadPoint() { + return ToYBCStatus(pgapi->RestartReadPoint()); } YBCStatus YBCPgCommitTransaction() { diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 6009066b73d8..6a43477a7658 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -447,7 +447,8 @@ YBCStatus YBCPgExecSelect(YBCPgStatement handle, const YBCPgExecParameters *exec YBCStatus YBCPgBeginTransaction(); YBCStatus YBCPgRecreateTransaction(); YBCStatus YBCPgRestartTransaction(); -YBCStatus YBCPgMaybeResetTransactionReadPoint(); +YBCStatus YBCPgResetTransactionReadPoint(); +YBCStatus YBCPgRestartReadPoint(); YBCStatus YBCPgCommitTransaction(); void YBCPgAbortTransaction(); YBCStatus YBCPgSetTransactionIsolationLevel(int isolation); diff --git a/src/yb/yql/pgwrapper/libpq_utils.cc b/src/yb/yql/pgwrapper/libpq_utils.cc index 2eb5a7d1c5e0..18d2bd26508e 100644 --- a/src/yb/yql/pgwrapper/libpq_utils.cc +++ b/src/yb/yql/pgwrapper/libpq_utils.cc @@ -275,6 +275,8 @@ CHECKED_STATUS PGConn::StartTransaction(IsolationLevel isolation_level) { switch (isolation_level) { case IsolationLevel::NON_TRANSACTIONAL: return Status::OK(); + case IsolationLevel::READ_COMMITTED: + return Execute("START TRANSACTION ISOLATION LEVEL READ COMMITTED"); case IsolationLevel::SNAPSHOT_ISOLATION: return Execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ"); case IsolationLevel::SERIALIZABLE_ISOLATION: