Skip to content

Commit

Permalink
[BACKPORT 2.20][#22099] docdb: Postpone TransactionParticipant's min_…
Browse files Browse the repository at this point in the history
…running_ht Initialization

Summary:
**Issue:**

Original commit: e414e3f / D34389
Initialized `min_running_ht` in `TransactionParticipant` indicates completed transaction loading by `TransactionLoader`.
Transaction loading starts during tablet bootstrap. However, there's a chance Transaction loading finishes before tablet bootstrap completes, which means `min_running_ht` could be initialized during the bootstrap. This could lead to unexpected behavior when `TransactionParticipant::MinRunningHybridTime()` is called during bootstrap.
A recent code change D33131 has exposed by introducing calls to `TransactionParticipant::MinRunningHybridTime()` in various functions that are likely to be executed during the bootstrap WAL reply process. And  early `min_running_ht` initialization triggers unexpected code execution and caused segmentation fault.(See [[ #21877 | #21877 ]] for more details)

**Fix:**

To address this, `min_running_ht` initialization now happens within the `LoadFinished` function, and it guarantees:
* Successful Transaction Loading: At this point, all transactions for the tablet have been loaded successfully.
* Local Bootstrap Completion: Once start_latch_.Wait() completes, it means `TransactionParticipant::Start()` has been called. This ensures the local bootstrap process has finished successfully.

This ensure `min_running_ht` is initialized at a safer point in the startup process.
Jira: DB-11029

Test Plan: To validate the effectiveness of the fix, we re-ran the stress tests that originally exposed issue [[ #21877 | #21877 ]]. The tests completed successfully with no segmentation faults observed.

Reviewers: esheng, sergei, rthallam

Reviewed By: rthallam

Subscribers: ybase, rthallam, slingam

Differential Revision: https://phorge.dev.yugabyte.com/D38931
  • Loading branch information
yusong-yan committed Oct 24, 2024
1 parent 3e3f8e7 commit 513131c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 26 deletions.
46 changes: 46 additions & 0 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests);
DECLARE_uint64(aborted_intent_cleanup_ms);
DECLARE_uint64(max_clock_skew_usec);
DECLARE_uint64(transaction_heartbeat_usec);
DECLARE_bool(TEST_load_transactions_sync);
DECLARE_uint64(TEST_inject_sleep_before_applying_intents_ms);
DECLARE_bool(TEST_skip_process_apply);
DECLARE_bool(TEST_skip_remove_intent);

namespace yb {
namespace client {
Expand Down Expand Up @@ -1781,6 +1785,48 @@ TEST_F_EX(QLTransactionTest, GCLogsAfterTransactionalWritesStop, QLTransactionTe
thread_holder.Stop();
}

class QLTransactionTestSingleTS : public QLTransactionTest {
protected:
void SetUp() override {
mini_cluster_opt_.num_tablet_servers = 1;
QLTransactionTest::SetUp();
}
};

TEST_F_EX(QLTransactionTest, TransactionsEarlyLoadedTest, QLTransactionTestSingleTS) {
auto txn_1 = CreateTransaction();
ASSERT_OK(WriteRow(
CreateSession(txn_1),
/* key = */ 0,
/* value = */ 0,
WriteOpType::INSERT,
Flush::kTrue));
auto txn_2 = CreateTransaction();
ASSERT_OK(WriteRow(
CreateSession(txn_2),
/* key = */ 100,
/* value = */ 100,
WriteOpType::INSERT,
Flush::kTrue));

// Skip applying and removing intent before stoping the cluster.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = true;
txn_1->Abort();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = true;
ASSERT_OK(txn_2->CommitFuture().get());

cluster_->StopSync();

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_inject_sleep_before_applying_intents_ms) = 1000;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_load_transactions_sync) = true;
// Replay WAL [txn_1(ABORTED), txn_2(COMMITTED)]
ASSERT_OK(cluster_->Start());
CheckAllTabletsRunning();
AssertNoRunningTransactions();
}

TEST_F(QLTransactionTest, DeleteTableDuringWrite) {
DisableApplyingIntents();
ASSERT_NO_FATALS(WriteData());
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ DEFINE_test_flag(uint64, inject_sleep_before_applying_write_batch_ms, 0,
DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0,
"Sleep before applying intents to docdb after transaction commit");

DEFINE_test_flag(bool, skip_remove_intent, false,
"If true, remove intent will be skipped");

DECLARE_bool(TEST_invalidate_last_change_metadata_op);

using namespace std::placeholders;
Expand Down Expand Up @@ -2023,6 +2026,9 @@ Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApply
template <class Ids>
Status Tablet::RemoveIntentsImpl(
const RemoveIntentsData& data, RemoveReason reason, const Ids& ids) {
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_remove_intent))) {
return Status::OK();
}
auto scoped_read_operation = CreateScopedRWOperationNotBlockingRocksDbShutdownStart();
RETURN_NOT_OK(scoped_read_operation);

Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/transaction_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ class TransactionLoader::Executor {
intents_iterator_.Reset();

RETURN_NOT_OK(CheckForShutdown());
context().CompleteLoad([this] {
loader_.state_ = TransactionLoaderState::kCompleted;
});

loader_.state_ = TransactionLoaderState::kCompleted;

{
// We need to lock and unlock the mutex here to avoid missing a notification in WaitLoaded
// and WaitAllLoaded. The waiting loop in those functions is equivalent to the following,
Expand Down
1 change: 0 additions & 1 deletion src/yb/tablet/transaction_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class TransactionLoaderContext {

virtual TransactionStatusResolver& AddStatusResolver() = 0;
virtual const std::string& LogPrefix() const = 0;
virtual void CompleteLoad(const std::function<void()>& functor) = 0;
virtual void LoadTransaction(
TransactionMetadata&& metadata,
TransactionalBatchData&& last_batch_data,
Expand Down
57 changes: 35 additions & 22 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ DEFINE_test_flag(double, txn_participant_error_on_load, 0.0,
"Probability that the participant would error on call to SetDB before launching "
"the transaction loader thread.");

DEFINE_test_flag(bool, skip_process_apply, false,
"If true, ProcessApply will be skipped");

DEFINE_test_flag(bool, load_transactions_sync, false,
"If true, the test will block until the loader has finished loading all txns.");

namespace yb {
namespace tablet {

Expand Down Expand Up @@ -772,6 +778,9 @@ class TransactionParticipant::Impl
}

Status ProcessApply(const TransactionApplyData& data) {
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_process_apply))) {
return Status::OK();
}
VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString();

RETURN_NOT_OK(loader_.WaitLoaded(data.transaction_id));
Expand Down Expand Up @@ -973,6 +982,10 @@ class TransactionParticipant::Impl
return STATUS_FORMAT(InternalError, "Flag TEST_txn_participant_error_on_load set.");
}
loader_.Start(pending_op_counter_blocking_rocksdb_shutdown_start, db_);
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_load_transactions_sync))) {
RETURN_NOT_OK(loader_.WaitAllLoaded());
std::this_thread::sleep_for(500ms);
}
return Status::OK();
}

Expand Down Expand Up @@ -1382,13 +1395,6 @@ class TransactionParticipant::Impl
>
> Transactions;

void CompleteLoad(const std::function<void()>& functor) override {
MinRunningNotifier min_running_notifier(&applier_);
std::lock_guard lock(mutex_);
functor();
TransactionsModifiedUnlocked(&min_running_notifier);
}

void LoadFinished(Status load_status) EXCLUDES(status_resolvers_mutex_) override {
// The start_latch will be hit either from a CountDown from Start, or from Shutdown, so make
// sure that at the end of Load, we unblock shutdown.
Expand Down Expand Up @@ -1426,24 +1432,29 @@ class TransactionParticipant::Impl
std::this_thread::sleep_for(10ms);
}

if (!pending_applies.empty()) {
LOG_WITH_PREFIX(INFO)
{
LOG_IF_WITH_PREFIX(INFO, !pending_applies.empty())
<< __func__ << ": starting " << pending_applies.size() << " pending applies";
MinRunningNotifier min_running_notifier(&applier_);
std::lock_guard lock(mutex_);
size_t idx = 0;
for (const auto& p : pending_applies) {
auto it = transactions_.find(p.first);
if (it == transactions_.end()) {
LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
continue;
}
if (!pending_applies.empty()) {
size_t idx = 0;
for (const auto& p : pending_applies) {
auto it = transactions_.find(p.first);
if (it == transactions_.end()) {
LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
continue;
}

TransactionApplyData apply_data;
apply_data.transaction_id = p.first;
apply_data.commit_ht = p.second.commit_ht;
(**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
++idx;
TransactionApplyData apply_data;
apply_data.transaction_id = p.first;
apply_data.commit_ht = p.second.commit_ht;
(**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
++idx;
}
}
transactions_loaded_ = true;
TransactionsModifiedUnlocked(&min_running_notifier);
}

{
Expand All @@ -1465,7 +1476,7 @@ class TransactionParticipant::Impl

void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
metric_transactions_running_->set_value(transactions_.size());
if (auto res = loader_.Completed(); !res.ok() || !(*res)) {
if (!transactions_loaded_) {
return;
}

Expand Down Expand Up @@ -2132,6 +2143,8 @@ class TransactionParticipant::Impl
std::unique_ptr<docdb::WaitQueue> wait_queue_;

std::shared_ptr<MemTracker> mem_tracker_ GUARDED_BY(mutex_);

bool transactions_loaded_ GUARDED_BY(mutex_) = false;
};

TransactionParticipant::TransactionParticipant(
Expand Down

0 comments on commit 513131c

Please sign in to comment.