From b0fd22faa22b0602b46f5f23c9acfb76a391e2c6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 29 Mar 2022 17:51:31 +0800 Subject: [PATCH 01/12] fix empty segment cannot be merged --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 75b5a700c60..0b101330858 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1268,7 +1268,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const auto it = segments.find(segment->getRowKeyRange().getEnd()); // check legality if (it == segments.end()) - return {}; auto & cur_segment = it->second; if (cur_segment.get() != segment.get()) @@ -1604,6 +1603,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) // meet empty segment, try merge it if (segment_snap->getRows() == 0) { + // release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status. + segment_snap = nullptr; checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); continue; } From 27a5cf462cb1aaf91414fab0c36f42c86618f69b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 29 Mar 2022 17:51:48 +0800 Subject: [PATCH 02/12] avoid write index data for empty dmfile --- dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp | 7 +++++-- dbms/src/Storages/DeltaMerge/File/DMFileWriter.h | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 77aca915957..a4a5b49107d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -121,6 +121,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) void DMFileWriter::write(const Block & block, const BlockProperty & block_property) { + write_data = true; DMFile::PackStat stat; stat.rows = block.rows(); stat.not_clean = block_property.not_clean_rows; @@ -359,7 +360,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) dmfile->encryptionIndexPath(stream_name), false, write_limiter); - stream->minmaxes->write(*type, buf); + if (write_data) + stream->minmaxes->write(*type, buf); buf.sync(); bytes_written += buf.getMaterializedBytes(); } @@ -372,7 +374,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) write_limiter, dmfile->configuration->getChecksumAlgorithm(), dmfile->configuration->getChecksumFrameLength()); - stream->minmaxes->write(*type, *buf); + if (write_data) + stream->minmaxes->write(*type, *buf); buf->sync(); bytes_written += buf->getMaterializedBytes(); #ifndef NDEBUG diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 2865630eff2..2a0b43d589e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -249,6 +249,9 @@ class DMFileWriter FileProviderPtr file_provider; WriteLimiterPtr write_limiter; + + // use to avoid write index data for empty file + bool write_data = false; }; } // namespace DM From 8a103dc228b24ccf114bf1dd1f2622a0397581c6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 13:46:29 +0800 Subject: [PATCH 03/12] fix read empty index file --- dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h | 10 ++++++---- dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp | 6 +++--- dbms/src/Storages/DeltaMerge/File/DMFileWriter.h | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index c81954efbc7..31c01846ff1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -219,13 +219,16 @@ class DMFilePackFilter const auto file_name_base = DMFile::getFileNameBase(col_id); auto load = [&]() { + auto index_file_size = dmfile->colIndexSize(file_name_base); + if (index_file_size == 0) + return std::make_shared(*type); if (!dmfile->configuration) { auto index_buf = ReadBufferFromFileProvider( file_provider, dmfile->colIndexPath(file_name_base), dmfile->encryptionIndexPath(file_name_base), - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), dmfile->colIndexSize(file_name_base)), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), read_limiter); index_buf.seek(dmfile->colIndexOffset(file_name_base)); return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base)); @@ -240,11 +243,10 @@ class DMFilePackFilter dmfile->configuration->getChecksumAlgorithm(), dmfile->configuration->getChecksumFrameLength()); index_buf->seek(dmfile->colIndexOffset(file_name_base)); - auto file_size = dmfile->colIndexSize(file_name_base); auto header_size = dmfile->configuration->getChecksumHeaderLength(); auto frame_total_size = dmfile->configuration->getChecksumFrameLength(); - auto frame_count = file_size / frame_total_size + (file_size % frame_total_size != 0); - return MinMaxIndex::read(*type, *index_buf, file_size - header_size * frame_count); + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); } }; MinMaxIndexPtr minmax_index; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index a4a5b49107d..a2bbf432e48 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -121,7 +121,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) void DMFileWriter::write(const Block & block, const BlockProperty & block_property) { - write_data = true; + is_empty_file = false; DMFile::PackStat stat; stat.rows = block.rows(); stat.not_clean = block_property.not_clean_rows; @@ -360,7 +360,7 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) dmfile->encryptionIndexPath(stream_name), false, write_limiter); - if (write_data) + if (!is_empty_file) stream->minmaxes->write(*type, buf); buf.sync(); bytes_written += buf.getMaterializedBytes(); @@ -374,7 +374,7 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) write_limiter, dmfile->configuration->getChecksumAlgorithm(), dmfile->configuration->getChecksumFrameLength()); - if (write_data) + if (!is_empty_file) stream->minmaxes->write(*type, *buf); buf->sync(); bytes_written += buf->getMaterializedBytes(); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 2a0b43d589e..b9868444162 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -251,7 +251,7 @@ class DMFileWriter WriteLimiterPtr write_limiter; // use to avoid write index data for empty file - bool write_data = false; + bool is_empty_file = true; }; } // namespace DM From 9278892e08dddfd6cc20274df269fd9c62a9f211 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 13:56:06 +0800 Subject: [PATCH 04/12] fix unnecessary wait when gc can be triggered by a lot of delete range --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0b101330858..0a00d258222 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1594,7 +1594,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) } assert(segment != nullptr); - if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr) + if (segment->hasAbandoned() || segment_snap == nullptr) continue; const auto segment_id = segment->segmentId(); @@ -1609,33 +1609,39 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) continue; } - // Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not. - // Because after we calculate StableProperty and compare it with this gc_safe_point, - // there is no need to recheck it again using the same gc_safe_point. - // On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process, - // it's still worth to wait another gc_safe_point to check this segment again. - segment->setLastCheckGCSafePoint(gc_safe_point); - dm_context->min_version = gc_safe_point; - - // calculate StableProperty if needed - if (!segment->getStable()->isStablePropertyCached()) - segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); - try { // Check whether we should apply gc on this segment - const bool should_compact - = GC::shouldCompactStable( - segment, - gc_safe_point, - global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, - log) - || GC::shouldCompactDeltaWithStable( - *dm_context, - segment_snap, - segment_range, - global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, - log); + bool should_compact = false; + if (GC::shouldCompactDeltaWithStable( + *dm_context, + segment_snap, + segment_range, + global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, + log)) + { + should_compact = true; + } + else if (segment->getLastCheckGCSafePoint() < gc_safe_point) + { + // Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not. + // Because after we calculate StableProperty and compare it with this gc_safe_point, + // there is no need to recheck it again using the same gc_safe_point. + // On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process, + // it's still worth to wait another gc_safe_point to check this segment again. + segment->setLastCheckGCSafePoint(gc_safe_point); + dm_context->min_version = gc_safe_point; + + // calculate StableProperty if needed + if (!segment->getStable()->isStablePropertyCached()) + segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); + + should_compact = GC::shouldCompactStable( + segment, + gc_safe_point, + global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, + log); + } bool finish_gc_on_segment = false; if (should_compact) { From 651dc7a9155c4b46b2bd76df70ef703d45d8d714 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 14:13:37 +0800 Subject: [PATCH 05/12] fix static analysis --- dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index a2bbf432e48..3701e9c6cca 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -129,17 +129,17 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column; - const ColumnVector * del_mark = !del_mark_column ? nullptr : (const ColumnVector *)del_mark_column.get(); + const ColumnVector * del_mark = !del_mark_column ? nullptr : static_cast *>(del_mark_column.get()); for (auto & cd : write_columns) { - auto & col = getByColumnId(block, cd.id).column; + const auto & col = getByColumnId(block, cd.id).column; writeColumn(cd.id, *cd.type, *col, del_mark); if (cd.id == VERSION_COLUMN_ID) stat.first_version = col->get64(0); else if (cd.id == TAG_COLUMN_ID) - stat.first_tag = (UInt8)(col->get64(0)); + stat.first_tag = static_cast(col->get64(0)); } if (!options.flags.isSingleFile()) From 5ff75b79d575645fe7e09e93ab81823f12a6b7f3 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 17:12:46 +0800 Subject: [PATCH 06/12] avoid pollute index cache for non read request --- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 1 + .../DeltaMerge/File/DMFileBlockInputStream.cpp | 1 + .../Storages/DeltaMerge/File/DMFilePackFilter.h | 17 +++++++++++++---- .../Storages/DeltaMerge/StableValueSpace.cpp | 13 +++++++------ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index f1e98b680ce..a3facf85042 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -37,6 +37,7 @@ void ColumnFileBig::calculateStat(const DMContext & context) auto pack_filter = DMFilePackFilter::loadFrom( file, index_cache, + false, {segment_range}, EMPTY_FILTER, {}, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index b062b391e62..23f5693a112 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -34,6 +34,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( dmfile, index_cache, + true, rowkey_ranges, rs_filter, read_packs, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 31c01846ff1..77c0b6868be 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -43,6 +43,7 @@ class DMFilePackFilter static DMFilePackFilter loadFrom( const DMFilePtr & dmfile, const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, const RowKeyRanges & rowkey_ranges, const RSOperatorPtr & filter, const IdSetPtr & read_packs, @@ -50,7 +51,7 @@ class DMFilePackFilter const ReadLimiterPtr & read_limiter, const DB::LoggerPtr & tracing_logger) { - auto pack_filter = DMFilePackFilter(dmfile, index_cache, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger); + auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger); pack_filter.init(); return pack_filter; } @@ -102,6 +103,7 @@ class DMFilePackFilter private: DMFilePackFilter(const DMFilePtr & dmfile_, const MinMaxIndexCachePtr & index_cache_, + bool set_cache_if_miss_, const RowKeyRanges & rowkey_ranges_, // filter by handle range const RSOperatorPtr & filter_, // filter by push down where clause const IdSetPtr & read_packs_, // filter by pack index @@ -110,6 +112,7 @@ class DMFilePackFilter const DB::LoggerPtr & tracing_logger) : dmfile(dmfile_) , index_cache(index_cache_) + , set_cache_if_miss(set_cache_if_miss_) , rowkey_ranges(rowkey_ranges_) , filter(filter_) , read_packs(read_packs_) @@ -212,6 +215,7 @@ class DMFilePackFilter const DMFilePtr & dmfile, const FileProviderPtr & file_provider, const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, ColId col_id, const ReadLimiterPtr & read_limiter) { @@ -250,13 +254,17 @@ class DMFilePackFilter } }; MinMaxIndexPtr minmax_index; - if (index_cache) + if (index_cache && set_cache_if_miss) { minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load); } else { - minmax_index = load(); + // try load from the cache first + if (index_cache) + minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base)); + if (!minmax_index) + minmax_index = load(); } indexes.emplace(col_id, RSIndex(type, minmax_index)); } @@ -269,12 +277,13 @@ class DMFilePackFilter if (!dmfile->isColIndexExist(col_id)) return; - loadIndex(param.indexes, dmfile, file_provider, index_cache, col_id, read_limiter); + loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter); } private: DMFilePtr dmfile; MinMaxIndexCachePtr index_cache; + bool set_cache_if_miss; RowKeyRanges rowkey_ranges; RSOperatorPtr filter; IdSetPtr read_packs; diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index e9d7d788a89..962d1e60d2f 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -54,6 +54,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang auto pack_filter = DMFilePackFilter::loadFrom( file, index_cache, + true, {range}, EMPTY_FILTER, {}, @@ -236,6 +237,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const auto pack_filter = DMFilePackFilter::loadFrom( file, context.db_context.getGlobalContext().getMinMaxIndexCache(), + false, {rowkey_range}, EMPTY_FILTER, {}, @@ -350,16 +352,15 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & size_t match_packs = 0; size_t total_match_rows = 0; size_t total_match_bytes = 0; - // Usually, this method will be called for some "cold" key ranges. Loading the index - // into cache may pollute the cache and make the hot index cache invalid. Set the - // index cache to nullptr so that the cache won't be polluted. - // TODO: We can use the cache if the index happens to exist in the cache, but - // don't refill the cache if the index does not exist. + // Usually, this method will be called for some "cold" key ranges. + // Loading the index into cache may pollute the cache and make the hot index cache invalid. + // So don't refill the cache if the index does not exist. for (auto & f : stable->files) { auto filter = DMFilePackFilter::loadFrom( f, - nullptr, + context.db_context.getGlobalContext().getMinMaxIndexCache(), + false, {range}, RSOperatorPtr{}, IdSetPtr{}, From 1316ade7d4f71615e79bb3ad267dec36026076f1 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 17:19:23 +0800 Subject: [PATCH 07/12] avoid rough set filter test to be influenced by background gc thread --- dbms/src/Debug/dbgFuncMisc.cpp | 66 ++++++++++++++++--- .../misc/timestamp_rough_set_filter.test | 18 ++--- .../expr/timestamp_filter.test | 22 +++---- 3 files changed, 77 insertions(+), 29 deletions(-) diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index ed0739879cf..1092a7c666f 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -22,29 +22,77 @@ namespace DB { +inline size_t getThreadIdForLog(const String & line) +{ + std::cout << "getThreadIdForLog " << line << std::endl; + auto sub_line = line.substr(line.find("thread_id=")); + std::regex rx(R"((0|[1-9][0-9]*))"); + std::smatch m; + if (regex_search(sub_line, m, rx)) + return std::stoi(m[1]); + else + return 0; +} + +// Usage example: +// The first argument is the key you want to search. +// For example, we want to search the key 'RSFilter exclude rate' in log file, and get the value following it. +// So we can use it as the first argument. +// But many kind of thread can print this keyword, +// so we can use the second argument to specify a keyword that may just be printed by a specific kind of thread. +// Here we use 'Rough set filter' to specify we just want to search read thread. +// And the complete command is the following: +// DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') +// TODO: this is still a too hack way to do test, but cannot think a better way now. void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Printer output) { - if (args.size() < 1) - throw Exception("Args not matched, should be: key", ErrorCodes::BAD_ARGUMENTS); + if (args.size() < 2) + throw Exception("Args not matched, should be: key, thread_hint", ErrorCodes::BAD_ARGUMENTS); String key = safeGet(typeid_cast(*args[0]).value); + // the candidate line must be printed by a thread which also print a line contains `thread_hint` + String thread_hint = safeGet(typeid_cast(*args[1]).value); auto log_path = context.getConfigRef().getString("logger.log"); std::ifstream file(log_path); - std::vector line_candidates; - String line; - while (std::getline(file, line)) + // get the lines containing `thread_hint` and `key` + std::vector thread_hint_line_candidates; + std::vector key_line_candidates; { - if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos)) - line_candidates.emplace_back(line); + String line; + while (std::getline(file, line)) + { + if ((line.find(thread_hint) != String::npos) && (line.find("DBGInvoke") == String::npos)) + thread_hint_line_candidates.emplace_back(line); + else if ((line.find(key) != String::npos) && (line.find("DBGInvoke") == String::npos)) + key_line_candidates.emplace_back(line); + } } - if (line_candidates.empty()) + // get target thread id + if (thread_hint_line_candidates.empty() || key_line_candidates.empty()) { output("Invalid"); return; } - auto & target_line = line_candidates.back(); + size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back()); + std::cout << "target_thread_id " << target_thread_id << std::endl; + if (target_thread_id == 0) + { + output("Invalid"); + return; + } + String target_line; + for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++) + { + if (getThreadIdForLog(*iter) == target_thread_id) + { + target_line = *iter; + break; + } + } + // try parse the first number following the key auto sub_line = target_line.substr(target_line.find(key)); + std::cout << "target_line " << sub_line << std::endl; std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))"); std::smatch m; if (regex_search(sub_line, m, rx)) diff --git a/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test b/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test index 799d770358e..d3961f3c2fb 100644 --- a/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test +++ b/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test @@ -84,14 +84,14 @@ │ 50 │ 2019-06-10 09:00:00.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ => DBGInvoke dag('select * from default.test where col_2 < cast_string_datetime(\'2019-06-10 04:00:00.00000\')',4,'encode_type:default,tz_name:America/Chicago') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -101,14 +101,14 @@ │ 50 │ 2019-06-10 09:00:00.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ => DBGInvoke dag('select * from default.test where col_2 > cast_string_datetime(\'2019-06-13 12:00:01.00000\')') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -118,14 +118,14 @@ │ 55 │ 2019-06-13 12:00:01.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ => DBGInvoke dag('select * from default.test where col_2 > cast_string_datetime(\'2019-06-13 20:00:01.00000\')',4,'encode_type:default,tz_offset:28800') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -135,14 +135,14 @@ │ 55 │ 2019-06-13 12:00:01.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ => DBGInvoke dag('select * from default.test where col_2 > cast_string_datetime(\'2019-06-13 07:00:01.00000\')',4,'encode_type:default,tz_name:America/Chicago') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -152,7 +152,7 @@ │ 55 │ 2019-06-13 12:00:01.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ diff --git a/tests/fullstack-test-dt/expr/timestamp_filter.test b/tests/fullstack-test-dt/expr/timestamp_filter.test index a80460d8c30..fc3f5c59ca1 100644 --- a/tests/fullstack-test-dt/expr/timestamp_filter.test +++ b/tests/fullstack-test-dt/expr/timestamp_filter.test @@ -44,7 +44,7 @@ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash | 1 | 2000-01-01 10:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ @@ -57,26 +57,26 @@ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash | 1 | 2000-01-01 10:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash'; select * from test.t where ts != '2000-01-01 10:00:00'; -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ ## Tests the direction between column and literal mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash'; select * from test.t where ts > '2000-01-01 10:00:01'; -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash'; select * from test.t where '2000-01-01 10:00:01' < ts; -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -86,7 +86,7 @@ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash +----+---------------------+ | 1 | 2000-01-01 10:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ @@ -96,7 +96,7 @@ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash +----+---------------------+ | 1 | 2000-01-01 10:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ @@ -109,14 +109,14 @@ mysql> SET time_zone = '+8:00'; set session tidb_isolation_read_engines='tiflash | 1 | 2000-01-01 18:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ mysql> SET time_zone = '+8:00'; set session tidb_isolation_read_engines='tiflash'; select * from test.t where ts != '2000-01-01 18:00:00'; -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -155,7 +155,7 @@ mysql> SET time_zone = '+0:00'; set session tidb_isolation_read_engines='tiflash | 1 | 2000-01-01 10:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ @@ -168,7 +168,7 @@ mysql> SET time_zone = '+8:00'; set session tidb_isolation_read_engines='tiflash | 1 | 2000-01-01 18:00:00 | +----+---------------------+ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ From 2997c411b3b949156a73d74761b04b9e77a1260b Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Wed, 30 Mar 2022 17:20:05 +0800 Subject: [PATCH 08/12] Update dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp Co-authored-by: Flowyi --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0a00d258222..615f679404f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1604,7 +1604,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segment_snap->getRows() == 0) { // release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status. - segment_snap = nullptr; + segment_snap = {}; checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); continue; } From 30941eea37a71cc622a789c7670bbf465777df21 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Wed, 30 Mar 2022 17:36:19 +0800 Subject: [PATCH 09/12] Update dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index de08b26b3ba..95fcede02c1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -48,7 +48,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( dmfile, index_cache, - true, + /* set_cache_if_miss*/ true, rowkey_ranges, rs_filter, read_packs, From 90c9958f0cc46cac52a6dad3e0e4425596a7d37d Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Wed, 30 Mar 2022 17:36:26 +0800 Subject: [PATCH 10/12] Update dbms/src/Storages/DeltaMerge/StableValueSpace.cpp Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 96f8a4423f3..331f39ab9bd 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -360,7 +360,7 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & auto filter = DMFilePackFilter::loadFrom( f, context.db_context.getGlobalContext().getMinMaxIndexCache(), - false, + /*set_cache_if_miss*/ false, {range}, RSOperatorPtr{}, IdSetPtr{}, From 495fbe69a6b077f2384ef4aac84dfe4a52a24afc Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 17:44:20 +0800 Subject: [PATCH 11/12] format and some small fix --- dbms/src/Debug/dbgFuncMisc.cpp | 2 +- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 1 + dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index 1092a7c666f..75487ec2c6d 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -31,7 +31,7 @@ inline size_t getThreadIdForLog(const String & line) if (regex_search(sub_line, m, rx)) return std::stoi(m[1]); else - return 0; + return 0; } // Usage example: diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 2fe0a15e9c5..27d443a3c1e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1652,6 +1652,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment) { // Continue to check whether we need to apply more tasks on this segment + segment_snap = {}; checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); gc_segments_num++; finish_gc_on_segment = true; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 4ff87d3d769..ee59fb7bc1c 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -43,7 +43,7 @@ class DMFilePackFilter static DMFilePackFilter loadFrom( const DMFilePtr & dmfile, const MinMaxIndexCachePtr & index_cache, - bool set_cache_if_miss, + bool set_cache_if_miss, const RowKeyRanges & rowkey_ranges, const RSOperatorPtr & filter, const IdSetPtr & read_packs, @@ -103,7 +103,7 @@ class DMFilePackFilter private: DMFilePackFilter(const DMFilePtr & dmfile_, const MinMaxIndexCachePtr & index_cache_, - bool set_cache_if_miss_, + bool set_cache_if_miss_, const RowKeyRanges & rowkey_ranges_, // filter by handle range const RSOperatorPtr & filter_, // filter by push down where clause const IdSetPtr & read_packs_, // filter by pack index @@ -220,7 +220,7 @@ class DMFilePackFilter const DMFilePtr & dmfile, const FileProviderPtr & file_provider, const MinMaxIndexCachePtr & index_cache, - bool set_cache_if_miss, + bool set_cache_if_miss, ColId col_id, const ReadLimiterPtr & read_limiter) { From 290844e8daa796dd2df1ac6234acd8571d84dba3 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 30 Mar 2022 19:09:33 +0800 Subject: [PATCH 12/12] remove some debug message --- dbms/src/Debug/dbgFuncMisc.cpp | 3 --- dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp | 2 +- .../src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp | 2 +- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 4 ++-- .../query/misc/timestamp_rough_set_filter.test | 6 +++--- 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index 75487ec2c6d..8563aaf7433 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -24,7 +24,6 @@ namespace DB { inline size_t getThreadIdForLog(const String & line) { - std::cout << "getThreadIdForLog " << line << std::endl; auto sub_line = line.substr(line.find("thread_id=")); std::regex rx(R"((0|[1-9][0-9]*))"); std::smatch m; @@ -75,7 +74,6 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr return; } size_t target_thread_id = getThreadIdForLog(thread_hint_line_candidates.back()); - std::cout << "target_thread_id " << target_thread_id << std::endl; if (target_thread_id == 0) { output("Invalid"); @@ -92,7 +90,6 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr } // try parse the first number following the key auto sub_line = target_line.substr(target_line.find(key)); - std::cout << "target_line " << sub_line << std::endl; std::regex rx(R"([+-]?([0-9]+([.][0-9]*)?|[.][0-9]+))"); std::smatch m; if (regex_search(sub_line, m, rx)) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index a3facf85042..8fe9e3ff825 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -37,7 +37,7 @@ void ColumnFileBig::calculateStat(const DMContext & context) auto pack_filter = DMFilePackFilter::loadFrom( file, index_cache, - false, + /*set_cache_if_miss*/ false, {segment_range}, EMPTY_FILTER, {}, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 95fcede02c1..749423b0bfa 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -48,7 +48,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( dmfile, index_cache, - /* set_cache_if_miss*/ true, + /*set_cache_if_miss*/ true, rowkey_ranges, rs_filter, read_packs, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 331f39ab9bd..01cfdccfc38 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -54,7 +54,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang auto pack_filter = DMFilePackFilter::loadFrom( file, index_cache, - true, + /*set_cache_if_miss*/ true, {range}, EMPTY_FILTER, {}, @@ -237,7 +237,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const auto pack_filter = DMFilePackFilter::loadFrom( file, context.db_context.getGlobalContext().getMinMaxIndexCache(), - false, + /*set_cache_if_miss*/ false, {rowkey_range}, EMPTY_FILTER, {}, diff --git a/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test b/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test index d3961f3c2fb..2aa3085e5bb 100644 --- a/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test +++ b/tests/delta-merge-test/query/misc/timestamp_rough_set_filter.test @@ -54,7 +54,7 @@ => DBGInvoke dag('select * from default.test where col_2 < cast_string_datetime(\'2019-06-10 09:00:00.00000\')') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘ @@ -64,7 +64,7 @@ │ 50 │ 2019-06-10 09:00:00.00000 │ └────────────┴───────────────────────────┘ -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 0.00 │ └─────────────────────────────────────────────┘ @@ -74,7 +74,7 @@ # so '2019-06-10 17:00:00.00000'(tz_offset:28800) below is equal to '2019-06-10 09:00:00.00000' in UTC => DBGInvoke dag('select * from default.test where col_2 < cast_string_datetime(\'2019-06-10 17:00:00.00000\')',4,'encode_type:default,tz_offset:28800') -=> DBGInvoke search_log_for_key('RSFilter exclude rate') +=> DBGInvoke search_log_for_key('RSFilter exclude rate', 'Rough set filter') ┌─search_log_for_key("RSFilter exclude rate")─┐ │ 100.00 │ └─────────────────────────────────────────────┘