diff --git a/src/yb/docdb/docdb-test.cc b/src/yb/docdb/docdb-test.cc index befd2e26f778..a2d74cd648d0 100644 --- a/src/yb/docdb/docdb-test.cc +++ b/src/yb/docdb/docdb-test.cc @@ -3703,7 +3703,7 @@ TEST_P(DocDBTestWrapper, SetHybridTimeFilter) { if (j == 0) { ASSERT_OK(FlushRocksDbAndWait()); } else if (j == 1) { - ForceRocksDBCompact(rocksdb()); + ASSERT_OK(ForceRocksDBCompact(rocksdb())); } } diff --git a/src/yb/docdb/docdb_rocksdb_util.cc b/src/yb/docdb/docdb_rocksdb_util.cc index 3594059d65af..83262df852bb 100644 --- a/src/yb/docdb/docdb_rocksdb_util.cc +++ b/src/yb/docdb/docdb_rocksdb_util.cc @@ -719,21 +719,17 @@ bool HasRunningCompaction(rocksdb::DB* db) { return running_compactions > 0; } -void ForceRocksDBCompact(rocksdb::DB* db) { - auto s = ForceFullRocksDBCompactAsync(db); - if (s.ok()) { - while (HasPendingCompaction(db) || HasRunningCompaction(db)) { - std::this_thread::sleep_for(10ms); - } - } else { - LOG(WARNING) << s; - } -} - -Status ForceFullRocksDBCompactAsync(rocksdb::DB* db) { +Status ForceRocksDBCompact(rocksdb::DB* db, const MonoDelta timeout) { + auto expiration = MonoTime::Now() + timeout; RETURN_NOT_OK_PREPEND( db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr), "Compact range failed:"); + while ((HasPendingCompaction(db) || HasRunningCompaction(db)) && MonoTime::Now() < expiration) { + if (MonoTime::Now() > expiration) { + return STATUS(TimedOut, "Timed out waiting for manual compaction to complete"); + } + std::this_thread::sleep_for(10ms); + } return Status::OK(); } diff --git a/src/yb/docdb/docdb_rocksdb_util.h b/src/yb/docdb/docdb_rocksdb_util.h index 7d990440cc44..33ce504776fc 100644 --- a/src/yb/docdb/docdb_rocksdb_util.h +++ b/src/yb/docdb/docdb_rocksdb_util.h @@ -109,9 +109,9 @@ bool HasPendingCompaction(rocksdb::DB* db); bool HasRunningCompaction(rocksdb::DB* db); // Request RocksDB compaction and wait until it completes. -void ForceRocksDBCompact(rocksdb::DB* db); -// Request RocksDB compaction and return immediately. -CHECKED_STATUS ForceFullRocksDBCompactAsync(rocksdb::DB* db); +CHECKED_STATUS ForceRocksDBCompact( + rocksdb::DB* db, + const MonoDelta timeout = MonoDelta::FromMilliseconds(50)); // Initialize the RocksDB 'options'. // The 'statistics' object provided by the caller will be used by RocksDB to maintain the stats for diff --git a/src/yb/integration-tests/tablet-split-itest.cc b/src/yb/integration-tests/tablet-split-itest.cc index b744a67c43f0..f8edb49235c3 100644 --- a/src/yb/integration-tests/tablet-split-itest.cc +++ b/src/yb/integration-tests/tablet-split-itest.cc @@ -11,6 +11,8 @@ // under the License. // +#include +#include #include "yb/client/client-test-util.h" #include "yb/client/error.h" #include "yb/client/ql-dml-test-base.h" @@ -27,6 +29,7 @@ #include "yb/docdb/doc_key.h" #include "yb/docdb/docdb_rocksdb_util.h" +#include "yb/gutil/dynamic_annotations.h" #include "yb/integration-tests/mini_cluster.h" #include "yb/integration-tests/test_workload.h" @@ -67,6 +70,7 @@ DECLARE_int32(cleanup_split_tablets_interval_sec); DECLARE_bool(TEST_skip_deleting_split_tablets); DECLARE_int32(replication_factor); DECLARE_int32(tablet_split_limit_per_table); +DECLARE_bool(TEST_pause_before_post_split_compation); namespace yb { @@ -222,7 +226,7 @@ class TabletSplitITest : public client::TransactionTestBase { size_t num_replicas_online = 0); // Wait for all peers to complete post-split compaction. - void WaitForTestTableTabletsCompactionFinish(); + void WaitForTestTableTabletsCompactionFinish(MonoDelta timeout); // Returns all tablet peers in the cluster which are marked as being in // TABLET_DATA_SPLIT_COMPLETED state. In most of the test cases below, this corresponds to the @@ -420,11 +424,11 @@ void TabletSplitITest::WaitForTabletSplitCompletion( DumpTableLocations(catalog_manager(), client::kTableName); } -void TabletSplitITest::WaitForTestTableTabletsCompactionFinish() { +void TabletSplitITest::WaitForTestTableTabletsCompactionFinish(MonoDelta timeout) { for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) { ASSERT_OK(WaitFor([&peer] { return peer->tablet()->metadata()->has_been_fully_compacted(); - }, 20s * kTimeMultiplier, "Wait for post tablet split compaction to be completed")); + }, timeout * kTimeMultiplier, "Wait for post tablet split compaction to be completed")); } } @@ -647,6 +651,24 @@ TEST_P(TabletSplitITestWithIsolationLevel, SplitSingleTablet) { ASSERT_OK(CheckPostSplitTabletReplicasData(kNumRows * 2)); } +TEST_F(TabletSplitITest, SplitTabletIsAsync) { + constexpr auto kNumRows = 500; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_before_post_split_compation) = true; + + ASSERT_OK(CreateSingleTabletAndSplit(kNumRows)); + + for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) { + EXPECT_FALSE(peer->tablet()->metadata()->has_been_fully_compacted()); + } + std::this_thread::sleep_for(1s * kTimeMultiplier); + for (auto peer : ASSERT_RESULT(ListPostSplitChildrenTabletPeers())) { + EXPECT_FALSE(peer->tablet()->metadata()->has_been_fully_compacted()); + } + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_pause_before_post_split_compation) = false; + ASSERT_NO_FATALS(WaitForTestTableTabletsCompactionFinish(5s * kTimeMultiplier)); +} + TEST_F(TabletSplitITest, ParentTabletCleanup) { constexpr auto kNumRows = 500; diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 528104deeabf..088bd3841de3 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -3037,23 +3037,17 @@ bool Tablet::MightHaveNonRelevantData() { } void Tablet::ForceRocksDBCompactInTest() { - if (regular_db_) { - docdb::ForceRocksDBCompact(regular_db_.get()); - } - if (intents_db_) { - CHECK_OK(intents_db_->Flush(rocksdb::FlushOptions())); - docdb::ForceRocksDBCompact(intents_db_.get()); - } + CHECK_OK(ForceFullRocksDBCompact()); } -Status Tablet::ForceFullRocksDBCompactAsync() { +Status Tablet::ForceFullRocksDBCompact() { if (regular_db_) { - RETURN_NOT_OK(docdb::ForceFullRocksDBCompactAsync(regular_db_.get())); + RETURN_NOT_OK(docdb::ForceRocksDBCompact(regular_db_.get())); } if (intents_db_) { RETURN_NOT_OK_PREPEND( intents_db_->Flush(rocksdb::FlushOptions()), "Pre-compaction flush of intents db failed"); - RETURN_NOT_OK(docdb::ForceFullRocksDBCompactAsync(intents_db_.get())); + RETURN_NOT_OK(docdb::ForceRocksDBCompact(intents_db_.get())); } return Status::OK(); } diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 4534fd6cc4f0..dc3e82828c54 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -527,7 +527,7 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { void ForceRocksDBCompactInTest(); - CHECKED_STATUS ForceFullRocksDBCompactAsync(); + CHECKED_STATUS ForceFullRocksDBCompact(); docdb::DocDB doc_db() const { return { regular_db_.get(), intents_db_.get(), &key_bounds_ }; } diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index f07a75a373e1..58d2e89aa0c9 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -33,9 +33,11 @@ #include "yb/tserver/ts_tablet_manager.h" #include +#include #include #include #include +#include #include #include @@ -178,6 +180,9 @@ DEFINE_test_flag(int32, apply_tablet_split_inject_delay_ms, 0, DEFINE_test_flag(bool, skip_deleting_split_tablets, false, "Skip deleting tablets which have been split."); +DEFINE_test_flag(bool, pause_before_post_split_compation, false, + "Pause before triggering post split compaction."); + namespace { constexpr int kDbCacheSizeUsePercentage = -1; @@ -218,6 +223,17 @@ DEFINE_int32(read_pool_max_queue_size, 128, "is used to run multiple read operations, that are part of the same tablet rpc, " "in parallel."); + +DEFINE_int32(post_split_trigger_compaction_pool_max_threads, 1, + "The maximum number of threads allowed for post_split_trigger_compaction_pool_. This " + "pool is used to run compactions on tablets after they have been split and still " + "contain irrelevant data from the tablet they were sourced from."); +DEFINE_int32(post_split_trigger_compaction_pool_max_queue_size, 16, + "The maximum number of tasks that can be held in the pool for " + "post_split_trigger_compaction_pool_. This pool is used to run compactions on tablets " + "after they have been split and still contain irrelevant data from the tablet they " + "were sourced from."); + DEFINE_test_flag(int32, sleep_after_tombstoning_tablet_secs, 0, "Whether we sleep in LogAndTombstone after calling DeleteTabletData."); @@ -280,6 +296,9 @@ METRIC_DEFINE_histogram(server, ts_bootstrap_time, "TServer Bootstrap Time", "Time that the tablet server takes to bootstrap all of its tablets.", 10000000, 2); +THREAD_POOL_METRICS_DEFINE( + server, post_split_trigger_compaction_pool, "Thread pool for tablet compaction jobs."); + using consensus::ConsensusMetadata; using consensus::ConsensusStatePB; using consensus::RaftConfigPB; @@ -453,6 +472,12 @@ TSTabletManager::TSTabletManager(FsManager* fs_manager, .set_max_queue_size(FLAGS_read_pool_max_queue_size) .set_metrics(std::move(read_metrics)) .Build(&read_pool_)); + CHECK_OK(ThreadPoolBuilder("compaction") + .set_max_threads(FLAGS_post_split_trigger_compaction_pool_max_threads) + .set_max_queue_size(FLAGS_post_split_trigger_compaction_pool_max_queue_size) + .set_metrics(THREAD_POOL_METRICS_INSTANCE( + server_->metric_entity(), post_split_trigger_compaction_pool)) + .Build(&post_split_trigger_compaction_pool_)); int64_t block_cache_size_bytes = FLAGS_db_block_cache_size_bytes; int64_t total_ram_avail = MemTracker::GetRootTracker()->limit(); @@ -1511,10 +1536,17 @@ void TSTabletManager::OpenTablet(const RaftGroupMetadataPtr& meta, if (tablet->MightHaveNonRelevantData()) { WARN_NOT_OK( - tablet->ForceFullRocksDBCompactAsync(), "Failed to submit compaction for split tablet"); + post_split_trigger_compaction_pool_->SubmitFunc( + std::bind(&TSTabletManager::CompactPostSplitTablet, this, tablet)), + "Failed to submit compaction for post-split tablet."); } } +void TSTabletManager::CompactPostSplitTablet(tablet::TabletPtr tablet) { + TEST_PAUSE_IF_FLAG(TEST_pause_before_post_split_compation); + WARN_NOT_OK(tablet->ForceFullRocksDBCompact(), "Failed to compact post-split tablet."); +} + void TSTabletManager::StartShutdown() { { std::lock_guard lock(mutex_); @@ -1600,6 +1632,9 @@ void TSTabletManager::CompleteShutdown() { if (append_pool_) { append_pool_->Shutdown(); } + if (post_split_trigger_compaction_pool_) { + post_split_trigger_compaction_pool_->Shutdown(); + } { std::lock_guard l(mutex_); diff --git a/src/yb/tserver/ts_tablet_manager.h b/src/yb/tserver/ts_tablet_manager.h index 41ce0c7bbedb..a723782e8545 100644 --- a/src/yb/tserver/ts_tablet_manager.h +++ b/src/yb/tserver/ts_tablet_manager.h @@ -491,6 +491,8 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table void CleanupSplitTablets(); + void CompactPostSplitTablet(tablet::TabletPtr tablet); + const CoarseTimePoint start_time_; FsManager* const fs_manager_; @@ -561,6 +563,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table // Thread pool for read ops, that are run in parallel, shared between all tablets. std::unique_ptr read_pool_; + // Thread pool for manually triggering compactions for tablets created from a split. + std::unique_ptr post_split_trigger_compaction_pool_; + std::unique_ptr tablets_cleaner_; // Used for scheduling flushes