Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-755] Fix rows not match after update #346

Merged
merged 3 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ struct Settings
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 4000, "Max rows of cache in segment delta in DeltaMerge Engine")\
M(SettingUInt64, dm_segment_stable_chunk_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable chunk rows in DeltaMerge Engine")\
M(SettingUInt64, dm_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaMerge Engine. By default 0 means no limit.")\
M(SettingBool, dm_enable_rough_set_filter, true, "whether to parse where expression as Rough Set Index filter or not") \
M(SettingBool, dm_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not") \
M(SettingBool, dm_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaMerge Engine.")\
M(SettingBool, dm_enable_logical_split, true, "Enable logical split or not in DeltaMerge Engine.")\
\
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Parsers/ASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ASTInsertQuery : public IAST

bool is_import;
bool is_upsert;
bool is_delete;

/** Get the text that identifies this element. */
String getID() const override { return "InsertQuery_" + database + "_" + table; };
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Parsers/ParserInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
// TODO: support partition in sub query

ParserKeyword s_insert_into("INSERT INTO");
ParserKeyword s_upsert_into("UPSERT INTO");
ParserKeyword s_import_into("IMPORT INTO");
ParserKeyword s_insert_into("INSERT");
ParserKeyword s_upsert_into("UPSERT");
ParserKeyword s_import_into("IMPORT");
ParserKeyword s_delete("DELETE");
ParserKeyword s_into("INTO");
ParserKeyword s_table("TABLE");
ParserKeyword s_partition("PARTITION");
ParserKeyword s_function("FUNCTION");
Expand Down Expand Up @@ -61,6 +63,11 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!is_insert && !is_upsert && !is_import)
return false;

bool is_delete = s_delete.ignore(pos, expected);

if (!s_into.ignore(pos, expected))
return false;

s_table.ignore(pos, expected);

if (s_function.ignore(pos, expected))
Expand Down Expand Up @@ -170,6 +177,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->end = end;
query->is_import = is_import;
query->is_upsert = is_upsert;
query->is_delete = is_delete;

if (columns)
query->children.push_back(columns);
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class StoragePool;
struct DMContext : private boost::noncopyable
{
const Context & db_context;
String store_path;
const String store_path;
const PathPool & extra_paths;
StoragePool & storage_pool;
UInt64 hash_salt;
const UInt64 hash_salt;

// The schema snapshot
// We need a consistent snapshot of columns, copy ColumnsDefines
Expand All @@ -44,19 +44,22 @@ struct DMContext : private boost::noncopyable
// The expected stable chunk rows.
const size_t stable_chunk_rows;

const bool enable_logical_split;

DMContext(const Context & db_context_,
const String & store_path_,
const PathPool & extra_paths_,
StoragePool & storage_pool_,
UInt64 hash_salt_,
const UInt64 hash_salt_,
const ColumnDefines & store_columns_,
const ColumnDefine & handle_column_,
const UInt64 min_version_,
const NotCompress & not_compress_,
const size_t segment_limit_rows_,
const size_t delta_limit_rows_,
const size_t delta_cache_limit_rows_,
const size_t stable_chunk_rows_)
const size_t stable_chunk_rows_,
const bool enable_logical_split_)
: db_context(db_context_),
store_path(store_path_),
extra_paths(extra_paths_),
Expand All @@ -69,7 +72,8 @@ struct DMContext : private boost::noncopyable
segment_limit_rows(segment_limit_rows_),
delta_limit_rows(delta_limit_rows_),
delta_cache_limit_rows(delta_cache_limit_rows_),
stable_chunk_rows(stable_chunk_rows_)
stable_chunk_rows(stable_chunk_rows_),
enable_logical_split(enable_logical_split_)
{
}
};
Expand Down
37 changes: 29 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ template <class DeltaValueSpace, class IndexIterator>
class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<false>
{
static constexpr size_t UNLIMITED = std::numeric_limits<UInt64>::max();
// static constexpr size_t COPY_ROWS_LIMIT = 2048;

private:
using DeltaValueSpacePtr = std::shared_ptr<DeltaValueSpace>;
Expand All @@ -39,13 +38,11 @@ class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<fal
};

SkippableBlockInputStreamPtr stable_input_stream;
// ChunkBlockInputStreamPtr stable_input_stream;
// ChunkBlockInputStream * stable_input_stream_raw_ptr;

// How many rows we need to skip before writing stable rows into output.
// == 0: None
// > 0 : do skip
// < 0 : some rows are filtered out by index, should not write into output.
// > 0 : some rows are ignored by delta tree (index), should not write into output.
// < 0 : some rows are filtered out by stable filter, should not write into output.
ssize_t stable_skip = 0;

DeltaValueSpacePtr delta_value_space;
Expand Down Expand Up @@ -74,6 +71,13 @@ class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<fal
bool stable_done = false;
bool delta_done = false;

// How many times `read` is called.
size_t num_read = 0;

Handle last_handle = N_INF_HANDLE;
size_t last_handle_pos = 0;
size_t last_handle_read_num = 0;

public:
DeltaMergeBlockInputStream(const SkippableBlockInputStreamPtr & stable_input_stream_,
const DeltaValueSpacePtr & delta_value_space_,
Expand Down Expand Up @@ -144,6 +148,7 @@ class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<fal

Block read() override
{
++num_read;
if (finished())
return {};
while (!finished())
Expand Down Expand Up @@ -174,7 +179,25 @@ class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<fal
if (limit == max_block_size)
continue;

return header.cloneWithColumns(std::move(columns));
auto result = header.cloneWithColumns(std::move(columns));
if constexpr (DM_RUN_CHECK)
{
auto & handle_column = toColumnVectorData<Handle>(result.getByPosition(0).column);
for (size_t i = 0; i < handle_column.size(); ++i)
{
if (handle_column[i] < last_handle)
{
throw Exception("DeltaMerge return wrong result, current handle [" + DB::toString(handle_column[i]) + "]@read["
+ DB::toString(num_read) + "]@pos[" + DB::toString(i) + "] is expected >= last handle ["
+ DB::toString(last_handle) + "]@read[" + DB::toString(last_handle_read_num) + "]@pos["
+ DB::toString(last_handle_pos) + "]");
}
last_handle = handle_column[i];
last_handle_pos = i;
last_handle_read_num = num_read;
}
}
return result;
}
return {};
}
Expand Down Expand Up @@ -235,8 +258,6 @@ class DeltaMergeBlockInputStream final : public IBlockInputStream, Allocator<fal
for (size_t i = 0; i < num_columns; ++i)
{
columns[i] = header.safeGetByPosition(i).column->cloneEmpty();
// TODO: Should we do reserve?
// columns[i]->reserve(max_block_size);
}
}

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
db_settings.dm_segment_limit_rows,
db_settings.dm_segment_delta_limit_rows,
db_settings.dm_segment_delta_cache_limit_rows,
db_settings.dm_segment_stable_chunk_rows);
db_settings.dm_segment_stable_chunk_rows,
db_settings.dm_enable_logical_split);
return DMContextPtr(ctx);
}

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,12 +715,18 @@ DeltaValueSpacePtr DiskValueSpace::getValueSpace(const PageReader & page_read
auto & chunk = chunks[chunk_index];
if (chunk.isDeleteRange() || !chunk.getRows())
continue;
#if 0
// FIXME: Disable filter since we need to use all values to build DeltaTree.
auto & handle_meta = chunk.getColumn(EXTRA_HANDLE_COLUMN_ID);
auto [min_handle, max_handle] = handle_meta.minmax->getIntMinMax(0);
if (range.intersect(min_handle, max_handle))
mvs->addBlock(read(read_columns, page_reader, chunk_index), chunk.getRows());
else
mvs->addBlock({}, chunk.getRows());
#else
(void) range;
mvs->addBlock(read(read_columns, page_reader, chunk_index), chunk.getRows());
#endif

already_read_rows += chunk.getRows();
if (already_read_rows >= rows_limit)
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ namespace DM
const Segment::Version Segment::CURRENT_VERSION = 1;
const static size_t SEGMENT_BUFFER_SIZE = 128; // More than enough.

const static bool FORCE_SPLIT_PHYSICAL = false;

DMFilePtr writeIntoNewDMFile(DMContext & dm_context, //
const BlockInputStreamPtr & input_stream,
UInt64 file_id,
Expand Down Expand Up @@ -428,7 +426,9 @@ Segment::split(DMContext & dm_context, const SegmentSnapshot & segment_snap, con

SegmentPair res;

if (FORCE_SPLIT_PHYSICAL || segment_snap.stable->getChunks() <= 3 || segment_snap.delta->num_rows() > segment_snap.stable->getRows())
if (!dm_context.enable_logical_split //
|| segment_snap.stable->getChunks() <= 3 //
|| segment_snap.delta->num_rows() > segment_snap.stable->getRows())
res = doSplitPhysical(dm_context, segment_snap, storage_snap, wbs);
else
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ class Segment : private boost::noncopyable
const PageId next_segment_id;

DiskValueSpacePtr delta;
// DMFilePtr stable;
StableValueSpacePtr stable;

std::atomic_bool is_merge_delta = false;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void StorageDeltaMerge::drop()
}
}

Block StorageDeltaMerge::buildInsertBlock(bool is_import, const Block & old_block)
Block StorageDeltaMerge::buildInsertBlock(bool is_import, bool is_delete, const Block & old_block)
{
Block block = old_block;

Expand Down Expand Up @@ -211,9 +211,10 @@ Block StorageDeltaMerge::buildInsertBlock(bool is_import, const Block & old_bloc
auto column = TAG_COLUMN_TYPE->createColumn();
auto & column_data = typeid_cast<ColumnVector<UInt8> &>(*column).getData();
column_data.resize(rows);
UInt8 tag = is_delete ? 1 : 0;
for (size_t i = 0; i < rows; ++i)
{
column_data[i] = 0;
column_data[i] = tag;
}

addColumnToBlock(block, TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE, std::move(column));
Expand Down Expand Up @@ -280,7 +281,9 @@ class DMBlockOutputStream : public IBlockOutputStream
BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settings & settings)
{
auto & insert_query = typeid_cast<const ASTInsertQuery &>(*query);
BlockDecorator decorator = std::bind(&StorageDeltaMerge::buildInsertBlock, this, insert_query.is_import, std::placeholders::_1);
auto decorator = [&](const Block & block) { //
return this->buildInsertBlock(insert_query.is_import, insert_query.is_delete, block);
};
return std::make_shared<DMBlockOutputStream>(store, decorator, global_context, settings);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class StorageDeltaMerge : public ext::shared_ptr_helper<StorageDeltaMerge>, publ
const ASTPtr & primary_expr_ast_,
Context & global_context_);

Block buildInsertBlock(bool is_import, const Block & block);
Block buildInsertBlock(bool is_import, bool is_delete, const Block & block);

private:
void alterImpl(const AlterCommands & commands,
Expand Down