diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 132732d6989..8a69b7573e2 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -141,6 +141,19 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan bool DeltaValueSpace::flush(DMContext & context) { + bool v = false; + if (!is_flushing.compare_exchange_strong(v, true)) + { + // other thread is flushing, just return. + LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo()); + return false; + } + SCOPE_EXIT({ + bool v = true; + if (!is_flushing.compare_exchange_strong(v, false)) + throw Exception(simpleInfo() + " is expected to be flushing", ErrorCodes::LOGICAL_ERROR); + }); + LOG_FMT_DEBUG(log, "{}, Flush start", info()); /// We have two types of data needed to flush to disk: diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 8f14682caa8..04fb97b3004 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -77,6 +77,11 @@ class DeltaValueSpace /// Note that those things can not be done at the same time. std::atomic_bool is_updating = false; + /// Note that it's safe to do multiple flush concurrently but only one of them can succeed, + /// and other thread's work is just a waste of resource. + /// So we only allow one flush task running at any time to aviod waste resource. + std::atomic_bool is_flushing = false; + std::atomic last_try_flush_rows = 0; std::atomic last_try_flush_bytes = 0; std::atomic last_try_compact_column_files = 0; @@ -159,6 +164,8 @@ class DeltaValueSpace size_t getTotalCacheBytes() const; size_t getValidCacheRows() const; + bool isFlushing() const { return is_flushing; } + bool isUpdating() const { return is_updating; } bool tryLockUpdating() diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 195ed5c53c2..09f290e311c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings checkSegmentUpdate(dm_context, segment, ThreadType::Write); } -void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range) +bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed) { RowKeyRange cur_range = range; while (!cur_range.none()) { RowKeyRange segment_range; - // Keep trying until succeeded. + // Keep trying until succeeded if needed. while (true) { SegmentPtr segment; @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa { break; } + else if (!try_until_succeed) + { + return false; + } } cur_range.setStart(segment_range.end); } + return true; } void DeltaMergeStore::mergeDeltaAll(const Context & context) @@ -1347,6 +1352,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const && (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows || delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes); bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3; + /// For write thread, we want to avoid foreground flush to block the process of apply raft command. + /// So we increase the threshold of foreground flush for write thread. + if (thread_type == ThreadType::Write) + { + should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 10 || unsaved_bytes >= delta_cache_limit_bytes * 10; + } bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) // && (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows @@ -1404,9 +1415,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const } else if (should_background_flush) { - delta_last_try_flush_rows = delta_rows; - delta_last_try_flush_bytes = delta_bytes; - try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}}); + /// It's meaningless to add more flush tasks if the segment is flushing. + /// Because only one flush task can proceed at any time. + /// And after the current flush task finished, + /// it will call `checkSegmentUpdate` again to check whether there is more flush task to do. + if (!segment->isFlushing()) + { + delta_last_try_flush_rows = delta_rows; + delta_last_try_flush_bytes = delta_bytes; + try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}}); + } } } @@ -1502,7 +1520,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const return false; }; auto try_bg_compact = [&]() { - if (should_compact) + /// Compact task should be a really low priority task. + /// And if the segment is flushing, + /// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput. + /// And after the current flush task complete, + /// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do. + if (should_compact && !segment->isFlushing()) { delta_last_try_compact_column_files = column_file_count; try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}}); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 705481ca107..57c2a42b807 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); - /// Force flush all data to disk. - void flushCache(const Context & context, const RowKeyRange & range) + /// Try flush all data in `range` to disk and return whether the task succeed. + bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true) { auto dm_context = newDMContext(context, context.getSettingsRef()); - flushCache(dm_context, range); + return flushCache(dm_context, range, try_until_succeed); } - void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range); + bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true); /// Merge delta into the stable layer for all segments. /// diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index cccfc5091b9..8058329ae91 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -300,6 +300,8 @@ class Segment : private boost::noncopyable void drop(const FileProviderPtr & file_provider, WriteBatches & wbs); + bool isFlushing() const { return delta->isFlushing(); } + RowsAndBytes getRowsAndBytesInRange( DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index ebf84c592e4..2ff766a9c6d 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage virtual void flushCache(const Context & /*context*/) {} - virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {} + virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; } virtual BlockInputStreamPtr status() { return {}; } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 67d32c73a05..a6de4efb3ac 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context) void StorageDeltaMerge::flushCache(const Context & context) { - flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size)); + flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true); } -void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) +bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) { - getAndMaybeInitStore()->flushCache(context, range_to_flush); + return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed); } void StorageDeltaMerge::mergeDelta(const Context & context) diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 79ee225d237..9e4ab12ad4f 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -73,7 +73,7 @@ class StorageDeltaMerge void flushCache(const Context & context) override; - void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override; + bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) override; /// Merge delta into the stable layer for all segments. /// diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 318a04c6ed9..f9d6d01955e 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function & callback(region.first, region.second); } -void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log) +bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed) { auto table_id = region.getMappedTableID(); auto storage = tmt.getStorages().get(table_id); @@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi "tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored", region.toString(), table_id); - return; + return true; } try @@ -151,7 +151,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi region.getRange()->getMappedTableID(), storage->isCommonHandle(), storage->getRowKeyColumnSize()); - storage->flushCache(tmt.getContext(), rowkey_range); + return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed); } catch (DB::Exception & e) { @@ -159,6 +159,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi if (e.code() != ErrorCodes::TABLE_IS_DROPPED) throw; } + return true; } void KVStore::tryPersist(RegionID region_id) @@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) { - // if rows or bytes more than threshold, flush cache and perist mem data. + // if rows or bytes more than threshold, try to flush cache and persist mem data. return true; } else { - // if thhere is little data in mem, wait until time interval reached threshold. + // if there is little data in mem, wait until time interval reached threshold. // use random period so that lots of regions will not be persisted at same time. auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); @@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd( if (check_sync_log()) { - tryFlushRegionCacheInStorage(tmt, curr_region, log); - persistRegion(curr_region, region_task_lock, "compact raft log"); - curr_region.markCompactLog(); - curr_region.cleanApproxMemCacheInfo(); - return EngineStoreApplyRes::Persist; + if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false)) + { + persistRegion(curr_region, region_task_lock, "compact raft log"); + curr_region.markCompactLog(); + curr_region.cleanApproxMemCacheInfo(); + return EngineStoreApplyRes::Persist; + } + else + { + return EngineStoreApplyRes::None; + } } return EngineStoreApplyRes::None; } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index bb45e65d18b..66e2fe32b75 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable void tryPersist(RegionID region_id); - static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log); + static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true); size_t regionSize() const; EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request, diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index c855d5b3226..5ae36a4bd64 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -230,7 +230,7 @@ void removeObsoleteDataInStorage( auto rowkey_range = DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize()); dm_storage->deleteRange(rowkey_range, context->getSettingsRef()); - dm_storage->flushCache(*context, rowkey_range); // flush to disk + dm_storage->flushCache(*context, rowkey_range, /*try_until_succeed*/ true); // flush to disk } catch (DB::Exception & e) {