Skip to content

Commit

Permalink
[FLASH-668/477] Predicator push down (#329)
Browse files Browse the repository at this point in the history
* simple parser

* multiple conditions with operator and by default

* add more debugging info

* [FLASH-477] Make chunks in stable not overlap

* Add option dm_enable_rough_set_filter

* A quick impl for rough set filter ast parser

* Fix bug: compare Date/DateTime to String in rough set filter

* Turn off verbose logging info in restoreSegment

* Add event tracking

* Add more debugging info for ast

* address comments

* fix compile error under Mac OSX

* use dynamic_cast instead of static_cast
  • Loading branch information
JaySon-Huang authored Nov 26, 2019
1 parent 60dc6f1 commit 3484309
Show file tree
Hide file tree
Showing 30 changed files with 1,651 additions and 68 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Index)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter)
add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/Transaction)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@
M(DMSegmentMergeNS) \
M(DMFlushDeltaCache) \
M(DMFlushDeltaCacheNS) \
M(DMWriteChunksWriteRows) \
M(DMWriteChunksCopyRows) \


namespace ProfileEvents
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ struct Settings
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 1024, "Max rows of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_delta_cache_limit_bytes, 16777216, "Max bytes of cache in segment's delta in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default '0' means no limit.")\
M(SettingBool, dm_enable_rough_set_filter, true, "whether to parse where expression as Rough Set Index filter or not") \
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@
<kvstore_path>/var/lib/clickhouse/kvstore</kvstore_path>
<regmap>/var/lib/clickhouse/regmap</regmap>
<pd_addr>http://127.0.0.1:13579</pd_addr>
<!--specify what engine we use. tmt or dm -->
<storage_engine>tmt</storage_engine>
<disable_bg_flush>false</disable_bg_flush>
</raft>

Expand Down
22 changes: 15 additions & 7 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ class Chunk
void serialize(WriteBuffer & buf) const;
static Chunk deserialize(ReadBuffer & buf);

String info() const
{
if (likely(!is_delete_range))
return "Chunk[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + "]";
else
return "DeleteRange[" + DB::toString(handle_start) + "," + DB::toString(handle_end) + ")";
}

private:
Handle handle_start;
Handle handle_end;
Expand All @@ -123,15 +131,15 @@ using GenPageId = std::function<PageId()>;
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);

void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunks & extr_chunks);
const Chunks & extr_chunks);

Chunks deserializeChunks(ReadBuffer & buf);

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class ChunkBlockInputStream final : public IBlockInputStream
}
}

~ChunkBlockInputStream()
{
size_t num_skipped = 0;
for (const auto & is_skip : skip_chunks)
num_skipped += is_skip;

LOG_TRACE(&Logger::get("ChunkBlockInputStream"), String("Skip: ") << num_skipped << " / " << chunks.size() << " chunks");
}

String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

Expand Down
16 changes: 1 addition & 15 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream

bool initNextBlock()
{
raw_block = readNextBlock();
raw_block = ::DB::DM::readNextBlock(children.back());
if (!raw_block)
{
handle_col_data = nullptr;
Expand All @@ -94,20 +94,6 @@ class DMVersionFilterBlockInputStream : public IProfilingBlockInputStream
}
}

/// This method guarantees that the returned valid block is not empty.
Block readNextBlock()
{
while (true)
{
Block res = children.back()->read();
if (!res)
return {};
if (!res.rows())
continue;
return res;
}
}

private:
UInt64 version_limit;
Block header;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/typeid_cast.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsConversion.h>
Expand Down Expand Up @@ -123,7 +124,12 @@ inline PaddedPODArray<T> const * getColumnVectorDataPtr(const Block & block, siz
return toColumnVectorDataPtr<T>(block.getByPosition(pos).column);
}

inline void addColumnToBlock(Block & block, ColId col_id, const String & col_name, const DataTypePtr & col_type, const ColumnPtr & col, const Field & default_value = Field())
inline void addColumnToBlock(Block & block,
ColId col_id,
const String & col_name,
const DataTypePtr & col_type,
const ColumnPtr & col,
const Field & default_value = Field())
{
ColumnWithTypeAndName column(col, col_type, col_name, col_id, default_value);
block.insert(std::move(column));
Expand All @@ -137,6 +143,20 @@ inline Block toEmptyBlock(const ColumnDefines & columns)
return block;
}

/// This method guarantees that the returned valid block is not empty.
inline Block readNextBlock(const BlockInputStreamPtr & in)
{
while (true)
{
Block res = in->read();
if (!res)
return Block{};
if (!res.rows())
continue;
return res;
}
}

inline void convertColumn(Block & block, size_t pos, const DataTypePtr & to_type, const Context & context)
{
const IDataType * to_type_ptr = to_type.get();
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
SegmentReadTasks tasks;
Expand Down Expand Up @@ -417,7 +418,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
task.read_snapshot,
*storage_snapshot,
task.ranges,
{},
filter,
max_version,
std::max(expected_block_size, STABLE_CHUNK_ROWS));
};
Expand Down Expand Up @@ -995,7 +996,7 @@ inline void setColumnDefineDefaultValue(const AlterCommand & command, ColumnDefi
time_t time = 0;
ReadBufferFromMemory buf(date.data(), date.size());
readDateTimeText(time, buf);
return toField(time);
return toField((Int64)time);
}
case TypeIndex::Decimal32:
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class DeltaMergeStore : private boost::noncopyable
const HandleRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
const RSOperatorPtr & filter,
size_t expected_block_size = STABLE_CHUNK_ROWS);

/// Force flush all data to disk.
Expand Down
124 changes: 117 additions & 7 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace ProfileEvents
{
extern const Event DMFlushDeltaCache;
extern const Event DMFlushDeltaCacheNS;
extern const Event DMWriteChunksWriteRows;
extern const Event DMWriteChunksCopyRows;
} // namespace ProfileEvents

namespace DB
Expand Down Expand Up @@ -265,19 +267,127 @@ DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, con
return {};
}

Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb)
namespace
{
size_t findCutOffsetInNextBlock(const Block & cur_block, const Block & next_block, const String & pk_column_name)
{
assert(cur_block);
if (!next_block)
return 0;

auto cur_col = cur_block.getByName(pk_column_name).column;
const Int64 last_curr_pk = cur_col->getInt(cur_col->size() - 1);
auto next_col = next_block.getByName(pk_column_name).column;
size_t cut_offset = 0;
for (/* */; cut_offset < next_col->size(); ++cut_offset)
{
const Int64 next_pk = next_col->getInt(cut_offset);
if (next_pk != last_curr_pk)
{
if constexpr (DM_RUN_CHECK)
{
if (unlikely(next_pk < last_curr_pk))
throw Exception("InputStream is not sorted, pk in next block is smaller than current block: " + toString(next_pk)
+ " < " + toString(last_curr_pk),
ErrorCodes::LOGICAL_ERROR);
}
break;
}
}
return cut_offset;
}
} // namespace

Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb)
{
const String & pk_column_name = context.dm_context.handle_column.name;
if constexpr (DM_RUN_CHECK)
{
// Sanity check for existence of pk column
assert(EXTRA_HANDLE_COLUMN_TYPE->equals(*DataTypeFactory::instance().get("Int64")));
Block header = sorted_input_stream->getHeader();
if (!header.has(pk_column_name))
{
throw Exception("Try to write block to Chunk without pk column", ErrorCodes::LOGICAL_ERROR);
}
}

// TODO: investigate which way is better for scan: written by chunks vs written by columns.
Chunks chunks;
Block cur_block = ::DB::DM::readNextBlock(sorted_input_stream);
Block next_block;
while (true)
{
Block block = input_stream->read();
if (!block)
if (!cur_block)
break;
if (!block.rows())
continue;
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, block);
chunks.push_back(std::move(chunk));

next_block = ::DB::DM::readNextBlock(sorted_input_stream);

const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_name);
if (cut_offset != 0)
{
const size_t next_block_nrows = next_block.rows();
for (size_t col_idx = 0; col_idx != cur_block.columns(); ++col_idx)
{
auto & cur_col_with_name = cur_block.getByPosition(col_idx);
auto & next_col_with_name = next_block.getByPosition(col_idx);
auto * cur_col_raw = const_cast<IColumn *>(cur_col_with_name.column.get());
cur_col_raw->insertRangeFrom(*next_col_with_name.column, 0, cut_offset);
if (cut_offset != next_block_nrows)
{
// TODO: we can track the valid range instead of copying data.
size_t nrows_to_copy = next_block_nrows - cut_offset;
ProfileEvents::increment(ProfileEvents::DMWriteChunksCopyRows, nrows_to_copy);
// Pop front `cut_offset` elems from `next_col_with_name`
assert(next_block_nrows == next_col_with_name.column->size());
MutableColumnPtr cutted_next_column = next_col_with_name.column->cloneEmpty();
cutted_next_column->insertRangeFrom(*next_col_with_name.column, cut_offset, nrows_to_copy);
next_col_with_name.column = cutted_next_column->getPtr();
}
}
if (cut_offset != next_block_nrows)
{
// We merge some rows to `cur_block`, make it as a chunk.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
// else we merge all rows from `next_block` to `cur_block`, continue to check if we should merge more blocks.
}
else
{
// There is no pk overlap between `cur_block` and `next_block`, just write `cur_block`.
Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wb, cur_block);
ProfileEvents::increment(ProfileEvents::DMWriteChunksWriteRows, chunk.getRows());
chunks.emplace_back(std::move(chunk));
cur_block = next_block;
}
}
if constexpr (DM_RUN_CHECK)
{
// Sanity check
if (chunks.size() > 1)
{
for (size_t i = 1; i < chunks.size(); ++i)
{
const Chunk & prev = chunks[i - 1];
const Chunk & curr = chunks[i];
if (prev.isDeleteRange() || curr.isDeleteRange())
{
throw Exception("Unexpected DeleteRange in stable inputstream. prev:" + prev.info() + " curr: " + curr.info(),
ErrorCodes::LOGICAL_ERROR);
}

const HandlePair prev_handle = prev.getHandleFirstLast();
const HandlePair curr_handle = curr.getHandleFirstLast();
// pk should be increasing and no overlap between chunks
if (prev_handle.second >= curr_handle.first)
{
throw Exception("Overlap chunks between " + prev.info() + " and " + curr.info(), ErrorCodes::LOGICAL_ERROR);
}
}
}
}
return chunks;
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ class DiskValueSpace
AppendTaskPtr createAppendTask(const OpContext & context, WriteBatches & wbs, const BlockOrDelete & update) const;
DiskValueSpacePtr applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update);

/// Write the blocks from input_stream into underlying storage, the returned chunks can be added to
/// Write the blocks from sorted_input_stream into underlying storage, the returned chunks can be added to
/// specified value space instance by #setChunks or #appendChunkWithCache later.
static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream, WriteBatch & wb);
static Chunks writeChunks(const OpContext & context, const BlockInputStreamPtr & sorted_input_stream, WriteBatch & wb);

static Chunk writeDelete(const OpContext & context, const HandleRange & delete_range);

Expand Down Expand Up @@ -186,6 +186,7 @@ class DiskValueSpace

DiskValueSpacePtr tryFlushCache(const OpContext & context, WriteBatch & remove_data_wb, bool force = false);

// TODO: getInputStream can be removed
ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const;

MutableColumnMap cloneCache();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RSOperatorPtr createNotEqual(const Attr & attr, const Field & value)
RSOperatorPtr createNotIn(const Attr & attr, const Fields & values) { return std::make_shared<NotIn>(attr, values); }
RSOperatorPtr createNotLike(const Attr & attr, const Field & value) { return std::make_shared<NotLike>(attr, value); }
RSOperatorPtr createOr(const RSOperators & children) { return std::make_shared<Or>(children); }
RSOperatorPtr createUnsupported(const String & content, bool is_not) { return std::make_shared<Unsupported>(content, is_not); }
RSOperatorPtr createUnsupported(const String & content, const String & reason, bool is_not) { return std::make_shared<Unsupported>(content, reason, is_not); }
// clang-format on
} // namespace DM
} // namespace DB
Loading

0 comments on commit 3484309

Please sign in to comment.