Skip to content

Commit

Permalink
[cherry-pick](branch-3.0) Pick "[fix](rowset meta) Fix rowset meta si…
Browse files Browse the repository at this point in the history
…ze relation (#41022)" (#43414)

Pick #41022
  • Loading branch information
Yukang-Lian authored Nov 12, 2024
1 parent 47c4604 commit b5b8e49
Show file tree
Hide file tree
Showing 37 changed files with 400 additions and 92 deletions.
23 changes: 15 additions & 8 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,18 @@ Status CloudBaseCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size);
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down Expand Up @@ -270,17 +273,21 @@ Status CloudBaseCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size());
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());

//_compaction_succeed = true;
_state = CompactionState::SUCCESS;

DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size);
base_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_total_size);
base_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand All @@ -302,8 +309,8 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(cloud_tablet()->cumulative_layer_point());
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
25 changes: 17 additions & 8 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,19 @@ Status CloudCumulativeCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
Expand Down Expand Up @@ -201,10 +205,14 @@ Status CloudCumulativeCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size())
.tag("tablet_max_version", _tablet->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
Expand All @@ -213,8 +221,9 @@ Status CloudCumulativeCompaction::execute_compact() {
_state = CompactionState::SUCCESS;

DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
cumu_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(
_input_rowsets_total_size);
cumu_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand Down Expand Up @@ -243,8 +252,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(new_cumulative_point);
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
compaction_job->set_num_input_rowsets(_input_rowsets.size());
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
// if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
return output_rowset->start_version() == last_cumulative_point &&
(last_delete_version.first != -1 ||
output_rowset->data_disk_size() >= cloud_promotion_size(tablet) ||
output_rowset->total_disk_size() >= cloud_promotion_size(tablet) ||
satisfy_promotion_version)
? output_rowset->end_version() + 1
: last_cumulative_point;
Expand Down
26 changes: 17 additions & 9 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,19 @@ Status CloudFullCompaction::prepare_compact() {
for (auto& rs : _input_rowsets) {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_size += rs->data_disk_size();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudFullCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size);
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size);
return st;
}

Expand Down Expand Up @@ -162,16 +166,20 @@ Status CloudFullCompaction::execute_compact() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_data_size", _input_rowsets_data_size)
.tag("input_rowsets_index_size", _input_rowsets_index_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("output_rows", _output_rowset->num_rows())
.tag("output_segments", _output_rowset->num_segments())
.tag("output_data_size", _output_rowset->data_disk_size());
.tag("output_rowset_data_size", _output_rowset->data_disk_size())
.tag("output_rowset_index_size", _output_rowset->index_disk_size())
.tag("output_rowset_total_size", _output_rowset->total_disk_size());

_state = CompactionState::SUCCESS;

DorisMetrics::instance()->full_compaction_deltas_total->increment(_input_rowsets.size());
DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_size);
full_output_size << _output_rowset->data_disk_size();
DorisMetrics::instance()->full_compaction_bytes_total->increment(_input_rowsets_total_size);
full_output_size << _output_rowset->total_disk_size();

return Status::OK();
}
Expand All @@ -193,8 +201,8 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->set_output_cumulative_point(_output_rowset->end_version() + 1);
compaction_job->set_num_input_rows(_input_row_num);
compaction_job->set_num_output_rows(_output_rowset->num_rows());
compaction_job->set_size_input_rowsets(_input_rowsets_size);
compaction_job->set_size_output_rowsets(_output_rowset->data_disk_size());
compaction_job->set_size_input_rowsets(_input_rowsets_total_size);
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.wrong_compaction_data_size", {
compaction_job->set_size_input_rowsets(1);
compaction_job->set_size_output_rowsets(10000001);
Expand Down Expand Up @@ -345,7 +353,7 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("input_data_size", _input_rowsets_size)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
_tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
return Status::OK();
Expand Down
123 changes: 123 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <random>
Expand All @@ -52,6 +53,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -769,6 +771,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
check_table_size_correctness(rs_meta);
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -1163,4 +1166,124 @@ Status CloudMetaMgr::remove_old_version_delete_bitmap(
return st;
}

void CloudMetaMgr::check_table_size_correctness(const RowsetMeta& rs_meta) {
if (!config::enable_table_size_correctness_check) {
return;
}
int64_t total_segment_size = get_segment_file_size(rs_meta);
int64_t total_inverted_index_size = get_inverted_index_file_szie(rs_meta);
if (rs_meta.data_disk_size() != total_segment_size ||
rs_meta.index_disk_size() != total_inverted_index_size ||
rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
LOG(WARNING) << "[Cloud table table size check failed]:"
<< " tablet id: " << rs_meta.tablet_id()
<< ", rowset id:" << rs_meta.rowset_id()
<< ", rowset data disk size:" << rs_meta.data_disk_size()
<< ", rowset real data disk size:" << total_segment_size
<< ", rowset index disk size:" << rs_meta.index_disk_size()
<< ", rowset real index disk size:" << total_inverted_index_size
<< ", rowset total disk size:" << rs_meta.total_disk_size()
<< ", rowset segment path:"
<< StorageResource().remote_segment_path(rs_meta.tablet_id(),
rs_meta.rowset_id().to_string(), 0);
DCHECK(false);
}
}

int64_t CloudMetaMgr::get_segment_file_size(const RowsetMeta& rs_meta) {
int64_t total_segment_size = 0;
const auto fs = const_cast<RowsetMeta&>(rs_meta).fs();
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
}
for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t segment_file_size = 0;
auto st = fs->file_size(segment_path, &segment_file_size);
if (!st.ok()) {
segment_file_size = 0;
if (st.is<FILE_NOT_EXIST>()) {
LOG(INFO) << "cloud table size correctness check get segment size 0 because "
"file not exist! msg:"
<< st.msg() << ", segment path:" << segment_path;
} else {
LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:"
<< st.msg() << ", segment path:" << segment_path;
}
}
total_segment_size += segment_file_size;
}
return total_segment_size;
}

int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
int64_t total_inverted_index_size = 0;
const auto fs = const_cast<RowsetMeta&>(rs_meta).fs();
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
}
if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
auto indices = rs_meta.tablet_schema()->indexes();
for (auto& index : indices) {
// only get file_size for inverted index
if (index.index_type() != IndexType::INVERTED) {
continue;
}
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
int64_t file_size = 0;

std::string inverted_index_file_path =
InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path),
index.index_id(), index.get_index_suffix());
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
if (st.is<FILE_NOT_EXIST>()) {
LOG(INFO) << "cloud table size correctness check get inverted index v1 "
"0 because file not exist! msg:"
<< st.msg()
<< ", inverted index path:" << inverted_index_file_path;
} else {
LOG(WARNING)
<< "cloud table size correctness check get inverted index v1 "
"size failed! msg:"
<< st.msg() << ", inverted index path:" << inverted_index_file_path;
}
}
total_inverted_index_size += file_size;
}
}
} else {
for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
int64_t file_size = 0;
std::string segment_path = StorageResource().remote_segment_path(
rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);

std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path));
auto st = fs->file_size(inverted_index_file_path, &file_size);
if (!st.ok()) {
file_size = 0;
if (st.is<FILE_NOT_EXIST>()) {
LOG(INFO) << "cloud table size correctness check get inverted index v2 "
"0 because file not exist! msg:"
<< st.msg() << ", inverted index path:" << inverted_index_file_path;
} else {
LOG(WARNING) << "cloud table size correctness check get inverted index v2 "
"size failed! msg:"
<< st.msg()
<< ", inverted index path:" << inverted_index_file_path;
}
}
total_inverted_index_size += file_size;
}
}
return total_inverted_index_size;
}

} // namespace doris::cloud
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class CloudMetaMgr {
Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
void check_table_size_correctness(const RowsetMeta& rs_meta);
int64_t get_segment_file_size(const RowsetMeta& rs_meta);
int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
};

} // namespace cloud
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void CloudRowsetBuilder::update_tablet_stats() {
tablet->fetch_add_approximate_num_rowsets(1);
tablet->fetch_add_approximate_num_segments(_rowset->num_segments());
tablet->fetch_add_approximate_num_rows(_rowset->num_rows());
tablet->fetch_add_approximate_data_size(_rowset->data_disk_size());
tablet->fetch_add_approximate_data_size(_rowset->total_disk_size());
tablet->fetch_add_approximate_cumu_num_rowsets(1);
tablet->fetch_add_approximate_cumu_num_deltas(_rowset->num_segments());
tablet->write_count.fetch_add(1, std::memory_order_relaxed);
Expand Down
Loading

0 comments on commit b5b8e49

Please sign in to comment.