Skip to content

Commit

Permalink
[yugabyte#22099] docdb: Postpone TransactionParticipant's min_running…
Browse files Browse the repository at this point in the history
…_ht Initialization

Summary:
**Issue:**

Initialized `min_running_ht` in `TransactionParticipant` indicates completed transaction loading by `TransactionLoader`.
Transaction loading starts during tablet bootstrap. However, there's a chance Transaction loading finishes before tablet bootstrap completes, which means `min_running_ht` could be initialized during the bootstrap. This could lead to unexpected behavior when `TransactionParticipant::MinRunningHybridTime()` is called during bootstrap.
A recent code change D33131 has exposed by introducing calls to `TransactionParticipant::MinRunningHybridTime()` in various functions that are likely to be executed during the bootstrap WAL reply process. And  early `min_running_ht` initialization triggers unexpected code execution and caused segmentation fault.(See [[ yugabyte#21877 | yugabyte#21877 ]] for more details)

**Fix:**

To address this, `min_running_ht` initialization now happens within the `LoadFinished` function, and it guarantees:
* Successful Transaction Loading: At this point, all transactions for the tablet have been loaded successfully.
* Local Bootstrap Completion: Once start_latch_.Wait() completes, it means `TransactionParticipant::Start()` has been called. This ensures the local bootstrap process has finished successfully.

This ensure `min_running_ht` is initialized at a safer point in the startup process.
Jira: DB-11029

Test Plan:
Unit test WIP
To validate the effectiveness of the fix, we re-ran the stress tests that originally exposed issue [[ yugabyte#21877 | yugabyte#21877 ]]. The tests completed successfully with no segmentation faults observed.

Reviewers: esheng, sergei

Reviewed By: sergei

Subscribers: slingam, rthallam, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34389
  • Loading branch information
yusong-yan committed May 1, 2024
1 parent 3b7e0ab commit 138b81a
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 26 deletions.
46 changes: 46 additions & 0 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests);
DECLARE_uint64(aborted_intent_cleanup_ms);
DECLARE_uint64(max_clock_skew_usec);
DECLARE_uint64(transaction_heartbeat_usec);
DECLARE_bool(TEST_load_transactions_sync);
DECLARE_uint64(TEST_inject_sleep_before_applying_intents_ms);
DECLARE_bool(TEST_skip_process_apply);
DECLARE_bool(TEST_skip_remove_intent);

namespace yb {
namespace client {
Expand Down Expand Up @@ -1782,6 +1786,48 @@ TEST_F_EX(QLTransactionTest, GCLogsAfterTransactionalWritesStop, QLTransactionTe
thread_holder.Stop();
}

class QLTransactionTestSingleTS : public QLTransactionTest {
protected:
void SetUp() override {
mini_cluster_opt_.num_tablet_servers = 1;
QLTransactionTest::SetUp();
}
};

TEST_F_EX(QLTransactionTest, TransactionsEarlyLoadedTest, QLTransactionTestSingleTS) {
auto txn_1 = CreateTransaction();
ASSERT_OK(WriteRow(
CreateSession(txn_1),
/* key = */ 0,
/* value = */ 0,
WriteOpType::INSERT,
Flush::kTrue));
auto txn_2 = CreateTransaction();
ASSERT_OK(WriteRow(
CreateSession(txn_2),
/* key = */ 100,
/* value = */ 100,
WriteOpType::INSERT,
Flush::kTrue));

// Skip applying and removing intent before stoping the cluster.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = true;
txn_1->Abort();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = true;
ASSERT_OK(txn_2->CommitFuture().get());

cluster_->StopSync();

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_remove_intent) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_inject_sleep_before_applying_intents_ms) = 1000;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_load_transactions_sync) = true;
// Replay WAL [txn_1(ABORTED), txn_2(COMMITTED)]
ASSERT_OK(cluster_->Start());
CheckAllTabletsRunning();
AssertNoRunningTransactions();
}

TEST_F(QLTransactionTest, DeleteTableDuringWrite) {
DisableApplyingIntents();
ASSERT_NO_FATALS(WriteData());
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ DEFINE_test_flag(uint64, inject_sleep_before_applying_write_batch_ms, 0,
DEFINE_test_flag(uint64, inject_sleep_before_applying_intents_ms, 0,
"Sleep before applying intents to docdb after transaction commit");

DEFINE_test_flag(bool, skip_remove_intent, false,
"If true, remove intent will be skipped");

DECLARE_bool(TEST_invalidate_last_change_metadata_op);

using namespace std::placeholders;
Expand Down Expand Up @@ -2025,6 +2028,9 @@ Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApply
template <class Ids>
Status Tablet::RemoveIntentsImpl(
const RemoveIntentsData& data, RemoveReason reason, const Ids& ids) {
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_remove_intent))) {
return Status::OK();
}
auto scoped_read_operation = CreateScopedRWOperationNotBlockingRocksDbShutdownStart();
RETURN_NOT_OK(scoped_read_operation);

Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/transaction_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ class TransactionLoader::Executor {
intents_iterator_.Reset();

RETURN_NOT_OK(CheckForShutdown());
context().CompleteLoad([this] {
loader_.state_ = TransactionLoaderState::kCompleted;
});

loader_.state_ = TransactionLoaderState::kCompleted;

{
// We need to lock and unlock the mutex here to avoid missing a notification in WaitLoaded
// and WaitAllLoaded. The waiting loop in those functions is equivalent to the following,
Expand Down
1 change: 0 additions & 1 deletion src/yb/tablet/transaction_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class TransactionLoaderContext {

virtual TransactionStatusResolver& AddStatusResolver() = 0;
virtual const std::string& LogPrefix() const = 0;
virtual void CompleteLoad(const std::function<void()>& functor) = 0;
virtual void LoadTransaction(
TransactionMetadata&& metadata,
TransactionalBatchData&& last_batch_data,
Expand Down
57 changes: 35 additions & 22 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ DEFINE_test_flag(double, txn_participant_error_on_load, 0.0,
"Probability that the participant would error on call to SetDB before launching "
"the transaction loader thread.");

DEFINE_test_flag(bool, skip_process_apply, false,
"If true, ProcessApply will be skipped");

DEFINE_test_flag(bool, load_transactions_sync, false,
"If true, the test will block until the loader has finished loading all txns.");

namespace yb {
namespace tablet {

Expand Down Expand Up @@ -815,6 +821,9 @@ class TransactionParticipant::Impl
}

Status ProcessApply(const TransactionApplyData& data) {
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_skip_process_apply))) {
return Status::OK();
}
VLOG_WITH_PREFIX(2) << "Apply: " << data.ToString();

RETURN_NOT_OK(loader_.WaitLoaded(data.transaction_id));
Expand Down Expand Up @@ -1024,6 +1033,10 @@ class TransactionParticipant::Impl
return STATUS_FORMAT(InternalError, "Flag TEST_txn_participant_error_on_load set.");
}
loader_.Start(pending_op_counter_blocking_rocksdb_shutdown_start, db_);
if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_TEST_load_transactions_sync))) {
RETURN_NOT_OK(loader_.WaitAllLoaded());
std::this_thread::sleep_for(500ms);
}
return Status::OK();
}

Expand Down Expand Up @@ -1398,13 +1411,6 @@ class TransactionParticipant::Impl
>
> Transactions;

void CompleteLoad(const std::function<void()>& functor) override {
MinRunningNotifier min_running_notifier(&applier_);
std::lock_guard lock(mutex_);
functor();
TransactionsModifiedUnlocked(&min_running_notifier);
}

void LoadFinished(Status load_status) EXCLUDES(status_resolvers_mutex_) override {
// The start_latch will be hit either from a CountDown from Start, or from Shutdown, so make
// sure that at the end of Load, we unblock shutdown.
Expand Down Expand Up @@ -1442,24 +1448,29 @@ class TransactionParticipant::Impl
std::this_thread::sleep_for(10ms);
}

if (!pending_applies.empty()) {
LOG_WITH_PREFIX(INFO)
{
LOG_IF_WITH_PREFIX(INFO, !pending_applies.empty())
<< __func__ << ": starting " << pending_applies.size() << " pending applies";
MinRunningNotifier min_running_notifier(&applier_);
std::lock_guard lock(mutex_);
size_t idx = 0;
for (const auto& p : pending_applies) {
auto it = transactions_.find(p.first);
if (it == transactions_.end()) {
LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
continue;
}
if (!pending_applies.empty()) {
size_t idx = 0;
for (const auto& p : pending_applies) {
auto it = transactions_.find(p.first);
if (it == transactions_.end()) {
LOG_WITH_PREFIX(INFO) << "Unknown transaction for pending apply: " << AsString(p.first);
continue;
}

TransactionApplyData apply_data;
apply_data.transaction_id = p.first;
apply_data.commit_ht = p.second.commit_ht;
(**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
++idx;
TransactionApplyData apply_data;
apply_data.transaction_id = p.first;
apply_data.commit_ht = p.second.commit_ht;
(**it).SetApplyData(p.second.state, &apply_data, &operations[idx]);
++idx;
}
}
transactions_loaded_ = true;
TransactionsModifiedUnlocked(&min_running_notifier);
}

{
Expand All @@ -1481,7 +1492,7 @@ class TransactionParticipant::Impl

void TransactionsModifiedUnlocked(MinRunningNotifier* min_running_notifier) REQUIRES(mutex_) {
metric_transactions_running_->set_value(transactions_.size());
if (auto res = loader_.Completed(); !res.ok() || !(*res)) {
if (!transactions_loaded_) {
return;
}

Expand Down Expand Up @@ -2169,6 +2180,8 @@ class TransactionParticipant::Impl

std::shared_ptr<MemTracker> mem_tracker_ GUARDED_BY(mutex_);

bool transactions_loaded_ GUARDED_BY(mutex_) = false;

bool pending_applied_notified_ = false;
std::mutex pending_applies_mutex_;
std::vector<std::pair<TabletId, TransactionId>> pending_applies_
Expand Down

0 comments on commit 138b81a

Please sign in to comment.