Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-575] Improve the performance of stream operations in DM #296

Merged
merged 19 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ struct Settings
M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\
M(SettingUInt64, delta_merge_size, 10000000, "The delta rows limit in memory. After that delta rows will be flushed.")\
\
M(SettingUInt64, dm_segment_limit_rows, 1048576, "")\
M(SettingUInt64, dm_segment_delta_limit_rows, 104857, "")\
M(SettingUInt64, dm_segment_delta_limit_bytes, 67108864, "")\
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 26214, "")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "")\
M(SettingUInt64, dm_segment_limit_rows, 65536, "Average rows of segments in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_limit_rows, 4096, "Max rows of segment's delta in DeltaMerge Engine ")\
M(SettingUInt64, dm_segment_delta_limit_bytes, 67108864, "Max bytes of segments' delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
146 changes: 121 additions & 25 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,72 +22,158 @@ Block DMVersionFilterBlockInputStream<MODE>::readImpl()
Block cur_raw_block = raw_block;
size_t rows = cur_raw_block.rows();
filter.resize(rows);
size_t i = 0;

i = (rows - 1) / UNROLL_BATCH * UNROLL_BATCH;
const size_t batch_rows = (rows - 1) / UNROLL_BATCH * UNROLL_BATCH;

// The following is trying to unroll the filtering operations,
// so that optimizer could use vectorized optimization.
// The original logic can be seen in #checkWithNextIndex().

if constexpr (MODE == DM_VERSION_FILTER_MODE_MVCC)
{
for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] = (*version_col_data)[n + k + 1] > version_limit;
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data()) + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) = (*version_pos) > version_limit;

++filter_pos;
++version_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] |= (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
UInt8 * filter_pos = filter.data();
auto * handle_pos = const_cast<Handle *>(handle_col_data->data());
auto * next_handle_pos = handle_pos + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) |= (*handle_pos) != (*(next_handle_pos));

++filter_pos;
++handle_pos;
++next_handle_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] &= (*version_col_data)[n + k] <= version_limit;
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) &= (*version_pos) <= version_limit;

++filter_pos;
++version_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] &= !(*delete_col_data)[n + k];
UInt8 * filter_pos = filter.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) &= !(*delete_pos);

++filter_pos;
++delete_pos;
}
}

// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] = (*version_col_data)[n + k + 1] > version_limit;
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] |= (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] &= (*version_col_data)[n + k] <= version_limit;
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] &= !(*delete_col_data)[n + k];
// }
}
else if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT)
{

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * handle_pos = const_cast<Handle *>(handle_col_data->data());
auto * next_handle_pos = handle_pos + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] = (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
(*filter_pos) = (*handle_pos) != (*(next_handle_pos));

++filter_pos;
++handle_pos;
++next_handle_pos;
}
}
for (size_t n = 0; n < i; n += UNROLL_BATCH)

{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] &= !(*delete_col_data)[n + k];
(*filter_pos) &= !(*delete_pos);

++filter_pos;
++delete_pos;
}
}
for (size_t n = 0; n < i; n += UNROLL_BATCH)

{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] |= (*version_col_data)[n] >= version_limit;
(*filter_pos) |= (*version_pos) >= version_limit;

++filter_pos;
++version_pos;
}
}

// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] = (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
// }
// }
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] &= !(*delete_col_data)[n + k];
// }
// }
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] |= (*version_col_data)[n] >= version_limit;
// }
// }
}
else
{
throw Exception("Unsupported mode");
}


for (; i < rows - 1; ++i)
for (size_t i = batch_rows; i < rows - 1; ++i)
filter[i] = checkWithNextIndex(i);

{
Expand Down Expand Up @@ -137,12 +223,22 @@ Block DMVersionFilterBlockInputStream<MODE>::readImpl()

const size_t passed_count = countBytesInFilter(filter);

++total_blocks;
total_rows += rows;
passed_rows += passed_count;

// This block is empty after filter, continue to process next block
if (passed_count == 0)
{
++complete_not_passed;
continue;
}

if (passed_count == rows)
{
++complete_passed;
return cur_raw_block;
}

for (size_t col_index = 0; col_index < cur_raw_block.columns(); ++col_index)
{
Expand Down
22 changes: 20 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <common/logger_useful.h>

#include <Columns/ColumnsCommon.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
Expand Down Expand Up @@ -27,16 +29,24 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
handle_col_pos(header.getPositionByName(handle_define.name)),
version_col_pos(header.getPositionByName(VERSION_COLUMN_NAME)),
delete_col_pos(header.getPositionByName(TAG_COLUMN_NAME)),
filter(65536)
filter(65536),
log(&Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
{
children.push_back(input);
}

~DMVersionFilterBlockInputStream()
{
LOG_DEBUG(log,
"Pass: " + DB::toString((Float64)passed_rows * 100 / total_rows, 2)
+ "%, complete pass: " + DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
+ "%, complete not pass: " + DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2) + "%");
}

String getName() const override { return "DeltaMergeVersionFilter"; }
Block getHeader() const override { return header; }

protected:

Block readImpl() override;

inline UInt8 checkWithNextIndex(size_t i)
Expand Down Expand Up @@ -113,6 +123,14 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
PaddedPODArray<Handle> const * handle_col_data = nullptr;
PaddedPODArray<UInt64> const * version_col_data = nullptr;
PaddedPODArray<UInt8> const * delete_col_data = nullptr;

size_t total_blocks = 0;
size_t total_rows = 0;
size_t passed_rows = 0;
size_t complete_passed = 0;
size_t complete_not_passed = 0;

Logger * log;
};
} // namespace DM
} // namespace DB
Loading