Skip to content

Commit

Permalink
multi match function add
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Jul 19, 2024
1 parent 76c6d17 commit a926b4c
Show file tree
Hide file tree
Showing 19 changed files with 582 additions and 72 deletions.
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class InvertedIndexIterator;
class InvertedIndexQueryCacheHandle;
class InvertedIndexFileReader;
struct InvertedIndexQueryInfo;

class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexReader> {
public:
explicit InvertedIndexReader(
Expand Down Expand Up @@ -147,6 +146,7 @@ class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexRea
TabletIndex _index_meta;
bool _has_null = true;
};
using InvertedIndexReaderPtr = std::shared_ptr<InvertedIndexReader>;

class FullTextIndexReader : public InvertedIndexReader {
ENABLE_FACTORY_CREATOR(FullTextIndexReader);
Expand Down Expand Up @@ -380,6 +380,8 @@ class InvertedIndexIterator {
[[nodiscard]] const std::map<string, string>& get_index_properties() const;
[[nodiscard]] bool has_null() { return _reader->has_null(); };

const InvertedIndexReaderPtr& reader() { return _reader; }

private:
OlapReaderStatistics* _stats = nullptr;
RuntimeState* _runtime_state = nullptr;
Expand Down
152 changes: 125 additions & 27 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
for (auto& expr : _remaining_conjunct_roots) {
_calculate_pred_in_remaining_conjunct_root(expr);
}
_calculate_func_in_remaining_conjunct_root();

_column_predicate_info.reset(new ColumnPredicateInfo());
if (_schema->rowid_col_idx() > 0) {
Expand Down Expand Up @@ -560,6 +561,7 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
}
}
_col_preds_except_leafnode_of_andnode.clear();
compound_func_exprs.clear();
// 1. if all conditions in the compound hit the inverted index and there are no other expr to handle.
// 2. then there is no need to generate index_result_column.
if (_enable_common_expr_pushdown && _remaining_conjunct_roots.empty()) {
Expand Down Expand Up @@ -809,25 +811,32 @@ Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(
auto v_literal_expr = std::dynamic_pointer_cast<doris::vectorized::VLiteral>(expr);
_column_predicate_info->query_values.insert(v_literal_expr->value());
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED ||
node_type == TExprNodeType::IN_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in";
node_type == TExprNodeType::IN_PRED || node_type == TExprNodeType::FUNCTION_CALL) {
std::string result_sign;
if (node_type == TExprNodeType::FUNCTION_CALL) {
result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX +
std::to_string(reinterpret_cast<uintptr_t>(expr.get()));
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in";
} else {
_column_predicate_info->query_op = "not_in";
}
} else {
_column_predicate_info->query_op = "not_in";
_column_predicate_info->query_op = expr->fn().name.function_name;
}
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
result_sign = _gen_predicate_result_sign(_column_predicate_info.get());
}

// get child condition result in compound conditions
auto pred_result_sign = _gen_predicate_result_sign(_column_predicate_info.get());
_column_predicate_info.reset(new ColumnPredicateInfo());
VLOG_DEBUG << "_gen_predicate_result_sign " << pred_result_sign;
if (_rowid_result_for_index.count(pred_result_sign) > 0 &&
_rowid_result_for_index[pred_result_sign].first) {
auto apply_result = _rowid_result_for_index[pred_result_sign].second;
VLOG_DEBUG << "result_sign " << result_sign;
if (_rowid_result_for_index.count(result_sign) > 0 &&
_rowid_result_for_index[result_sign].first) {
auto apply_result = _rowid_result_for_index[result_sign].second;
_pred_except_leafnode_of_andnode_evaluate_result.push_back(apply_result);
}
} else if (node_type == TExprNodeType::COMPOUND_PRED) {
Expand Down Expand Up @@ -871,7 +880,7 @@ Status SegmentIterator::_execute_compound_fn(const std::string& function_name) {

bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
// no compound predicates push down, so no need to filter
if (_col_preds_except_leafnode_of_andnode.size() == 0) {
if (_col_preds_except_leafnode_of_andnode.empty() && compound_func_exprs.empty()) {
return false;
}
for (auto pred : _col_preds_except_leafnode_of_andnode) {
Expand All @@ -885,6 +894,14 @@ bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
return false;
}
}
for (const auto& func_expr_pair : compound_func_exprs) {
const auto& expr = func_expr_pair.first;
std::string pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX +
std::to_string(reinterpret_cast<uintptr_t>(expr.get()));
if (!_rowid_result_for_index.contains(pred_result_sign)) {
return false;
}
}
return true;
}

Expand Down Expand Up @@ -992,6 +1009,16 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
}
}

for (const auto& func_expr_pair : compound_func_exprs) {
const auto& expr = func_expr_pair.first;
const auto& expr_ctx = func_expr_pair.second;
auto result = std::make_shared<roaring::Roaring>();
RETURN_IF_ERROR(execute_func_expr(expr, expr_ctx, result));
std::string result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX +
std::to_string(reinterpret_cast<uintptr_t>(expr.get()));
_rowid_result_for_index.emplace(result_sign, std::make_pair(true, *result));
}

return Status::OK();
}

Expand Down Expand Up @@ -1246,18 +1273,6 @@ Status SegmentIterator::_apply_inverted_index() {
std::vector<ColumnPredicate*> remaining_predicates;
std::set<const ColumnPredicate*> no_need_to_pass_column_predicate_set;

// TODO:Comment out this code before introducing range query functionality
/*for (const auto& entry : _opts.col_id_to_predicates) {
ColumnId column_id = entry.first;
auto pred = entry.second;
bool continue_apply = true;
RETURN_IF_ERROR(_apply_inverted_index_on_block_column_predicate(
column_id, pred.get(), no_need_to_pass_column_predicate_set, &continue_apply));
if (!continue_apply) {
break;
}
}*/

for (auto pred : _col_predicates) {
if (no_need_to_pass_column_predicate_set.count(pred) > 0) {
continue;
Expand Down Expand Up @@ -1293,6 +1308,23 @@ Status SegmentIterator::_apply_inverted_index() {
}
}

for (const auto& func_expr_pair : no_compound_func_exprs) {
const auto& expr = func_expr_pair.first;
const auto& expr_ctx = func_expr_pair.second;
auto result = std::make_shared<roaring::Roaring>();
RETURN_IF_ERROR(execute_func_expr(expr, expr_ctx, result));
_row_bitmap &= *result;
for (auto it = _remaining_conjunct_roots.begin(); it != _remaining_conjunct_roots.end();) {
if (*it == expr) {
std::erase_if(_common_expr_ctxs_push_down,
[&it](const auto& iter) { return iter->root() == *it; });
it = _remaining_conjunct_roots.erase(it);
} else {
++it;
}
}
}

// add a switch for inverted index filter
if (_opts.runtime_state &&
_opts.runtime_state->enable_common_expr_pushdown_for_inverted_index()) {
Expand Down Expand Up @@ -1427,6 +1459,18 @@ Status SegmentIterator::_init_inverted_index_iterators() {
return Status::OK();
}

Status SegmentIterator::_init_inverted_index_iterators(ColumnId cid) {
if (_inverted_index_iterators[cid] == nullptr) {
return _init_single_inverted_index_iterator.call([&] {
return _segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid)),
_opts, &_inverted_index_iterators[cid]);
});
}
return Status::OK();
}

Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
rowid_t* rowid) {
if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS &&
Expand Down Expand Up @@ -2859,6 +2903,42 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
}
}

void SegmentIterator::_calculate_func_in_remaining_conjunct_root() {
for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) {
const auto& root_expr = root_expr_ctx->root();
if (root_expr == nullptr) {
continue;
}

std::stack<std::pair<vectorized::VExprSPtr, bool>> stack;
stack.emplace(root_expr, false);

while (!stack.empty()) {
const auto& [expr, has_compound_pred] = stack.top();
stack.pop();

bool current_has_compound_pred =
has_compound_pred || (expr->node_type() == TExprNodeType::COMPOUND_PRED);

if (expr->node_type() == TExprNodeType::FUNCTION_CALL &&
expr->can_push_down_to_index()) {
if (current_has_compound_pred) {
compound_func_exprs.emplace_back(expr, root_expr_ctx);
} else {
no_compound_func_exprs.emplace_back(expr, root_expr_ctx);
}
}

const auto& children = expr->children();
for (int32_t i = children.size() - 1; i >= 0; --i) {
if (!children[i]->children().empty()) {
stack.emplace(children[i], current_has_compound_pred);
}
}
}
}
}

bool SegmentIterator::_no_need_read_key_data(ColumnId cid, vectorized::MutableColumnPtr& column,
size_t nrows_read) {
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
Expand Down Expand Up @@ -2933,5 +3013,23 @@ bool SegmentIterator::_can_opt_topn_reads() {
return all_true;
}

Status SegmentIterator::execute_func_expr(const vectorized::VExprSPtr& expr,
const vectorized::VExprContextSPtr& expr_ctx,
std::shared_ptr<roaring::Roaring>& result) {
const auto& expr0 = expr->get_child(0);
if (!expr0 || expr0->node_type() != TExprNodeType::SLOT_REF) {
return Status::RuntimeError("cannot perform index filtering");
}

FuncExprParams params;
auto slot_expr = std::static_pointer_cast<vectorized::VSlotRef>(expr0);
params._column_id = _schema->column_id(slot_expr->column_id());
params._unique_id = _schema->unique_id(slot_expr->column_id());
params._column_name = _opts.tablet_schema->column(params._column_id).name();
params._segment_iterator = this;

return expr->eval_inverted_index(expr_ctx.get(), params, result);
}

} // namespace segment_v2
} // namespace doris
27 changes: 27 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ struct ColumnPredicateInfo {
int32_t column_id;
};

class SegmentIterator;
struct FuncExprParams {
ColumnId _column_id = 0;
uint32_t _unique_id = 0;
std::string _column_name;
SegmentIterator* _segment_iterator = nullptr;
};

class SegmentIterator : public RowwiseIterator {
public:
SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema);
Expand All @@ -123,6 +131,8 @@ class SegmentIterator : public RowwiseIterator {
std::vector<RowLocation>* block_row_locations) override;

const Schema& schema() const override { return *_schema; }
Segment& segment() { return *_segment; }
StorageReadOptions& storage_read_options() { return _opts; }
bool is_lazy_materialization_read() const override { return _lazy_materialization_read; }
uint64_t data_id() const override { return _segment->id(); }
RowsetId rowset_id() const { return _segment->rowset_id(); }
Expand All @@ -142,6 +152,11 @@ class SegmentIterator : public RowwiseIterator {
return updated;
}

std::vector<std::unique_ptr<InvertedIndexIterator>>& inverted_index_iterators() {
return _inverted_index_iterators;
}
[[nodiscard]] Status _init_inverted_index_iterators(ColumnId cid);

private:
Status _next_batch_internal(vectorized::Block* block);

Expand Down Expand Up @@ -310,6 +325,7 @@ class SegmentIterator : public RowwiseIterator {
bool _check_column_pred_all_push_down(const std::string& column_name, bool in_compound = false,
bool is_match = false);
void _calculate_pred_in_remaining_conjunct_root(const vectorized::VExprSPtr& expr);
void _calculate_func_in_remaining_conjunct_root();

// todo(wb) remove this method after RowCursor is removed
void _convert_rowcursor_to_short_key(const RowCursor& key, size_t num_keys) {
Expand Down Expand Up @@ -393,6 +409,10 @@ class SegmentIterator : public RowwiseIterator {
bool _check_all_predicates_passed_inverted_index_for_column(ColumnId cid,
bool default_return = false);

Status execute_func_expr(const vectorized::VExprSPtr& expr,
const vectorized::VExprContextSPtr& expr_ctx,
std::shared_ptr<roaring::Roaring>& result);

class BitmapRangeIterator;
class BackwardBitmapRangeIterator;

Expand Down Expand Up @@ -459,6 +479,11 @@ class SegmentIterator : public RowwiseIterator {
// make a copy of `_opts.column_predicates` in order to make local changes
std::vector<ColumnPredicate*> _col_predicates;
std::vector<ColumnPredicate*> _col_preds_except_leafnode_of_andnode;

using FuncExprPair = std::pair<vectorized::VExprSPtr, vectorized::VExprContextSPtr>;
std::vector<FuncExprPair> no_compound_func_exprs;
std::vector<FuncExprPair> compound_func_exprs;

vectorized::VExprContextSPtrs _common_expr_ctxs_push_down;
bool _enable_common_expr_pushdown = false;
std::vector<vectorized::VExprSPtr> _remaining_conjunct_roots;
Expand Down Expand Up @@ -505,6 +530,8 @@ class SegmentIterator : public RowwiseIterator {

std::unordered_map<int, std::unordered_map<std::string, bool>>
_column_predicate_inverted_index_status;

DorisCallOnce<Status> _init_single_inverted_index_iterator;
};

} // namespace segment_v2
Expand Down
25 changes: 23 additions & 2 deletions be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc,
}
VExpr::register_function_context(state, context);
_function_name = _fn.name.function_name;
_can_fast_execute = _function->can_fast_execute() && _children.size() == 2 &&
_children[0]->is_slot_ref() && _children[1]->is_literal();
_can_fast_execute = can_fast_execute();
_prepare_finished = true;
return Status::OK();
}
Expand Down Expand Up @@ -247,4 +246,26 @@ std::string VectorizedFnCall::debug_string(const std::vector<VectorizedFnCall*>&
out << "]";
return out.str();
}

bool VectorizedFnCall::can_push_down_to_index() const {
return _function->can_push_down_to_index();
}

bool VectorizedFnCall::can_fast_execute() const {
auto function_name = _function->get_name();
if (function_name == "eq" || function_name == "ne" || function_name == "lt" ||
function_name == "gt" || function_name == "le" || function_name == "ge") {
if (_children.size() == 2 && _children[0]->is_slot_ref() && _children[1]->is_literal()) {
return true;
}
}
return _function->can_push_down_to_index();
}

Status VectorizedFnCall::eval_inverted_index(VExprContext* context,
segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) {
return _function->eval_inverted_index(context->fn_context(_fn_context_index), params, result);
}

} // namespace doris::vectorized
7 changes: 6 additions & 1 deletion be/src/vec/exprs/vectorized_fn_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,19 @@ class VectorizedFnCall : public VExpr {
}
static std::string debug_string(const std::vector<VectorizedFnCall*>& exprs);

bool can_push_down_to_index() const override;
bool can_fast_execute() const override;
Status eval_inverted_index(VExprContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) override;

protected:
FunctionBasePtr _function;
bool _can_fast_execute = false;
std::string _expr_name;
std::string _function_name;

private:
Status _do_execute(doris::vectorized::VExprContext* context, doris::vectorized::Block* block,
int* result_column_id, std::vector<size_t>& args);
};

} // namespace doris::vectorized
Loading

0 comments on commit a926b4c

Please sign in to comment.