Skip to content

Commit

Permalink
[#6424] Fix post-split compaction to be async
Browse files Browse the repository at this point in the history
Summary: Previously we were blocking on post-split tablet compaction, so tablet open could not complete. This diff modifies post-split compaction to be asynchronous.

Test Plan: `ybd --cxx-test integration-tests_tablet-split-itest --gtest_filter TabletSplitITest.SplitTabletIsAsync`

Reviewers: timur

Reviewed By: timur

Subscribers: bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D9951
  • Loading branch information
robertsami committed Dec 14, 2020
1 parent 49a035d commit b408754
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/yb/docdb/docdb-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3703,7 +3703,7 @@ TEST_P(DocDBTestWrapper, SetHybridTimeFilter) {
if (j == 0) {
ASSERT_OK(FlushRocksDbAndWait());
} else if (j == 1) {
ForceRocksDBCompact(rocksdb());
ASSERT_OK(ForceRocksDBCompact(rocksdb()));
}
}

Expand Down
20 changes: 8 additions & 12 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,21 +719,17 @@ bool HasRunningCompaction(rocksdb::DB* db) {
return running_compactions > 0;
}

void ForceRocksDBCompact(rocksdb::DB* db) {
auto s = ForceFullRocksDBCompactAsync(db);
if (s.ok()) {
while (HasPendingCompaction(db) || HasRunningCompaction(db)) {
std::this_thread::sleep_for(10ms);
}
} else {
LOG(WARNING) << s;
}
}

Status ForceFullRocksDBCompactAsync(rocksdb::DB* db) {
Status ForceRocksDBCompact(rocksdb::DB* db, const MonoDelta timeout) {
auto expiration = MonoTime::Now() + timeout;
RETURN_NOT_OK_PREPEND(
db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr),
"Compact range failed:");
while ((HasPendingCompaction(db) || HasRunningCompaction(db)) && MonoTime::Now() < expiration) {
if (MonoTime::Now() > expiration) {
return STATUS(TimedOut, "Timed out waiting for manual compaction to complete");
}
std::this_thread::sleep_for(10ms);
}
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ bool HasPendingCompaction(rocksdb::DB* db);
bool HasRunningCompaction(rocksdb::DB* db);

// Request RocksDB compaction and wait until it completes.
void ForceRocksDBCompact(rocksdb::DB* db);
// Request RocksDB compaction and return immediately.
CHECKED_STATUS ForceFullRocksDBCompactAsync(rocksdb::DB* db);
CHECKED_STATUS ForceRocksDBCompact(
rocksdb::DB* db,
const MonoDelta timeout = MonoDelta::FromMilliseconds(50));

// Initialize the RocksDB 'options'.
// The 'statistics' object provided by the caller will be used by RocksDB to maintain the stats for
Expand Down
28 changes: 25 additions & 3 deletions src/yb/integration-tests/tablet-split-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// under the License.
//

#include <chrono>
#include <thread>
#include "yb/client/client-test-util.h"
#include "yb/client/error.h"
#include "yb/client/ql-dml-test-base.h"
Expand All @@ -27,6 +29,7 @@
#include "yb/docdb/doc_key.h"
#include "yb/docdb/docdb_rocksdb_util.h"

#include "yb/gutil/dynamic_annotations.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/test_workload.h"

Expand Down Expand Up @@ -67,6 +70,7 @@ DECLARE_int32(cleanup_split_tablets_interval_sec);
DECLARE_bool(TEST_skip_deleting_split_tablets);
DECLARE_int32(replication_factor);
DECLARE_int32(tablet_split_limit_per_table);
DECLARE_bool(TEST_pause_before_post_split_compation);

namespace yb {

Expand Down Expand Up @@ -222,7 +226,7 @@ class TabletSplitITest : public client::TransactionTestBase<MiniCluster> {
size_t num_replicas_online = 0);

// Wait for all peers to complete post-split compaction.
void WaitForTestTableTabletsCompactionFinish();
void WaitForTestTableTabletsCompactionFinish(MonoDelta timeout);

// Returns all tablet peers in the cluster which are marked as being in
// TABLET_DATA_SPLIT_COMPLETED state. In most of the test cases below, this corresponds to the
Expand Down Expand Up @@ -420,11 +424,11 @@ void TabletSplitITest::WaitForTabletSplitCompletion(
DumpTableLocations(catalog_manager(), client::kTableName);
}

void TabletSplitITest::WaitForTestTableTabletsCompactionFinish() {
void TabletSplitITest::WaitForTestTableTabletsCompactionFinish(MonoDelta timeout) {
for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) {
ASSERT_OK(WaitFor([&peer] {
return peer->tablet()->metadata()->has_been_fully_compacted();
}, 20s * kTimeMultiplier, "Wait for post tablet split compaction to be completed"));
}, timeout * kTimeMultiplier, "Wait for post tablet split compaction to be completed"));
}
}

Expand Down Expand Up @@ -647,6 +651,24 @@ TEST_P(TabletSplitITestWithIsolationLevel, SplitSingleTablet) {
ASSERT_OK(CheckPostSplitTabletReplicasData(kNumRows * 2));
}

TEST_F(TabletSplitITest, SplitTabletIsAsync) {
constexpr auto kNumRows = 500;

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_before_post_split_compation) = true;

ASSERT_OK(CreateSingleTabletAndSplit(kNumRows));

for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) {
EXPECT_FALSE(peer->tablet()->metadata()->has_been_fully_compacted());
}
std::this_thread::sleep_for(1s * kTimeMultiplier);
for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) {
EXPECT_FALSE(peer->tablet()->metadata()->has_been_fully_compacted());
}
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_before_post_split_compation) = false;
ASSERT_NO_FATALS(WaitForTestTableTabletsCompactionFinish(5s * kTimeMultiplier));
}

TEST_F(TabletSplitITest, ParentTabletCleanup) {
constexpr auto kNumRows = 500;

Expand Down
14 changes: 4 additions & 10 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3037,23 +3037,17 @@ bool Tablet::MightHaveNonRelevantData() {
}

void Tablet::ForceRocksDBCompactInTest() {
if (regular_db_) {
docdb::ForceRocksDBCompact(regular_db_.get());
}
if (intents_db_) {
CHECK_OK(intents_db_->Flush(rocksdb::FlushOptions()));
docdb::ForceRocksDBCompact(intents_db_.get());
}
CHECK_OK(ForceFullRocksDBCompact());
}

Status Tablet::ForceFullRocksDBCompactAsync() {
Status Tablet::ForceFullRocksDBCompact() {
if (regular_db_) {
RETURN_NOT_OK(docdb::ForceFullRocksDBCompactAsync(regular_db_.get()));
RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get()));
}
if (intents_db_) {
RETURN_NOT_OK_PREPEND(
intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed");
RETURN_NOT_OK(docdb::ForceFullRocksDBCompactAsync(intents_db_.get()));
RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get()));
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {

void ForceRocksDBCompactInTest();

CHECKED_STATUS ForceFullRocksDBCompactAsync();
CHECKED_STATUS ForceFullRocksDBCompact();

docdb::DocDB doc_db() const { return { regular_db_.get(), intents_db_.get(), &key_bounds_ }; }

Expand Down
37 changes: 36 additions & 1 deletion src/yb/tserver/ts_tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include "yb/tserver/ts_tablet_manager.h"

#include <algorithm>
#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include <boost/optional/optional.hpp>
Expand Down Expand Up @@ -178,6 +180,9 @@ DEFINE_test_flag(int32, apply_tablet_split_inject_delay_ms, 0,
DEFINE_test_flag(bool, skip_deleting_split_tablets, false,
"Skip deleting tablets which have been split.");

DEFINE_test_flag(bool, pause_before_post_split_compation, false,
"Pause before triggering post split compaction.");

namespace {

constexpr int kDbCacheSizeUsePercentage = -1;
Expand Down Expand Up @@ -218,6 +223,17 @@ DEFINE_int32(read_pool_max_queue_size, 128,
"is used to run multiple read operations, that are part of the same tablet rpc, "
"in parallel.");


DEFINE_int32(post_split_trigger_compaction_pool_max_threads, 1,
"The maximum number of threads allowed for post_split_trigger_compaction_pool_. This "
"pool is used to run compactions on tablets after they have been split and still "
"contain irrelevant data from the tablet they were sourced from.");
DEFINE_int32(post_split_trigger_compaction_pool_max_queue_size, 16,
"The maximum number of tasks that can be held in the pool for "
"post_split_trigger_compaction_pool_. This pool is used to run compactions on tablets "
"after they have been split and still contain irrelevant data from the tablet they "
"were sourced from.");

DEFINE_test_flag(int32, sleep_after_tombstoning_tablet_secs, 0,
"Whether we sleep in LogAndTombstone after calling DeleteTabletData.");

Expand Down Expand Up @@ -280,6 +296,9 @@ METRIC_DEFINE_histogram(server, ts_bootstrap_time, "TServer Bootstrap Time",
"Time that the tablet server takes to bootstrap all of its tablets.",
10000000, 2);

THREAD_POOL_METRICS_DEFINE(
server, post_split_trigger_compaction_pool, "Thread pool for tablet compaction jobs.");

using consensus::ConsensusMetadata;
using consensus::ConsensusStatePB;
using consensus::RaftConfigPB;
Expand Down Expand Up @@ -453,6 +472,12 @@ TSTabletManager::TSTabletManager(FsManager* fs_manager,
.set_max_queue_size(FLAGS_read_pool_max_queue_size)
.set_metrics(std::move(read_metrics))
.Build(&read_pool_));
CHECK_OK(ThreadPoolBuilder("compaction")
.set_max_threads(FLAGS_post_split_trigger_compaction_pool_max_threads)
.set_max_queue_size(FLAGS_post_split_trigger_compaction_pool_max_queue_size)
.set_metrics(THREAD_POOL_METRICS_INSTANCE(
server_->metric_entity(), post_split_trigger_compaction_pool))
.Build(&post_split_trigger_compaction_pool_));

int64_t block_cache_size_bytes = FLAGS_db_block_cache_size_bytes;
int64_t total_ram_avail = MemTracker::GetRootTracker()->limit();
Expand Down Expand Up @@ -1511,10 +1536,17 @@ void TSTabletManager::OpenTablet(const RaftGroupMetadataPtr& meta,

if (tablet->MightHaveNonRelevantData()) {
WARN_NOT_OK(
tablet->ForceFullRocksDBCompactAsync(), "Failed to submit compaction for split tablet");
post_split_trigger_compaction_pool_->SubmitFunc(
std::bind(&TSTabletManager::CompactPostSplitTablet, this, tablet)),
"Failed to submit compaction for post-split tablet.");
}
}

void TSTabletManager::CompactPostSplitTablet(tablet::TabletPtr tablet) {
TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compation);
WARN_NOT_OK(tablet->ForceFullRocksDBCompact(), "Failed to compact post-split tablet.");
}

void TSTabletManager::StartShutdown() {
{
std::lock_guard<RWMutex> lock(mutex_);
Expand Down Expand Up @@ -1600,6 +1632,9 @@ void TSTabletManager::CompleteShutdown() {
if (append_pool_) {
append_pool_->Shutdown();
}
if (post_split_trigger_compaction_pool_) {
post_split_trigger_compaction_pool_->Shutdown();
}

{
std::lock_guard<RWMutex> l(mutex_);
Expand Down
5 changes: 5 additions & 0 deletions src/yb/tserver/ts_tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table

void CleanupSplitTablets();

void CompactPostSplitTablet(tablet::TabletPtr tablet);

const CoarseTimePoint start_time_;

FsManager* const fs_manager_;
Expand Down Expand Up @@ -561,6 +563,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table
// Thread pool for read ops, that are run in parallel, shared between all tablets.
std::unique_ptr<ThreadPool> read_pool_;

// Thread pool for manually triggering compactions for tablets created from a split.
std::unique_ptr<ThreadPool> post_split_trigger_compaction_pool_;

std::unique_ptr<rpc::Poller> tablets_cleaner_;

// Used for scheduling flushes
Expand Down

0 comments on commit b408754

Please sign in to comment.