Skip to content

Commit

Permalink
add ReorganizeBlockInputStream for reorganize the boundary of blocks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored and flowbehappy committed Nov 27, 2019
1 parent 3484309 commit bf87a3a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 94 deletions.
2 changes: 0 additions & 2 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@
M(DMSegmentMergeNS) \
M(DMFlushDeltaCache) \
M(DMFlushDeltaCacheNS) \
M(DMWriteChunksWriteRows) \
M(DMWriteChunksCopyRows) \


namespace ProfileEvents
Expand Down
102 changes: 10 additions & 92 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DiskValueSpace.h>
#include <Storages/DeltaMerge/ReorganizeBlockInputStream.h>
#include <Storages/Page/PageStorage.h>

#include <IO/WriteHelpers.h>
Expand All @@ -14,8 +15,6 @@ namespace ProfileEvents
{
extern const Event DMFlushDeltaCache;
extern const Event DMFlushDeltaCacheNS;
extern const Event DMWriteChunksWriteRows;
extern const Event DMWriteChunksCopyRows;
} // namespace ProfileEvents

namespace DB
Expand Down Expand Up @@ -267,106 +266,25 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con
return {};
}

namespace
{
size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name)
{
assert(cur_block);
if (!next_block)
return 0;

auto cur_col = cur_block.getByName(pk_column_name).column;
const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1);
auto next_col = next_block.getByName(pk_column_name).column;
size_t cut_offset = 0;
for (/* */; cut_offset < next_col->size(); ++cut_offset)
{
const Int64 next_pk = next_col->getInt(cut_offset);
if (next_pk != last_curr_pk)
{
if constexpr (DM_RUN_CHECK)
{
if (unlikely(next_pk < last_curr_pk))
throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk)
+ " < " + toString(last_curr_pk),
ErrorCodes::LOGICAL_ERROR);
}
break;
}
}
return cut_offset;
}
} // namespace

Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb)
{
const String & pk_column_name = context.dm_context.handle_column.name;
if constexpr (DM_RUN_CHECK)
{
// Sanity check for existence of pk column
assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64")));
Block header = sorted_input_stream->getHeader();
if (!header.has(pk_column_name))
{
throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR);
}
}
// Cleanning up the boundary between blocks.
const String & pk_column_name = context.dm_context.handle_column.name;
ReorganizeBlockInputStream stream(sorted_input_stream, pk_column_name);

// TODO: investigate which way is better for scan: written by chunks vs written by columns.
Chunks chunks;
Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream);
Block next_block;
while (true)
{
if (!cur_block)
Block block = stream.read();
if (!block)
break;

next_block = ::DB::DM::readNextBlock(sorted_input_stream);

const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name);
if (cut_offset != 0)
{
const size_t next_block_nrows = next_block.rows();
for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx)
{
auto & cur_col_with_name = cur_block.getByPosition(col_idx);
auto & next_col_with_name = next_block.getByPosition(col_idx);
auto * cur_col_raw = const_cast<IColumn *>(cur_col_with_name.column.get());
cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset);
if (cut_offset != next_block_nrows)
{
// TODO: we can track the valid range instead of copying data.
size_t nrows_to_copy = next_block_nrows - cut_offset;
ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy);
// Pop front `cut_offset` elems from `next_col_with_name`
assert(next_block_nrows == next_col_with_name.column->size());
MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty();
cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy);
next_col_with_name.column = cutted_next_column->getPtr();
}
}
if (cut_offset != next_block_nrows)
{
// We merge some rows to `cur_block`, make it as a chunk.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
// else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks.
}
else
{
// There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block);
chunks.push_back(std::move(chunk));
}

if constexpr (DM_RUN_CHECK)
{
// Sanity check
// Sanity check the boundary between different chunks is not overlap
if (chunks.size() > 1)
{
for (size_t i = 1; i < chunks.size(); ++i)
Expand Down
119 changes: 119 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReorganizeBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>

namespace DB
{
namespace DM
{

/// Reorganize the boundary of blocks.
/// Note that child must be a sorted input stream with increasing pk column.
class ReorganizeBlockInputStream final : public IBlockInputStream
{
public:
ReorganizeBlockInputStream(BlockInputStreamPtr child, String pk_column_name_)
: sorted_input_stream(std::move(child)), pk_column_name(std::move(pk_column_name_))
{
assert(sorted_input_stream != nullptr);
cur_block = {};
next_block = ::DB::DM::readNextBlock(sorted_input_stream);
if constexpr (DM_RUN_CHECK)
{
// Sanity check for existence of pk column
Block header = sorted_input_stream->getHeader();
if (!header.has(pk_column_name))
{
throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR);
}
}
}

String getName() const override { return "ReorganizeBlockBoundary"; }
Block getHeader() const override { return sorted_input_stream->getHeader(); }

Block read() override
{
cur_block = next_block;
if (!cur_block)
return cur_block;

while (true)
{
next_block = ::DB::DM::readNextBlock(sorted_input_stream);

const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name);
if (unlikely(cut_offset == 0))
// There is no pk overlap between `cur_block` and `next_block`, or `next_block` is empty, just return `cur_block`.
return cur_block;
else
{
const size_t next_block_nrows = next_block.rows();
for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx)
{
auto & cur_col_with_name = cur_block.getByPosition(col_idx);
auto & next_col_with_name = next_block.getByPosition(col_idx);
auto * cur_col_raw = const_cast<IColumn *>(cur_col_with_name.column.get());
cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset);

if (cut_offset != next_block_nrows)
{
// TODO: we can track the valid range instead of copying data.
size_t nrows_to_copy = next_block_nrows - cut_offset;
// Pop front `cut_offset` elems from `next_col_with_name`
assert(next_block_nrows == next_col_with_name.column->size());
MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty();
cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy);
next_col_with_name.column = cutted_next_column->getPtr();
}
}
if (cut_offset != next_block_nrows)
{
// We merge some rows to `cur_block`, return it.
return cur_block;
}
// else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks.
}
}
}

private:
static size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name)
{
assert(cur_block);
if (!next_block)
return 0;

auto cur_col = cur_block.getByName(pk_column_name).column;
const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1);
auto next_col = next_block.getByName(pk_column_name).column;
size_t cut_offset = 0;
for (/* */; cut_offset < next_col->size(); ++cut_offset)
{
const Int64 next_pk = next_col->getInt(cut_offset);
if (next_pk != last_curr_pk)
{
if constexpr (DM_RUN_CHECK)
{
if (unlikely(next_pk < last_curr_pk))
throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk)
+ " < " + toString(last_curr_pk),
ErrorCodes::LOGICAL_ERROR);
}
break;
}
}
return cut_offset;
}

private:
BlockInputStreamPtr sorted_input_stream;
const String pk_column_name;

Block cur_block;
Block next_block;
};

} // namespace DM
} // namespace DB

0 comments on commit bf87a3a

Please sign in to comment.