From 1b2933c2d78bd65b827e5f89f3ef8966f773ec38 Mon Sep 17 00:00:00 2001 From: Flowyi Date: Thu, 12 Sep 2019 17:46:33 +0800 Subject: [PATCH] DM: support rough index in Storage level. (#233) * DM: support rough set index in Storage level. * put delta_deletes into snapshot * rename sort_column to handle_column * remove useless code * address comments --- dbms/CMakeLists.txt | 2 + dbms/src/Storages/DeltaMerge/Chunk.cpp | 20 +- dbms/src/Storages/DeltaMerge/Chunk.h | 21 +- .../DeltaMerge/ChunkBlockInputStream.h | 36 +- dbms/src/Storages/DeltaMerge/DMContext.h | 4 +- .../DeltaMerge/DMSegmentThreadInputStream.h | 18 +- dbms/src/Storages/DeltaMerge/DeltaMerge.h | 96 ++--- .../Storages/DeltaMerge/DeltaMergeDefines.h | 9 +- .../Storages/DeltaMerge/DeltaMergeHelpers.h | 13 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 72 +--- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 27 +- dbms/src/Storages/DeltaMerge/DeltaPlace.h | 37 +- dbms/src/Storages/DeltaMerge/DeltaTree.h | 136 ++++--- .../Storages/DeltaMerge/DiskValueSpace.cpp | 19 +- dbms/src/Storages/DeltaMerge/DiskValueSpace.h | 4 +- .../DummyDeltaMergeBlockInputStream.h | 4 +- .../DummyDeltaMergeBlockOutputStream.h | 4 +- .../src/Storages/DeltaMerge/DummyValueSpace.h | 2 +- dbms/src/Storages/DeltaMerge/Filter/And.h | 49 +++ dbms/src/Storages/DeltaMerge/Filter/Equal.h | 29 ++ dbms/src/Storages/DeltaMerge/Filter/Greater.h | 30 ++ .../Storages/DeltaMerge/Filter/GreaterEqual.h | 30 ++ dbms/src/Storages/DeltaMerge/Filter/In.h | 49 +++ dbms/src/Storages/DeltaMerge/Filter/Less.h | 30 ++ .../Storages/DeltaMerge/Filter/LessEqual.h | 30 ++ dbms/src/Storages/DeltaMerge/Filter/Like.h | 25 ++ dbms/src/Storages/DeltaMerge/Filter/Not.h | 25 ++ .../src/Storages/DeltaMerge/Filter/NotEqual.h | 29 ++ dbms/src/Storages/DeltaMerge/Filter/NotIn.h | 49 +++ dbms/src/Storages/DeltaMerge/Filter/NotLike.h | 25 ++ dbms/src/Storages/DeltaMerge/Filter/Or.h | 43 +++ .../Storages/DeltaMerge/Filter/RSOperator.cpp | 39 ++ .../Storages/DeltaMerge/Filter/RSOperator.h | 111 ++++++ .../Storages/DeltaMerge/Filter/Unsupported.h | 35 ++ dbms/src/Storages/DeltaMerge/FilterHelper.h | 21 ++ dbms/src/Storages/DeltaMerge/Index/CMap.h | 14 + .../src/Storages/DeltaMerge/Index/Histogram.h | 16 + dbms/src/Storages/DeltaMerge/Index/MinMax.h | 322 +++++++++++++++++ .../Storages/DeltaMerge/Index/MinMaxIndex.cpp | 118 +++++++ .../Storages/DeltaMerge/Index/MinMaxIndex.h | 50 +++ dbms/src/Storages/DeltaMerge/Index/RSIndex.h | 39 ++ dbms/src/Storages/DeltaMerge/Index/RSResult.h | 67 ++++ .../Storages/DeltaMerge/Index/RoughCheck.h | 100 ++++++ .../DeltaMerge/Index/ValueComparison.h | 333 ++++++++++++++++++ dbms/src/Storages/DeltaMerge/Segment.cpp | 145 +++++--- dbms/src/Storages/DeltaMerge/Segment.h | 20 +- .../DeltaMerge/tests/gtest_dm_chunk.cpp | 8 +- .../DeltaMerge/tests/gtest_dm_delta_tree.cpp | 4 +- .../tests/gtest_dm_disk_value_space.cpp | 10 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 18 +- 50 files changed, 2110 insertions(+), 327 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/Filter/And.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Equal.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Greater.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/GreaterEqual.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/In.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Less.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/LessEqual.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Like.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Not.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/NotEqual.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/NotIn.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/NotLike.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Or.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Filter/RSOperator.h create mode 100644 dbms/src/Storages/DeltaMerge/Filter/Unsupported.h create mode 100644 dbms/src/Storages/DeltaMerge/FilterHelper.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/CMap.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/Histogram.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/MinMax.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp create mode 100644 dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/RSIndex.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/RSResult.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/RoughCheck.h create mode 100644 dbms/src/Storages/DeltaMerge/Index/ValueComparison.h diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index d4afbff8e06..286cc4cf37b 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -55,6 +55,8 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy) add_headers_and_sources(dbms src/Columns) add_headers_and_sources(dbms src/Storages) add_headers_and_sources(dbms src/Storages/DeltaMerge) +add_headers_and_sources(dbms src/Storages/DeltaMerge/Index) +add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter) add_headers_and_sources(dbms src/Storages/Distributed) add_headers_and_sources(dbms src/Storages/MergeTree) add_headers_and_sources(dbms src/Storages/Transaction) diff --git a/dbms/src/Storages/DeltaMerge/Chunk.cpp b/dbms/src/Storages/DeltaMerge/Chunk.cpp index 7878917b925..64a502db902 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/Chunk.cpp @@ -22,6 +22,15 @@ void Chunk::serialize(WriteBuffer & buf) const writeIntBinary(d.rows, buf); writeIntBinary(d.bytes, buf); writeStringBinary(d.type->getName(), buf); + if (d.minmax) + { + writePODBinary(true, buf); + d.minmax->write(*d.type, buf); + } + else + { + writePODBinary(false, buf); + } } } @@ -46,8 +55,11 @@ Chunk Chunk::deserialize(ReadBuffer & buf) readIntBinary(d.rows, buf); readIntBinary(d.bytes, buf); readStringBinary(type, buf); - d.type = DataTypeFactory::instance().get(type); + bool has_minmax; + readPODBinary(has_minmax, buf); + if (has_minmax) + d.minmax = MinMaxIndex::read(*d.type, buf); chunk.columns.emplace(d.col_id, d); @@ -108,9 +120,9 @@ BufferAndSize serializeColumn(const IColumn & column, const DataTypePtr & type, Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_data_page_id, WriteBatch & wb, const Block & block) { - auto & handle_col_data = getColumnVectorData(block, block.getPositionByName(dm_context.table_handle_define.name)); + auto & handle_col_data = getColumnVectorData(block, block.getPositionByName(dm_context.handle_column.name)); Chunk chunk(handle_col_data[0], handle_col_data[handle_col_data.size() - 1]); - for (const auto & col_define : dm_context.table_columns) + for (const auto & col_define : dm_context.store_columns) { auto col_id = col_define.id; const IColumn & column = *(block.getByName(col_define.name).column); @@ -122,6 +134,8 @@ Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_ d.rows = column.size(); d.bytes = size; d.type = col_define.type; + d.minmax = std::make_shared( + *col_define.type, column, static_cast &>(*block.getByName(TAG_COLUMN_NAME).column), 0, column.size()); wb.putPage(d.page_id, 0, buf, size); chunk.insert(d); diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h index 395210161a8..91efe4c3d68 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.h +++ b/dbms/src/Storages/DeltaMerge/Chunk.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -23,11 +24,12 @@ static constexpr size_t CHUNK_SERIALIZE_BUFFER_SIZE = 65536; struct ColumnMeta { - ColId col_id; - PageId page_id; - UInt32 rows; - UInt64 bytes; - DataTypePtr type; + ColId col_id; + PageId page_id; + UInt32 rows; + UInt64 bytes; + DataTypePtr type; + MinMaxIndexPtr minmax; }; using ColumnMetas = std::vector; @@ -36,7 +38,6 @@ class Chunk public: using ColumnMetaMap = std::unordered_map; - Chunk() : Chunk(0, 0) {} Chunk(Handle handle_first_, Handle handle_last_) : handle_start(handle_first_), handle_end(handle_last_), is_delete_range(false) {} explicit Chunk(const HandleRange & delete_range) : handle_start(delete_range.start), handle_end(delete_range.end), is_delete_range(true) @@ -78,6 +79,14 @@ class Chunk return it->second; } + const ColumnMeta * tryGetColumn(ColId col_id) const + { + auto it = columns.find(col_id); + if (unlikely(it == columns.end())) + return nullptr; + return &it->second; + } + const ColumnMetaMap & getMetas() const { return columns; } void insert(const ColumnMeta & c) diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h index 13bc1aedc3e..39d8b9cc388 100644 --- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h @@ -3,6 +3,7 @@ #include #include +#include namespace DB { @@ -12,9 +13,26 @@ namespace DM class ChunkBlockInputStream final : public IBlockInputStream { public: - ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, const PageReader & page_reader_) - : chunks(chunks_), read_columns(read_columns_), page_reader(page_reader_) + ChunkBlockInputStream(const Chunks & chunks_, + const RSOperatorPtr & filter, + const ColumnDefines & read_columns_, + const PageReader & page_reader_) + : chunks(chunks_), skip_chunks(chunks.size()), read_columns(read_columns_), page_reader(page_reader_) { + for (size_t i = 0; i < chunks.size(); ++i) + { + if (!filter) + { + skip_chunks[i] = 0; + continue; + } + auto & chunk = chunks[i]; + RSCheckParam param; + for (auto & [col_id, meta] : chunk.getMetas()) + param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax)); + + skip_chunks[i] = filter->roughCheck(param) == None; + } } String getName() const override { return "Chunk"; } @@ -22,18 +40,20 @@ class ChunkBlockInputStream final : public IBlockInputStream Block read() override { - if (!hasNextBlock()) + if (!hasNext()) return {}; return readChunk(chunks[chunk_index++], read_columns, page_reader); } - bool hasNextBlock() { return chunk_index < chunks.size(); } - HandlePair nextBlockHandle() { return chunks[chunk_index].getHandleFirstLast(); } - size_t nextBlockRows() { return chunks[chunk_index].getRows(); } - void skipNextBlock() { ++chunk_index; } + bool hasNext() { return chunk_index < chunks.size(); } + size_t nextRows() { return chunks[chunk_index].getRows(); } + bool shouldSkipNext() { return skip_chunks[chunk_index]; } + void skipNext() { ++chunk_index; } private: - Chunks chunks; + Chunks chunks; + std::vector skip_chunks; + size_t chunk_index = 0; ColumnDefines read_columns; PageReader page_reader; diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index e4065d07501..9ea551aad77 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -23,8 +23,8 @@ struct DMContext // The schema snapshot // We need a consistent snapshot of columns, copy ColumnsDefines - const ColumnDefines table_columns; - const ColumnDefine table_handle_define; + const ColumnDefines store_columns; + const ColumnDefine handle_column; const UInt64 min_version; diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 169b372e792..77ffcce11e5 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -13,17 +13,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream /// If handle_real_type_ is empty, means do not convert handle column back to real type. DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_, const SegmentStreamCreator & stream_creator_, - const ColumnDefines & columns_to_read_, - const String & handle_name_, - const DataTypePtr & handle_real_type_, - const Context & context_) + const ColumnDefines & columns_to_read_) : task_pool(task_pool_), stream_creator(stream_creator_), columns_to_read(columns_to_read_), header(toEmptyBlock(columns_to_read)), - handle_name(handle_name_), - handle_real_type(handle_real_type_), - context(context_), log(&Logger::get("DMSegmentThreadInputStream")) { } @@ -74,13 +68,6 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream Block res; for (auto & cd : columns_to_read) res.insert(original_block.getByName(cd.name)); - - if (handle_real_type && res.has(handle_name)) - { - auto pos = res.getPositionByName(handle_name); - convertColumn(res, pos, handle_real_type, context); - res.getByPosition(pos).type = handle_real_type; - } return res; } @@ -89,9 +76,6 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream SegmentStreamCreator stream_creator; ColumnDefines columns_to_read; Block header; - String handle_name; - DataTypePtr handle_real_type; - const Context & context; bool done = false; BlockInputStreamPtr cur_stream; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index 2dc33e3c9ad..694b72294a9 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -18,7 +18,7 @@ namespace DB namespace DM { -// Note that the columns in stable input stream and value space must exactly the same, include the name, type, and id. +// Note that the columns in stable input stream and value space must exactly the same, including name, type, and id. template class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream { @@ -37,11 +37,10 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream // How many rows we need to skip before writing stable rows into output. // == 0: None // > 0 : do skip - // < 0 : some rows are filtered out by handle_range, should not write into output. + // < 0 : some rows are filtered out by index, should not write into output. ssize_t stable_skip = 0; DeltaValueSpacePtr delta_value_space; - size_t delta_rows_limit; IndexIterator entry_it; IndexIterator entry_end; @@ -72,7 +71,6 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream stable_input_stream(stable_input_stream_), stable_input_stream_raw_ptr(stable_input_stream.get()), delta_value_space(delta_value_space_), - delta_rows_limit(delta_value_space->getRows()), entry_it(index_begin), entry_end(index_end), max_block_size(max_block_size_) @@ -155,20 +153,20 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream if constexpr (!c_delta_done) { - if (entry_it.getType() == DT_DEL) + auto tuple_id = entry_it.getValue(); + switch (entry_it.getType()) { - writeDeleteFromDelta(entry_it.getValue()); - } - else if (entry_it.getType() == DT_INS) - { - if (entry_it.getValue() < delta_rows_limit) + case DT_DEL: + writeDeleteFromDelta(1); + break; + case DT_INS: + if (handle_range.check(delta_value_space->getHandle(tuple_id))) { - writeInsertFromDelta(output_columns, entry_it.getValue()); + writeInsertFromDelta(output_columns, tuple_id); --output_write_limit; } - } - else - { + break; + default: throw Exception("Entry type " + DTTypeString(entry_it.getType()) + " is not supported, is end: " + DB::toString(entry_it == entry_end) + ", use_stable_rows: " + DB::toString(use_stable_rows) + ", stable_skip: " + DB::toString(stable_skip) + ", stable_done: " + DB::toString(stable_done) @@ -257,28 +255,18 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream continue; } // Check whether we can skip next block entirely or not. - if (!stable_input_stream_raw_ptr->hasNextBlock()) + if (!stable_input_stream_raw_ptr->hasNext()) throw Exception("Unexpected end of block, need more rows to skip"); - auto [first_handle, last_handle] = stable_input_stream_raw_ptr->nextBlockHandle(); - size_t rows = stable_input_stream_raw_ptr->nextBlockRows(); - - if (handle_range.intersect(first_handle, last_handle)) + size_t rows = stable_input_stream_raw_ptr->nextRows(); + if (!stable_input_stream_raw_ptr->shouldSkipNext()) { - // We need to load next block. fillStableBlockIfNeeded(); - continue; } - - // Entirely skip block. - stable_input_stream_raw_ptr->skipNextBlock(); - stable_skip -= rows; - - // If the next block of stable is larger then handle_range, then we should stop reading from stable. - if (!handle_range.checkEnd(first_handle)) + else { - stable_done = true; - use_stable_rows = 0; + stable_input_stream_raw_ptr->skipNext(); + stable_skip -= rows; } } @@ -297,6 +285,9 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream } } + if (unlikely(!(stable_skip == 0 || use_stable_rows == 0))) + throw Exception("Algorithm broken!"); + while (use_stable_rows && output_write_limit) { if (curStableBlockRemaining()) @@ -305,7 +296,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream continue; } - if (!stable_input_stream_raw_ptr->hasNextBlock()) + if (!stable_input_stream_raw_ptr->hasNext()) { if constexpr (c_delta_done) { @@ -317,43 +308,34 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream throw Exception("Unexpected end of block, need more rows to write"); } - auto [first_handle, last_handle] = stable_input_stream_raw_ptr->nextBlockHandle(); - size_t next_block_rows = stable_input_stream_raw_ptr->nextBlockRows(); + size_t next_block_rows = stable_input_stream_raw_ptr->nextRows(); - if (handle_range.intersect(first_handle, last_handle)) + if (!stable_input_stream_raw_ptr->shouldSkipNext()) { - // Load next block, and then write to output in next round. fillStableBlockIfNeeded(); - continue; - } - - // Entirely skip block. - stable_input_stream_raw_ptr->skipNextBlock(); - // We skipped some rows, some of them are consumed by writing to output, the rest are recorded by stable_skip. - if (next_block_rows <= use_stable_rows) - { - use_stable_rows -= next_block_rows; } else { - stable_skip -= next_block_rows - use_stable_rows; - use_stable_rows = 0; - } - - // If the next block of stable is larger then handle_range, then we should stop reading from stable. - if (!handle_range.checkEnd(first_handle)) - { - stable_done = true; - use_stable_rows = 0; - break; + // Entirely skip block. + stable_input_stream_raw_ptr->skipNext(); + // We skipped some rows, some of them are consumed by writing to output, the rest are recorded by stable_skip. + if (next_block_rows <= use_stable_rows) + { + use_stable_rows -= next_block_rows; + } + else + { + stable_skip -= next_block_rows - use_stable_rows; + use_stable_rows = 0; + } } } } - inline void writeInsertFromDelta(MutableColumns & output_columns, UInt64 value_index) + inline void writeInsertFromDelta(MutableColumns & output_columns, UInt64 tuple_id) { for (size_t index = 0; index < num_columns; ++index) - delta_value_space->insertValue(*output_columns[index], index, value_index); + delta_value_space->insertValue(*output_columns[index], index, tuple_id); } inline void writeDeleteFromDelta(size_t n) { stable_skip += n; } @@ -362,7 +344,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream { auto prev_sid = entry_it.getSid(); if (entry_it.getType() == DT_DEL) - prev_sid += entry_it.getValue(); + prev_sid += 1; ++entry_it; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index eab2f157cb6..b108b7ed437 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -72,7 +73,10 @@ struct ColumnDefine DataTypePtr type; String default_value; - explicit ColumnDefine(ColId id_ = 0, String name_ = "", DataTypePtr type_ = nullptr): id(id_), name(std::move(name_)), type(std::move(type_)) {} + explicit ColumnDefine(ColId id_ = 0, String name_ = "", DataTypePtr type_ = nullptr) + : id(id_), name(std::move(name_)), type(std::move(type_)) + { + } }; using ColumnDefines = std::vector; using ColumnDefineMap = std::unordered_map; @@ -95,6 +99,7 @@ static const DataTypePtr EXTRA_HANDLE_COLUMN_TYPE = DataTypeFactory::instance(). static const DataTypePtr VERSION_COLUMN_TYPE = DataTypeFactory::instance().get("UInt64"); static const DataTypePtr TAG_COLUMN_TYPE = DataTypeFactory::instance().get("UInt8"); +static const ColumnDefine EXTRA_HANDLE_COLUMN_DEFINE{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_TYPE}; static const ColumnDefine VERSION_COLUMN_DEFINE{VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE}; static const ColumnDefine TAG_COLUMN_DEFINE{TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE}; @@ -110,5 +115,7 @@ static constexpr Handle P_INF_HANDLE = MAX_INT64; // Used in range, indicating p static_assert(static_cast(static_cast(MIN_INT64)) == MIN_INT64, "Unsupported compiler!"); static_assert(static_cast(static_cast(MAX_INT64)) == MAX_INT64, "Unsupported compiler!"); +static constexpr UInt64 DEL_RANGE_POS_MARK = (1ULL << 63); + } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index 7ec652df85d..47676e1d643 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB { @@ -73,22 +74,20 @@ inline SortDescription getPkSort(const ColumnDefine & handle) return sort; } -using PermutationPtr = std::unique_ptr; -inline PermutationPtr sortBlockByPk(const ColumnDefine & handle, Block & block) +inline bool sortBlockByPk(const ColumnDefine & handle, Block & block, IColumn::Permutation & perm) { SortDescription sort = getPkSort(handle); if (isAlreadySorted(block, sort)) - return {}; + return false; - auto perm = std::make_unique(); - stableGetPermutation(block, sort, *perm); + stableGetPermutation(block, sort, perm); for (size_t i = 0; i < block.columns(); ++i) { auto & c = block.getByPosition(i); - c.column = c.column->permute(*perm, 0); + c.column = c.column->permute(perm, 0); } - return perm; + return true; } template diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5b92b5c8be0..8ebf1a9f685 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -47,13 +47,6 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, settings(settings_), log(&Logger::get("DeltaMergeStore")) { - // We use Int64 to store handle. - if (!table_handle_define.type->equals(*EXTRA_HANDLE_COLUMN_TYPE)) - { - table_handle_real_type = table_handle_define.type; - table_handle_define.type = EXTRA_HANDLE_COLUMN_TYPE; - } - table_columns.emplace_back(table_handle_define); table_columns.emplace_back(VERSION_COLUMN_DEFINE); table_columns.emplace_back(TAG_COLUMN_DEFINE); @@ -63,8 +56,6 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, if (col.name != table_handle_define.name && col.name != VERSION_COLUMN_NAME && col.name != TAG_COLUMN_NAME) table_columns.emplace_back(col); } - // update block header - header = genHeaderBlock(table_columns, table_handle_define, table_handle_real_type); DMContext dm_context = newDMContext(db_context, db_context.getSettingsRef()); if (!storage_pool.maxMetaPageId()) @@ -129,21 +120,22 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ DMContext dm_context = newDMContext(db_context, db_settings); Block block = to_write; - const auto & handle_define = table_handle_define; + // Add an extra handle column, if handle reused the original column data. + if (pkIsHandle()) { - // Transform handle column into Int64. - auto handle_pos = block.getPositionByName(handle_define.name); - if (!block.getByPosition(handle_pos).type->equals(*EXTRA_HANDLE_COLUMN_TYPE)) - { - FunctionToInt64::create(db_context)->execute(block, {handle_pos}, handle_pos); - block.getByPosition(handle_pos).type = EXTRA_HANDLE_COLUMN_TYPE; - } + auto handle_pos = block.getPositionByName(table_handle_define.name); + addColumnToBlock(block, // + EXTRA_HANDLE_COLUMN_ID, + EXTRA_HANDLE_COLUMN_NAME, + EXTRA_HANDLE_COLUMN_TYPE, + EXTRA_HANDLE_COLUMN_TYPE->createColumn()); + FunctionToInt64::create(db_context)->execute(block, {handle_pos}, block.columns() - 1); } { // Sort by handle & version in ascending order. SortDescription sort; - sort.emplace_back(handle_define.name, 1, 0); + sort.emplace_back(EXTRA_HANDLE_COLUMN_NAME, 1, 0); sort.emplace_back(VERSION_COLUMN_NAME, 1, 0); if (!isAlreadySorted(block, sort)) @@ -165,7 +157,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ LOG_DEBUG(log, msg); } - const auto & handle_data = getColumnVectorData(block, block.getPositionByName(handle_define.name)); + const auto & handle_data = getColumnVectorData(block, block.getPositionByName(EXTRA_HANDLE_COLUMN_NAME)); struct WriteAction { @@ -286,14 +278,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { - BlockInputStreamPtr stream = std::make_shared( // - read_task_pool, - stream_creator, - columns_to_read, - table_handle_define.name, - DataTypePtr{}, - db_context); - + BlockInputStreamPtr stream = std::make_shared(read_task_pool, stream_creator, columns_to_read); res.push_back(stream); } return res; @@ -367,12 +352,13 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto dm_context = newDMContext(db_context, db_settings); - SegmentStreamCreator stream_creator = [=](const SegmentReadTask & task) { + auto stream_creator = [=](const SegmentReadTask & task) { return task.segment->getInputStream(dm_context, task.read_snapshot, *storage_snapshot, columns_to_read, task.ranges, + {}, max_version, std::min(expected_block_size, DEFAULT_BLOCK_SIZE)); }; @@ -383,14 +369,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { - BlockInputStreamPtr stream = std::make_shared( // - read_task_pool, - stream_creator, - columns_to_read, - table_handle_define.name, - table_handle_real_type, - db_context); - + BlockInputStreamPtr stream = std::make_shared(read_task_pool, stream_creator, columns_to_read); res.push_back(stream); } return res; @@ -531,22 +510,6 @@ void DeltaMergeStore::check(const Context & db_context, const DB::Settings & db_ } } -Block DeltaMergeStore::genHeaderBlock(const ColumnDefines & raw_columns, - const ColumnDefine & handle_define, - const DataTypePtr & handle_real_type) -{ - ColumnDefines real_cols = raw_columns; - for (auto && col : real_cols) - { - if (col.id == handle_define.id) - { - if (handle_real_type) - col.type = handle_real_type; - } - } - return toEmptyBlock(real_cols); -} - void DeltaMergeStore::applyAlters(const AlterCommands & commands, const OptionTableInfoConstRef table_info, ColumnID & max_column_id_used, @@ -560,9 +523,6 @@ void DeltaMergeStore::applyAlters(const AlterCommands & commands, { applyAlter(command, table_info, max_column_id_used); } - - // Don't forget to update header - header = genHeaderBlock(table_columns, table_handle_define, table_handle_real_type); } namespace @@ -631,7 +591,7 @@ void DeltaMergeStore::applyAlter(const AlterCommand & command, const OptionTable } else if (command.type == AlterCommand::ADD_COLUMN) { - // we don't care about `after_column` in `table_columns` + // we don't care about `after_column` in `store_columns` /// If TableInfo from TiDB is not empty, we get column id from TiDB ColumnDefine define(0, command.column_name, command.data_type); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 78d3f57cf57..a386f739604 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -78,7 +78,7 @@ class DeltaMergeStore const ColumnDefines & getTableColumns() const { return table_columns; } const ColumnDefine & getHandle() const { return table_handle_define; } - const Block & getHeader() const { return header; } + Block getHeader() const { return toEmptyBlock(table_columns); } const Settings & getSettings() const { return settings; } void check(const Context & db_context, const DB::Settings & db_settings); @@ -86,11 +86,18 @@ class DeltaMergeStore private: DMContext newDMContext(const Context & db_context, const DB::Settings & db_settings) { - return DMContext{.db_context = db_context, - .storage_pool = storage_pool, - .table_columns = table_columns, - .table_handle_define = table_handle_define, - .min_version = min_version, + ColumnDefines store_columns = table_columns; + if (pkIsHandle()) + { + // Add an extra handle column. + store_columns.push_back(EXTRA_HANDLE_COLUMN_DEFINE); + } + + return DMContext{.db_context = db_context, + .storage_pool = storage_pool, + .store_columns = std::move(store_columns), + .handle_column = EXTRA_HANDLE_COLUMN_DEFINE, + .min_version = min_version, .not_compress = settings.not_compress_columns, .delta_limit_rows = db_settings.dm_segment_delta_limit_rows, @@ -99,6 +106,8 @@ class DeltaMergeStore .delta_cache_limit_bytes = db_settings.dm_segment_delta_cache_limit_bytes}; } + bool pkIsHandle() { return table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; } + bool afterInsertOrDelete(const Context & db_context, const DB::Settings & db_settings); bool shouldSplit(const SegmentPtr & segment, size_t segment_rows_setting); bool shouldMerge(const SegmentPtr & left, const SegmentPtr & right, size_t segment_rows_setting); @@ -109,10 +118,6 @@ class DeltaMergeStore const OptionTableInfoConstRef table_info, ColumnID & max_column_id_used); - static Block genHeaderBlock(const ColumnDefines & raw_columns, // - const ColumnDefine & handle_define, - const DataTypePtr & handle_real_type); - private: using SegmentSortedMap = std::map; @@ -122,8 +127,6 @@ class DeltaMergeStore String table_name; ColumnDefines table_columns; ColumnDefine table_handle_define; - DataTypePtr table_handle_real_type; - Block header; // an empty block header BackgroundProcessingPool & background_pool; BackgroundProcessingPool::TaskHandle gc_handle; diff --git a/dbms/src/Storages/DeltaMerge/DeltaPlace.h b/dbms/src/Storages/DeltaMerge/DeltaPlace.h index 8c210f16277..e06efe7f7f8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaPlace.h +++ b/dbms/src/Storages/DeltaMerge/DeltaPlace.h @@ -42,7 +42,7 @@ struct RidGenerator size_t delta_block_rows; size_t delta_block_pos = 0; // Whether this row's pk duplicates with the next one, if so, they share the same rid. - std::vector delta_block_dup_next; + std::vector delta_block_dup_next; // Whether current row's pk duplicates with the previous one. Used by Upsert. bool dup_prev = false; @@ -58,7 +58,7 @@ struct RidGenerator sort_desc(sort_desc_), num_sort_columns(sort_desc.size()), delta_block_rows(delta_block.rows()), - delta_block_dup_next(delta_block_rows, false) + delta_block_dup_next(delta_block_rows, 0) { stable_stream->readPrefix(); @@ -186,13 +186,13 @@ struct RidGenerator /** * Index the block which is already sorted by primary keys. The indexing is recorded into delta_tree. */ -template -void placeInsert(const BlockInputStreamPtr & stable, // - const Block & delta_block, - DeltaTree & delta_tree, - RowId delta_value_space_offset, - const PermutationPtr & row_id_ref, - const SortDescription & sort) +template +void placeInsert(const BlockInputStreamPtr & stable, // + const Block & delta_block, + DeltaTree & delta_tree, + RowId delta_value_space_offset, + const IColumn::Permutation & row_id_ref, + const SortDescription & sort) { auto block_rows = delta_block.rows(); if (!block_rows) @@ -208,20 +208,25 @@ void placeInsert(const BlockInputStreamPtr & stable, // for (size_t i = 0; i < block_rows; ++i) { auto [rid, dup] = rids[i]; - if (dup) - delta_tree.addDelete(rid); - if (row_id_ref) - delta_tree.addInsert(rid, delta_value_space_offset + (*row_id_ref)[i]); + UInt64 tuple_id; + if constexpr (use_row_id_ref) + tuple_id = delta_value_space_offset + row_id_ref[i]; else - delta_tree.addInsert(rid, delta_value_space_offset + i); + tuple_id = delta_value_space_offset + i; + + if (dup) + delta_tree.addDelete(rid, tuple_id); + delta_tree.addInsert(rid, tuple_id); } } +/// del_range_id: the pos of delete range action in value space. It is used to filter out irrelevant deletes. template void placeDelete(const BlockInputStreamPtr & stable, // const Block & delta_block, DeltaTree & delta_tree, - const SortDescription & sort) + const SortDescription & sort, + UInt64 del_range_id) { auto block_rows = delta_block.rows(); if (!block_rows) @@ -236,7 +241,7 @@ void placeDelete(const BlockInputStreamPtr & stable, // for (size_t i = 0; i < block_rows; ++i) { if (rids[i] >= 0) - delta_tree.addDelete(rids[i]); + delta_tree.addDelete(rids[i], del_range_id | DEL_RANGE_POS_MARK); } } diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.h b/dbms/src/Storages/DeltaMerge/DeltaTree.h index 1bb37d4d760..14888d4f8d6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaTree.h +++ b/dbms/src/Storages/DeltaMerge/DeltaTree.h @@ -41,7 +41,8 @@ inline std::string addrToHex(const void * addr) } /// DTMutation type available values. -using DT_TYPE = UInt16; +using DT_TYPE = UInt16; +using DT_VALUE = UInt64; static constexpr DT_TYPE DT_INS = 65535; static constexpr DT_TYPE DT_DEL = 65534; @@ -78,11 +79,10 @@ struct DTMutation /// DT_DEL : Delete /// DT_MULTI_MOD : modify chain /// otherwise, mutation is in DT_MOD mode, "type" is modify columnId. - UInt16 type = 0; - /// for DT_INS and DT_MOD, "value" is the value index in value space; - /// for DT_MULTI_MOD, "value" represents the chain pointer; - /// for DT_DEL, "value" is the consecutive deleting tuple count, e.g. 5 means 5 tuples got deleted starting from current position. - UInt64 value = 0; + DT_TYPE type = 0; + /// for DT_INS, DT_DEL and DT_MOD, "value" is the value index in value space + /// for DT_MULTI_MOD, "value" represents the chain pointer + DT_VALUE value = 0; inline bool isModify() const { return type != DT_INS && type != DT_DEL; } }; @@ -165,9 +165,9 @@ struct DTLeaf { const auto & m = mutations[i]; if (m.type == DT_INS) - delta += 1; + ++delta; else if (m.type == DT_DEL) - delta -= m.value; + --delta; } return delta; } @@ -191,9 +191,9 @@ struct DTLeaf return {i, delta}; } if (type(i) == DT_INS) - delta += 1; + ++delta; else if (type(i) == DT_DEL) - delta -= value(i); + --delta; } return {i, delta}; } @@ -479,9 +479,9 @@ class DTEntryIterator DTEntryIterator & operator++() { if (leaf->type(pos) == DT_INS) - delta += 1; + ++delta; else if (leaf->type(pos) == DT_DEL) - delta -= leaf->value(pos); + --delta; if (++pos >= leaf->count && leaf->next) { @@ -505,9 +505,9 @@ class DTEntryIterator } if (leaf->type(pos) == DT_INS) - delta -= 1; + --delta; else if (leaf->type(pos) == DT_DEL) - delta += leaf->value(pos); + ++delta; return *this; } @@ -527,33 +527,70 @@ class DTEntriesCopy : Allocator { using LeafPtr = DTLeaf *; - const size_t entry_count; - const Int64 delta; + const size_t entry_capacity; UInt64 * sids; DTMutation * mutations; + Int64 delta; + size_t entry_count; + public: - DTEntriesCopy(LeafPtr left_leaf, size_t entry_count_, Int64 delta_) - : entry_count(entry_count_), - delta(delta_), - sids(reinterpret_cast(this->alloc(sizeof(UInt64) * entry_count))), - mutations(reinterpret_cast(this->alloc(sizeof(DTMutation) * entry_count))) + DTEntriesCopy(const LeafPtr left_leaf, size_t entry_count_, UInt64 upsert_limit, UInt64 del_range_limit) + : entry_capacity(entry_count_), + sids(reinterpret_cast(this->alloc(sizeof(UInt64) * entry_capacity))), + mutations(reinterpret_cast(this->alloc(sizeof(DTMutation) * entry_capacity))), + delta(0) { - size_t offset = 0; - while (left_leaf) + size_t offset = 0; + LeafPtr cur_leaf = left_leaf; + while (cur_leaf) { - std::move(left_leaf->sids, left_leaf->sids + left_leaf->count, sids + offset); - std::move(left_leaf->mutations, left_leaf->mutations + left_leaf->count, mutations + offset); + // Only copy relevant entries. + for (size_t i = 0; i < cur_leaf->count; ++i) + { + auto type = cur_leaf->type(i); + auto value = cur_leaf->value(i); + switch (type) + { + case DT_INS: + { + if (value < upsert_limit) + { + *(sids + offset) = cur_leaf->sids[i]; + *(mutations + offset) = cur_leaf->mutations[i]; + ++offset; + + ++delta; + } + break; + } + case DT_DEL: + { + bool ok = (value & DEL_RANGE_POS_MARK) == 0 ? value < upsert_limit : (value & (~DEL_RANGE_POS_MARK)) < del_range_limit; + if (ok) + { + *(sids + offset) = cur_leaf->sids[i]; + *(mutations + offset) = cur_leaf->mutations[i]; + ++offset; + + --delta; + } + break; + } + default: + throw Exception("Unexpected type: " + DB::toString(type)); + } + } - offset += left_leaf->count; - left_leaf = left_leaf->next; + cur_leaf = cur_leaf->next; } + entry_count = offset; } ~DTEntriesCopy() { - this->free(sids, sizeof(UInt64) * entry_count); - this->free(mutations, sizeof(DTMutation) * entry_count); + this->free(sids, sizeof(UInt64) * entry_capacity); + this->free(mutations, sizeof(DTMutation) * entry_capacity); } class Iterator @@ -577,9 +614,9 @@ class DTEntriesCopy : Allocator Iterator & operator++() { if (entries->mutations[index].type == DT_INS) - delta += 1; + ++delta; else if (entries->mutations[index].type == DT_DEL) - delta -= entries->mutations[index].value; + --delta; ++index; @@ -591,9 +628,9 @@ class DTEntriesCopy : Allocator --index; if (entries->mutations[index].type == DT_INS) - delta -= 1; + --delta; else if (entries->mutations[index].type == DT_DEL) - delta += entries->mutations[index].value; + ++delta; return *this; } @@ -813,10 +850,9 @@ class DeltaTree } template - std::shared_ptr> getEntriesCopy() + std::shared_ptr> getEntriesCopy(UInt64 upsert_limit, UInt64 del_range_limit) { - Int64 delta = isLeaf(root) ? as(Leaf, root)->getDelta() : as(Intern, root)->getDelta(); - return std::make_shared>(left_leaf, num_entries, delta); + return std::make_shared>(left_leaf, num_entries, upsert_limit, del_range_limit); } EntryIterator sidLowerBound(UInt64 sid) const @@ -840,7 +876,7 @@ class DeltaTree void addModify(const UInt64 rid, const UInt16 column_id, const UInt64 new_value_id); void addModify(const UInt64 rid, const RefTuple & tuple); - void addDelete(const UInt64 rid); + void addDelete(const UInt64 rid, const UInt64 tuple_id = 1); void addInsert(const UInt64 rid, const UInt64 tuple_id); }; @@ -1027,7 +1063,7 @@ void DT_CLASS::addModify(const UInt64 rid, const RefTuple & tuple) } DT_TEMPLATE -void DT_CLASS::addDelete(const UInt64 rid) +void DT_CLASS::addDelete(const UInt64 rid, const UInt64 tuple_id) { ++num_deletes; @@ -1037,9 +1073,6 @@ void DT_CLASS::addDelete(const UInt64 rid) std::tie(leaf, delta) = findLeftLeaf(rid); std::tie(pos, delta) = leaf->searchRid(rid, delta); - bool merge = false; - size_t merge_pos; - bool exists = pos != leaf->count && leaf->rid(pos, delta) == rid; if (exists && leaf->type(pos) == DT_DEL) { @@ -1048,9 +1081,6 @@ void DT_CLASS::addDelete(const UInt64 rid) EntryIterator leaf_end(this->end()); while (leaf_it != leaf_end && leaf_it.getRid() == rid && leaf_it.getType() == DT_DEL) { - merge = true; - merge_pos = leaf_it.getPos(); - ++leaf_it; } leaf = leaf_it.getLeaf(); @@ -1107,20 +1137,12 @@ void DT_CLASS::addDelete(const UInt64 rid) } } - if (merge) - { - /// Simply increase delete count at the last one of delete chain. - ++(leaf->mutations[merge_pos].value); - } - else - { - ++num_entries; + ++num_entries; - leaf->shiftEntries(pos, 1); - leaf->sids[pos] = rid - delta; - leaf->mutations[pos] = DTMutation(DT_DEL, 1); - ++(leaf->count); - } + leaf->shiftEntries(pos, 1); + leaf->sids[pos] = rid - delta; + leaf->mutations[pos] = DTMutation(DT_DEL, tuple_id); + ++(leaf->count); afterLeafUpdated(leaf); diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 1edc9862469..1da00c5e342 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -66,11 +65,11 @@ void DiskValueSpace::restore(const OpContext & context) size_t total_rows = num_rows(); size_t cache_rows = rowsFromBack(chunks_to_cache); PageReader page_reader(context.data_storage); - Block cache_data = read(context.dm_context.table_columns, page_reader, total_rows - cache_rows, cache_rows); + Block cache_data = read(context.dm_context.store_columns, page_reader, total_rows - cache_rows, cache_rows); if (unlikely(cache_data.rows() != cache_rows)) throw Exception("The fragment rows from storage mismatch"); - for (const auto & col_define : context.dm_context.table_columns) + for (const auto & col_define : context.dm_context.store_columns) { ColumnWithTypeAndName & col = cache_data.getByName(col_define.name); cache[col_define.id] = (*std::move(col.column)).mutate(); @@ -158,7 +157,7 @@ AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & context, Append PageReader page_reader(context.data_storage); // Load fragment chunks' data from disk. - compacted_block = read(context.dm_context.table_columns, // + compacted_block = read(context.dm_context.store_columns, // page_reader, in_storage_rows, cache_rows, @@ -173,7 +172,7 @@ AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & context, Append else { // Use the cache. - for (const auto & col_define : context.dm_context.table_columns) + for (const auto & col_define : context.dm_context.store_columns) { auto new_col = col_define.type->createColumn(); new_col->reserve(compacted_rows); @@ -227,7 +226,7 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con throw Exception("cache should only be applied to this"); auto block_rows = update.block.rows(); - for (const auto & col_define : context.dm_context.table_columns) + for (const auto & col_define : context.dm_context.store_columns) { const ColumnWithTypeAndName & col = update.block.getByName(col_define.name); @@ -321,7 +320,7 @@ void DiskValueSpace::appendChunkWithCache(const OpContext & context, Chunk && ch && (write_rows >= context.dm_context.delta_cache_limit_rows || write_bytes >= context.dm_context.delta_cache_limit_bytes)) return; - for (const auto & col_define : context.dm_context.table_columns) + for (const auto & col_define : context.dm_context.store_columns) { const ColumnWithTypeAndName & col = block.getByName(col_define.name); @@ -562,7 +561,7 @@ bool DiskValueSpace::doFlushCache(const OpContext & context) { // Load fragment data from disk. PageReader page_reader(context.data_storage); - compacted = read(context.dm_context.table_columns, page_reader, in_storage_rows, cache_rows); + compacted = read(context.dm_context.store_columns, page_reader, in_storage_rows, cache_rows); if (unlikely(compacted.rows() != cache_rows)) throw Exception("The fragment rows from storage mismatch"); @@ -570,7 +569,7 @@ bool DiskValueSpace::doFlushCache(const OpContext & context) else { // Use the cache. - for (const auto & col_define : context.dm_context.table_columns) + for (const auto & col_define : context.dm_context.store_columns) { ColumnWithTypeAndName col(cache.at(col_define.id)->cloneResized(cache_rows), col_define.type, col_define.name, col_define.id); compacted.insert(col); @@ -622,7 +621,7 @@ bool DiskValueSpace::doFlushCache(const OpContext & context) ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const { - return std::make_shared(chunks, read_columns, page_reader); + return std::make_shared(chunks, RSOperatorPtr(), read_columns, page_reader); } size_t DiskValueSpace::num_rows() const diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h index 1b4f00de188..3e0b8b9ab6a 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h @@ -123,10 +123,10 @@ class DiskValueSpace size_t rows_end, size_t deletes_end) const; - ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const; - bool tryFlushCache(const OpContext & context, bool force = false); + ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const; + size_t num_rows() const; size_t num_rows(size_t chunks_offset, size_t chunk_length) const; size_t num_deletes() const; diff --git a/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockInputStream.h index db9cf64147b..41d64c0101f 100644 --- a/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockInputStream.h @@ -186,8 +186,8 @@ class DummyDeltaMergeBlockInputStream final : public IProfilingBlockInputStream if (unlikely(sid != entry_it.getSid())) throw Exception("Algorithm broken!"); - ignoreStableTuples(entry_it.getValue()); - sid += entry_it.getValue(); + ignoreStableTuples(1); + sid += 1; ++entry_it; skip_rows = (entry_it != entry_end ? entry_it.getSid() : UNLIMITED) - sid; diff --git a/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockOutputStream.h b/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockOutputStream.h index d7d13a477eb..446a7315450 100644 --- a/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/DummyDeltaMergeBlockOutputStream.h @@ -81,7 +81,7 @@ struct RidGenerator size_t modify_block_rows; size_t modify_block_pos = 0; // Whether this row's pk duplicates with the next one, if so, they share the same rid. - std::vector modify_block_dup_next; + std::vector modify_block_dup_next; // Whether current row's pk duplicates with the previous one. Used by Upsert. bool dup_prev = false; @@ -97,7 +97,7 @@ struct RidGenerator sort_desc(sort_desc_), num_sort_columns(sort_desc.size()), modify_block_rows(modify_block.rows()), - modify_block_dup_next(modify_block_rows, false) + modify_block_dup_next(modify_block_rows, 0) { stable_stream->readPrefix(); diff --git a/dbms/src/Storages/DeltaMerge/DummyValueSpace.h b/dbms/src/Storages/DeltaMerge/DummyValueSpace.h index c4214d5f4f3..1c1cf4d9762 100644 --- a/dbms/src/Storages/DeltaMerge/DummyValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DummyValueSpace.h @@ -45,7 +45,7 @@ class MemoryValueSpace { static constexpr size_t SPLIT_SIZE = 65536; - using DeleteMark = std::vector; + using DeleteMark = std::vector; using DeleteMarks = std::vector; String name; diff --git a/dbms/src/Storages/DeltaMerge/Filter/And.h b/dbms/src/Storages/DeltaMerge/Filter/And.h new file mode 100644 index 00000000000..8a76f15c8fb --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/And.h @@ -0,0 +1,49 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class And : public LogicalOp +{ +public: + explicit And(const RSOperators & children_) : LogicalOp(children_) + { + if (children.empty()) + throw Exception("Unexpected empty children"); + } + + String name() override { return "and"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + auto res = children[0]->roughCheck(param); + if (res == None) + return res; + for (size_t i = 1; i < children.size(); ++i) + { + res = res && children[i]->roughCheck(param); + if (res == None) + return res; + } + return res; + } + + RSOperatorPtr applyNot() override + { + RSOperators new_children; + for (auto & child : children) + new_children.push_back(child->applyNot()); + return createOr(new_children); + }; + + // TODO: override applyOptimize() +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Equal.h b/dbms/src/Storages/DeltaMerge/Filter/Equal.h new file mode 100644 index 00000000000..7f82e903aba --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Equal.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Equal : public ColCmpVal +{ +public: + Equal(const Attr & attr_, const Field & value_) : ColCmpVal(attr_, value_, 0) {} + + String name() override { return "equal"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return rsindex.minmax->checkEqual(value, rsindex.type); + } + + RSOperatorPtr applyNot() override { return createNotEqual(attr, value); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Greater.h b/dbms/src/Storages/DeltaMerge/Filter/Greater.h new file mode 100644 index 00000000000..c53483f5467 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Greater.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Greater : public ColCmpVal +{ +public: + Greater(const Attr & attr_, const Field & value_, int null_direction_) : ColCmpVal(attr_, value_, null_direction_) {} + + String name() override { return "greater"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return rsindex.minmax->checkGreater(value, rsindex.type, null_direction); + } + + RSOperatorPtr applyNot() override { return createLessEqual(attr, value, null_direction); }; + RSOperatorPtr switchDirection() override { return createLess(attr, value, null_direction); } +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/GreaterEqual.h b/dbms/src/Storages/DeltaMerge/Filter/GreaterEqual.h new file mode 100644 index 00000000000..cd9919288ab --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/GreaterEqual.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class GreaterEqual : public ColCmpVal +{ +public: + GreaterEqual(const Attr & attr_, const Field & value_, int null_direction) : ColCmpVal(attr_, value_, null_direction) {} + + String name() override { return "greater_equal"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return rsindex.minmax->checkGreaterEqual(value, rsindex.type, null_direction); + } + + RSOperatorPtr applyNot() override { return createLess(attr, value, null_direction); }; + RSOperatorPtr switchDirection() override { return createLessEqual(attr, value, null_direction); } +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/In.h b/dbms/src/Storages/DeltaMerge/Filter/In.h new file mode 100644 index 00000000000..721a92b6859 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/In.h @@ -0,0 +1,49 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class In : public RSOperator +{ + Attr attr; + Fields values; + +public: + In(const Attr & attr_, const Fields & values_) : attr(attr_), values(values_) + { + if (unlikely(values.empty())) + throw Exception("Unexpected empty values"); + } + + String name() override { return "in"; } + String toString() override + { + String s = R"({"op":")" + name() + R"(","col":")" + attr.col_name + R"(","value":"[)"; + for (auto & v : values) + s += "\"" + applyVisitor(FieldVisitorToString(), v) + "\","; + s.pop_back(); + return s + "]}"; + }; + + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + // TODO optimize for IN + RSResult res = rsindex.minmax->checkEqual(values[0], rsindex.type); + for (size_t i = 1; i < values.size(); ++i) + res = res || rsindex.minmax->checkEqual(values[i], rsindex.type); + return res; + } + + RSOperatorPtr applyNot() override { return createNotIn(attr, values); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Less.h b/dbms/src/Storages/DeltaMerge/Filter/Less.h new file mode 100644 index 00000000000..b4958fa10f1 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Less.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Less : public ColCmpVal +{ +public: + Less(const Attr & attr_, const Field & value_, int null_direction) : ColCmpVal(attr_, value_, null_direction) {} + + String name() override { return "less"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return !rsindex.minmax->checkGreaterEqual(value, rsindex.type, null_direction); + } + + RSOperatorPtr applyNot() override { return createGreaterEqual(attr, value, null_direction); }; + RSOperatorPtr switchDirection() override { return createGreater(attr, value, null_direction); } +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/LessEqual.h b/dbms/src/Storages/DeltaMerge/Filter/LessEqual.h new file mode 100644 index 00000000000..ec9af35bfdb --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/LessEqual.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class LessEqual : public ColCmpVal +{ +public: + LessEqual(const Attr & attr_, const Field & value_, int null_direction) : ColCmpVal(attr_, value_, null_direction) {} + + String name() override { return "less_equal"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return !rsindex.minmax->checkGreater(value, rsindex.type, null_direction); + } + + RSOperatorPtr applyNot() override { return createGreater(attr, value, null_direction); }; + RSOperatorPtr switchDirection() override { return createGreater(attr, value, null_direction); } +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Like.h b/dbms/src/Storages/DeltaMerge/Filter/Like.h new file mode 100644 index 00000000000..1bb1aa18662 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Like.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Like : public ColCmpVal +{ +public: + Like(const Attr & attr_, const Field & value_) : ColCmpVal(attr_, value_, 0) {} + + String name() override { return "like"; } + + RSResult roughCheck(const RSCheckParam & /*param*/) override { return Some; } + + RSOperatorPtr applyNot() override { return createNotLike(attr, value); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Not.h b/dbms/src/Storages/DeltaMerge/Filter/Not.h new file mode 100644 index 00000000000..54d22e38b5d --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Not.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Not : public LogicalOp +{ +public: + Not(const RSOperatorPtr & child) : LogicalOp({child}) {} + + String name() override { return "not"; } + + RSResult roughCheck(const RSCheckParam & param) override { return !children[0]->roughCheck(param); } + + RSOperatorPtr applyNot() override { return children[0]; }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/NotEqual.h b/dbms/src/Storages/DeltaMerge/Filter/NotEqual.h new file mode 100644 index 00000000000..d1d5f284b47 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/NotEqual.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class NotEqual : public ColCmpVal +{ +public: + NotEqual(const Attr & attr_, const Field & value_) : ColCmpVal(attr_, value_, 0) {} + + String name() override { return "not_equal"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + return !rsindex.minmax->checkEqual(value, rsindex.type); + } + + RSOperatorPtr applyNot() override { return createEqual(attr, value); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/NotIn.h b/dbms/src/Storages/DeltaMerge/Filter/NotIn.h new file mode 100644 index 00000000000..5f7ecded413 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/NotIn.h @@ -0,0 +1,49 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class NotIn : public RSOperator +{ + Attr attr; + Fields values; + +public: + NotIn(const Attr & attr_, const Fields & values_) : attr(attr_), values(values_) + { + if (unlikely(values.empty())) + throw Exception("Unexpected empty values"); + } + + String name() override { return "not_in"; } + String toString() override + { + String s = R"({"op":")" + name() + R"(","col":")" + attr.col_name + R"(","value":"[)"; + for (auto & v : values) + s += "\"" + applyVisitor(FieldVisitorToString(), v) + "\","; + s.pop_back(); + return s + "]}"; + }; + + + RSResult roughCheck(const RSCheckParam & param) override + { + GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex); + // TODO optimize for IN + RSResult res = !rsindex.minmax->checkEqual(values[0], rsindex.type); + for (size_t i = 1; i < values.size(); ++i) + res = res && !rsindex.minmax->checkEqual(values[i], rsindex.type); + return res; + } + + RSOperatorPtr applyNot() override { return createNotIn(attr, values); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/NotLike.h b/dbms/src/Storages/DeltaMerge/Filter/NotLike.h new file mode 100644 index 00000000000..e5bb64f5407 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/NotLike.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class NotLike : public ColCmpVal +{ +public: + NotLike(const Attr & attr_, const Field & value_) : ColCmpVal(attr_, value_, 0) {} + + String name() override { return "not_like"; } + + RSResult roughCheck(const RSCheckParam & /*param*/) override { return Some; } + + RSOperatorPtr applyNot() override { return createLike(attr, value); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Or.h b/dbms/src/Storages/DeltaMerge/Filter/Or.h new file mode 100644 index 00000000000..d6ce926013a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Or.h @@ -0,0 +1,43 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Or : public LogicalOp +{ +public: + explicit Or(const RSOperators & children_) : LogicalOp(children_) + { + if (children.empty()) + throw Exception("Unexpected empty children"); + } + + String name() override { return "or"; } + + RSResult roughCheck(const RSCheckParam & param) override + { + auto res = children[0]->roughCheck(param); + for (size_t i = 1; i < children.size(); ++i) + res = res || children[i]->roughCheck(param); + return res; + } + + RSOperatorPtr applyNot() override + { + RSOperators new_children; + for (auto & child : children) + new_children.push_back(child->applyNot()); + return createAnd(new_children); + }; + + // TODO: override applyOptimize() +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp new file mode 100644 index 00000000000..a9d8afcec36 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace DM +{ + +// clang-format off +RSOperatorPtr createAnd(const RSOperators & children) { return std::make_shared(children); } +RSOperatorPtr createEqual(const Attr & attr, const Field & value) { return std::make_shared(attr, value); } +RSOperatorPtr createGreater(const Attr & attr, const Field & value, int null_direction) { return std::make_shared(attr, value, null_direction); } +RSOperatorPtr createGreaterEqual(const Attr & attr, const Field & value, int null_direction) { return std::make_shared(attr, value, null_direction); } +RSOperatorPtr createIn(const Attr & attr, const Fields & values) { return std::make_shared(attr, values); } +RSOperatorPtr createLess(const Attr & attr, const Field & value, int null_direction) { return std::make_shared(attr, value, null_direction); } +RSOperatorPtr createLessEqual(const Attr & attr, const Field & value, int null_direction) { return std::make_shared(attr, value, null_direction); } +RSOperatorPtr createLike(const Attr & attr, const Field & value) { return std::make_shared(attr, value); } +RSOperatorPtr createNot(const RSOperatorPtr & op) { return std::make_shared(op); } +RSOperatorPtr createNotEqual(const Attr & attr, const Field & value) { return std::make_shared(attr, value); } +RSOperatorPtr createNotIn(const Attr & attr, const Fields & values) { return std::make_shared(attr, values); } +RSOperatorPtr createNotLike(const Attr & attr, const Field & value) { return std::make_shared(attr, value); } +RSOperatorPtr createOr(const RSOperators & children) { return std::make_shared(children); } +RSOperatorPtr createUnsupported(const String & content, bool is_not) { return std::make_shared(content, is_not); } +// clang-format on +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h new file mode 100644 index 00000000000..c465835d419 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.h @@ -0,0 +1,111 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +namespace DM +{ + +class RSOperator; +using RSOperatorPtr = std::shared_ptr; +using RSOperators = std::vector; +using Fields = std::vector; + +static const RSOperatorPtr EMPTY_FILTER{}; + +struct RSCheckParam +{ + ColumnIndexes indexes; +}; + + +class RSOperator : public std::enable_shared_from_this +{ +protected: + RSOperators children; + + RSOperator() = default; + explicit RSOperator(const RSOperators & children_) : children(children_) {} + +public: + virtual ~RSOperator() = default; + + virtual String name() = 0; + virtual String toString() = 0; + + virtual RSResult roughCheck(const RSCheckParam & param) = 0; + + virtual RSOperatorPtr optimize() { return shared_from_this(); }; + virtual RSOperatorPtr switchDirection() { return shared_from_this(); }; + virtual RSOperatorPtr applyNot() = 0; +}; + +class ColCmpVal : public RSOperator +{ +protected: + Attr attr; + Field value; + int null_direction; + +public: + ColCmpVal(const Attr & attr_, const Field & value_, int null_direction_) : attr(attr_), value(value_), null_direction(null_direction_) + { + } + + String toString() override + { + return R"({"op":")" + name() + // + R"(","col":")" + attr.col_name + // + R"(","value":")" + applyVisitor(FieldVisitorToString(), value) + "\"}"; + } +}; + + +class LogicalOp : public RSOperator +{ +public: + explicit LogicalOp(const RSOperators & children_) : RSOperator(children_) {} + + String toString() override + { + String s = R"({"op":")" + name() + R"(","children":[)"; + for (auto & child : children) + s += child->toString() + ","; + s.pop_back(); + return s + "]}"; + } +}; + +#define GET_RSINDEX_FROM_PARAM_NOT_FOUND_RETURN_SOME(param, attr, rsindex) \ + auto it = param.indexes.find(attr.col_id); \ + if (it == param.indexes.end()) \ + return Some; \ + auto rsindex = it->second; \ + if (!rsindex.type->equals(*attr.type)) \ + return Some; + + +RSOperatorPtr createAnd(const RSOperators & children); +RSOperatorPtr createEqual(const Attr & attr, const Field & value); +RSOperatorPtr createGreater(const Attr & attr, const Field & value, int null_direction); +RSOperatorPtr createGreaterEqual(const Attr & attr, const Field & value, int null_direction); +RSOperatorPtr createIn(const Attr & attr, const Fields & values); +RSOperatorPtr createLess(const Attr & attr, const Field & value, int null_direction); +RSOperatorPtr createLessEqual(const Attr & attr, const Field & value, int null_direction); +RSOperatorPtr createLike(const Attr & attr, const Field & value); +RSOperatorPtr createNot(const RSOperatorPtr & op); +RSOperatorPtr createNotEqual(const Attr & attr, const Field & value); +RSOperatorPtr createNotIn(const Attr & attr, const Fields & values); +RSOperatorPtr createNotLike(const Attr & attr, const Field & values); +RSOperatorPtr createOr(const RSOperators & children); +RSOperatorPtr createUnsupported(const String & content, bool is_not); + + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h new file mode 100644 index 00000000000..d51167e4ce9 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Filter/Unsupported.h @@ -0,0 +1,35 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class Unsupported : public RSOperator +{ + String content; + bool is_not; + +public: + Unsupported(const String & content_) : content(content_), is_not(false) {} + Unsupported(const String & content_, bool is_not_) : content(content_), is_not(is_not_) {} + + String name() override { return "unsupported"; } + String toString() override + { + return R"({"op":")" + name() + // + R"(","content":")" + content + // + R"(","is_not":")" + DB::toString(is_not) + "\"}"; + } + + RSResult roughCheck(const RSCheckParam & /*param*/) override { return Some; } + + RSOperatorPtr applyNot() override { return createUnsupported(content, !is_not); }; +}; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/FilterHelper.h b/dbms/src/Storages/DeltaMerge/FilterHelper.h new file mode 100644 index 00000000000..881cd9c468d --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/FilterHelper.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace DB +{ +namespace DM +{ + +inline RSOperatorPtr withHanleRange(const RSOperatorPtr & filter, HandleRange handle_range) +{ + Attr handle_attr = {EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_TYPE}; + auto left = createGreaterEqual(handle_attr, Field(handle_range.start), -1); + auto right = createLess(handle_attr, Field(handle_range.end), -1); + auto handle_range_op = createAnd({left, right}); + return !filter ? handle_range_op : createAnd({handle_range_op, filter}); +} + +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/CMap.h b/dbms/src/Storages/DeltaMerge/Index/CMap.h new file mode 100644 index 00000000000..aa1bb3a0558 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/CMap.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace DB +{ +namespace DM +{ +class CMap : public EqualIndex +{ + +}; +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/Histogram.h b/dbms/src/Storages/DeltaMerge/Index/Histogram.h new file mode 100644 index 00000000000..72554a3074a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/Histogram.h @@ -0,0 +1,16 @@ +#pragma once + +namespace DB +{ +namespace DM +{ + + +class RSIndexNumber { + +}; + + + +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMax.h b/dbms/src/Storages/DeltaMerge/Index/MinMax.h new file mode 100644 index 00000000000..23db275bf53 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/MinMax.h @@ -0,0 +1,322 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace DM +{ + +struct MIN +{ + static constexpr auto is_min = true; + static constexpr auto name = "min"; +}; + +struct MAX +{ + static constexpr auto is_min = false; + static constexpr auto name = "max"; +}; + +static constexpr size_t NONE_EXIST = std::numeric_limits::max(); + +template +std::pair minmax(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) +{ + const auto & del_mark_data = del_mark.getData(); + + size_t batch_min_idx = NONE_EXIST; + size_t batch_max_idx = NONE_EXIST; + + auto & cast_column = static_cast(column); + for (size_t i = offset; i < offset + limit; ++i) + { + if (!del_mark_data[i]) + { + if (batch_min_idx == NONE_EXIST || cast_column.compareAt(i, batch_min_idx, cast_column, -1) < 0) + batch_min_idx = i; + if (batch_max_idx == NONE_EXIST || cast_column.compareAt(batch_max_idx, i, cast_column, -1) < 0) + batch_max_idx = i; + } + } + + return {batch_min_idx, batch_max_idx}; +} + +template +std::pair minmaxVec(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) +{ + const auto & del_mark_data = del_mark.getData(); + + size_t batch_min_idx = NONE_EXIST; + size_t batch_max_idx = NONE_EXIST; + + auto & col_data = static_cast &>(column).getData(); + for (size_t i = offset; i < offset + limit; ++i) + { + if (!del_mark_data[i]) + { + if (batch_min_idx == NONE_EXIST || col_data[i] < col_data[batch_min_idx]) + batch_min_idx = i; + if (batch_max_idx == NONE_EXIST || col_data[batch_max_idx] < col_data[i]) + batch_max_idx = i; + } + } + + return {batch_min_idx, batch_max_idx}; +} + +struct MinMaxValue; +using MinMaxValuePtr = std::shared_ptr; +using MinMaxValues = std::vector; + +struct MinMaxValue +{ + bool has_value = false; + + explicit MinMaxValue() = default; + explicit MinMaxValue(bool has_value_) : has_value(has_value_) {} + + virtual ~MinMaxValue() = default; + + virtual void merge(const MinMaxValue & other) = 0; + virtual MinMaxValuePtr clone() = 0; + virtual void write(const IDataType & type, WriteBuffer & buf) = 0; + + + virtual RSResult checkEqual(const Field & value, const DataTypePtr & type) = 0; + virtual RSResult checkGreater(const Field & value, const DataTypePtr & type) = 0; + virtual RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) = 0; +}; + +/// Number types. +template +struct MinMaxValueFixed : public MinMaxValue +{ + T min; + T max; + + MinMaxValueFixed() = default; + MinMaxValueFixed(bool has_value_, T min_, T max_) : MinMaxValue(has_value_), min(min_), max(max_) {} + MinMaxValueFixed(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + set(column, del_mark, offset, limit); + } + + void set(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + auto [min_idx, max_idx] = minmaxVec(column, del_mark, offset, limit); + if (min_idx != NONE_EXIST) + { + auto & col_data = static_cast &>(column).getData(); + + has_value = true; + min = col_data[min_idx]; + max = col_data[max_idx]; + } + } + + void merge(const MinMaxValue & other) override + { + auto o = static_cast &>(other); + if (!o.has_value) + return; + else if (!has_value) + { + min = o.min; + max = o.max; + } + else + { + min = std::min(min, o.min); + max = std::max(max, o.max); + } + } + + MinMaxValuePtr clone() override { return std::make_shared(has_value, min, max); } + + void write(const IDataType & type, WriteBuffer & buf) override + { + writePODBinary(has_value, buf); + auto col = type.createColumn(); + auto & col_data = typeid_cast *>(col.get())->getData(); + col_data.push_back(min); + col_data.push_back(max); + type.serializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 0, 2, true, {}); + } + + static MinMaxValuePtr read(const IDataType & type, ReadBuffer & buf) + { + auto v = std::make_shared>(); + readPODBinary(v->has_value, buf); + auto col = type.createColumn(); + auto & col_data = typeid_cast *>(col.get())->getData(); + type.deserializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 2, 0, true, {}); + v->min = col_data[0]; + v->max = col_data[1]; + return v; + } + + // clang-format off + RSResult checkEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkEqual(value, type, min, max); } + RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } + RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } + // clang-format on +}; + +/// String type only. +struct MinMaxValueString : public MinMaxValue +{ + std::string min; + std::string max; + + MinMaxValueString() = default; + MinMaxValueString(bool has_value_, const std::string & min_, const std::string & max_) : MinMaxValue(has_value_), min(min_), max(max_) + { + } + MinMaxValueString(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + set(column, del_mark, offset, limit); + } + + void set(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + auto [min_idx, max_idx] = minmax(column, del_mark, offset, limit); + if (min_idx != NONE_EXIST) + { + auto & cast_column = static_cast(column); + + has_value = true; + min = cast_column.getDataAt(min_idx).toString(); + max = cast_column.getDataAt(max_idx).toString(); + } + } + + void merge(const MinMaxValue & other) override + { + auto o = static_cast(other); + if (!o.has_value) + return; + else if (!has_value) + { + min = o.min; + max = o.max; + } + else + { + min = std::min(min, o.min); + max = std::max(max, o.max); + } + } + + MinMaxValuePtr clone() override { return std::make_shared(has_value, min, max); } + + void write(const IDataType & type, WriteBuffer & buf) override + { + writePODBinary(has_value, buf); + auto col = type.createColumn(); + auto str_col = typeid_cast(col.get()); + str_col->insertData(min.data(), min.size()); + str_col->insertData(max.data(), max.size()); + type.serializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 0, 2, true, {}); + } + + static MinMaxValuePtr read(const IDataType & type, ReadBuffer & buf) + { + auto v = std::make_shared(); + readPODBinary(v->has_value, buf); + auto col = type.createColumn(); + auto str_col = typeid_cast(col.get()); + type.deserializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 2, 0, true, {}); + v->min = str_col->getDataAt(0).toString(); + v->max = str_col->getDataAt(1).toString(); + return v; + } + + // clang-format off + RSResult checkEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkEqual(value, type, min, max); } + RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } + RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } + // clang-format on +}; + +/// Other types. +struct MinMaxValueDataGeneric : public MinMaxValue +{ + Field min; + Field max; + + MinMaxValueDataGeneric() = default; + MinMaxValueDataGeneric(bool has_value_, const Field & min_, const Field & max_) : MinMaxValue(has_value_), min(min_), max(max_) {} + MinMaxValueDataGeneric(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + set(column, del_mark, offset, limit); + } + + void set(const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) + { + auto [min_idx, max_idx] = minmax(column, del_mark, offset, limit); + if (min_idx != NONE_EXIST) + { + has_value = true; + column.get(min_idx, min); + column.get(max_idx, max); + } + } + + void merge(const MinMaxValue & other) override + { + auto o = static_cast(other); + if (!o.has_value) + return; + else if (!has_value) + { + min = o.min; + max = o.max; + } + else + { + min = std::min(min, o.min); + max = std::max(max, o.max); + } + } + + MinMaxValuePtr clone() override { return std::make_shared(has_value, min, max); } + + void write(const IDataType & type, WriteBuffer & buf) override + { + writePODBinary(has_value, buf); + auto col = type.createColumn(); + col->insert(min); + col->insert(max); + type.serializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 0, 2, true, {}); + } + + static MinMaxValuePtr read(const IDataType & type, ReadBuffer & buf) + { + auto v = std::make_shared(); + readPODBinary(v->has_value, buf); + auto col = type.createColumn(); + type.deserializeBinaryBulkWithMultipleStreams(*col, [&](const IDataType::SubstreamPath &) { return &buf; }, 2, 0, true, {}); + col->get(0, v->min); + col->get(1, v->max); + return v; + } + + // clang-format off + RSResult checkEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkEqual(value, type, min, max); } + RSResult checkGreater(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreater(value, type, min, max); } + RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type) override { return RoughCheck::checkGreaterEqual(value, type, min, max); } + // clang-format on +}; + + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp new file mode 100644 index 00000000000..ebf6e73e9d2 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp @@ -0,0 +1,118 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace DM +{ +MinMaxIndex::MinMaxIndex(const IDataType & type, const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit) +{ + const IColumn * column_ptr = &column; + if (column.isColumnNullable()) + { + auto & del_mark_data = del_mark.getData(); + auto & nullable_column = static_cast(column); + auto & null_mark_data = nullable_column.getNullMapColumn().getData(); + column_ptr = &nullable_column.getNestedColumn(); + + for (size_t i = offset; i < offset + limit; ++i) + { + if (!del_mark_data[i] && null_mark_data[i]) + { + has_null = true; + break; + } + } + } + +#define DISPATCH(TYPE) \ + if (typeid_cast(&type)) \ + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); \ + else + + FOR_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + if (typeid_cast(&type)) + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); + else if (typeid_cast(&type)) + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); + else if (typeid_cast(&type)) + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); + else if (typeid_cast *>(&type)) + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); + else if (typeid_cast *>(&type)) + minmax = std::make_shared>(*column_ptr, del_mark, offset, limit); + else if (typeid_cast(&type)) + minmax = std::make_shared(*column_ptr, del_mark, offset, limit); + else + minmax = std::make_shared(*column_ptr, del_mark, offset, limit); +} + +void MinMaxIndex::merge(const MinMaxIndex & other) +{ + has_null |= other.has_null; + minmax->merge(*(other.minmax)); +} + +void MinMaxIndex::write(const IDataType & type, WriteBuffer & buf) +{ + writePODBinary(has_null, buf); + minmax->write(type, buf); +} + +MinMaxIndexPtr MinMaxIndex::read(const IDataType & type, ReadBuffer & buf) +{ + auto v = std::make_shared(); + readPODBinary(v->has_null, buf); +#define DISPATCH(TYPE) \ + if (typeid_cast(&type)) \ + v->minmax = MinMaxValueFixed::read(type, buf); \ + else + FOR_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + if (typeid_cast(&type)) + v->minmax = MinMaxValueFixed::read(type, buf); + else if (typeid_cast(&type)) + v->minmax = MinMaxValueFixed::read(type, buf); + else if (typeid_cast(&type)) + v->minmax = MinMaxValueFixed::read(type, buf); + else if (typeid_cast *>(&type)) + v->minmax = MinMaxValueFixed::read(type, buf); + else if (typeid_cast *>(&type)) + v->minmax = MinMaxValueFixed::read(type, buf); + else if (typeid_cast(&type)) + v->minmax = MinMaxValueString::read(type, buf); + else + v->minmax = MinMaxValueDataGeneric::read(type, buf); + return v; +} + +RSResult MinMaxIndex::checkEqual(const Field & value, const DataTypePtr & type) +{ + if (has_null || value.isNull()) + return Some; + return minmax->checkEqual(value, type); +} +RSResult MinMaxIndex::checkGreater(const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) +{ + if (has_null || value.isNull()) + return Some; + return minmax->checkGreater(value, type); +} +RSResult MinMaxIndex::checkGreaterEqual(const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) +{ + if (has_null || value.isNull()) + return Some; + return minmax->checkGreaterEqual(value, type); +} + +} // namespace DM +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h new file mode 100644 index 00000000000..1cddee40276 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace DM +{ + +class MinMaxIndex; +using MinMaxIndexPtr = std::shared_ptr; +using MinMaxIndexes = std::vector; + +class MinMaxIndex +{ +private: + bool has_null = false; + MinMaxValuePtr minmax; + +public: + MinMaxIndex() = default; + MinMaxIndex(bool has_null_, const MinMaxValuePtr & minmax_) : has_null(has_null_), minmax(minmax_) {} + MinMaxIndex(const MinMaxIndex & other) : has_null(other.has_null), minmax(other.minmax->clone()) {} + MinMaxIndex(const IDataType & type, const IColumn & column, const ColumnVector & del_mark, size_t offset, size_t limit); + + void merge(const MinMaxIndex & other); + + void write(const IDataType & type, WriteBuffer & buf); + static MinMaxIndexPtr read(const IDataType & type, ReadBuffer & buf); + + // TODO: Use has_null and value.isNull to check. + + RSResult checkBetween(const Field & value0, const Field & value1, const DataTypePtr & type, int nan_direction); + RSResult checkEqual(const Field & value, const DataTypePtr & type); + RSResult checkGreater(const Field & value, const DataTypePtr & type, int nan_direction); + RSResult checkGreaterEqual(const Field & value, const DataTypePtr & type, int nan_direction); +}; + + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/RSIndex.h b/dbms/src/Storages/DeltaMerge/Index/RSIndex.h new file mode 100644 index 00000000000..cf0ab22abe8 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/RSIndex.h @@ -0,0 +1,39 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +class EqualIndex; +using EqualIndexPtr = std::shared_ptr; + + +class EqualIndex +{ +public: + virtual ~EqualIndex() = default; +}; + +struct RSIndex +{ + DataTypePtr type; + MinMaxIndexPtr minmax; + EqualIndexPtr equal; + + RSIndex(const DataTypePtr & type_, const MinMaxIndexPtr & minmax_) : type(type_), minmax(minmax_) {} + + RSIndex(const DataTypePtr & type_, const MinMaxIndexPtr & minmax_, const EqualIndexPtr & equal_) + : type(type_), minmax(minmax_), equal(equal_) + { + } +}; + +using ColumnIndexes = std::unordered_map; + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/RSResult.h b/dbms/src/Storages/DeltaMerge/Index/RSResult.h new file mode 100644 index 00000000000..24e235652d3 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/RSResult.h @@ -0,0 +1,67 @@ +#pragma once + +#include + +namespace DB +{ + +namespace DM +{ + +struct Attr +{ + String col_name; + ColId col_id; + DataTypePtr type; +}; +using Atttrs = std::vector; + +enum class RSResult +{ + Unknown = 0, // Not checked yet + Some = 1, // Suspected (but may be empty or full) + None = 2, // Empty + All = 3, // Full +}; + +static constexpr RSResult Unknown = RSResult::Unknown; +static constexpr RSResult Some = RSResult::Some; +static constexpr RSResult None = RSResult::None; +static constexpr RSResult All = RSResult::All; + +inline RSResult operator!(RSResult v) +{ + if (unlikely(v == Unknown)) + throw Exception("Unexpected Unknown"); + if (v == All) + return None; + else if (v == None) + return All; + return v; +} + +inline RSResult operator||(RSResult v0, RSResult v1) +{ + if (unlikely(v0 == Unknown || v1 == Unknown)) + throw Exception("Unexpected Unknown"); + if (v0 == All || v1 == All) + return All; + if (v0 == Some || v1 == Some) + return Some; + return None; +} + +inline RSResult operator&&(RSResult v0, RSResult v1) +{ + if (unlikely(v0 == Unknown || v1 == Unknown)) + throw Exception("Unexpected Unknown"); + if (v0 == None || v1 == None) + return None; + if (v0 == All && v1 == All) + return All; + return Some; +} + +} // namespace DM + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h b/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h new file mode 100644 index 00000000000..ba5c0ebbd30 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ +namespace DM +{ + + +namespace RoughCheck +{ + +static constexpr int TRUE = 1; +static constexpr int FAILED = 0; + +template