Skip to content

Commit

Permalink
[FLASH-575] Improve the performance of stream operations in DM (pingc…
Browse files Browse the repository at this point in the history
…ap#296)

* do handle range filter in

* add status support for DM

* compact continue insert

* bug fix, and temporary split big insert block

* enable split logical

* add store restore log

* fix lock in PageEntriesVersionSetWithDelta::listAllLiveFiles

* disable posix_fadvise in PageStorage

* Update default DM settings

* optimize DMVersionFilterBlockInputStream

* optimize DMVersionFilterBlockInputStream v2

* added some metrics

* add dm_insert_max_rows setting

* Fix compilation error of TiDB.h

* address comment

* fix broken unit tests

* fix unit tests of DeltaTree

* Bug fix: DiskValueSpace::createAppendTask remove data pages by mistake

* remove debug code
  • Loading branch information
flowbehappy authored and zanmato1984 committed Nov 1, 2019
1 parent 4fab794 commit f9bedb3
Show file tree
Hide file tree
Showing 25 changed files with 733 additions and 195 deletions.
11 changes: 6 additions & 5 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ struct Settings
M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\
M(SettingUInt64, delta_merge_size, 10000000, "The delta rows limit in memory. After that delta rows will be flushed.")\
\
M(SettingUInt64, dm_segment_limit_rows, 1048576, "")\
M(SettingUInt64, dm_segment_delta_limit_rows, 104857, "")\
M(SettingUInt64, dm_segment_delta_limit_bytes, 67108864, "")\
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 26214, "")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "")\
M(SettingUInt64, dm_segment_limit_rows, 65536, "Average rows of segments in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_limit_rows, 4096, "Max rows of segment's delta in DeltaMerge Engine ")\
M(SettingUInt64, dm_segment_delta_limit_bytes, 67108864, "Max bytes of segments' delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
146 changes: 121 additions & 25 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,72 +22,158 @@ Block DMVersionFilterBlockInputStream<MODE>::readImpl()
Block cur_raw_block = raw_block;
size_t rows = cur_raw_block.rows();
filter.resize(rows);
size_t i = 0;

i = (rows - 1) / UNROLL_BATCH * UNROLL_BATCH;
const size_t batch_rows = (rows - 1) / UNROLL_BATCH * UNROLL_BATCH;

// The following is trying to unroll the filtering operations,
// so that optimizer could use vectorized optimization.
// The original logic can be seen in #checkWithNextIndex().

if constexpr (MODE == DM_VERSION_FILTER_MODE_MVCC)
{
for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] = (*version_col_data)[n + k + 1] > version_limit;
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data()) + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) = (*version_pos) > version_limit;

++filter_pos;
++version_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] |= (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
UInt8 * filter_pos = filter.data();
auto * handle_pos = const_cast<Handle *>(handle_col_data->data());
auto * next_handle_pos = handle_pos + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) |= (*handle_pos) != (*(next_handle_pos));

++filter_pos;
++handle_pos;
++next_handle_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] &= (*version_col_data)[n + k] <= version_limit;
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) &= (*version_pos) <= version_limit;

++filter_pos;
++version_pos;
}
}

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
filter[n + k] &= !(*delete_col_data)[n + k];
UInt8 * filter_pos = filter.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*filter_pos) &= !(*delete_pos);

++filter_pos;
++delete_pos;
}
}

// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] = (*version_col_data)[n + k + 1] > version_limit;
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] |= (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] &= (*version_col_data)[n + k] <= version_limit;
// }
//
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// filter[n + k] &= !(*delete_col_data)[n + k];
// }
}
else if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT)
{

for (size_t n = 0; n < i; n += UNROLL_BATCH)
{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * handle_pos = const_cast<Handle *>(handle_col_data->data());
auto * next_handle_pos = handle_pos + 1;
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] = (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
(*filter_pos) = (*handle_pos) != (*(next_handle_pos));

++filter_pos;
++handle_pos;
++next_handle_pos;
}
}
for (size_t n = 0; n < i; n += UNROLL_BATCH)

{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] &= !(*delete_col_data)[n + k];
(*filter_pos) &= !(*delete_pos);

++filter_pos;
++delete_pos;
}
}
for (size_t n = 0; n < i; n += UNROLL_BATCH)

{
for (size_t k = 0; k < UNROLL_BATCH; ++k)
UInt8 * filter_pos = filter.data();
auto * version_pos = const_cast<UInt64 *>(version_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
filter[n + k] |= (*version_col_data)[n] >= version_limit;
(*filter_pos) |= (*version_pos) >= version_limit;

++filter_pos;
++version_pos;
}
}

// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] = (*handle_col_data)[n + k] != (*handle_col_data)[n + k + 1];
// }
// }
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] &= !(*delete_col_data)[n + k];
// }
// }
// for (size_t n = 0; n < batch_rows; n += UNROLL_BATCH)
// {
// for (size_t k = 0; k < UNROLL_BATCH; ++k)
// {
// filter[n + k] |= (*version_col_data)[n] >= version_limit;
// }
// }
}
else
{
throw Exception("Unsupported mode");
}


for (; i < rows - 1; ++i)
for (size_t i = batch_rows; i < rows - 1; ++i)
filter[i] = checkWithNextIndex(i);

{
Expand Down Expand Up @@ -137,12 +223,22 @@ Block DMVersionFilterBlockInputStream<MODE>::readImpl()

const size_t passed_count = countBytesInFilter(filter);

++total_blocks;
total_rows += rows;
passed_rows += passed_count;

// This block is empty after filter, continue to process next block
if (passed_count == 0)
{
++complete_not_passed;
continue;
}

if (passed_count == rows)
{
++complete_passed;
return cur_raw_block;
}

for (size_t col_index = 0; col_index < cur_raw_block.columns(); ++col_index)
{
Expand Down
22 changes: 20 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <common/logger_useful.h>

#include <Columns/ColumnsCommon.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
Expand Down Expand Up @@ -27,16 +29,24 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
handle_col_pos(header.getPositionByName(handle_define.name)),
version_col_pos(header.getPositionByName(VERSION_COLUMN_NAME)),
delete_col_pos(header.getPositionByName(TAG_COLUMN_NAME)),
filter(65536)
filter(65536),
log(&Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
{
children.push_back(input);
}

~DMVersionFilterBlockInputStream()
{
LOG_DEBUG(log,
"Pass: " + DB::toString((Float64)passed_rows * 100 / total_rows, 2)
+ "%, complete pass: " + DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
+ "%, complete not pass: " + DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2) + "%");
}

String getName() const override { return "DeltaMergeVersionFilter"; }
Block getHeader() const override { return header; }

protected:

Block readImpl() override;

inline UInt8 checkWithNextIndex(size_t i)
Expand Down Expand Up @@ -113,6 +123,14 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
PaddedPODArray<Handle> const * handle_col_data = nullptr;
PaddedPODArray<UInt64> const * version_col_data = nullptr;
PaddedPODArray<UInt8> const * delete_col_data = nullptr;

size_t total_blocks = 0;
size_t total_rows = 0;
size_t passed_rows = 0;
size_t complete_passed = 0;
size_t complete_not_passed = 0;

Logger * log;
};
} // namespace DM
} // namespace DB
Loading

0 comments on commit f9bedb3

Please sign in to comment.