Skip to content

Commit

Permalink
[optimize](zonemap) skip zonemap if predicate does not support_zonemap (
Browse files Browse the repository at this point in the history
#28595)

* [optimize](zonemap) skip zonemap if predicate does not support_zonemap #27608 (#28506)
  • Loading branch information
xiaokang authored Dec 24, 2023
1 parent 6ea6ff5 commit db1da16
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 73 deletions.
13 changes: 13 additions & 0 deletions be/src/olap/block_column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class BlockColumnPredicate {
return true;
}

virtual bool support_zonemap() const { return true; }

virtual bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const {
LOG(FATAL) << "should not reach here";
return true;
Expand Down Expand Up @@ -118,6 +120,7 @@ class SingleColumnBlockPredicate : public BlockColumnPredicate {
uint16_t selected_size) const override;
void evaluate_and(vectorized::MutableColumns& block, uint16_t* sel, uint16_t selected_size,
bool* flags) const override;
bool support_zonemap() const override { return _predicate->support_zonemap(); }
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override;
bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
bool evaluate_and(const StringRef* dict_words, const size_t dict_num) const override;
Expand Down Expand Up @@ -148,6 +151,16 @@ class MutilColumnBlockPredicate : public BlockColumnPredicate {
}
}

bool support_zonemap() const override {
for (const auto* child_block_predicate : _block_column_predicate_vec) {
if (!child_block_predicate->support_zonemap()) {
return false;
}
}

return true;
}

void add_column_predicate(const BlockColumnPredicate* column_predicate) {
_block_column_predicate_vec.push_back(column_predicate);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ class ColumnPredicate {
virtual void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
bool* flags) const {}

virtual bool support_zonemap() const { return true; }

virtual bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const {
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/match_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class MatchPredicate : public ColumnPredicate {

const std::string& get_value() const { return _value; }

bool support_zonemap() const override { return false; }

//evaluate predicate on Bitmap
virtual Status evaluate(BitmapIndexIterator* iterator, uint32_t num_rows,
roaring::Roaring* roaring) const override {
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ struct OlapReaderStatistics {

int64_t rows_key_range_filtered = 0;
int64_t rows_stats_filtered = 0;
int64_t rows_stats_rp_filtered = 0;
int64_t rows_bf_filtered = 0;
int64_t rows_dict_filtered = 0;
// Including the number of rows filtered out according to the Delete information in the Tablet,
Expand All @@ -337,6 +338,10 @@ struct OlapReaderStatistics {
// the number of rows filtered by various column indexes.
int64_t rows_conditions_filtered = 0;
int64_t block_conditions_filtered_ns = 0;
int64_t block_conditions_filtered_bf_ns = 0;
int64_t block_conditions_filtered_zonemap_ns = 0;
int64_t block_conditions_filtered_zonemap_rp_ns = 0;
int64_t block_conditions_filtered_dict_ns = 0;

int64_t index_load_ns = 0;

Expand Down
167 changes: 95 additions & 72 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,90 +529,113 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
cids.insert(entry.first);
}

// first filter data by bloom filter index
// bloom filter index only use CondColumn
RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
for (auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid, _opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
}
// get row ranges by bf index of this column,
RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter(
_opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges));
RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
}
size_t pre_size = 0;

size_t pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges);
_opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count());

RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
// second filter data by zone map
for (auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid, _opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_bf_ns);
// first filter data by bloom filter index
// bloom filter index only use CondColumn
RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
for (auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid, _opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
}
// get row ranges by bf index of this column,
RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter(
_opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges));
RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
}
// get row ranges by zone map of this column,
RowRanges column_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map(
_opts.col_id_to_predicates.at(cid).get(),
_opts.del_predicates_for_zone_map.count(cid) > 0
? &(_opts.del_predicates_for_zone_map.at(cid))
: nullptr,
&column_row_ranges));
// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
&zone_map_row_ranges);
}

std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate && _segment->can_apply_predicate_safely(
runtime_predicate->column_id(), runtime_predicate.get(),
*_schema, _opts.io_ctx.reader_type)) {
AndBlockColumnPredicate and_predicate;
auto single_predicate = new SingleColumnBlockPredicate(runtime_predicate.get());
and_predicate.add_column_predicate(single_predicate);

RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(
_column_iterators[runtime_predicate->column_id()]->get_row_ranges_by_zone_map(
&and_predicate, nullptr, &column_rp_row_ranges));
pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges);
_opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count());
}

{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns);
RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
// second filter data by zone map
for (auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid, _opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts.io_ctx.reader_type)) {
continue;
}
// do not check zonemap if predicate does not support zonemap
if (!_opts.col_id_to_predicates.at(cid)->support_zonemap()) {
VLOG_DEBUG << "skip zonemap for column " << cid;
continue;
}
// get row ranges by zone map of this column,
RowRanges column_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map(
_opts.col_id_to_predicates.at(cid).get(),
_opts.del_predicates_for_zone_map.count(cid) > 0
? &(_opts.del_predicates_for_zone_map.at(cid))
: nullptr,
&column_row_ranges));
// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges,
RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
&zone_map_row_ranges);
}
}

pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);
_opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count());
pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);

/// Low cardinality optimization is currently not very stable, so to prevent data corruption,
/// we are temporarily disabling its use in data compaction.
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
for (auto cid : cids) {
RowRanges tmp_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
_opts.col_id_to_predicates.at(cid).get(), &tmp_row_ranges));
RowRanges::ranges_intersection(dict_row_ranges, tmp_row_ranges, &dict_row_ranges);
std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_zonemap_ns);
auto query_ctx = _opts.runtime_state->get_query_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate &&
_segment->can_apply_predicate_safely(runtime_predicate->column_id(),
runtime_predicate.get(), *_schema,
_opts.io_ctx.reader_type)) {
AndBlockColumnPredicate and_predicate;
auto single_predicate = new SingleColumnBlockPredicate(runtime_predicate.get());
and_predicate.add_column_predicate(single_predicate);

RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()]
->get_row_ranges_by_zone_map(&and_predicate, nullptr,
&column_rp_row_ranges));

// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges,
&zone_map_row_ranges);
}
}

pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges,
size_t pre_size2 = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);
_opts.stats->rows_dict_filtered += (pre_size - condition_row_ranges->count());
_opts.stats->rows_stats_rp_filtered += (pre_size2 - condition_row_ranges->count());
_opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count());
}

{
SCOPED_RAW_TIMER(&_opts.stats->block_conditions_filtered_dict_ns);
/// Low cardinality optimization is currently not very stable, so to prevent data corruption,
/// we are temporarily disabling its use in data compaction.
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
for (auto cid : cids) {
RowRanges tmp_row_ranges = RowRanges::create_single(num_rows());
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
_opts.col_id_to_predicates.at(cid).get(), &tmp_row_ranges));
RowRanges::ranges_intersection(dict_row_ranges, tmp_row_ranges, &dict_row_ranges);
}

pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges,
condition_row_ranges);
_opts.stats->rows_dict_filtered += (pre_size - condition_row_ranges->count());
}
}

return Status::OK();
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ Status OlapScanLocalState::_init_profile() {
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
_block_conditions_filtered_timer = ADD_TIMER(_segment_profile, "BlockConditionsFilteredTime");
_block_conditions_filtered_bf_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredBloomFilterTime");
_block_conditions_filtered_zonemap_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredZonemapTime");
_block_conditions_filtered_zonemap_rp_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredZonemapRuntimePredicateTime");
_block_conditions_filtered_dict_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredDictTime");

_rows_vec_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
Expand All @@ -87,6 +95,8 @@ Status OlapScanLocalState::_init_profile() {
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");

_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
_stats_rp_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsZonemapRuntimePredicateFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_dict_filtered_counter = ADD_COUNTER(_segment_profile, "RowsDictFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
std::map<int, PredicateFilterInfo> _filter_info;

RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
RuntimeProfile::Counter* _stats_rp_filtered_counter = nullptr;
RuntimeProfile::Counter* _bf_filtered_counter = nullptr;
RuntimeProfile::Counter* _dict_filtered_counter = nullptr;
RuntimeProfile::Counter* _del_filtered_counter = nullptr;
Expand All @@ -141,6 +142,10 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
RuntimeProfile::Counter* _block_init_seek_timer = nullptr;
RuntimeProfile::Counter* _block_init_seek_counter = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_bf_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_zonemap_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_zonemap_rp_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_dict_timer = nullptr;
RuntimeProfile::Counter* _first_read_timer = nullptr;
RuntimeProfile::Counter* _second_read_timer = nullptr;
RuntimeProfile::Counter* _first_read_seek_timer = nullptr;
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ Status NewOlapScanNode::_init_profile() {
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
_block_conditions_filtered_timer = ADD_TIMER(_segment_profile, "BlockConditionsFilteredTime");
_block_conditions_filtered_bf_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredBloomFilterTime");
_block_conditions_filtered_zonemap_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredZonemapTime");
_block_conditions_filtered_zonemap_rp_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredZonemapRuntimePredicateTime");
_block_conditions_filtered_dict_timer =
ADD_TIMER(_segment_profile, "BlockConditionsFilteredDictTime");

_rows_vec_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
Expand All @@ -152,7 +160,9 @@ Status NewOlapScanNode::_init_profile() {

_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");

_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsZonemapFiltered", TUnit::UNIT);
_stats_rp_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsZonemapRuntimePredicateFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_dict_filtered_counter = ADD_COUNTER(_segment_profile, "RowsDictFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/scan/new_olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class NewOlapScanNode : public VScanNode {
std::map<int, PredicateFilterInfo> _filter_info;

RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
RuntimeProfile::Counter* _stats_rp_filtered_counter = nullptr;
RuntimeProfile::Counter* _bf_filtered_counter = nullptr;
RuntimeProfile::Counter* _dict_filtered_counter = nullptr;
RuntimeProfile::Counter* _del_filtered_counter = nullptr;
Expand All @@ -168,6 +169,10 @@ class NewOlapScanNode : public VScanNode {
RuntimeProfile::Counter* _block_init_seek_timer = nullptr;
RuntimeProfile::Counter* _block_init_seek_counter = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_bf_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_zonemap_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_zonemap_rp_timer = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_dict_timer = nullptr;
RuntimeProfile::Counter* _first_read_timer = nullptr;
RuntimeProfile::Counter* _second_read_timer = nullptr;
RuntimeProfile::Counter* _first_read_seek_timer = nullptr;
Expand Down
Loading

0 comments on commit db1da16

Please sign in to comment.