From bf87a3a02861c9737c75ec45b2427485cfa06e68 Mon Sep 17 00:00:00 2001 From: JaySon Date: Tue, 26 Nov 2019 21:21:47 -0600 Subject: [PATCH] add ReorganizeBlockInputStream for reorganize the boundary of blocks (#330) --- dbms/src/Common/ProfileEvents.cpp | 2 - .../Storages/DeltaMerge/DiskValueSpace.cpp | 102 ++------------- .../DeltaMerge/ReorganizeBlockInputStream.h | 119 ++++++++++++++++++ 3 files changed, 129 insertions(+), 94 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/ReorganizeBlockInputStream.h diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index d9642007deb..53edcb41565 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -197,8 +197,6 @@ M(DMSegmentMergeNS) \ M(DMFlushDeltaCache) \ M(DMFlushDeltaCacheNS) \ - M(DMWriteChunksWriteRows) \ - M(DMWriteChunksCopyRows) \ namespace ProfileEvents diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 0c361b51927..f29a62bba78 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -14,8 +15,6 @@ namespace ProfileEvents { extern const Event DMFlushDeltaCache; extern const Event DMFlushDeltaCacheNS; -extern const Event DMWriteChunksWriteRows; -extern const Event DMWriteChunksCopyRows; } // namespace ProfileEvents namespace DB @@ -267,106 +266,25 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con return {}; } -namespace -{ -size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name) -{ - assert(cur_block); - if (!next_block) - return 0; - - auto cur_col = cur_block.getByName(pk_column_name).column; - const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1); - auto next_col = next_block.getByName(pk_column_name).column; - size_t cut_offset = 0; - for (/* */; cut_offset < next_col->size(); ++cut_offset) - { - const Int64 next_pk = next_col->getInt(cut_offset); - if (next_pk != last_curr_pk) - { - if constexpr (DM_RUN_CHECK) - { - if (unlikely(next_pk < last_curr_pk)) - throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk) - + " < " + toString(last_curr_pk), - ErrorCodes::LOGICAL_ERROR); - } - break; - } - } - return cut_offset; -} -} // namespace - Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb) { - const String & pk_column_name = context.dm_context.handle_column.name; - if constexpr (DM_RUN_CHECK) - { - // Sanity check for existence of pk column - assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64"))); - Block header = sorted_input_stream->getHeader(); - if (!header.has(pk_column_name)) - { - throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR); - } - } + // Cleanning up the boundary between blocks. + const String & pk_column_name = context.dm_context.handle_column.name; + ReorganizeBlockInputStream stream(sorted_input_stream, pk_column_name); - // TODO: investigate which way is better for scan: written by chunks vs written by columns. Chunks chunks; - Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream); - Block next_block; while (true) { - if (!cur_block) + Block block = stream.read(); + if (!block) break; - - next_block = ::DB::DM::readNextBlock(sorted_input_stream); - - const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name); - if (cut_offset != 0) - { - const size_t next_block_nrows = next_block.rows(); - for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx) - { - auto & cur_col_with_name = cur_block.getByPosition(col_idx); - auto & next_col_with_name = next_block.getByPosition(col_idx); - auto * cur_col_raw = const_cast(cur_col_with_name.column.get()); - cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset); - if (cut_offset != next_block_nrows) - { - // TODO: we can track the valid range instead of copying data. - size_t nrows_to_copy = next_block_nrows - cut_offset; - ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy); - // Pop front `cut_offset` elems from `next_col_with_name` - assert(next_block_nrows == next_col_with_name.column->size()); - MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty(); - cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy); - next_col_with_name.column = cutted_next_column->getPtr(); - } - } - if (cut_offset != next_block_nrows) - { - // We merge some rows to `cur_block`, make it as a chunk. - Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); - ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows()); - chunks.emplace_back(std::move(chunk)); - cur_block = next_block; - } - // else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks. - } - else - { - // There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`. - Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block); - ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows()); - chunks.emplace_back(std::move(chunk)); - cur_block = next_block; - } + Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block); + chunks.push_back(std::move(chunk)); } + if constexpr (DM_RUN_CHECK) { - // Sanity check + // Sanity check the boundary between different chunks is not overlap if (chunks.size() > 1) { for (size_t i = 1; i < chunks.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/ReorganizeBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ReorganizeBlockInputStream.h new file mode 100644 index 00000000000..558dcc0841d --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/ReorganizeBlockInputStream.h @@ -0,0 +1,119 @@ +#pragma once + +#include +#include + +namespace DB +{ +namespace DM +{ + +/// Reorganize the boundary of blocks. +/// Note that child must be a sorted input stream with increasing pk column. +class ReorganizeBlockInputStream final : public IBlockInputStream +{ +public: + ReorganizeBlockInputStream(BlockInputStreamPtr child, String pk_column_name_) + : sorted_input_stream(std::move(child)), pk_column_name(std::move(pk_column_name_)) + { + assert(sorted_input_stream != nullptr); + cur_block = {}; + next_block = ::DB::DM::readNextBlock(sorted_input_stream); + if constexpr (DM_RUN_CHECK) + { + // Sanity check for existence of pk column + Block header = sorted_input_stream->getHeader(); + if (!header.has(pk_column_name)) + { + throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR); + } + } + } + + String getName() const override { return "ReorganizeBlockBoundary"; } + Block getHeader() const override { return sorted_input_stream->getHeader(); } + + Block read() override + { + cur_block = next_block; + if (!cur_block) + return cur_block; + + while (true) + { + next_block = ::DB::DM::readNextBlock(sorted_input_stream); + + const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name); + if (unlikely(cut_offset == 0)) + // There is no pk overlap between `cur_block` and `next_block`, or `next_block` is empty, just return `cur_block`. + return cur_block; + else + { + const size_t next_block_nrows = next_block.rows(); + for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx) + { + auto & cur_col_with_name = cur_block.getByPosition(col_idx); + auto & next_col_with_name = next_block.getByPosition(col_idx); + auto * cur_col_raw = const_cast(cur_col_with_name.column.get()); + cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset); + + if (cut_offset != next_block_nrows) + { + // TODO: we can track the valid range instead of copying data. + size_t nrows_to_copy = next_block_nrows - cut_offset; + // Pop front `cut_offset` elems from `next_col_with_name` + assert(next_block_nrows == next_col_with_name.column->size()); + MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty(); + cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy); + next_col_with_name.column = cutted_next_column->getPtr(); + } + } + if (cut_offset != next_block_nrows) + { + // We merge some rows to `cur_block`, return it. + return cur_block; + } + // else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks. + } + } + } + +private: + static size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name) + { + assert(cur_block); + if (!next_block) + return 0; + + auto cur_col = cur_block.getByName(pk_column_name).column; + const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1); + auto next_col = next_block.getByName(pk_column_name).column; + size_t cut_offset = 0; + for (/* */; cut_offset < next_col->size(); ++cut_offset) + { + const Int64 next_pk = next_col->getInt(cut_offset); + if (next_pk != last_curr_pk) + { + if constexpr (DM_RUN_CHECK) + { + if (unlikely(next_pk < last_curr_pk)) + throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk) + + " < " + toString(last_curr_pk), + ErrorCodes::LOGICAL_ERROR); + } + break; + } + } + return cut_offset; + } + +private: + BlockInputStreamPtr sorted_input_stream; + const String pk_column_name; + + Block cur_block; + Block next_block; +}; + +} // namespace DM +} // namespace DB