Skip to content

Commit

Permalink
ignore CompactLog command if the segment is flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Jun 24, 2022
1 parent decdbee commit f8f7206
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 35 deletions.
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan
bool DeltaValueSpace::flush(DMContext & context)
{
bool v = false;
// Other thread is flushing, just return.
if (!is_flushing.compare_exchange_strong(v, true))
{
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", info());
// other thread is flushing, just return.
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo());
return false;
}

SCOPE_EXIT({
bool v = true;
if (!is_flushing.compare_exchange_strong(v, false))
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class DeltaValueSpace
/// Note that those things can not be done at the same time.
std::atomic_bool is_updating = false;

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed.
/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
std::atomic_bool is_flushing = false;

Expand Down
25 changes: 16 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed)
{
RowKeyRange cur_range = range;
while (!cur_range.none())
{
RowKeyRange segment_range;

// Keep trying until succeeded.
// Keep trying until succeeded if needed.
while (true)
{
SegmentPtr segment;
Expand All @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
{
break;
}
else if (!try_until_succeed)
{
return false;
}
}

cur_range.setStart(segment_range.end);
}
return true;
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
Expand Down Expand Up @@ -1347,12 +1352,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3;
// Don't block write thread unless the data in delta layer is really too large.
// And read thread never do foreground flush so no need to special check for read thread.
if (thread_type == ThreadType::Write)
{
should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 100 || unsaved_bytes >= delta_cache_limit_bytes * 100;
}

bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) //
&& (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows
Expand Down Expand Up @@ -1410,6 +1409,10 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
else if (should_background_flush)
{
/// It's meaningless to add more flush tasks if the segment is flushing.
/// Because only one flush task can proceed at any time.
/// And after the current flush task finished,
/// it will call `checkSegmentUpdate` again to check whether there is more flush task to do.
if (!segment->isFlushing())
{
delta_last_try_flush_rows = delta_rows;
Expand Down Expand Up @@ -1511,7 +1514,11 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_compact = [&]() {
/// Only add background compact task when this segment is not flushing to reduce lock contention on the segment.
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
/// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput.
/// And after the current flush task complete,
/// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do.
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
flushCache(dm_context, range);
return flushCache(dm_context, range, try_until_succeed);
}

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);
bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
///
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,9 @@ bool Segment::compactDelta(DMContext & dm_context)
CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_DeltaCompact};
GET_METRIC(tiflash_storage_subtask_count, type_delta_compact).Increment();
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_compact).Observe(watch.elapsedSeconds()); });
SCOPE_EXIT({
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_compact).Observe(watch.elapsedSeconds());
});

return delta->compact(dm_context);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage

virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {}
virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; }

virtual BlockInputStreamPtr status() { return {}; }

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context)

void StorageDeltaMerge::flushCache(const Context & context)
{
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size));
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true);
}

void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush)
bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed)
{
getAndMaybeInitStore()->flushCache(context, range_to_flush);
return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed);
}

void StorageDeltaMerge::mergeDelta(const Context & context)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class StorageDeltaMerge

void flushCache(const Context & context) override;

void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override;
bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed = true) override;

/// Merge delta into the stable layer for all segments.
///
Expand Down
27 changes: 17 additions & 10 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> &
callback(region.first, region.second);
}

void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log)
bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed)
{
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
Expand All @@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
"tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored",
region.toString(),
table_id);
return;
return true;
}

try
Expand All @@ -151,14 +151,15 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
return true;
}

void KVStore::tryPersist(RegionID region_id)
Expand Down Expand Up @@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and perist mem data.
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if thhere is little data in mem, wait until time interval reached threshold.
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
Expand All @@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

if (check_sync_log())
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
}
return EngineStoreApplyRes::None;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable

void tryPersist(RegionID region_id);

static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log);
static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true);

size_t regionSize() const;
EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request,
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ void removeObsoleteDataInStorage(
auto rowkey_range
= DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize());
dm_storage->deleteRange(rowkey_range, context->getSettingsRef());
dm_storage->flushCache(*context, rowkey_range); // flush to disk
// flush to disk and keep try until success
while (!dm_storage->flushCache(*context, rowkey_range))
{
}
}
catch (DB::Exception & e)
{
Expand Down

0 comments on commit f8f7206

Please sign in to comment.