Skip to content

Commit

Permalink
Refactor physical match (#1285)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Lots of check which should be done in query bounding phase, is done in
query execution phase. This need to refactor.

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored Jun 4, 2024
1 parent 863f785 commit f53d1dc
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 120 deletions.
11 changes: 9 additions & 2 deletions src/executor/operator/physical_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

module physical_flush;

import stl;
import txn;
import query_context;
Expand All @@ -25,8 +27,7 @@ import operator_state;
import logger;
import bg_task;
import third_party;

module physical_flush;
import status;

namespace infinity {

Expand Down Expand Up @@ -65,10 +66,16 @@ void PhysicalFlush::FlushData(QueryContext *query_context, OperatorState *operat

void PhysicalFlush::FlushLog(QueryContext *query_context, OperatorState *operator_state) {
// Generate the result
Status status = Status::NotSupport("Flush log");
LOG_ERROR(status.message());
RecoverableError(status);
}

void PhysicalFlush::FlushBuffer(QueryContext *query_context, OperatorState *operator_state) {
// Generate the result
Status status = Status::NotSupport("Flush buffer");
LOG_ERROR(status.message());
RecoverableError(status);
}

} // namespace infinity
144 changes: 65 additions & 79 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
module;

#include <cassert>
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -50,7 +49,7 @@ import logical_type;
import search_options;
import status;
import index_defines;
import search_driver;

import query_node;
import query_builder;
import doc_iterator;
Expand Down Expand Up @@ -537,8 +536,8 @@ struct FilterQueryNode final : public QueryNode {
return MakeUnique<FilterIterator<DocIterator>>(common_query_filter_, std::move(search_iter));
}
std::unique_ptr<EarlyTerminateIterator>
CreateEarlyTerminateSearch(const TableEntry *table_entry, IndexReader &index_reader, Scorer *scorer, EarlyTermAlg early_term_alg) const override {
auto search_iter = query_tree_->CreateEarlyTerminateSearch(table_entry, index_reader, scorer, early_term_alg);
CreateEarlyTerminateSearch(const TableEntry *table_entry, IndexReader &index_reader, Scorer *scorer, EarlyTermAlgo early_term_algo) const override {
auto search_iter = query_tree_->CreateEarlyTerminateSearch(table_entry, index_reader, scorer, early_term_algo);
if (!search_iter) {
return nullptr;
}
Expand Down Expand Up @@ -572,12 +571,12 @@ void ASSERT_FLOAT_EQ(float bar, u32 i, float a, float b) {
void ExecuteFTSearch(UniquePtr<EarlyTerminateIterator> &et_iter,
FullTextScoreResultHeap &result_heap,
u32 &blockmax_loop_cnt,
EarlyTermAlg early_term_alg) {
EarlyTermAlgo early_term_algo) {
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter == nullptr)
return;
switch (early_term_alg) {
case EarlyTermAlg::kBMM: {
switch (early_term_algo) {
case EarlyTermAlgo::kBMM: {
while (true) {
auto [id, et_score] = et_iter->BlockNextWithThreshold(result_heap.GetScoreThreshold());
if (id == INVALID_ROWID) [[unlikely]] {
Expand All @@ -593,7 +592,7 @@ void ExecuteFTSearch(UniquePtr<EarlyTerminateIterator> &et_iter,
}
break;
}
case EarlyTermAlg::kBMW:
case EarlyTermAlgo::kBMW:
default: {
while (true) {
++blockmax_loop_cnt;
Expand All @@ -619,44 +618,36 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
auto execute_start_time = std::chrono::high_resolution_clock::now();
// 1. build QueryNode tree
// 1.1 populate column2analyzer
Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(txn, base_table_ref_);
// Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(base_table_ref_.get());
query_builder.Init(index_reader_);
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
LOG_DEBUG(fmt::format("PhysicalMatch 0: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
const Map<String, String> &column2analyzer = query_builder.GetColumn2Analyzer();

// 1.2 parse options into map, populate default_field
SearchOptions search_ops(match_expr_->options_text_);
const String &default_field = search_ops.options_["default_field"];
const String &block_max_option = search_ops.options_["block_max"];
bool use_ordinary_iter = false;
bool use_block_max_iter = false;
const String &threshold = search_ops.options_["threshold"];
const float begin_threshold = strtof(threshold.c_str(), nullptr);
EarlyTermAlg early_term_alg = EarlyTermAlg::kBMW;
if (block_max_option.empty() or block_max_option == "true" or block_max_option == "bmw") {
use_block_max_iter = true;
early_term_alg = EarlyTermAlg::kBMW;
} else if (block_max_option == "bmm") {
use_block_max_iter = true;
early_term_alg = EarlyTermAlg::kBMM;
} else if (block_max_option == "false") {
use_ordinary_iter = true;
} else if (block_max_option == "compare") {
use_ordinary_iter = true;
use_block_max_iter = true;
} else {
Status status = Status::SyntaxError("block_max option must be empty, true, false or compare");
LOG_ERROR(status.message());
RecoverableError(status);
}
// 1.3 build filter
SearchDriver driver(column2analyzer, default_field);
UniquePtr<QueryNode> query_tree = driver.ParseSingleWithFields(match_expr_->fields_, match_expr_->matching_text_);
if (query_tree == nullptr) {
Status status = Status::ParseMatchExprFailed(match_expr_->fields_, match_expr_->matching_text_);
LOG_ERROR(status.message());
RecoverableError(status);

switch(early_term_algo_) {
case EarlyTermAlgo::kBMM: {
use_block_max_iter = true;
break;
}
case EarlyTermAlgo::kNaive: {
use_ordinary_iter = true;
break;
}
case EarlyTermAlgo::kCompare: {
use_ordinary_iter = true;
use_block_max_iter = true;
break;
}
case EarlyTermAlgo::kBMW:
default: {
use_block_max_iter = true;
break;
}
}

auto finish_parse_query_tree_time = std::chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -692,50 +683,39 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
TimeDurationType blockmax_duration_2 = {};
TimeDurationType blockmax_duration_3 = {};
assert(common_query_filter_);
full_text_query_context.query_tree_ = MakeUnique<FilterQueryNode>(common_query_filter_.get(), std::move(query_tree));
full_text_query_context.query_tree_ = MakeUnique<FilterQueryNode>(common_query_filter_.get(), std::move(query_tree_));

if (use_block_max_iter) {
et_iter = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
et_iter = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter != nullptr && begin_threshold > 0.0f)
et_iter->UpdateScoreThreshold(begin_threshold);
if (et_iter != nullptr && begin_threshold_ > 0.0f)
et_iter->UpdateScoreThreshold(begin_threshold_);
}
if (use_ordinary_iter) {
doc_iterator = query_builder.CreateSearch(full_text_query_context);
}
if (use_block_max_iter and use_ordinary_iter) {
et_iter_2 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
et_iter_3 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
if (et_iter_2 != nullptr && begin_threshold > 0.0f)
et_iter_2->UpdateScoreThreshold(begin_threshold);
if (et_iter_3 != nullptr && begin_threshold > 0.0f)
et_iter_3->UpdateScoreThreshold(begin_threshold);
et_iter_2 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
et_iter_3 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
if (et_iter_2 != nullptr && begin_threshold_ > 0.0f)
et_iter_2->UpdateScoreThreshold(begin_threshold_);
if (et_iter_3 != nullptr && begin_threshold_ > 0.0f)
et_iter_3->UpdateScoreThreshold(begin_threshold_);
}

// 3 full text search
u32 top_n = 0;
if (auto iter_n_option = search_ops.options_.find("topn"); iter_n_option != search_ops.options_.end()) {
int top_n_option = std::stoi(iter_n_option->second);
if (top_n_option <= 0) {
Status status = Status::SyntaxError("topn must be a positive integer");
LOG_ERROR(status.message());
RecoverableError(status);
}
top_n = top_n_option;
} else {
top_n = DEFAULT_FULL_TEXT_OPTION_TOP_N;
}

auto finish_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_duration = finish_query_builder_time - finish_parse_query_tree_time;
LOG_DEBUG(fmt::format("PhysicalMatch Part 2: Build Query iterator time: {} ms", query_builder_duration.count()));
if (use_block_max_iter) {
blockmax_score_result = MakeUniqueForOverwrite<float[]>(top_n);
blockmax_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, blockmax_score_result.get(), blockmax_row_id_result.get());
blockmax_score_result = MakeUniqueForOverwrite<float[]>(top_n_);
blockmax_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, blockmax_score_result.get(), blockmax_row_id_result.get());
#ifdef INFINITY_DEBUG
auto blockmax_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt, early_term_alg);
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt, early_term_algo_);
result_heap.Sort();
blockmax_result_count = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
Expand All @@ -746,9 +726,9 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
if (use_ordinary_iter) {
RowID iter_row_id = doc_iterator.get() == nullptr ? INVALID_ROWID : (doc_iterator->PrepareFirstDoc(), doc_iterator->Doc());
if (iter_row_id != INVALID_ROWID) [[likely]] {
ordinary_score_result = MakeUniqueForOverwrite<float[]>(top_n);
ordinary_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, ordinary_score_result.get(), ordinary_row_id_result.get());
ordinary_score_result = MakeUniqueForOverwrite<float[]>(top_n_);
ordinary_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, ordinary_score_result.get(), ordinary_row_id_result.get());
#ifdef INFINITY_DEBUG
auto ordinary_begin_ts = std::chrono::high_resolution_clock::now();
#endif
Expand All @@ -769,32 +749,32 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
}
}
if (use_ordinary_iter and use_block_max_iter) {
blockmax_score_result_2 = MakeUniqueForOverwrite<float[]>(top_n);
blockmax_row_id_result_2 = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, blockmax_score_result_2.get(), blockmax_row_id_result_2.get());
blockmax_score_result_2 = MakeUniqueForOverwrite<float[]>(top_n_);
blockmax_row_id_result_2 = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, blockmax_score_result_2.get(), blockmax_row_id_result_2.get());
#ifdef INFINITY_DEBUG
auto blockmax_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(et_iter_2, result_heap, blockmax_loop_cnt_2, early_term_alg);
ExecuteFTSearch(et_iter_2, result_heap, blockmax_loop_cnt_2, early_term_algo_);
result_heap.Sort();
blockmax_result_count_2 = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
auto blockmax_end_ts = std::chrono::high_resolution_clock::now();
blockmax_duration_2 = blockmax_end_ts - blockmax_begin_ts;
{
auto blockmax_score_result_3 = MakeUniqueForOverwrite<float[]>(top_n);
auto blockmax_row_id_result_3 = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap_3(top_n, blockmax_score_result_3.get(), blockmax_row_id_result_3.get());
auto blockmax_score_result_3 = MakeUniqueForOverwrite<float[]>(top_n_);
auto blockmax_row_id_result_3 = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap_3(top_n_, blockmax_score_result_3.get(), blockmax_row_id_result_3.get());
auto blockmax_begin_ts_3 = std::chrono::high_resolution_clock::now();
u32 blockmax_loop_cnt_3 = 0;
ExecuteFTSearch(et_iter_3, result_heap_3, blockmax_loop_cnt_3, early_term_alg);
ExecuteFTSearch(et_iter_3, result_heap_3, blockmax_loop_cnt_3, early_term_algo_);
result_heap_3.Sort();
auto blockmax_end_ts_3 = std::chrono::high_resolution_clock::now();
if (blockmax_loop_cnt_3 != blockmax_loop_cnt_2) {
assert(false);
}
assert(result_heap_3.GetResultSize() == result_heap.GetResultSize());
for (u32 i = 0; i < top_n; ++i) {
for (u32 i = 0; i < top_n_; ++i) {
assert(blockmax_score_result_2[i] == blockmax_score_result_3[i]);
assert(blockmax_row_id_result_2[i] == blockmax_row_id_result_3[i]);
}
Expand Down Expand Up @@ -915,11 +895,17 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
PhysicalMatch::PhysicalMatch(u64 id,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<MatchExpression> match_expr,
IndexReader index_reader,
UniquePtr<QueryNode>&& query_tree,
float begin_threshold,
EarlyTermAlgo early_term_algo,
u32 top_n,
const SharedPtr<CommonQueryFilter> &common_query_filter,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kMatch, nullptr, nullptr, id, load_metas), table_index_(match_table_index),
base_table_ref_(std::move(base_table_ref)), match_expr_(std::move(match_expr)), common_query_filter_(common_query_filter) {}
base_table_ref_(std::move(base_table_ref)), match_expr_(std::move(match_expr)), index_reader_(index_reader), query_tree_(std::move(query_tree)),
begin_threshold_(begin_threshold), early_term_algo_(early_term_algo), top_n_(top_n), common_query_filter_(common_query_filter) {}

PhysicalMatch::~PhysicalMatch() = default;

Expand Down
13 changes: 13 additions & 0 deletions src/executor/operator/physical_match.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import internal_types;
import data_type;
import common_query_filter;
import logger;
import column_index_reader;
import query_node;
import early_terminate_iterator;

namespace infinity {

Expand All @@ -40,6 +43,11 @@ public:
explicit PhysicalMatch(u64 id,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<MatchExpression> match_expr,
IndexReader index_reader,
UniquePtr<QueryNode>&& query_tree,
float begin_threshold,
EarlyTermAlgo early_term_algo,
u32 top_n,
const SharedPtr<CommonQueryFilter> &common_query_filter,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas);
Expand Down Expand Up @@ -81,6 +89,11 @@ private:
u64 table_index_ = 0;
SharedPtr<BaseTableRef> base_table_ref_;
SharedPtr<MatchExpression> match_expr_;
IndexReader index_reader_;
UniquePtr<QueryNode> query_tree_;
float begin_threshold_;
EarlyTermAlgo early_term_algo_{EarlyTermAlgo::kBMW};
u32 top_n_{1};

// for filter
SharedPtr<CommonQueryFilter> common_query_filter_;
Expand Down
5 changes: 5 additions & 0 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,11 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildMatch(const SharedPtr<LogicalN
return MakeUnique<PhysicalMatch>(logical_match->node_id(),
logical_match->base_table_ref_,
logical_match->match_expr_,
logical_match->index_reader_,
std::move(logical_match->query_tree_),
logical_match->begin_threshold_,
logical_match->early_term_algo_,
logical_match->top_n_,
logical_match->common_query_filter_,
logical_match->TableIndex(),
logical_operator->load_metas());
Expand Down
Loading

0 comments on commit f53d1dc

Please sign in to comment.