Skip to content

Commit

Permalink
[#14165] DocDB: Rollback only the newest transaction in a deadlock
Browse files Browse the repository at this point in the history
Summary:
This revision modifies the deadlock detector to abort only the youngest transaction in a detected deadlock.

Previously, each deadlock detector would add simply a txn_id to the deadlock probe response. We now include information about the status tablet and txn age.

Upon receiving a probe response, the probe origin previously would abort the transaction for which the probe was started, which was guaranteed to be local. We now abort the youngest transaction, which may require a remote AbortTransaction RPC.

**Upgrade/Rollback safety:**
This revision adds a new "deadlock" field to ProbeTransactionDeadlockResponsePB which contains a new custom DeadlockedTxnInfo message type. This field is meant to replace the existing deadlocked_txn_ids field. The code in this revision updates both fields and handles both fields to ensure upgrade/downgrade safety without the use of any flags.
Jira: DB-3646

Test Plan: ybd --cxx-test pgwrapper_pg_wait_on_conflict-test --gtest_filter PgWaitQueuesTest.DeadlockResolvesYoungestTxn

Reviewers: bkolagani, pjain

Reviewed By: bkolagani

Subscribers: yql, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D31827
  • Loading branch information
robertsami committed Feb 13, 2024
1 parent 40b2a7e commit ad37035
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void testSnapshotReadDelayWrite() throws Exception {

@Test
public void testSerializableReadDelayWrite() throws Exception {
runWithFailOnConflict();
runReadDelayWriteTest(IsolationLevel.SERIALIZABLE);
}

Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
} else {
metadata_.start_time = read_point_.Now();
}
// TODO(wait-queues): Consider using metadata_.pg_txn_start_us here for consistency with
// wait queues. https://github.com/yugabyte/yugabyte-db/issues/20976
start_.store(manager_->clock()->Now().GetPhysicalValueMicros(), std::memory_order_release);
}

Expand Down
181 changes: 146 additions & 35 deletions src/yb/docdb/deadlock_detector.cc

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/yb/docdb/deadlock_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ using BlockerData = std::vector<BlockerTransactionInfo>;
using BlockerDataPtr = std::shared_ptr<BlockerData>;

struct WaiterData {
HybridTime wait_start_time;
BlockerDataPtr blockers;
HybridTime wait_start_time;
BlockerDataPtr blockers;
};

// WaiterInfoEntry stores the wait-for dependencies of a waiter transaction received from a
Expand Down Expand Up @@ -126,6 +126,7 @@ class TransactionStatusController {
virtual void RemoveInactiveTransactions(Waiters* waiters) = 0;
virtual bool IsAnySubtxnActive(const TransactionId& transaction_id,
const SubtxnSet& subtxn_set) = 0;
virtual std::optional<MicrosTime> GetTxnStart(const TransactionId& transaction_id) = 0;
virtual ~TransactionStatusController() = default;
};

Expand Down
11 changes: 11 additions & 0 deletions src/yb/tablet/transaction_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,17 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
return !aborted_subtxn_info->set().Contains(subtxn_set);
}

std::optional<MicrosTime> GetTxnStart(const TransactionId& transaction_id) override {
{
std::lock_guard lock(managed_mutex_);
auto it = managed_transactions_.find(transaction_id);
if (it == managed_transactions_.end()) {
return std::nullopt;
}
return it->first_touch();
}
}

void Shutdown() {
deadlock_detection_poller_.Shutdown();
deadlock_detector_.Shutdown();
Expand Down
18 changes: 15 additions & 3 deletions src/yb/tserver/tserver_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,11 @@ message ProbeTransactionDeadlockRequestPB {

optional bytes tablet_id = 2;

optional bytes waiting_txn_id = 3;
optional bytes probe_origin_txn_id = 3;

optional bytes blocking_txn_id = 4;

optional bytes detector_uuid = 5;
optional bytes detector_id = 5;

optional fixed32 probe_num = 6;

Expand All @@ -373,7 +373,19 @@ message ProbeTransactionDeadlockResponsePB {
// If empty, no deadlock was detected. Otherwise, includes reverse ordered list of the deadlock
// participants detected by the probe recipient. The probe sender should append its own txn to
// this list before responding to its own probe sender.
repeated bytes deadlocked_txn_ids = 3;
repeated bytes deadlocked_txn_ids = 3; // deprecated in favor of deadlock field below

message DeadlockedTxnInfo {
optional bytes id = 1;

optional bytes tablet_id = 2;

optional bytes detector_id = 3;

optional fixed64 txn_start_us = 4;
}

repeated DeadlockedTxnInfo deadlock = 4;
}

message GetTransactionStatusAtParticipantRequestPB {
Expand Down
85 changes: 83 additions & 2 deletions src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "yb/util/monotime.h"
#include "yb/util/pb_util.h"
#include "yb/util/random_util.h"
#include "yb/util/scope_exit.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_thread_holder.h"
#include "yb/util/tsan_util.h"
Expand Down Expand Up @@ -64,6 +65,7 @@ DECLARE_uint64(TEST_inject_process_update_resp_delay_ms);
DECLARE_uint64(TEST_delay_rpc_status_req_callback_ms);
DECLARE_int32(TEST_txn_participant_inject_delay_on_start_shutdown_ms);
DECLARE_string(ysql_pg_conf_csv);
DECLARE_uint64(transaction_heartbeat_usec);

using namespace std::literals;

Expand Down Expand Up @@ -1037,8 +1039,12 @@ TEST_F(
// re-run conflict resolution periodically, this deadlock wouldn't be detected in the current
// implementation.
ASSERT_OK(conn3.Execute("UPDATE foo SET v2=v2+1000 WHERE k=1"));
ASSERT_FALSE(
conn3.Execute("UPDATE foo SET v2=v2+1000 WHERE k=2").ok() && status_future.get().ok());
auto conn3_status = conn3.Execute("UPDATE foo SET v2=v2+1000 WHERE k=2");
ASSERT_STR_CONTAINS(conn3_status.ToUserMessage(true /*include_code*/), "Deadlock");
ASSERT_OK(conn1.CommitTransaction());

ASSERT_STR_CONTAINS(
status_future.get().ToString(), "could not serialize access due to concurrent update");
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock)) {
Expand All @@ -1051,6 +1057,7 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock))
"INSERT INTO foo SELECT generate_series(0, $0), 0", kNumKeys * 5));

for (int deadlock_idx = 1; deadlock_idx <= kNumKeys; ++deadlock_idx) {
LOG(INFO) << "Begin test of deadlock_idx " << deadlock_idx;
auto update_conn = ASSERT_RESULT(Connect());
ASSERT_OK(update_conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
ASSERT_OK(update_conn.Fetch("SELECT * FROM foo WHERE k=0 FOR UPDATE"));
Expand Down Expand Up @@ -1098,6 +1105,80 @@ TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(ParallelUpdatesDetectDeadlock))
LOG(INFO) << "Thread 0 failed to update " << s;
did_deadlock.CountDown();
}
LOG(INFO) << "End test of deadlock_idx " << deadlock_idx;
}
}

TEST_F(PgWaitQueuesTest, YB_DISABLE_TEST_IN_TSAN(DeadlockResolvesYoungestTxn)) {
// Tests that in a large cyclic deadlock, the youngest transaction is always the *only*
// transaction which is aborted.
constexpr int kNumKeys = 20;
auto setup_conn = ASSERT_RESULT(Connect());
ASSERT_OK(setup_conn.Execute("CREATE TABLE foo (k INT PRIMARY KEY, v INT)"));
ASSERT_OK(setup_conn.ExecuteFormat(
"INSERT INTO foo SELECT generate_series(0, $0), 0", kNumKeys * 5));

for (int last_locker_idx = 0; last_locker_idx < kNumKeys; ++last_locker_idx) {
LOG(INFO) << "Begin test of last_locker_idx " << last_locker_idx;

CountDownLatch did_deadlock(1);
CountDownLatch did_commit(kNumKeys - 1);
CountDownLatch did_first_select(kNumKeys - 1);
CountDownLatch did_deadlock_select(1);
TestThreadHolder thread_holder;
for (int offset_idx = 1; offset_idx < kNumKeys; ++offset_idx) {
auto key_idx = (last_locker_idx + offset_idx) % kNumKeys;
thread_holder.AddThreadFunctor(
[this, key_idx, &did_deadlock, &did_commit, &did_first_select, &did_deadlock_select] {
LOG(INFO) << "Starting thread " << key_idx;
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
ASSERT_OK(conn.FetchFormat("SELECT * FROM foo WHERE k=$0 FOR UPDATE", key_idx));
did_first_select.CountDown();
auto txn_id = ASSERT_RESULT(conn.FetchRow<Uuid>("SELECT yb_get_current_transaction()"));

LOG(INFO) << "Thread " << key_idx
<< " locked key " << key_idx
<< " for txn " << yb::ToString(txn_id);
ASSERT_TRUE(did_deadlock_select.WaitFor(20s * kTimeMultiplier));

ASSERT_OK(conn.FetchFormat(
"SELECT * FROM foo WHERE k=$0 FOR UPDATE", (key_idx + 1) % kNumKeys));
LOG(INFO) << "Thread " << key_idx << " locked key " << (key_idx + 1) % kNumKeys;

did_deadlock.WaitFor(30s * kTimeMultiplier);
ASSERT_OK(conn.CommitTransaction());
LOG(INFO) << "Thread " << key_idx << " committed";
did_commit.CountDown();
});
}

ASSERT_TRUE(did_first_select.WaitFor(10s * kTimeMultiplier));
std::this_thread::sleep_for(500ms * kTimeMultiplier);

LOG(INFO) << "Starting deadlock conn " << last_locker_idx;
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
ASSERT_OK(conn.FetchFormat("SELECT * FROM foo WHERE k=$0 FOR UPDATE", last_locker_idx));
std::this_thread::sleep_for(2us * FLAGS_transaction_heartbeat_usec);
did_deadlock_select.CountDown();
auto txn_id = ASSERT_RESULT(conn.FetchRow<Uuid>("SELECT yb_get_current_transaction()"));

LOG(INFO) << "Deadlocker thread " << last_locker_idx
<< " locked key " << last_locker_idx
<< " for txn " << yb::ToString(txn_id);

auto s = conn.FetchFormat(
"SELECT * FROM foo WHERE k=$0 FOR UPDATE", (last_locker_idx + 1) % kNumKeys);
ASSERT_NOK(s);
ASSERT_STR_CONTAINS(s.status().ToUserMessage(true /*include_code*/), "Deadlock");
did_deadlock.CountDown();
LOG(INFO) << "Finished deadlocker thread";

ASSERT_TRUE(did_commit.WaitFor(30s * kTimeMultiplier));
LOG(INFO) << "End test of last_locker_idx " << last_locker_idx;

thread_holder.JoinAll();
}
}

Expand Down

0 comments on commit ad37035

Please sign in to comment.