Skip to content

Commit

Permalink
[#9219] docdb - Ignore intents from aborted subtransactions during re…
Browse files Browse the repository at this point in the history
…ads and writes of the same transaction

Summary:
This revision add support for client-side tracking of aborted subtransactions, transmission of this
metadata to docdb, and consideration of this metadata when processing intents for reads and writes.
This revision *does not* ensure that aborted intents are not applied after transaction commit, so
"ROLLBACK TO SAVEPOINT" cannot yet be safely used and remains guarded by default.

This revision also implements some basic java test coverage of some of the savepoint functionality
implemented so far, but is by no means exhaustive. These tests were meant to merely demonstrate
basic functionality that has been implemented in previous revisions newly in this one.

Test Plan:
ybd --java-test 'org.yb.pgsql.TestPgSavepoints'
ybd --cxx-test util_uint_set-test

Reviewers: mbautin, sergei

Reviewed By: sergei

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D12331
  • Loading branch information
robertsami committed Aug 2, 2021
1 parent d989326 commit c8eb974
Show file tree
Hide file tree
Showing 21 changed files with 608 additions and 54 deletions.
137 changes: 137 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSavepoints.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.util.YBTestRunnerNonTsanOnly;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;

import static org.yb.AssertionWrappers.*;

@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgSavepoints extends BasePgSQLTest {
private static final Logger LOG = LoggerFactory.getLogger(TestPgSavepoints.class);

private void createTable() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE t (k INT, v INT)");
}
}

private OptionalInt getSingleValue(Connection c, int k) throws SQLException {
String query = String.format("SELECT * FROM t WHERE k = %d", k);
try (ResultSet rs = c.createStatement().executeQuery(query)) {
if (!rs.next()) {
return OptionalInt.empty();
}
Row row = Row.fromResultSet(rs);
LOG.info(row.toString());
assertFalse("Found more than one result", rs.next());
return OptionalInt.of(row.getInt(1));
}
}

@Override
protected Map<String, String> getTServerFlags() {
// TODO(savepoints) -- enable by default.
Map<String, String> flags = super.getTServerFlags();
flags.put("enable_pg_savepoints", "true");
return flags;
}

@Test
public void testSavepointCreation() throws Exception {
createTable();

try (Connection conn = getConnectionBuilder()
.withAutoCommit(AutoCommit.DISABLED)
.connect()) {
Statement statement = conn.createStatement();
statement.execute("INSERT INTO t VALUES (1, 2)");
statement.execute("SAVEPOINT a");
statement.execute("INSERT INTO t VALUES (3, 4)");
statement.execute("SAVEPOINT b");

assertEquals(getSingleValue(conn, 1), OptionalInt.of(2));
assertEquals(getSingleValue(conn, 3), OptionalInt.of(4));
}
}

@Test
public void testSavepointRollback() throws Exception {
createTable();

try (Connection conn = getConnectionBuilder()
.withAutoCommit(AutoCommit.DISABLED)
.connect()) {
Statement statement = conn.createStatement();
statement.execute("INSERT INTO t VALUES (1, 2)");
statement.execute("SAVEPOINT a");
statement.execute("INSERT INTO t VALUES (3, 4)");
statement.execute("ROLLBACK TO a");

assertEquals(getSingleValue(conn, 1), OptionalInt.of(2));
assertEquals(getSingleValue(conn, 3), OptionalInt.empty());
}
}

@Test
public void testSavepointUpdateAbortedRow() throws Exception {
createTable();

try (Connection conn = getConnectionBuilder()
.withAutoCommit(AutoCommit.DISABLED)
.connect()) {
Statement statement = conn.createStatement();
statement.execute("INSERT INTO t VALUES (1, 2)");
statement.execute("SAVEPOINT a");
statement.execute("INSERT INTO t VALUES (3, 4)");
statement.execute("ROLLBACK TO a");
statement.execute("UPDATE t SET v = 5 WHERE k = 3");
assertEquals(statement.getUpdateCount(), 0);

assertEquals(getSingleValue(conn, 1), OptionalInt.of(2));
assertEquals(getSingleValue(conn, 3), OptionalInt.empty());
}
}

@Test
public void testAbortsIntentOfReleasedSavepoint() throws Exception {
createTable();

try (Connection conn = getConnectionBuilder()
.withAutoCommit(AutoCommit.DISABLED)
.connect()) {
Statement statement = conn.createStatement();
statement.execute("SAVEPOINT a");
statement.execute("SAVEPOINT b");
statement.execute("INSERT INTO t VALUES (3, 4)");
statement.execute("RELEASE SAVEPOINT b");
statement.execute("ROLLBACK TO a");

assertEquals(getSingleValue(conn, 3), OptionalInt.empty());
}
}

}
10 changes: 6 additions & 4 deletions src/postgres/src/backend/parser/gram.y
Original file line number Diff line number Diff line change
Expand Up @@ -10824,17 +10824,19 @@ TransactionStmt:
}
| ROLLBACK opt_transaction TO SAVEPOINT ColId
{
/* TODO(9219) -- conditionally enable once client supports aborted savepoints */
parser_ybc_signal_unsupported(@1, "ROLLBACK <transaction>", 1125);
if (!YBSavepointsEnabled()) {
parser_ybc_signal_unsupported(@1, "ROLLBACK <transaction>", 1125);
}
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_ROLLBACK_TO;
n->savepoint_name = $5;
$$ = (Node *)n;
}
| ROLLBACK opt_transaction TO ColId
{
/* TODO(9219) -- conditionally enable once client supports aborted savepoints */
parser_ybc_signal_unsupported(@1, "ROLLBACK <transaction>", 1125);
if (!YBSavepointsEnabled()) {
parser_ybc_signal_unsupported(@1, "ROLLBACK <transaction>", 1125);
}
TransactionStmt *n = makeNode(TransactionStmt);
n->kind = TRANS_STMT_ROLLBACK_TO;
n->savepoint_name = $4;
Expand Down
6 changes: 6 additions & 0 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,14 @@ message TransactionMetadataPB {
optional fixed64 metadata_write_time = 6;
}

message AbortedSubTransactionSetPB {
repeated uint32 set = 1;
}

message SubTransactionMetadataPB {
optional uint32 subtransaction_id = 1;

optional AbortedSubTransactionSetPB aborted = 2;
}

// See ReadHybridTime for explation of this message.
Expand Down
13 changes: 12 additions & 1 deletion src/yb/common/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,22 @@ std::ostream& operator<<(std::ostream& out, const TransactionMetadata& metadata)

void SubTransactionMetadata::ToPB(SubTransactionMetadataPB* dest) const {
dest->set_subtransaction_id(subtransaction_id);
aborted.ToPB(dest->mutable_aborted()->mutable_set());
}

Result<SubTransactionMetadata> SubTransactionMetadata::FromPB(
const SubTransactionMetadataPB& source) {
return SubTransactionMetadata {
.subtransaction_id = source.has_subtransaction_id()
? source.subtransaction_id()
: kMinSubTransactionId,
.aborted = VERIFY_RESULT(AbortedSubTransactionSet::FromPB(source.aborted().set())),
};
}

bool SubTransactionMetadata::IsDefaultState() const {
DCHECK(subtransaction_id >= kMinSubTransactionId);
return subtransaction_id == kMinSubTransactionId;
return subtransaction_id == kMinSubTransactionId && aborted.IsEmpty();
}

std::ostream& operator<<(std::ostream& out, const SubTransactionMetadata& metadata) {
Expand Down
62 changes: 43 additions & 19 deletions src/yb/common/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "yb/util/result.h"
#include "yb/util/strongly_typed_bool.h"
#include "yb/util/strongly_typed_uuid.h"
#include "yb/util/tostring.h"
#include "yb/util/uint_set.h"
#include "yb/util/uuid.h"

namespace rocksdb {
Expand Down Expand Up @@ -205,15 +207,56 @@ class RequestScope {
int64_t request_id_;
};

using AbortedSubTransactionSet = UnsignedIntSet<SubTransactionId>;

// Represents all metadata tracked about subtransaction state by the client in support of postgres
// savepoints. Can be serialized and deserialized to/from SubTransactionMetadataPB. This should be
// sent by the client on any transactional read/write requests where a savepoint has been created,
// and finally on transaction commit.
struct SubTransactionMetadata {
SubTransactionId subtransaction_id = kMinSubTransactionId;
AbortedSubTransactionSet aborted;
// Tracks the highest observed subtransaction_id. Used during "ROLLBACK TO s" to abort from s to
// the highest live subtransaction_id.
SubTransactionId highest_subtransaction_id = subtransaction_id;

void ToPB(SubTransactionMetadataPB* dest) const;

static Result<SubTransactionMetadata> FromPB(
const SubTransactionMetadataPB& source);

std::string ToString() const {
return YB_STRUCT_TO_STRING(subtransaction_id, highest_subtransaction_id, aborted);
}

// Returns true if this is the default state, i.e. default subtransaction_id. This indicates
// whether the client has interacted with savepoints at all in the context of a session. If true,
// the client could, for example, skip sending subtransaction-related metadata in RPCs.
// TODO(savepoints) -- update behavior and comment to track default aborted subtransaction state
// as well.
bool IsDefaultState() const;
};

std::ostream& operator<<(std::ostream& out, const SubTransactionMetadata& metadata);

struct TransactionOperationContext {
TransactionOperationContext(
const TransactionId& transaction_id_, TransactionStatusManager* txn_status_manager_)
: transaction_id(transaction_id_),
txn_status_manager(*(DCHECK_NOTNULL(txn_status_manager_))) {}

TransactionOperationContext(
const TransactionId& transaction_id_,
SubTransactionMetadata&& subtransaction_,
TransactionStatusManager* txn_status_manager_)
: transaction_id(transaction_id_),
subtransaction(std::move(subtransaction_)),
txn_status_manager(*(DCHECK_NOTNULL(txn_status_manager_))) {}

bool transactional() const;

TransactionId transaction_id;
SubTransactionMetadata subtransaction;
TransactionStatusManager& txn_status_manager;
};

Expand Down Expand Up @@ -264,25 +307,6 @@ inline bool operator!=(const TransactionMetadata& lhs, const TransactionMetadata

std::ostream& operator<<(std::ostream& out, const TransactionMetadata& metadata);

struct SubTransactionMetadata {
SubTransactionId subtransaction_id = kMinSubTransactionId;

void ToPB(SubTransactionMetadataPB* dest) const;

std::string ToString() const {
return Format("{ subtransaction_id: $0 }", subtransaction_id);
}

// Returns true if this is the default state, i.e. default subtransaction_id. This indicates
// whether the client has interacted with savepoints at all in the context of a session. If true,
// the client could, for example, skip sending subtransaction-related metadata in RPCs.
// TODO(savepoints) -- update behavior and comment to track default aborted subtransaction state
// as well.
bool IsDefaultState() const;
};

std::ostream& operator<<(std::ostream& out, const SubTransactionMetadata& metadata);

MonoDelta TransactionRpcTimeout();
CoarseTimePoint TransactionRpcDeadline();

Expand Down
1 change: 1 addition & 0 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
auto transaction_id = VERIFY_RESULT(FullyDecodeTransactionId(
Slice(existing_value.data(), TransactionId::StaticSize())));

// TODO(savepoints) - if the intent corresponds to an aborted subtransaction, ignore.
if (!context_->IgnoreConflictsWith(transaction_id)) {
conflicts_.insert(transaction_id);
}
Expand Down
40 changes: 36 additions & 4 deletions src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ Result<DetermineKeysToLockResult> DetermineKeysToLock(
return result;
}

// Collapse keys_locked into a unique set of keys with intent_types representing the union of
// intent_types originally present. In other words, suppose keys_locked is originally the following:
// [
// (k1, {kWeakRead, kWeakWrite}),
// (k1, {kStrongRead}),
// (k2, {kWeakRead}),
// (k3, {kStrongRead}),
// (k2, {kStrongWrite}),
// ]
// Then after calling FilterKeysToLock we will have:
// [
// (k1, {kWeakRead, kWeakWrite, kStrongRead}),
// (k2, {kWeakRead}),
// (k3, {kStrongRead, kStrongWrite}),
// ]
// Note that only keys which appear in order in keys_locked will be collapsed in this manner.
void FilterKeysToLock(LockBatchEntries *keys_locked) {
if (keys_locked->empty()) {
return;
Expand Down Expand Up @@ -624,10 +640,23 @@ inline bool IsEndOfSubKeys(const Slice& key) {
(key.size() == 1 || key[1] == ValueTypeAsChar::kHybridTime);
}

// Enumerates weak intents generated by the given key by invoking the provided callback with each
// weak intent stored in encoded_key_buffer. On return, *encoded_key_buffer contains the
// corresponding strong intent, for which the callback has not yet been called. It is expected
// that the caller would do so.
// Enumerates weak intent keys generated by considering specified prefixes of the given key and
// invoking the provided callback with each combination considered, stored in encoded_key_buffer.
// On return, *encoded_key_buffer contains the corresponding strong intent, for which the callback
// has not yet been called. It is left to the caller to use the final state of encoded_key_buffer.
//
// The prefixes of the key considered are as follows:
// 1. Up to and including the whole hash key.
// 2. Up to and including the whole range key, or if partial_range_key_intents is
// PartialRangeKeyIntents::kTrue, then enumerate the prefix up to the end of each component of
// the range key separately.
// 3. Up to and including each subkey component, separately.
//
// In any case, we stop short of enumerating the last intent key generated based on the above, as
// this represents the strong intent key and will be stored in encoded_key_buffer at the end of this
// call.
//
// The beginning of each intent key will also include any cotable_id or PgTableOid, if present.
Status EnumerateWeakIntents(
Slice key,
const EnumerateIntentsCallback& functor,
Expand Down Expand Up @@ -720,6 +749,7 @@ Status EnumerateWeakIntents(
// Range components.
auto range_key_start = key.cdata();
while (VERIFY_RESULT(ConsumePrimitiveValueFromKey(&key))) {
// Append the consumed primitive value to encoded_key_buffer.
encoded_key_buffer->AppendRawBytes(range_key_start, key.cdata() - range_key_start);
// We always need kGroupEnd at the end to make this a valid encoded DocKey.
encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);
Expand All @@ -745,8 +775,10 @@ Status EnumerateWeakIntents(
// ConsumePrimitiveValueFromKey, which returned false.
encoded_key_buffer->AppendValueType(ValueType::kGroupEnd);

// Subkey components.
auto subkey_start = key.cdata();
while (VERIFY_RESULT(SubDocKey::DecodeSubkey(&key))) {
// Append the consumed value to encoded_key_buffer.
encoded_key_buffer->AppendRawBytes(subkey_start, key.cdata() - subkey_start);
if (key.empty() || *key.cdata() == ValueTypeAsChar::kHybridTime) {
// This was the last subkey.
Expand Down
11 changes: 8 additions & 3 deletions src/yb/docdb/intent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ Result<DecodedIntentKey> DecodeIntentKey(const Slice &encoded_intent_key) {

int doc_ht_size = 0;
RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(intent_prefix, &doc_ht_size));
if (intent_prefix.size() < doc_ht_size + 3) {
// There should always be 3 bytes present before teh start of the doc_ht:
// 1. ValueType::kIntentTypeSet
// 2. the corresponding value for ValueType::kIntentTypeSet
// 3. ValueType::kHybridTime
constexpr int kBytesBeforeDocHt = 3;
if (intent_prefix.size() < doc_ht_size + kBytesBeforeDocHt) {
return STATUS_FORMAT(
Corruption, "Intent key is too short: $0 bytes", encoded_intent_key.size());
}
intent_prefix.remove_suffix(doc_ht_size + 3);
intent_prefix.remove_suffix(doc_ht_size + kBytesBeforeDocHt);
RETURN_NOT_OK(result.doc_ht.FullyDecodeFrom(
Slice(intent_prefix.data() + intent_prefix.size() + 3, doc_ht_size)));
Slice(intent_prefix.data() + intent_prefix.size() + kBytesBeforeDocHt, doc_ht_size)));
auto* prefix_end = intent_prefix.end();

if (prefix_end[2] != ValueTypeAsChar::kHybridTime)
Expand Down
Loading

0 comments on commit c8eb974

Please sign in to comment.