diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index 0d231bb2f90..a5ae1443df9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -129,7 +129,7 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, fap_ctx->cleanCheckpointIngestInfo(tmt, new_region->id()); } // Another FAP will not take place if this stage is not finished. - if (fap_ctx->tasks_trace->discardTask(new_region->id())) + if (fap_ctx->tasks_trace->leakingDiscardTask(new_region->id())) { LOG_ERROR(log, "FastAddPeer: find old fap task, region_id={}", new_region->id()); } diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 343ae9e04c5..166df50b682 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -337,9 +337,16 @@ FastAddPeerRes FastAddPeerImpl( UInt64 new_peer_id, UInt64 start_time) { + auto log = Logger::get("FastAddPeer"); try { - auto elapsed = fap_ctx->tasks_trace->queryElapsed(region_id); + auto maybe_elapsed = fap_ctx->tasks_trace->queryElapsed(region_id); + if unlikely (!maybe_elapsed.has_value()) + { + GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); + LOG_INFO(log, "FAP is canceled at beginning region_id={} new_peer_id={}", region_id, new_peer_id); + } + auto elapsed = maybe_elapsed.value(); GET_METRIC(tiflash_fap_task_duration_seconds, type_queue_stage).Observe(elapsed / 1000.0); GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Decrement(); auto res = FastAddPeerImplSelect(tmt, proxy_helper, region_id, new_peer_id); diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp index 77eac2b77a4..bc596b3d2e6 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommandsKVS.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp index 31ab5c98fb2..a820a16cb1c 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.cpp @@ -524,4 +524,10 @@ const RegionState & RegionMeta::getRegionState() const std::lock_guard lock(mutex); return region_state; } + +RegionState & RegionMeta::debugMutRegionState() +{ + std::lock_guard lock(mutex); + return region_state; +} } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h b/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h index 6a540d6aeb6..04c147d2b88 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionMeta.h @@ -122,6 +122,7 @@ class RegionMeta const raft_serverpb::MergeState & getMergeState() const; raft_serverpb::MergeState cloneMergeState() const; const RegionState & getRegionState() const; + RegionState & debugMutRegionState(); RegionMeta clone() const { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index 586732d6ac2..725a8d527cc 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -117,7 +117,7 @@ class Region : public std::enable_shared_from_this std::unique_lock lock; // A unique_lock so that we can safely remove committed data. }; -public: +public: // Simple Read and Write explicit Region(RegionMeta && meta_); explicit Region(RegionMeta && meta_, const TiFlashRaftProxyHelper *); ~Region(); @@ -129,12 +129,13 @@ class Region : public std::enable_shared_from_this // Directly drop all data in this Region object. void clearAllData(); - CommittedScanner createCommittedScanner(bool use_lock, bool need_value); - CommittedRemover createCommittedRemover(bool use_lock = true); + void mergeDataFrom(const Region & other); + RegionMeta & mutMeta() { return meta; } - std::tuple serialize(WriteBuffer & buf) const; - static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr); + // Assign data and meta by moving from `new_region`. + void assignRegion(Region && new_region); +public: // Stats RegionID id() const; ImutRegionRangePtr getRange() const; @@ -161,6 +162,9 @@ class Region : public std::enable_shared_from_this std::pair getRaftLogEagerGCRange() const; void updateRaftLogEagerIndex(UInt64 new_truncate_index); + std::tuple serialize(WriteBuffer & buf) const; + static RegionPtr deserialize(ReadBuffer & buf, const TiFlashRaftProxyHelper * proxy_helper = nullptr); + friend bool operator==(const Region & region1, const Region & region2) { std::shared_lock lock1(region1.mutex); @@ -169,15 +173,6 @@ class Region : public std::enable_shared_from_this return region1.meta == region2.meta && region1.data == region2.data; } - // Check if we can read by this index. - bool checkIndex(UInt64 index) const; - // Return for wait-index. - std::tuple waitIndex( - UInt64 index, - UInt64 timeout_ms, - std::function && check_running, - const LoggerPtr & log); - // Requires RegionMeta's lock UInt64 appliedIndex() const; // Requires RegionMeta's lock @@ -189,10 +184,33 @@ class Region : public std::enable_shared_from_this RegionVersion version() const; RegionVersion confVer() const; - RegionMetaSnapshot dumpRegionMetaSnapshot() const; + TableID getMappedTableID() const; + KeyspaceID getKeyspaceID() const; - // Assign data and meta by moving from `new_region`. - void assignRegion(Region && new_region); + /// get approx rows, bytes info about mem cache. + std::pair getApproxMemCacheInfo() const; + void cleanApproxMemCacheInfo() const; + + // Check the raftstore cluster version of this region. + // Currently, all version in the same TiFlash store should be the same. + RaftstoreVer getClusterRaftstoreVer(); + RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; } + const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; } + +public: // Raft Read and Write + CommittedScanner createCommittedScanner(bool use_lock, bool need_value); + CommittedRemover createCommittedRemover(bool use_lock = true); + + // Check if we can read by this index. + bool checkIndex(UInt64 index) const; + // Return for wait-index. + std::tuple waitIndex( + UInt64 index, + UInt64 timeout_ms, + std::function && check_running, + const LoggerPtr & log); + + RegionMetaSnapshot dumpRegionMetaSnapshot() const; void tryCompactionFilter(Timestamp safe_point); @@ -202,36 +220,21 @@ class Region : public std::enable_shared_from_this raft_serverpb::MergeState cloneMergeState() const; const raft_serverpb::MergeState & getMergeState() const; - TableID getMappedTableID() const; - KeyspaceID getKeyspaceID() const; std::pair handleWriteRaftCmd( const WriteCmdsView & cmds, UInt64 index, UInt64 term, TMTContext & tmt); - /// get approx rows, bytes info about mem cache. - std::pair getApproxMemCacheInfo() const; - void cleanApproxMemCacheInfo() const; - - RegionMeta & mutMeta() { return meta; } - UInt64 getSnapshotEventFlag() const { return snapshot_event_flag; } // IngestSST will first be applied to the `temp_region`, then we need to // copy the key-values from `temp_region` and move forward the `index` and `term` void finishIngestSSTByDTFile(RegionPtr && temp_region, UInt64 index, UInt64 term); - // Check the raftstore cluster version of this region. - // Currently, all version in the same TiFlash store should be the same. - RaftstoreVer getClusterRaftstoreVer(); // Methods to handle orphan keys under raftstore v2. void beforePrehandleSnapshot(uint64_t region_id, std::optional deadline_index); void afterPrehandleSnapshot(int64_t ongoing); - RegionData::OrphanKeysInfo & orphanKeysInfo() { return data.orphan_keys_info; } - const RegionData::OrphanKeysInfo & orphanKeysInfo() const { return data.orphan_keys_info; } - - void mergeDataFrom(const Region & other); Region() = delete; diff --git a/dbms/src/Storages/KVStore/TMTContext.cpp b/dbms/src/Storages/KVStore/TMTContext.cpp index 409397b6e32..194c8954433 100644 --- a/dbms/src/Storages/KVStore/TMTContext.cpp +++ b/dbms/src/Storages/KVStore/TMTContext.cpp @@ -455,6 +455,11 @@ uint64_t TMTContext::readIndexWorkerTick() const return read_index_worker_tick_ms.load(std::memory_order_relaxed); } +void TMTContext::debugSetKVStore(const KVStorePtr & new_kvstore) +{ + kvstore = new_kvstore; +} + const std::string & IntoStoreStatusName(TMTContext::StoreStatus status) { static const std::string StoreStatusName[] = { diff --git a/dbms/src/Storages/KVStore/TMTContext.h b/dbms/src/Storages/KVStore/TMTContext.h index c7753b4f551..10a234384ed 100644 --- a/dbms/src/Storages/KVStore/TMTContext.h +++ b/dbms/src/Storages/KVStore/TMTContext.h @@ -78,7 +78,7 @@ class TMTContext : private boost::noncopyable public: const KVStorePtr & getKVStore() const; KVStorePtr & getKVStore(); - void debugSetKVStore(const KVStorePtr & new_kvstore) { kvstore = new_kvstore; } + void debugSetKVStore(const KVStorePtr &); const ManagedStorages & getStorages() const; ManagedStorages & getStorages(); diff --git a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h index e6d8e7e2c00..01dbe2ef92f 100644 --- a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h +++ b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h @@ -14,103 +14,347 @@ #pragma once +#include +#include +#include #include #include +#include +#include namespace DB { + +namespace AsyncTaskHelper +{ +// It's guaranteed a task is no longer accessible once canceled or has its result fetched. +// NotScheduled -> InQueue, Running, NotScheduled +// InQueue -> Running +// Running -> Finished, NotScheduled(canceled) +// Finished -> NotScheduled(fetched) +// NOTE: magic_enum can't work properly with enums in template classes due to compiler opts. +enum class TaskState +{ + NotScheduled, + InQueue, + Running, + Finished, +}; +} // namespace AsyncTaskHelper + +// Key should support `fmt::formatter`. template struct AsyncTasks { // We use a big queue to cache, to reduce add task failures. explicit AsyncTasks(uint64_t pool_size, uint64_t free_pool_size, uint64_t queue_size) : thread_pool(std::make_unique(pool_size, free_pool_size, queue_size)) + , log(DB::Logger::get()) {} - bool discardTask(Key k) + ~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); } + + using TaskState = AsyncTaskHelper::TaskState; + + struct CancelHandle; + using CancelHandlePtr = std::shared_ptr; + struct CancelHandle + { + CancelHandle() = default; + CancelHandle(const CancelHandle &) = delete; + + bool isCanceled() const { return inner.load(); } + + bool blockedWaitFor(std::chrono::duration timeout) + { + // The task could be canceled before running. + if (isCanceled()) + return true; + std::unique_lock lock(mut); + cv.wait_for(lock, timeout, [&]() { return isCanceled(); }); + return isCanceled(); + } + + static CancelHandlePtr genAlreadyCanceled() noexcept + { + auto h = std::make_shared(); + h->inner.store(true); + return h; + } + + private: + // Make this private to forbid called by multiple consumers. + void doCancel() + { + // Use lock here to prevent losing signal. + std::scoped_lock lock(mut); + inner.store(true); + cv.notify_all(); + } + + friend struct AsyncTasks; + std::atomic_bool inner = false; + std::mutex mut; + std::condition_variable cv; + }; + + struct Elem + { + Elem(std::future && fut_, uint64_t start_ts_, std::shared_ptr && triggered_) + : fut(std::move(fut_)) + , start_ts(start_ts_) + , triggered(triggered_) + { + cancel = std::make_shared(); + } + Elem(const Elem &) = delete; + Elem(Elem &&) = default; + + std::future fut; + uint64_t start_ts; + std::shared_ptr cancel; + std::shared_ptr triggered; + }; + + /// Although not mandatory, we publicize the method to allow holding the handle at the beginning of the body of async task. + std::shared_ptr getCancelHandleFromExecutor(Key k) const + { + std::scoped_lock l(mtx); + auto it = tasks.find(k); + if unlikely (it == tasks.end()) + { + // When the invokable is running by some executor in the thread pool, + // it must have been registered into `tasks`. + // So the only case that an access for a non-existing task is that the task is already cancelled asyncly. + return CancelHandle::genAlreadyCanceled(); + } + return it->second.cancel; + } + + // Only unregister, no clean. + // Use `asyncCancelTask` if there is something to clean. + bool leakingDiscardTask(Key k) { std::scoped_lock l(mtx); - auto it = futures.find(k); - if (it != futures.end()) + auto it = tasks.find(k); + if (it != tasks.end()) { - futures.erase(it); - start_time.erase(k); + tasks.erase(it); return true; } return false; } - bool addTask(Key k, Func f) + // Safety: Throws if + // 1. The task not exist and `throw_if_noexist`. + // 2. Throw in `result_dropper`. + /// Usage: + /// 1. If the task is in `Finished` state + /// It's result will be cleaned with `result_dropper`. + /// 2. If the task is in `Running` state + /// Make sure the executor will do the clean. + /// 3. If the tasks is in `InQueue` state + /// The task will directly return when it's eventually run by a thread. + /// 4. If the tasks is in `NotScheduled` state + /// `throw_if_noexist` controls whether to throw. + /// NOTE: The task element will be removed after calling this function. + template + TaskState asyncCancelTask(Key k, ResultDropper result_dropper, bool throw_if_noexist) { + auto cancel_handle = getCancelHandleFromCaller(k, throw_if_noexist); + if (cancel_handle) + { + cancel_handle->doCancel(); + // Cancel logic should do clean itself + } + + auto state = queryState(k); + if (!throw_if_noexist && state == TaskState::NotScheduled) + return state; + if (state == TaskState::Finished) + { + result_dropper(); + } + + // `result_dropper` may remove the task by `fetchResult`. + leakingDiscardTask(k); + + return state; + } + + TaskState asyncCancelTask(Key k) + { + return asyncCancelTask( + k, + []() {}, + true); + } + + // Safety: Throws if + // 1. The task is not found, and throw_on_no_exist. + // 2. The cancel_handle is already set. + /// Usage: + /// 1. If the task is in `Finished`/`Running` state + /// It's result is returned. The Caller may do the rest cleaning. + /// 2. If the tasks is in `InQueue` state + /// The task will directly return when it's eventually run by a thread. + /// 3. If the tasks is in `NotScheduled` state + /// It will throw on `throw_on_no_exist`. + /// Returns: + /// 1. The TaskState before the task is canceled + /// 2. Exception + /// NOTE: The task element will be removed after calling this function. + [[nodiscard]] TaskState blockedCancelRunningTask(Key k, bool throw_on_no_exist = true) + { + auto cancel_handle = getCancelHandleFromCaller(k); + auto state = queryState(k); + if (state == TaskState::NotScheduled) + { + if (throw_on_no_exist) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't block wait a non-scheduled task"); + } + else + { + return state; + } + } + + // Only one thread can block cancel and wait. + RUNTIME_CHECK_MSG(!cancel_handle->isCanceled(), "Try block cancel running task twice"); + cancel_handle->doCancel(); + if (state == TaskState::InQueue) + { + leakingDiscardTask(k); + return state; + } + // Consider a one producer thread one consumer thread scene, the first task is running, + // and the second task is in queue. If we block cancel the second task here, deadlock will happen. + // So we only block on fetching running tasks, and users have to guaruantee cancel checking. + fetchResult(k); + return state; + } + + using CancelFunc = std::function; + // Safety: Throws if + // 1. There is already a task registered with the same name and not canceled or fetched. + bool addTaskWithCancel(Key k, Func f, CancelFunc cf) + { + std::scoped_lock l(mtx); + RUNTIME_CHECK(!tasks.contains(k)); using P = std::packaged_task; std::shared_ptr

p = std::make_shared

(P(f)); + std::shared_ptr triggered = std::make_shared(false); + auto trigger_cloned = triggered; + auto elem = Elem(p->get_future(), getCurrentMillis(), std::move(trigger_cloned)); + auto cancel_handle = elem.cancel; - // TODO(fap) `start_time` may not be set immediately when calling `p`, will be fixed in another PR. - auto res = thread_pool->trySchedule([p]() { (*p)(); }, 0, 0); + auto running_mut = std::make_shared(); + // Task could not run unless registered in `tasks`. + std::scoped_lock caller_running_lock(*running_mut); + // The executor thread may outlive `AsyncTasks` in most cases, so we don't capture `this`. + auto job = [p, triggered, running_mut, cancel_handle, cf]() { + RUNTIME_CHECK(triggered != nullptr); + RUNTIME_CHECK(running_mut != nullptr); + RUNTIME_CHECK(cancel_handle != nullptr); + RUNTIME_CHECK(p != nullptr); + if (cancel_handle->isCanceled()) + { + cf(); + return; + } + std::scoped_lock worker_running_lock(*running_mut); + triggered->store(true); + // We can hold the cancel handle here to prevent it from destructing, but it is not necessary. + (*p)(); + // We don't erase from `tasks` here, since we won't capture `this` + }; + auto res = thread_pool->trySchedule(job, 0, 0); + SYNC_FOR("after_AsyncTasks::addTask_scheduled"); if (res) { - std::scoped_lock l(mtx); - futures[k] = p->get_future(); - start_time[k] = getCurrentMillis(); + tasks.insert({k, std::move(elem)}); } + SYNC_FOR("before_AsyncTasks::addTask_quit"); return res; } - bool isScheduled(Key key) const + bool addTask(Key k, Func f) { - std::scoped_lock l(mtx); - return futures.contains(key); + return addTaskWithCancel(k, f, []() {}); } - bool isReady(Key key) const + TaskState unsafeQueryState(Key key) const { using namespace std::chrono_literals; - std::scoped_lock l(mtx); - auto it = futures.find(key); - if (it == futures.end()) - return false; - return it->second.wait_for(0ms) == std::future_status::ready; + auto it = tasks.find(key); + if (it == tasks.end()) + return TaskState::NotScheduled; + if (!it->second.triggered->load()) + return TaskState::InQueue; + if (it->second.fut.wait_for(0ms) == std::future_status::ready) + return TaskState::Finished; + return TaskState::Running; } - R fetchResult(Key key) + TaskState queryState(Key key) const { - std::unique_lock l(mtx); - auto it = futures.find(key); - RUNTIME_CHECK_MSG(it != futures.end(), "fetchResult meets empty key"); - auto fut = std::move(it->second); - futures.erase(it); - start_time.erase(key); - l.unlock(); - return fut.get(); + std::scoped_lock l(mtx); + return unsafeQueryState(key); } - uint64_t queryElapsed(Key key) + bool isScheduled(Key key) const { return queryState(key) != TaskState::NotScheduled; } + + bool isInQueue(Key key) const { return queryState(key) == TaskState::InQueue; } + + bool isRunning(Key key) const { return queryState(key) == TaskState::Running; } + + bool isReady(Key key) const { return queryState(key) == TaskState::Finished; } + + std::optional queryElapsed(Key key) const { std::scoped_lock l(mtx); - auto it2 = start_time.find(key); - RUNTIME_CHECK_MSG(it2 != start_time.end(), "queryElapsed meets empty key"); - return getCurrentMillis() - it2->second; + auto it = tasks.find(key); + if unlikely (it == tasks.end()) + { + return std::nullopt; + } + return getCurrentMillis() - it->second.start_ts; } - uint64_t queryStartTime(Key key) + std::optional queryStartTime(Key key) const { std::scoped_lock l(mtx); - auto it2 = start_time.find(key); - RUNTIME_CHECK_MSG(it2 != start_time.end(), "queryStartTime meets empty key"); - return it2->second; + auto it = tasks.find(key); + if unlikely (it == tasks.end()) + { + return std::nullopt; + } + return it->second.start_ts; + } + + // If a task is canceled, `fetchResult` may throw. + R fetchResult(Key key) + { + std::unique_lock l(mtx); + auto it = tasks.find(key); + RUNTIME_CHECK(it != tasks.end(), key); + std::future fut = std::move(it->second.fut); + tasks.erase(key); + l.unlock(); + RUNTIME_CHECK_MSG(fut.valid(), "no valid future"); + return fut.get(); } std::pair fetchResultAndElapsed(Key key) { std::unique_lock l(mtx); - auto it = futures.find(key); - auto fut = std::move(it->second); - auto it2 = start_time.find(key); - RUNTIME_CHECK_MSG(it != futures.end() && it2 != start_time.end(), "fetchResultAndElapsed meets empty key"); - auto start = it2->second; - futures.erase(it); - start_time.erase(it2); + auto it = tasks.find(key); + RUNTIME_CHECK(it != tasks.end(), key); + auto fut = std::move(it->second.fut); + auto start = it->second.start_ts; + tasks.erase(it); l.unlock(); auto elapsed = getCurrentMillis() - start; return std::make_pair(fut.get(), elapsed); @@ -118,6 +362,8 @@ struct AsyncTasks std::unique_ptr & inner() { return thread_pool; } + size_t count() const { return tasks.size(); } + static uint64_t getCurrentMillis() { return std::chrono::duration_cast( @@ -126,9 +372,29 @@ struct AsyncTasks } protected: - std::unordered_map> futures; - std::unordered_map start_time; + std::shared_ptr getCancelHandleFromCaller(Key k, bool throw_if_noexist = true) const + { + std::scoped_lock l(mtx); + auto it = tasks.find(k); + if (it == tasks.end()) + { + if (throw_if_noexist) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "getCancelHandleFromCaller can't find key"); + } + else + { + return nullptr; + } + } + return it->second.cancel; + } + +protected: + std::unordered_map tasks; + // TODO(fap) Use threadpool which supports purging from queue. std::unique_ptr thread_pool; mutable std::mutex mtx; + LoggerPtr log; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index f98791ec0d3..e40abd471e5 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -17,10 +17,9 @@ #include #include #include +#include #include -#include "region_kvstore_test.h" - extern std::shared_ptr root_of_kvstore_mem_trackers; namespace DB @@ -783,56 +782,5 @@ try } CATCH -TEST_F(RegionKVStoreTest, AsyncTasks) -{ - using namespace std::chrono_literals; - - using TestAsyncTasks = AsyncTasks, int>; - auto async_tasks = std::make_unique(1, 1, 2); - - int total = 5; - std::vector f(total, false); - bool initial_loop = true; - while (true) - { - SCOPE_EXIT({ initial_loop = false; }); - auto count = std::accumulate(f.begin(), f.end(), 0, [&](int a, bool b) -> int { return a + int(b); }); - if (count >= total) - { - break; - } - else - { - LOG_DEBUG(log, "finished {}/{}", count, total); - } - for (int i = 0; i < total; ++i) - { - if (!async_tasks->isScheduled(i)) - { - auto res = async_tasks->addTask(i, []() { - std::this_thread::sleep_for(200ms); - return 1; - }); - if (initial_loop) - ASSERT_EQ(res, i <= 1); - } - } - - for (int i = 0; i < total; ++i) - { - if (!f[i]) - { - if (async_tasks->isReady(i)) - { - auto r = async_tasks->fetchResult(i); - UNUSED(r); - f[i] = true; - } - } - } - std::this_thread::sleep_for(200ms); - } -} - } // namespace tests } // namespace DB diff --git a/dbms/src/Storages/KVStore/tests/gtest_proactive_flush.cpp b/dbms/src/Storages/KVStore/tests/gtest_proactive_flush.cpp index 356cc029ac4..da63e35592e 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_proactive_flush.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_proactive_flush.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "region_kvstore_test.h" +#include namespace DB { diff --git a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp index df791fc4ccc..4f15087d18f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp @@ -13,8 +13,7 @@ // limitations under the License. #include - -#include "region_kvstore_test.h" +#include namespace DB { diff --git a/dbms/src/Storages/KVStore/tests/gtests_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtests_async_tasks.cpp new file mode 100644 index 00000000000..c370da390a2 --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/gtests_async_tasks.cpp @@ -0,0 +1,312 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +namespace tests +{ +TEST(AsyncTasksTest, AsyncTasksNormal) +{ + using namespace std::chrono_literals; + using TestAsyncTasks = AsyncTasks, void>; + + auto log = DB::Logger::get(); + LOG_INFO(log, "Cancel and addTask"); + // Cancel and addTask + { + auto async_tasks = std::make_unique(1, 1, 2); + auto m = std::make_shared(); + auto m2 = std::make_shared(); + int flag = 0; + std::unique_lock cl(*m); + async_tasks->addTask(1, [m, &flag, &async_tasks, &m2]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); + std::scoped_lock rl2(*m2); + std::scoped_lock rl(*m); + if (cancel_handle->isCanceled()) + { + return; + } + flag = 1; + }); + async_tasks->asyncCancelTask(1); + ASSERT_FALSE(async_tasks->isScheduled(1)); + async_tasks->addTask(1, [&flag]() { flag = 2; }); + cl.unlock(); + std::scoped_lock rl2(*m2); + ASSERT_NO_THROW(async_tasks->fetchResult(1)); + ASSERT_EQ(flag, 2); + } + // Lifetime of tasks + LOG_INFO(log, "Lifetime of tasks"); + { + auto async_tasks = std::make_unique(1, 1, 1); + auto sp_after_sched = SyncPointCtl::enableInScope("after_AsyncTasks::addTask_scheduled"); + auto sp_before_quit = SyncPointCtl::enableInScope("before_AsyncTasks::addTask_quit"); + std::thread t1([&]() { + sp_after_sched.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::NotScheduled); + sp_after_sched.next(); + sp_after_sched.disable(); + }); + std::thread t2([&]() { + sp_before_quit.waitAndPause(); + ASSERT_EQ(async_tasks->unsafeQueryState(1), TestAsyncTasks::TaskState::InQueue); + sp_before_quit.next(); + sp_before_quit.disable(); + std::this_thread::sleep_for(50ms); + ASSERT_TRUE(async_tasks->isReady(1)); + }); + auto res = async_tasks->addTask(1, []() {}); + ASSERT_TRUE(res); + t1.join(); + t2.join(); + } + + // Cancel in queue + LOG_INFO(log, "Cancel in queue"); + { + auto async_tasks = std::make_unique(1, 1, 2); + bool finished = false; + bool canceled = false; + std::mutex mtx; + std::unique_lock cl(mtx); + + auto res1 = async_tasks->addTask(1, [&]() { + std::scoped_lock rl(mtx); + UNUSED(rl); + }); + ASSERT_TRUE(res1); + + auto res2 = async_tasks->addTaskWithCancel( + 2, + [&]() { finished = true; }, + [&]() { canceled = true; }); + ASSERT_TRUE(res2); + + async_tasks->asyncCancelTask(2); + cl.unlock(); + + int elapsed = 0; + while (true) + { + if (canceled) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(50ms); + } + ASSERT_TRUE(elapsed < 10); + ASSERT_FALSE(finished); + } + + // Block cancel + LOG_INFO(log, "Block cancel"); + { + auto async_tasks = std::make_unique(2, 2, 10); + int total = 9; + int finished = 0; + std::vector f(total, false); + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, &finished, log]() { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + std::this_thread::sleep_for(100ms); + if (cancel_handle->isCanceled()) + { + break; + } + } + finished += 1; + }); + // Ensure thread 1 is the first + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + while (finished < total) + { + std::this_thread::sleep_for(100ms); + for (int i = 0; i < total; i++) + { + if (f[i]) + continue; + if (async_tasks->blockedCancelRunningTask(i) == AsyncTaskHelper::TaskState::InQueue) + { + // Cancel in queue, should manually add `finished`. + finished += 1; + } + f[i] = true; + break; + } + } + + for (int i = 0; i < total; i++) + { + ASSERT_TRUE(f[i]); + } + ASSERT_EQ(async_tasks->count(), 0); + } + + // Cancel tasks in queue + LOG_INFO(log, "Cancel tasks in queue"); + { + auto async_tasks = std::make_unique(1, 1, 100); + + int total = 7; + std::atomic_int finished = 0; + for (int i = 0; i < total; i++) + { + auto res = async_tasks->addTaskWithCancel( + i, + [i, &async_tasks, &finished]() { + while (true) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + // Busy loop to take over cpu + if (cancel_handle->isCanceled()) + { + break; + } + } + finished.fetch_add(1); + }, + [&]() { finished.fetch_add(1); }); + // Ensure task 1 is the first to handle + if (i == 0) + std::this_thread::sleep_for(10ms); + ASSERT_TRUE(res); + } + + for (int i = 0; i < total; i++) + { + std::this_thread::sleep_for(100ms); + async_tasks->asyncCancelTask(i); + // Throw on double cancel + EXPECT_THROW(async_tasks->asyncCancelTask(i), Exception); + } + + int elapsed = 0; + while (true) + { + if (finished >= total) + { + break; + } + ++elapsed; + std::this_thread::sleep_for(100ms); + } + ASSERT_TRUE(elapsed < 50); + ASSERT_EQ(async_tasks->count(), 0); + } +} + +TEST(AsyncTasksTest, AsyncTasksCommon) +{ + using namespace std::chrono_literals; + + using TestAsyncTasks = AsyncTasks, int>; + auto async_tasks = std::make_unique(1, 1, 2); + + int total = 5; + int max_steps = 10; + int current_step = 0; + std::vector f(total, false); + std::vector s(total, false); + bool initial_loop = true; + while (true) + { + ASSERT(current_step < max_steps); + SCOPE_EXIT({ initial_loop = false; }); + auto count = std::accumulate(f.begin(), f.end(), 0, [&](int a, bool b) -> int { return a + int(b); }); + if (count >= total) + { + break; + } + + auto to_be_canceled = total - 1; + if (count == total - 1) + { + if (async_tasks->isScheduled(to_be_canceled)) + { + async_tasks->asyncCancelTask( + to_be_canceled, + []() {}, + true); + } + // Otherwise, the task is not added. + } + + // Add tasks + for (int i = 0; i < total; ++i) + { + if (!async_tasks->isScheduled(i) && !s[i]) + { + auto res = async_tasks->addTask(i, [i, &async_tasks, to_be_canceled, &f]() { + if (i == to_be_canceled) + { + auto cancel_handle = async_tasks->getCancelHandleFromExecutor(i); + while (true) + { + if (cancel_handle->blockedWaitFor(100ms)) + { + f[to_be_canceled] = true; + break; + } + } + } + else + { + std::this_thread::sleep_for(100ms); + } + return 1; + }); + if (res) + s[i] = true; + // In the first loop, only the first task can run. + if (initial_loop) + ASSERT_EQ(res, i <= 1); + } + } + + // Fetch result + for (int i = 0; i < total; ++i) + { + if (!f[i]) + { + if (i == to_be_canceled) + continue; + if (async_tasks->isReady(i)) + { + auto r = async_tasks->fetchResult(i); + UNUSED(r); + f[i] = true; + } + } + } + std::this_thread::sleep_for(100ms); + } + + ASSERT_EQ(async_tasks->count(), 0); +} +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/tests/kvstore_helper.h b/dbms/src/Storages/KVStore/tests/kvstore_helper.h index b023335abb8..61d50b5564c 100644 --- a/dbms/src/Storages/KVStore/tests/kvstore_helper.h +++ b/dbms/src/Storages/KVStore/tests/kvstore_helper.h @@ -97,21 +97,27 @@ class KVStoreTestBase : public ::testing::Test ASSERT_EQ(kvstore->getStoreID(), store.id()); } - LOG_INFO(Logger::get("Test"), "Finished setup"); + LOG_INFO(log, "Finished setup"); } void TearDown() override { proxy_instance->clear(); } protected: KVStore & getKVS() { return *kvstore; } - KVStore & reloadKVSFromDisk() + void resetKVStoreStorage() { - kvstore.reset(); auto & global_ctx = TiFlashTestEnv::getGlobalContext(); global_ctx.tryReleaseWriteNodePageStorageForTest(); global_ctx.initializeWriteNodePageStorageIfNeed(*path_pool); + } + KVStore & reloadKVSFromDisk(bool with_reset = true) + { + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + kvstore.reset(); + if (with_reset) + resetKVStoreStorage(); kvstore = std::make_shared(global_ctx); - // only recreate kvstore and restore data from disk, don't recreate proxy instance + // Only recreate kvstore and restore data from disk, don't recreate proxy instance kvstore->restore(*path_pool, proxy_helper.get()); proxy_instance->reload(); global_ctx.getTMTContext().getRegionTable().clear(); diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.cpp b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.cpp index 170fe6e7191..b6443c2a8ae 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.cpp +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.cpp @@ -21,6 +21,10 @@ namespace DB::details String UniversalPageIdFormatHelper::format(const DB::UniversalPageId & value) { auto prefix = DB::UniversalPageIdFormat::getFullPrefix(value); + if (value.hasPrefix({UniversalPageIdFormat::KV_PREFIX}) || value.hasPrefix({UniversalPageIdFormat::RAFT_PREFIX})) + { + return fmt::format("0x{}", Redact::keyToHexString(value.data(), value.size())); + } return fmt::format( "0x{}.{}", Redact::keyToHexString(prefix.data(), prefix.size()), diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h index ae39ba1ef20..fd9e4451d3d 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h @@ -52,6 +52,7 @@ class UniversalPageId final UniversalPageId substr(size_t pos, size_t npos) const { return id.substr(pos, npos); } bool operator<(const UniversalPageId & rhs) const { return id < rhs.id; } bool hasPrefix(const String & str) const { return startsWith(id, str); } + bool hasPrefix(const char * str) const { return startsWith(id, str); } String toStr() const { return id; } const String & asStr() const { return id; } diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index b72d386d593..3b28680acbd 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -44,6 +44,7 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p , cur_offset(0) , log(Logger::get(remote_fname)) { + RUNTIME_CHECK(client_ptr != nullptr); RUNTIME_CHECK(initialize(), remote_fname); } diff --git a/dbms/src/Storages/KVStore/tests/gtest_json_binary.cpp b/dbms/src/TiDB/tests/gtest_json_binary.cpp similarity index 100% rename from dbms/src/Storages/KVStore/tests/gtest_json_binary.cpp rename to dbms/src/TiDB/tests/gtest_json_binary.cpp diff --git a/dbms/src/Storages/KVStore/tests/gtest_json_path_expr.cpp b/dbms/src/TiDB/tests/gtest_json_path_expr.cpp similarity index 100% rename from dbms/src/Storages/KVStore/tests/gtest_json_path_expr.cpp rename to dbms/src/TiDB/tests/gtest_json_path_expr.cpp