Skip to content

Commit

Permalink
Raft: Add identifier to logger when wait index happens(release-6.5) (#…
Browse files Browse the repository at this point in the history
…8473) (#8475)

close #8076, close #8448
  • Loading branch information
ti-chi-bot authored Dec 20, 2023
1 parent a03e16f commit 170bc61
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 217 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ namespace DB
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_subtask_throughput_bytes, "Calculate the throughput of (maybe foreground) tasks of storage in bytes", Counter, /**/ \
F(type_delta_flush, {"type", "delta_flush"}), /**/ \
F(type_delta_compact, {"type", "delta_compact"}), /**/ \
F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \
F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \
M(tiflash_storage_subtask_throughput_rows, "Calculate the throughput of (maybe foreground) tasks of storage in rows", Counter, /**/ \
F(type_delta_flush, {"type", "delta_flush"}), /**/ \
F(type_delta_compact, {"type", "delta_compact"}), /**/ \
F(type_write_to_cache, {"type", "write_to_cache"}), /**/ \
F(type_write_to_disk, {"type", "write_to_disk"})) /**/ \
M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
F(type_ingest, {"type", "ingest"}), /**/ \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ColumnFileFlushTask
size_t flush_version;

size_t flush_rows = 0;
size_t flush_bytes = 0;
size_t flush_deletes = 0;

public:
Expand All @@ -70,6 +71,7 @@ class ColumnFileFlushTask
inline Task & addColumnFile(ColumnFilePtr column_file)
{
flush_rows += column_file->getRows();
flush_bytes += column_file->getBytes();
flush_deletes += column_file->getDeletes();
return tasks.emplace_back(column_file);
}
Expand All @@ -78,6 +80,7 @@ class ColumnFileFlushTask

size_t getTaskNum() const { return tasks.size(); }
size_t getFlushRows() const { return flush_rows; }
size_t getFlushBytes() const { return flush_bytes; }
size_t getFlushDeletes() const { return flush_deletes; }

// Persist data in ColumnFileInMemory
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Functions/FunctionHelpers.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
Expand Down Expand Up @@ -320,6 +321,8 @@ bool DeltaValueSpace::flush(DMContext & context)
new_delta_index = cur_delta_index->cloneWithUpdates(delta_index_updates);
LOG_DEBUG(log, "Update index done, delta={}", simpleInfo());
}
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_flush).Increment(flush_task->getFlushBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_flush).Increment(flush_task->getFlushRows());

SYNC_FOR("after_DeltaValueSpace::flush|prepare_flush");

Expand All @@ -345,7 +348,7 @@ bool DeltaValueSpace::flush(DMContext & context)
if (new_delta_index)
delta_index = new_delta_index;

LOG_DEBUG(log, "Flush end, flush_tasks={} flush_rows={} flush_deletes={} delta={}", flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushDeletes(), info());
LOG_DEBUG(log, "Flush end, flush_tasks={} flush_rows={} flush_bytes={} flush_deletes={} delta={}", flush_task->getTaskNum(), flush_task->getFlushRows(), flush_task->getFlushBytes(), flush_task->getFlushDeletes(), info());
}
return true;
}
Expand Down Expand Up @@ -387,6 +390,9 @@ bool DeltaValueSpace::compact(DMContext & context)
log_storage_snap.reset(); // release the snapshot ASAP
}

GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_delta_compact).Increment(compaction_task->getTotalCompactBytes());
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_delta_compact).Increment(compaction_task->getTotalCompactRows());

{
std::scoped_lock lock(mutex);

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ void MinorCompaction::prepare(DMContext & context, WriteBatches & wbs, const Pag
}
Block compact_block = schema.cloneWithColumns(std::move(compact_columns));
auto compact_rows = compact_block.rows();
auto compact_bytes = compact_block.bytes();
auto compact_column_file = ColumnFileTiny::writeColumnFile(context, compact_block, 0, compact_rows, wbs, task.to_compact.front()->tryToTinyFile()->getSchema());
wbs.writeLogAndData();
task.result = compact_column_file;

total_compact_files += task.to_compact.size();
total_compact_rows += compact_rows;
total_compact_bytes += compact_bytes;
result_compact_files += 1;
}
}
Expand All @@ -72,7 +74,7 @@ bool MinorCompaction::commit(ColumnFilePersistedSetPtr & persisted_file_set, Wri

String MinorCompaction::info() const
{
return fmt::format("Compact end, total_compact_files={} result_compact_files={} total_compact_rows={}", total_compact_files, result_compact_files, total_compact_rows);
return fmt::format("Compact end, total_compact_files={} result_compact_files={} total_compact_rows={} total_compact_bytes={}", total_compact_files, result_compact_files, total_compact_rows, total_compact_bytes);
}
} // namespace DM
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t total_compact_files = 0;
size_t total_compact_rows = 0;
size_t total_compact_bytes = 0;
size_t result_compact_files = 0;

public:
Expand Down Expand Up @@ -101,6 +102,10 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>

size_t getCompactionVersion() const { return current_compaction_version; }

// The stats about compaction. Only effective after `prepare` is called.
size_t getTotalCompactRows() const { return total_compact_rows; }
size_t getTotalCompactBytes() const { return total_compact_bytes; }

/// Create new column file by combining several small `ColumnFileTiny`s
void prepare(DMContext & context, WriteBatches & wbs, const PageReader & reader);

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
{
if (segment->writeToCache(*dm_context, block, offset, limit))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_cache).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_cache).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand All @@ -605,6 +607,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
// Write could fail, because other threads could already updated the instance. Like split/merge, merge delta.
if (segment->writeToDisk(*dm_context, write_column_file))
{
GET_METRIC(tiflash_storage_subtask_throughput_bytes, type_write_to_disk).Increment(alloc_bytes);
GET_METRIC(tiflash_storage_subtask_throughput_rows, type_write_to_disk).Increment(limit);
updated_segments.push_back(segment);
break;
}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/Transaction/LearnerRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,12 @@ LearnerReadSnapshot doLearnerRead(
{
// Wait index timeout is disabled; or timeout is enabled but not happen yet, wait index for
// a specify Region.
auto [wait_res, time_cost] = region->waitIndex(index_to_wait, tmt.waitIndexTimeout(), [&tmt]() { return tmt.checkRunning(); });
if (wait_res != WaitIndexResult::Finished)
auto [wait_res, time_cost] = region->waitIndex(
index_to_wait,
tmt.waitIndexTimeout(),
[&tmt]() { return tmt.checkRunning(); },
log);
if (wait_res != WaitIndexStatus::Finished)
{
handle_wait_timeout_region(region_to_query.region_id, index_to_wait);
continue;
Expand Down
88 changes: 54 additions & 34 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,43 +526,63 @@ bool Region::checkIndex(UInt64 index) const
return meta.checkIndex(index);
}

std::tuple<WaitIndexResult, double> Region::waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running)
std::tuple<WaitIndexStatus, double> Region::waitIndex(
UInt64 index,
const UInt64 timeout_ms,
std::function<bool(void)> && check_running,
const LoggerPtr & log)
{
if (proxy_helper != nullptr)
if (proxy_helper == nullptr) // just for debug
return {WaitIndexStatus::Finished, 0};

if (meta.checkIndex(index))
{
if (!meta.checkIndex(index))
{
Stopwatch wait_index_watch;
LOG_DEBUG(log,
"{} need to wait learner index {}",
toString(),
index);
auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
auto elapsed_secs = wait_index_watch.elapsedSeconds();
switch (wait_idx_res)
{
case WaitIndexResult::Finished:
{
LOG_DEBUG(log,
"{} wait learner index {} done",
toString(false),
index);
return {wait_idx_res, elapsed_secs};
}
case WaitIndexResult::Terminated:
{
return {wait_idx_res, elapsed_secs};
}
case WaitIndexResult::Timeout:
{
ProfileEvents::increment(ProfileEvents::RaftWaitIndexTimeout);
LOG_WARNING(log, "{} wait learner index {} timeout", toString(false), index);
return {wait_idx_res, elapsed_secs};
}
}
}
// already satisfied
return {WaitIndexStatus::Finished, 0};
}

Stopwatch wait_index_watch;
const auto wait_idx_res = meta.waitIndex(index, timeout_ms, std::move(check_running));
const auto elapsed_secs = wait_index_watch.elapsedSeconds();
const auto & status = wait_idx_res.status;
switch (status)
{
case WaitIndexStatus::Finished:
{
const auto log_lvl = elapsed_secs < 1.0 ? Poco::Message::PRIO_DEBUG : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(
log,
log_lvl,
"{} wait learner index done, prev_index={} curr_index={} to_wait={} elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
case WaitIndexStatus::Terminated:
{
return {status, elapsed_secs};
}
case WaitIndexStatus::Timeout:
{
ProfileEvents::increment(ProfileEvents::RaftWaitIndexTimeout);
LOG_WARNING(
log,
"{} wait learner index timeout, prev_index={} curr_index={} to_wait={} state={}"
" elapsed_s={:.3f} timeout_s={:.3f}",
toString(false),
wait_idx_res.prev_index,
wait_idx_res.current_index,
index,
static_cast<Int32>(peerState()),
elapsed_secs,
timeout_ms / 1000.0);
return {status, elapsed_secs};
}
}
return {WaitIndexResult::Finished, 0};
}

UInt64 Region::version() const
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class Region : public std::enable_shared_from_this<Region>

bool checkIndex(UInt64 index) const;

// Return <WaitIndexResult, time cost(seconds)> for wait-index.
std::tuple<WaitIndexResult, double> waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running);
// Return <WaitIndexStatus, time cost(seconds)> for wait-index.
std::tuple<WaitIndexStatus, double> waitIndex(UInt64 index, UInt64 timeout_ms, std::function<bool(void)> && check_running, const LoggerPtr & log);

UInt64 appliedIndex() const;
UInt64 appliedIndexTerm() const;
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/Transaction/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,38 +163,41 @@ void RegionMeta::setPeerState(const raft_serverpb::PeerState peer_state_)
WaitIndexResult RegionMeta::waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running) const
{
std::unique_lock lock(mutex);
WaitIndexResult status = WaitIndexResult::Finished;
WaitIndexResult res;
res.prev_index = apply_state.applied_index();
if (timeout_ms != 0)
{
// wait for applied index with a timeout
auto timeout_timepoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
if (!cv.wait_until(lock, timeout_timepoint, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
}))
{
// not terminated && not reach the `index` => timeout
status = WaitIndexResult::Timeout;
res.status = WaitIndexStatus::Timeout;
}
}
else
{
// wait infinitely
cv.wait(lock, [&] {
res.current_index = apply_state.applied_index();
if (!check_running())
{
status = WaitIndexResult::Terminated;
res.status = WaitIndexStatus::Terminated;
return true;
}
return doCheckIndex(index);
});
}

return status;
return res;
}

bool RegionMeta::checkIndex(UInt64 index) const
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/Transaction/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ struct RegionMergeResult;
class Region;
class MetaRaftCommandDelegate;
class RegionRaftCommandDelegate;
enum class WaitIndexResult
enum class WaitIndexStatus
{
Finished,
Terminated,
Terminated, // Read index is terminated due to upper layer.
Timeout,
};
struct WaitIndexResult
{
WaitIndexStatus status{WaitIndexStatus::Finished};
// the applied index before wait index
UInt64 prev_index = 0;
// the applied index when wait index finish
UInt64 current_index = 0;
};

struct RegionMetaSnapshot
{
Expand Down Expand Up @@ -102,7 +110,7 @@ class RegionMeta
// If `timeout_ms` == 0, it waits infinite except `check_running` return false.
// `timeout_ms` != 0 and not reaching `index` after waiting for `timeout_ms`, Return WaitIndexResult::Timeout.
// If `check_running` return false, returns WaitIndexResult::Terminated
WaitIndexResult waitIndex(UInt64 index, const UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
WaitIndexResult waitIndex(UInt64 index, UInt64 timeout_ms, std::function<bool(void)> && check_running) const;
bool checkIndex(UInt64 index) const;

RegionMetaSnapshot dumpRegionMetaSnapshot() const;
Expand Down
Loading

0 comments on commit 170bc61

Please sign in to comment.