Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor physical match #1285

Merged
merged 7 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/executor/operator/physical_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

module physical_flush;

import stl;
import txn;
import query_context;
Expand All @@ -25,8 +27,7 @@ import operator_state;
import logger;
import bg_task;
import third_party;

module physical_flush;
import status;

namespace infinity {

Expand Down Expand Up @@ -65,10 +66,16 @@ void PhysicalFlush::FlushData(QueryContext *query_context, OperatorState *operat

void PhysicalFlush::FlushLog(QueryContext *query_context, OperatorState *operator_state) {
// Generate the result
Status status = Status::NotSupport("Flush log");
LOG_ERROR(status.message());
RecoverableError(status);
}

void PhysicalFlush::FlushBuffer(QueryContext *query_context, OperatorState *operator_state) {
// Generate the result
Status status = Status::NotSupport("Flush buffer");
LOG_ERROR(status.message());
RecoverableError(status);
}

} // namespace infinity
144 changes: 65 additions & 79 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
module;

#include <cassert>
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -50,7 +49,7 @@ import logical_type;
import search_options;
import status;
import index_defines;
import search_driver;

import query_node;
import query_builder;
import doc_iterator;
Expand Down Expand Up @@ -537,8 +536,8 @@ struct FilterQueryNode final : public QueryNode {
return MakeUnique<FilterIterator<DocIterator>>(common_query_filter_, std::move(search_iter));
}
std::unique_ptr<EarlyTerminateIterator>
CreateEarlyTerminateSearch(const TableEntry *table_entry, IndexReader &index_reader, Scorer *scorer, EarlyTermAlg early_term_alg) const override {
auto search_iter = query_tree_->CreateEarlyTerminateSearch(table_entry, index_reader, scorer, early_term_alg);
CreateEarlyTerminateSearch(const TableEntry *table_entry, IndexReader &index_reader, Scorer *scorer, EarlyTermAlgo early_term_algo) const override {
auto search_iter = query_tree_->CreateEarlyTerminateSearch(table_entry, index_reader, scorer, early_term_algo);
if (!search_iter) {
return nullptr;
}
Expand Down Expand Up @@ -572,12 +571,12 @@ void ASSERT_FLOAT_EQ(float bar, u32 i, float a, float b) {
void ExecuteFTSearch(UniquePtr<EarlyTerminateIterator> &et_iter,
FullTextScoreResultHeap &result_heap,
u32 &blockmax_loop_cnt,
EarlyTermAlg early_term_alg) {
EarlyTermAlgo early_term_algo) {
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter == nullptr)
return;
switch (early_term_alg) {
case EarlyTermAlg::kBMM: {
switch (early_term_algo) {
case EarlyTermAlgo::kBMM: {
while (true) {
auto [id, et_score] = et_iter->BlockNextWithThreshold(result_heap.GetScoreThreshold());
if (id == INVALID_ROWID) [[unlikely]] {
Expand All @@ -593,7 +592,7 @@ void ExecuteFTSearch(UniquePtr<EarlyTerminateIterator> &et_iter,
}
break;
}
case EarlyTermAlg::kBMW:
case EarlyTermAlgo::kBMW:
default: {
while (true) {
++blockmax_loop_cnt;
Expand All @@ -619,44 +618,36 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
auto execute_start_time = std::chrono::high_resolution_clock::now();
// 1. build QueryNode tree
// 1.1 populate column2analyzer
Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(txn, base_table_ref_);
// Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(base_table_ref_.get());
query_builder.Init(index_reader_);
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
LOG_DEBUG(fmt::format("PhysicalMatch 0: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
const Map<String, String> &column2analyzer = query_builder.GetColumn2Analyzer();

// 1.2 parse options into map, populate default_field
SearchOptions search_ops(match_expr_->options_text_);
const String &default_field = search_ops.options_["default_field"];
const String &block_max_option = search_ops.options_["block_max"];
bool use_ordinary_iter = false;
bool use_block_max_iter = false;
const String &threshold = search_ops.options_["threshold"];
const float begin_threshold = strtof(threshold.c_str(), nullptr);
EarlyTermAlg early_term_alg = EarlyTermAlg::kBMW;
if (block_max_option.empty() or block_max_option == "true" or block_max_option == "bmw") {
use_block_max_iter = true;
early_term_alg = EarlyTermAlg::kBMW;
} else if (block_max_option == "bmm") {
use_block_max_iter = true;
early_term_alg = EarlyTermAlg::kBMM;
} else if (block_max_option == "false") {
use_ordinary_iter = true;
} else if (block_max_option == "compare") {
use_ordinary_iter = true;
use_block_max_iter = true;
} else {
Status status = Status::SyntaxError("block_max option must be empty, true, false or compare");
LOG_ERROR(status.message());
RecoverableError(status);
}
// 1.3 build filter
SearchDriver driver(column2analyzer, default_field);
UniquePtr<QueryNode> query_tree = driver.ParseSingleWithFields(match_expr_->fields_, match_expr_->matching_text_);
if (query_tree == nullptr) {
Status status = Status::ParseMatchExprFailed(match_expr_->fields_, match_expr_->matching_text_);
LOG_ERROR(status.message());
RecoverableError(status);

switch(early_term_algo_) {
case EarlyTermAlgo::kBMM: {
use_block_max_iter = true;
break;
}
case EarlyTermAlgo::kNaive: {
use_ordinary_iter = true;
break;
}
case EarlyTermAlgo::kCompare: {
use_ordinary_iter = true;
use_block_max_iter = true;
break;
}
case EarlyTermAlgo::kBMW:
default: {
use_block_max_iter = true;
break;
}
}

auto finish_parse_query_tree_time = std::chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -692,50 +683,39 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
TimeDurationType blockmax_duration_2 = {};
TimeDurationType blockmax_duration_3 = {};
assert(common_query_filter_);
full_text_query_context.query_tree_ = MakeUnique<FilterQueryNode>(common_query_filter_.get(), std::move(query_tree));
full_text_query_context.query_tree_ = MakeUnique<FilterQueryNode>(common_query_filter_.get(), std::move(query_tree_));

if (use_block_max_iter) {
et_iter = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
et_iter = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
// et_iter is nullptr if fulltext index is present but there's no data
if (et_iter != nullptr && begin_threshold > 0.0f)
et_iter->UpdateScoreThreshold(begin_threshold);
if (et_iter != nullptr && begin_threshold_ > 0.0f)
et_iter->UpdateScoreThreshold(begin_threshold_);
}
if (use_ordinary_iter) {
doc_iterator = query_builder.CreateSearch(full_text_query_context);
}
if (use_block_max_iter and use_ordinary_iter) {
et_iter_2 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
et_iter_3 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_alg);
if (et_iter_2 != nullptr && begin_threshold > 0.0f)
et_iter_2->UpdateScoreThreshold(begin_threshold);
if (et_iter_3 != nullptr && begin_threshold > 0.0f)
et_iter_3->UpdateScoreThreshold(begin_threshold);
et_iter_2 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
et_iter_3 = query_builder.CreateEarlyTerminateSearch(full_text_query_context, early_term_algo_);
if (et_iter_2 != nullptr && begin_threshold_ > 0.0f)
et_iter_2->UpdateScoreThreshold(begin_threshold_);
if (et_iter_3 != nullptr && begin_threshold_ > 0.0f)
et_iter_3->UpdateScoreThreshold(begin_threshold_);
}

// 3 full text search
u32 top_n = 0;
if (auto iter_n_option = search_ops.options_.find("topn"); iter_n_option != search_ops.options_.end()) {
int top_n_option = std::stoi(iter_n_option->second);
if (top_n_option <= 0) {
Status status = Status::SyntaxError("topn must be a positive integer");
LOG_ERROR(status.message());
RecoverableError(status);
}
top_n = top_n_option;
} else {
top_n = DEFAULT_FULL_TEXT_OPTION_TOP_N;
}

auto finish_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_duration = finish_query_builder_time - finish_parse_query_tree_time;
LOG_DEBUG(fmt::format("PhysicalMatch Part 2: Build Query iterator time: {} ms", query_builder_duration.count()));
if (use_block_max_iter) {
blockmax_score_result = MakeUniqueForOverwrite<float[]>(top_n);
blockmax_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, blockmax_score_result.get(), blockmax_row_id_result.get());
blockmax_score_result = MakeUniqueForOverwrite<float[]>(top_n_);
blockmax_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, blockmax_score_result.get(), blockmax_row_id_result.get());
#ifdef INFINITY_DEBUG
auto blockmax_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt, early_term_alg);
ExecuteFTSearch(et_iter, result_heap, blockmax_loop_cnt, early_term_algo_);
result_heap.Sort();
blockmax_result_count = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
Expand All @@ -746,9 +726,9 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
if (use_ordinary_iter) {
RowID iter_row_id = doc_iterator.get() == nullptr ? INVALID_ROWID : (doc_iterator->PrepareFirstDoc(), doc_iterator->Doc());
if (iter_row_id != INVALID_ROWID) [[likely]] {
ordinary_score_result = MakeUniqueForOverwrite<float[]>(top_n);
ordinary_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, ordinary_score_result.get(), ordinary_row_id_result.get());
ordinary_score_result = MakeUniqueForOverwrite<float[]>(top_n_);
ordinary_row_id_result = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, ordinary_score_result.get(), ordinary_row_id_result.get());
#ifdef INFINITY_DEBUG
auto ordinary_begin_ts = std::chrono::high_resolution_clock::now();
#endif
Expand All @@ -769,32 +749,32 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
}
}
if (use_ordinary_iter and use_block_max_iter) {
blockmax_score_result_2 = MakeUniqueForOverwrite<float[]>(top_n);
blockmax_row_id_result_2 = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap(top_n, blockmax_score_result_2.get(), blockmax_row_id_result_2.get());
blockmax_score_result_2 = MakeUniqueForOverwrite<float[]>(top_n_);
blockmax_row_id_result_2 = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap(top_n_, blockmax_score_result_2.get(), blockmax_row_id_result_2.get());
#ifdef INFINITY_DEBUG
auto blockmax_begin_ts = std::chrono::high_resolution_clock::now();
#endif
ExecuteFTSearch(et_iter_2, result_heap, blockmax_loop_cnt_2, early_term_alg);
ExecuteFTSearch(et_iter_2, result_heap, blockmax_loop_cnt_2, early_term_algo_);
result_heap.Sort();
blockmax_result_count_2 = result_heap.GetResultSize();
#ifdef INFINITY_DEBUG
auto blockmax_end_ts = std::chrono::high_resolution_clock::now();
blockmax_duration_2 = blockmax_end_ts - blockmax_begin_ts;
{
auto blockmax_score_result_3 = MakeUniqueForOverwrite<float[]>(top_n);
auto blockmax_row_id_result_3 = MakeUniqueForOverwrite<RowID[]>(top_n);
FullTextScoreResultHeap result_heap_3(top_n, blockmax_score_result_3.get(), blockmax_row_id_result_3.get());
auto blockmax_score_result_3 = MakeUniqueForOverwrite<float[]>(top_n_);
auto blockmax_row_id_result_3 = MakeUniqueForOverwrite<RowID[]>(top_n_);
FullTextScoreResultHeap result_heap_3(top_n_, blockmax_score_result_3.get(), blockmax_row_id_result_3.get());
auto blockmax_begin_ts_3 = std::chrono::high_resolution_clock::now();
u32 blockmax_loop_cnt_3 = 0;
ExecuteFTSearch(et_iter_3, result_heap_3, blockmax_loop_cnt_3, early_term_alg);
ExecuteFTSearch(et_iter_3, result_heap_3, blockmax_loop_cnt_3, early_term_algo_);
result_heap_3.Sort();
auto blockmax_end_ts_3 = std::chrono::high_resolution_clock::now();
if (blockmax_loop_cnt_3 != blockmax_loop_cnt_2) {
assert(false);
}
assert(result_heap_3.GetResultSize() == result_heap.GetResultSize());
for (u32 i = 0; i < top_n; ++i) {
for (u32 i = 0; i < top_n_; ++i) {
assert(blockmax_score_result_2[i] == blockmax_score_result_3[i]);
assert(blockmax_row_id_result_2[i] == blockmax_row_id_result_3[i]);
}
Expand Down Expand Up @@ -915,11 +895,17 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
PhysicalMatch::PhysicalMatch(u64 id,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<MatchExpression> match_expr,
IndexReader index_reader,
UniquePtr<QueryNode>&& query_tree,
float begin_threshold,
EarlyTermAlgo early_term_algo,
u32 top_n,
const SharedPtr<CommonQueryFilter> &common_query_filter,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kMatch, nullptr, nullptr, id, load_metas), table_index_(match_table_index),
base_table_ref_(std::move(base_table_ref)), match_expr_(std::move(match_expr)), common_query_filter_(common_query_filter) {}
base_table_ref_(std::move(base_table_ref)), match_expr_(std::move(match_expr)), index_reader_(index_reader), query_tree_(std::move(query_tree)),
begin_threshold_(begin_threshold), early_term_algo_(early_term_algo), top_n_(top_n), common_query_filter_(common_query_filter) {}

PhysicalMatch::~PhysicalMatch() = default;

Expand Down
13 changes: 13 additions & 0 deletions src/executor/operator/physical_match.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import internal_types;
import data_type;
import common_query_filter;
import logger;
import column_index_reader;
import query_node;
import early_terminate_iterator;

namespace infinity {

Expand All @@ -40,6 +43,11 @@ public:
explicit PhysicalMatch(u64 id,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<MatchExpression> match_expr,
IndexReader index_reader,
UniquePtr<QueryNode>&& query_tree,
float begin_threshold,
EarlyTermAlgo early_term_algo,
u32 top_n,
const SharedPtr<CommonQueryFilter> &common_query_filter,
u64 match_table_index,
SharedPtr<Vector<LoadMeta>> load_metas);
Expand Down Expand Up @@ -81,6 +89,11 @@ private:
u64 table_index_ = 0;
SharedPtr<BaseTableRef> base_table_ref_;
SharedPtr<MatchExpression> match_expr_;
IndexReader index_reader_;
UniquePtr<QueryNode> query_tree_;
float begin_threshold_;
EarlyTermAlgo early_term_algo_{EarlyTermAlgo::kBMW};
u32 top_n_{1};

// for filter
SharedPtr<CommonQueryFilter> common_query_filter_;
Expand Down
5 changes: 5 additions & 0 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,11 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildMatch(const SharedPtr<LogicalN
return MakeUnique<PhysicalMatch>(logical_match->node_id(),
logical_match->base_table_ref_,
logical_match->match_expr_,
logical_match->index_reader_,
std::move(logical_match->query_tree_),
logical_match->begin_threshold_,
logical_match->early_term_algo_,
logical_match->top_n_,
logical_match->common_query_filter_,
logical_match->TableIndex(),
logical_operator->load_metas());
Expand Down
Loading
Loading