From 60cab5a478621b6c189660ad196a0e22328fd8a7 Mon Sep 17 00:00:00 2001 From: zzzxl1993 <474696115@qq.com> Date: Mon, 8 Jul 2024 11:08:34 +0800 Subject: [PATCH] [opt](inverted index) mow support need_read_key_data --- .../rowset/segment_v2/segment_iterator.cpp | 52 +++++++- be/src/vec/exprs/vexpr.cpp | 2 + .../test_all_index_hit_fault_injection.out | 28 ++++ .../test_topn_fault_injection.out | 25 ++++ .../test_all_index_hit_fault_injection.groovy | 122 ++++++++++++++++++ .../test_topn_fault_injection.groovy | 117 +++++++++++++++++ 6 files changed, 339 insertions(+), 7 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_all_index_hit_fault_injection.out create mode 100644 regression-test/data/fault_injection_p0/test_topn_fault_injection.out create mode 100644 regression-test/suites/fault_injection_p0/test_all_index_hit_fault_injection.groovy create mode 100644 regression-test/suites/fault_injection_p0/test_topn_fault_injection.groovy diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 6a66ae6fc146b38..71499cabb82b458 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2026,11 +2026,12 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32 auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default( "segment_iterator._read_columns_by_index", "column_name", ""); if (debug_col_name.empty()) { - return Status::Error("{} does not need to read data"); + return Status::Error("does not need to read data"); } auto col_name = _opts.tablet_schema->column(cid).name(); if (debug_col_name.find(col_name) != std::string::npos) { - return Status::Error("{} does not need to read data"); + return Status::Error("does not need to read data, {}", + debug_col_name); } }) @@ -2225,9 +2226,27 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector& read_colu } for (auto cid : read_column_ids) { - if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) { + auto& colunm = (*mutable_columns)[cid]; + if (_no_need_read_key_data(cid, colunm, select_size)) { continue; } + if (_prune_column(cid, colunm, true, select_size)) { + continue; + } + + DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", { + auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default( + "segment_iterator._read_columns_by_index", "column_name", ""); + if (debug_col_name.empty()) { + return Status::Error("does not need to read data"); + } + auto col_name = _opts.tablet_schema->column(cid).name(); + if (debug_col_name.find(col_name) != std::string::npos) { + return Status::Error("does not need to read data, {}", + debug_col_name); + } + }) + RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, _current_return_columns[cid])); } @@ -2385,6 +2404,13 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { nrows_read_limit = std::min(static_cast(_opts.topn_limit), nrows_read_limit); } + DBUG_EXECUTE_IF("segment_iterator.topn_opt", { + if (nrows_read_limit != 1) { + return Status::Error("topn opt execute failed: {}", + nrows_read_limit); + } + }) + RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit)); _converted_column_ids.assign(_schema->columns().size(), 0); @@ -2837,7 +2863,10 @@ bool SegmentIterator::_no_need_read_key_data(ColumnId cid, vectorized::MutableCo if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) { return false; } - if (_opts.tablet_schema->keys_type() != KeysType::DUP_KEYS) { + + if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS || + (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS && + _opts.enable_unique_key_merge_on_write)))) { return false; } @@ -2895,11 +2924,20 @@ bool SegmentIterator::_can_opt_topn_reads() const { return false; } - if (!_col_predicates.empty() || !_col_preds_except_leafnode_of_andnode.empty()) { - return false; + std::set cids; + for (auto* pred : _col_predicates) { + cids.insert(pred->column_id()); + } + for (auto* pred : _col_preds_except_leafnode_of_andnode) { + cids.insert(pred->column_id()); } - return true; + uint32_t delete_sign_idx = _opts.tablet_schema->delete_sign_idx(); + bool result = std::ranges::all_of(cids.begin(), cids.end(), [delete_sign_idx](auto cid) { + return cid == delete_sign_idx; + }); + + return result; } } // namespace segment_v2 diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 64a4adfa6beeda9..bb6e48f60842cd5 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -604,6 +604,8 @@ bool VExpr::fast_execute(Block& block, const ColumnNumbers& arguments, size_t re size_t input_rows_count, const std::string& function_name) { std::string result_column_name = gen_predicate_result_sign(block, arguments, function_name); if (!block.has(result_column_name)) { + DBUG_EXECUTE_IF("segment_iterator.fast_execute", + { return Status::Error("fast_execute failed"); }) return false; } diff --git a/regression-test/data/fault_injection_p0/test_all_index_hit_fault_injection.out b/regression-test/data/fault_injection_p0/test_all_index_hit_fault_injection.out new file mode 100644 index 000000000000000..479263ef899a0ba --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_all_index_hit_fault_injection.out @@ -0,0 +1,28 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +120 + +-- !sql -- +41 + +-- !sql -- +53 + +-- !sql -- +55 + +-- !sql -- +97 + +-- !sql -- +210 + +-- !sql -- +48 + +-- !sql -- +61 + +-- !sql -- +92 + diff --git a/regression-test/data/fault_injection_p0/test_topn_fault_injection.out b/regression-test/data/fault_injection_p0/test_topn_fault_injection.out new file mode 100644 index 000000000000000..9cc3f4146b5bed0 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_topn_fault_injection.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +893964617 40.135.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964617 40.135.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + +-- !sql -- +893964672 26.1.0.0 GET /images/hm_bg.jpg HTTP/1.0 200 24736 + diff --git a/regression-test/suites/fault_injection_p0/test_all_index_hit_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_all_index_hit_fault_injection.groovy new file mode 100644 index 000000000000000..3bd884a5d87f4d2 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_all_index_hit_fault_injection.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_all_index_hit_fault_injection", "nonConcurrent") { + // define a sql table + def indexTbName1 = "test_all_index_hit_fault_injection_1" + def indexTbName2 = "test_all_index_hit_fault_injection_2" + + sql "DROP TABLE IF EXISTS ${indexTbName1}" + sql """ + CREATE TABLE ${indexTbName1} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + sql "DROP TABLE IF EXISTS ${indexTbName2}" + sql """ + CREATE TABLE ${indexTbName2} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + UNIQUE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true" + ); + """ + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + load_httplogs_data.call(indexTbName1, 'test_all_index_hit_fault_injection_1', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(indexTbName2, 'test_all_index_hit_fault_injection_2', 'true', 'json', 'documents-1000.json') + + sql "sync" + + try { + GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._read_columns_by_index", [column_name: "clientip,request"]) + GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.fast_execute") + + qt_sql """ select count() from ${indexTbName1} where (request match_phrase 'hm'); """ + qt_sql """ select count() from ${indexTbName1} where (request match_phrase 'hm' and clientip = '126.1.0.0'); """ + qt_sql """ select count() from ${indexTbName1} where (request match_phrase 'hm' and clientip = '126.1.0.0') or (request match_phrase 'bg' and clientip = '201.0.0.0'); """ + qt_sql """ select count() from ${indexTbName1} where (request match_phrase 'hm' and clientip = '126.1.0.0' or clientip = '247.37.0.0') or (request match_phrase 'bg' and clientip = '201.0.0.0' or clientip = '232.0.0.0'); """ + qt_sql """ select count() from ${indexTbName1} where (request match_phrase 'hm' and clientip in ('126.1.0.0', '247.37.0.0')) or (request match_phrase 'bg' and clientip in ('201.0.0.0', '232.0.0.0')); """ + + qt_sql """ select count() from ${indexTbName2} where (request match_phrase 'hm'); """ + qt_sql """ select count() from ${indexTbName2} where (request match_phrase 'hm' and clientip = '126.1.0.0'); """ + qt_sql """ select count() from ${indexTbName2} where (request match_phrase 'hm' and clientip = '126.1.0.0') or (request match_phrase 'bg' and clientip = '201.0.0.0'); """ + qt_sql """ select count() from ${indexTbName2} where (request match_phrase 'hm' and clientip = '126.1.0.0' or clientip = '247.37.0.0') or (request match_phrase 'bg' and clientip = '201.0.0.0' or clientip = '232.0.0.0'); """ + qt_sql """ select count() from ${indexTbName2} where (request match_phrase 'hm' and clientip in ('126.1.0.0', '247.37.0.0')) or (request match_phrase 'bg' and clientip in ('201.0.0.0', '232.0.0.0')); """ + + } finally { + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator._read_columns_by_index") + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator.fast_execute") + } + } finally { + } +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/test_topn_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_topn_fault_injection.groovy new file mode 100644 index 000000000000000..08a1ef0164deeec --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_topn_fault_injection.groovy @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_topn_fault_injection", "nonConcurrent") { + // define a sql table + def indexTbName1 = "test_topn_fault_injection1" + def indexTbName2 = "test_topn_fault_injection2" + + sql "DROP TABLE IF EXISTS ${indexTbName1}" + sql """ + CREATE TABLE ${indexTbName1} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + UNIQUE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true" + ); + """ + + sql "DROP TABLE IF EXISTS ${indexTbName2}" + sql """ + CREATE TABLE ${indexTbName2} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + load_httplogs_data.call(indexTbName1, 'test_topn_fault_injection1', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(indexTbName2, 'test_topn_fault_injection2', 'true', 'json', 'documents-1000.json') + + sql "sync" + + try { + GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.topn_opt") + + qt_sql """ select * from ${indexTbName1} where (request match_phrase 'hm') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName1} where (request match_phrase 'hm' and clientip match_phrase '1') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName1} where (request match_phrase 'hm' and clientip match_phrase '1') or (request match_phrase 'bg' and clientip match_phrase '2') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName1} where (request match_phrase 'hm' and clientip match_phrase '1' or clientip match_phrase '3') or (request match_phrase 'bg' and clientip match_phrase '2' or clientip match_phrase '4') order by `@timestamp` limit 1; """ + + qt_sql """ select * from ${indexTbName2} where (request match_phrase 'hm') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName2} where (request match_phrase 'hm' and clientip match_phrase '1') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName2} where (request match_phrase 'hm' and clientip match_phrase '1') or (request match_phrase 'bg' and clientip match_phrase '2') order by `@timestamp` limit 1; """ + qt_sql """ select * from ${indexTbName2} where (request match_phrase 'hm' and clientip match_phrase '1' or clientip match_phrase '3') or (request match_phrase 'bg' and clientip match_phrase '2' or clientip match_phrase '4') order by `@timestamp` limit 1; """ + } finally { + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator.topn_opt") + } + } finally { + } +} \ No newline at end of file