Skip to content

Commit

Permalink
[Enhance] Push down predicate on value column of unique table to base…
Browse files Browse the repository at this point in the history
… rowset (#5022)
  • Loading branch information
yangzhg authored Dec 6, 2020
1 parent 6021d6f commit b9dabc3
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 31 deletions.
12 changes: 10 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,16 @@ Status OlapScanner::_init_params(const std::vector<OlapScanRange*>& key_ranges,
// TODO(zc)
_params.profile = _profile;
_params.runtime_state = _runtime_state;

if (_aggregation) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
(_params.rs_readers.size() == 1 &&
_params.rs_readers[0]->rowset()->start_version() == 0 &&
!_params.rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) ||
(_params.rs_readers.size() == 2 &&
_params.rs_readers[1]->rowset()->rowset_meta()->num_rows() == 0 &&
_params.rs_readers[1]->rowset()->start_version() == 2 &&
!_params.rs_readers[1]->rowset()->rowset_meta()->is_segments_overlapping());
if (_aggregation || single_version) {
_params.return_columns = _return_columns;
} else {
for (size_t i = 0; i < _tablet->num_key_columns(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class StorageReadOptions {
// used to fiter rows in row block
// TODO(hkp): refactor the column predicate framework
// to unify Conditions and ColumnPredicate
const std::vector<ColumnPredicate*>* column_predicates = nullptr;
std::vector<ColumnPredicate*> column_predicates;

// REQUIRED (null is not allowed)
OlapReaderStatistics* stats = nullptr;
Expand Down
16 changes: 12 additions & 4 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ void Reader::close() {
for (auto pred : _col_predicates) {
delete pred;
}
for (auto pred : _value_col_predicates) {
delete pred;
}
}

OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
Expand Down Expand Up @@ -365,6 +368,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.load_bf_columns = &_load_bf_columns;
_reader_context.conditions = &_conditions;
_reader_context.predicates = &_col_predicates;
_reader_context.value_predicates = &_value_col_predicates;
_reader_context.lower_bound_keys = &_keys_param.start_keys;
_reader_context.is_lower_keys_included = &_is_lower_keys_included;
_reader_context.upper_bound_keys = &_keys_param.end_keys;
Expand Down Expand Up @@ -575,7 +579,13 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
DCHECK_EQ(OLAP_SUCCESS, _conditions.append_condition(condition));
ColumnPredicate* predicate = _parse_to_predicate(condition);
if (predicate != nullptr) {
_col_predicates.push_back(predicate);
if (_tablet->tablet_schema()
.column(_tablet->field_index(condition.column_name))
.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
} else {
_col_predicates.push_back(predicate);
}
}
}
}
Expand Down Expand Up @@ -685,9 +695,6 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
return nullptr;
}
const TabletColumn& column = _tablet->tablet_schema().column(index);
if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
return nullptr;
}
ColumnPredicate* predicate = nullptr;
if (condition.condition_op == "*=" && condition.condition_values.size() == 1) {
predicate = _new_eq_pred(column, index, condition.condition_values[0]);
Expand Down Expand Up @@ -820,6 +827,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {
} else if (condition.condition_op == "is") {
predicate = new NullPredicate(index, condition.condition_values[0] == "null");
}

return predicate;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class Reader {
std::vector<bool> _is_upper_keys_included;
Conditions _conditions;
std::vector<ColumnPredicate*> _col_predicates;
std::vector<ColumnPredicate*> _value_col_predicates;
DeleteHandler _delete_handler;

OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, MemPool* mem_pool,
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/alpha_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,22 +310,24 @@ OLAPStatus AlphaRowset::init() {
if (segment_group_meta.zone_maps_size() != 0) {
size_t zone_maps_size = segment_group_meta.zone_maps_size();
// after 0.12.10 the value column in duplicate table also has zone map.
size_t expect_zone_maps_num = _schema->keys_type() == KeysType::DUP_KEYS
// after 0.14 the value column in duplicate table also has zone map.
size_t expect_zone_maps_num = _schema->keys_type() != KeysType::AGG_KEYS
? _schema->num_columns()
: _schema->num_key_columns();
if ((_schema->keys_type() != KeysType::DUP_KEYS &&
if ((_schema->keys_type() == KeysType::AGG_KEYS &&
expect_zone_maps_num != zone_maps_size) ||
(_schema->keys_type() == KeysType::DUP_KEYS &&
(_schema->keys_type() != KeysType::AGG_KEYS &&
expect_zone_maps_num < zone_maps_size)) {
LOG(ERROR) << "column pruning size is error."
LOG(ERROR) << "column pruning size is error. "
<< "KeysType=" << KeysType_Name(_schema->keys_type()) << ", "
<< "zone_maps_size=" << zone_maps_size << ", "
<< "num_key_columns=" << _schema->num_key_columns() << ", "
<< "num_columns=" << _schema->num_columns();
return OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR;
}
// Before 0.12.10, the zone map columns number in duplicate table is the same with the key column numbers,
// but after 0.12.10 we build zone map for the value column, so when first start the two number is not the same,
// Before 0.12.10, the zone map columns number in duplicate/unique table is the same with the key column numbers,
// but after 0.12.10 we build zone map for duplicate table value column, after 0.14 we build zone map for unique
// table value column, so when first start the two number is not the same,
// it causes start failed. When `expect_zone_maps_num > zone_maps_size` it may be the first start after upgrade
if (expect_zone_maps_num > zone_maps_size) {
LOG(WARNING)
Expand Down
16 changes: 14 additions & 2 deletions be/src/olap/rowset/alpha_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,22 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context
continue;
}
} else {
std::vector<ColumnPredicate*> predicates;
if (read_context->predicates != nullptr) {
predicates.insert(predicates.end(), read_context->predicates->begin(),
read_context->predicates->end());
}
// if unique table with rowset [0-x] or [0-1] [2-y] [...],
// value column predicates can be pushdown on rowset [0-x] or [2-y]
if (read_context->value_predicates != nullptr && _rowset->keys_type() == UNIQUE_KEYS &&
(_rowset->start_version() == 0 || _rowset->start_version() == 2)) {
predicates.insert(predicates.end(), read_context->value_predicates->begin(),
read_context->value_predicates->end());
}
new_column_data->set_read_params(
*read_context->return_columns, *read_context->seek_columns,
*read_context->load_bf_columns, *read_context->conditions,
*read_context->predicates, use_index_stream_cache, read_context->runtime_state);
*read_context->load_bf_columns, *read_context->conditions, predicates,
use_index_stream_cache, read_context->runtime_state);
// filter
if (new_column_data->rowset_pruning_filter()) {
_stats->rows_stats_filtered += new_column_data->num_rows();
Expand Down
14 changes: 13 additions & 1 deletion be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,19 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
read_context->delete_handler->get_delete_conditions_after_version(
_rowset->end_version(), &read_options.delete_conditions);
}
read_options.column_predicates = read_context->predicates;
if (read_context->predicates != nullptr) {
read_options.column_predicates.insert(read_options.column_predicates.end(),
read_context->predicates->begin(),
read_context->predicates->end());
}
// if unique table with rowset [0-x] or [0-1] [2-y] [...],
// value column predicates can be pushdown on rowset [0-x] or [2-y]
if (read_context->value_predicates != nullptr && _rowset->keys_type() == UNIQUE_KEYS &&
(_rowset->start_version() == 0 || _rowset->start_version() == 2)) {
read_options.column_predicates.insert(read_options.column_predicates.end(),
read_context->value_predicates->begin(),
read_context->value_predicates->end());
}
read_options.use_page_cache = read_context->use_page_cache;

// create iterator for each segment
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "gen_cpp/olap_file.pb.h"
#include "gutil/macros.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_schema.h"

namespace doris {

Expand All @@ -36,7 +37,6 @@ class Rowset;
using RowsetSharedPtr = std::shared_ptr<Rowset>;
class RowsetFactory;
class RowsetReader;
class TabletSchema;

// the rowset state transfer graph:
// ROWSET_UNLOADED <--|
Expand Down Expand Up @@ -159,6 +159,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
bool delete_flag() const { return rowset_meta()->delete_flag(); }
int64_t num_segments() const { return rowset_meta()->num_segments(); }
void to_rowset_pb(RowsetMetaPB* rs_meta) { return rowset_meta()->to_rowset_pb(rs_meta); }
inline KeysType keys_type() { return _schema->keys_type(); }

// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ struct RowsetReaderContext {
// column name -> column predicate
// adding column_name for predicate to make use of column selectivity
const std::vector<ColumnPredicate*>* predicates = nullptr;
// value column predicate in UNIQUE table
const std::vector<ColumnPredicate*>* value_predicates = nullptr;
const std::vector<RowCursor*>* lower_bound_keys = nullptr;
const std::vector<bool>* is_lower_keys_included = nullptr;
const std::vector<RowCursor*>* upper_bound_keys = nullptr;
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ OLAPStatus SegmentGroup::add_zone_maps_for_linked_schema_change(
<< zonemap_col_num << " vs. " << schema_mapping.size();

for (size_t i = 0; i < zonemap_col_num; ++i) {
// in duplicated table update from 0.11 to 0.12, zone map index may be missed and may not a new column.
if (_schema->keys_type() == DUP_KEYS && schema_mapping[i].ref_column != -1 &&
// in duplicate/unique table update from 0.11 to 0.12, zone map index may be missed and may not a new column.
if (_schema->keys_type() != AGG_KEYS && schema_mapping[i].ref_column != -1 &&
schema_mapping[i].ref_column >= zone_map_fields.size()) {
// the sequence of columns in _zone_maps and _schema must be consistent, so here
// process should not add missed zonemap and we break the loop.
Expand Down Expand Up @@ -729,7 +729,7 @@ const TabletSchema& SegmentGroup::get_tablet_schema() {
}

int SegmentGroup::get_num_zone_map_columns() {
if (_schema->keys_type() == KeysType::DUP_KEYS) {
if (_schema->keys_type() != KeysType::AGG_KEYS) {
return _schema->num_columns();
}
return _schema->num_key_columns();
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ struct ColumnWriterOptions {
bool need_zone_map = false;
bool need_bitmap_index = false;
bool need_bloom_filter = false;
std::string to_string() {
std::stringstream ss;
ss << std::boolalpha << "meta=" << meta->DebugString()
<< ", data_page_size=" << data_page_size
<< ", compression_min_space_saving = " << compression_min_space_saving
<< ", need_zone_map=" << need_zone_map << ", need_bitmap_index=" << need_bitmap_index
<< ", need_bloom_filter" << need_bloom_filter;
return ss.str();
}
};

class BitmapIndexWriter;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ SegmentIterator::~SegmentIterator() {

Status SegmentIterator::init(const StorageReadOptions& opts) {
_opts = opts;
if (opts.column_predicates != nullptr) {
_col_predicates = *(opts.column_predicates);
if (!opts.column_predicates.empty()) {
_col_predicates = opts.column_predicates;
}
return Status::OK();
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused))

_init_column_meta(opts.meta, &column_id, column);

// now we create zone map for key columns
// now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
// and not support zone map for array type.
opts.need_zone_map = column.is_key() || _tablet_schema->keys_type() == KeysType::DUP_KEYS;
if (column.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
opts.need_zone_map = false;
} else {
opts.need_zone_map =
column.is_key() || _tablet_schema->keys_type() != KeysType::AGG_KEYS;
}
opts.need_bloom_filter = column.is_bf_column();
opts.need_bitmap_index = column.has_bitmap_index();
Expand Down
16 changes: 8 additions & 8 deletions be/test/olap/rowset/segment_v2/segment_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {

OlapReaderStatistics stats;
StorageReadOptions read_opts;
read_opts.column_predicates = &predicates;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand All @@ -331,7 +331,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {

OlapReaderStatistics stats;
StorageReadOptions read_opts;
read_opts.column_predicates = &predicates;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand Down Expand Up @@ -383,7 +383,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {

OlapReaderStatistics stats;
StorageReadOptions read_opts;
read_opts.column_predicates = &predicates;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand Down Expand Up @@ -1026,7 +1026,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {

StorageReadOptions read_opts;
OlapReaderStatistics stats;
read_opts.column_predicates = &column_predicates;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand All @@ -1048,7 +1048,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {

StorageReadOptions read_opts;
OlapReaderStatistics stats;
read_opts.column_predicates = &column_predicates;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand All @@ -1070,7 +1070,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {

StorageReadOptions read_opts;
OlapReaderStatistics stats;
read_opts.column_predicates = &column_predicates;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand All @@ -1094,7 +1094,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {

StorageReadOptions read_opts;
OlapReaderStatistics stats;
read_opts.column_predicates = &column_predicates;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand All @@ -1117,7 +1117,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {

StorageReadOptions read_opts;
OlapReaderStatistics stats;
read_opts.column_predicates = &column_predicates;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;

std::unique_ptr<RowwiseIterator> iter;
Expand Down

0 comments on commit b9dabc3

Please sign in to comment.