Skip to content

Commit

Permalink
[FLASH-296] [DM] Use Background Threads To Run Delta Merge Operations (
Browse files Browse the repository at this point in the history
…pingcap#266)

* Support merge delta in background.
* Avoid slow down query after delete range.
* Add a new sql feature manage table tb delete range 100, to test delete range.
  • Loading branch information
flowbehappy authored and JaySon-Huang committed Oct 23, 2019
1 parent d91fdae commit 2c77f37
Show file tree
Hide file tree
Showing 34 changed files with 1,697 additions and 963 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Common/Allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
buf = ::realloc(buf, new_size);

if (nullptr == buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + DB::toString(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + DB::toString(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);

if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ BlockIO InterpreterManageQuery::execute()
manageable_storage->checkStatus(context);
return {};
}
case ManageOperation::Enum ::DeleteRows:
{
manageable_storage->deleteRows(context, ast.rows);
return {};
}
}
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ struct Settings
M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\
M(SettingUInt64, delta_merge_size, 10000000, "The delta rows limit in memory. After that delta rows will be flushed.")\
\
M(SettingUInt64, dm_segment_rows, 1048576, "")\
M(SettingUInt64, dm_segment_limit_rows, 1048576, "")\
M(SettingUInt64, dm_segment_delta_limit_rows, 104857, "")\
M(SettingUInt64, dm_segment_delta_limit_bytes, 67108864, "")\
M(SettingUInt64, dm_segment_delta_cache_limit_rows, 26214, "")\
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Parsers/ASTManageQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ namespace ManageOperation
Flush,
Status,
Check,
DeleteRows,
};

inline const char * toString(UInt64 op)
{
static const char * data[] = {"Flush", "Status", "Check"};
return op < 3 ? data[op] : "Unknown operation";
static const char * data[] = {"Flush", "Status", "Check", "Delete Rows"};
return op < 4 ? data[op] : "Unknown operation";
}
}

Expand All @@ -31,6 +32,8 @@ class ASTManageQuery : public IAST

ManageOperation::Enum operation;

size_t rows = 0;

/** Get the text that identifies this element. */
String getID() const override
{
Expand All @@ -51,6 +54,8 @@ class ASTManageQuery : public IAST
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " "
<< (settings.hilite ? hilite_keyword : "") << ManageOperation::toString(operation)
<< (settings.hilite ? hilite_none : "");
if(operation == ManageOperation::Enum::DeleteRows)
settings.ostr << " " << rows;
}
};
}
3 changes: 2 additions & 1 deletion dbms/src/Parsers/ParserInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_values("VALUES");
ParserKeyword s_format("FORMAT");
ParserKeyword s_select("SELECT");
ParserKeyword s_selraw("SELRAW");
ParserKeyword s_with("WITH");
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
Expand Down Expand Up @@ -137,7 +138,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (data < end && *data == '\n')
++data;
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
else if (s_select.ignore(pos, expected) || s_selraw.ignore(pos, expected) || s_with.ignore(pos,expected))
{
pos = before_select;
ParserSelectWithUnionQuery select_p;
Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Parsers/ParserManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_flush("FLUSH");
ParserKeyword s_status("STATUS");
ParserKeyword s_check("CHECK");
ParserKeyword s_delete_rows("DELETE ROWS");

ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
Expand All @@ -37,25 +38,34 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}

auto query = std::make_shared<ASTManageQuery>();
node = query;

if (database)
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;

if (s_flush.ignore(pos, expected))
operation = ManageOperation::Enum::Flush;
else if (s_status.ignore(pos, expected))
operation = ManageOperation::Enum::Status;
else if (s_check.ignore(pos, expected))
operation = ManageOperation::Enum::Check;
else if (s_delete_rows.ignore(pos, expected))
{
operation = ManageOperation::Enum::DeleteRows;
ParserNumber num;
ASTPtr rows;
if (!num.parse(pos, rows, expected))
return false;
query->rows = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*rows).value);
}
else
return false;

auto query = std::make_shared<ASTManageQuery>();
node = query;

if (database)
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;

query->operation = operation;

return true;
}
}
} // namespace DB
13 changes: 12 additions & 1 deletion dbms/src/Storages/DeltaMerge/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id
}

void serializeChunks(
WriteBuffer & buf, Chunks::const_iterator begin, Chunks ::const_iterator end, const Chunk * extra1, const Chunk * extra2)
WriteBuffer & buf, Chunks::const_iterator begin, Chunks::const_iterator end, const Chunk * extra1, const Chunk * extra2)
{
auto size = (UInt64)(end - begin);
if (extra1)
Expand All @@ -133,6 +133,17 @@ void serializeChunks(
extra2->serialize(buf);
}

void serializeChunks(WriteBuffer & buf, Chunks::const_iterator begin, Chunks ::const_iterator end, const Chunks & extra_chunks)
{
auto size = (UInt64)(end - begin) + extra_chunks.size();
writeIntBinary(size, buf);

for (; begin != end; ++begin)
(*begin).serialize(buf);
for (auto & chunk : extra_chunks)
chunk.serialize(buf);
}

Chunks deserializeChunks(ReadBuffer & buf)
{
Chunks chunks;
Expand Down
16 changes: 11 additions & 5 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Chunk
// Binary version of chunk
using Version = UInt32;
static const Version CURRENT_VERSION;

public:
using ColumnMetaMap = std::unordered_map<ColId, ColumnMeta>;

Expand Down Expand Up @@ -122,11 +123,16 @@ using GenPageId = std::function<PageId()>;
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);

void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks ::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
Chunks::const_iterator end,
const Chunks & extr_chunks);

Chunks deserializeChunks(ReadBuffer & buf);

Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_data_page_id, WriteBatch & wb, const Block & block);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ChunkBlockInputStream final : public IBlockInputStream
const ColumnDefines & read_columns_,
const PageReader & page_reader_,
const RSOperatorPtr & filter)
: chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
: chunks(std::move(chunks_)), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
{
if (filter)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ struct DMContext

const NotCompress & not_compress;

// The rows of segment.
const size_t segment_limit_rows;

// The threshold of delta.
const size_t delta_limit_rows;
const size_t delta_limit_bytes;
Expand Down
18 changes: 11 additions & 7 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
/// If handle_real_type_ is empty, means do not convert handle column back to real type.
DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_,
const SegmentStreamCreator & stream_creator_,
const AfterSegmentRead & after_segment_read_,
const ColumnDefines & columns_to_read_)
: task_pool(task_pool_),
stream_creator(stream_creator_),
after_segment_read(after_segment_read_),
columns_to_read(columns_to_read_),
header(toEmptyBlock(columns_to_read)),
log(&Logger::get("DMSegmentThreadInputStream"))
Expand All @@ -41,9 +43,9 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
return {};
}

cur_segment_id = task->segment->segmentId();
cur_stream = stream_creator(*task);
LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment_id) + "]");
cur_segment = task->segment;
cur_stream = stream_creator(*task);
LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment->segmentId()) + "]");
}

Block res = cur_stream->read();
Expand All @@ -56,9 +58,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
}
else
{
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment_id) + "]");
cur_segment_id = 0;
cur_stream = {};
after_segment_read(cur_segment);
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment->segmentId()) + "]");
cur_segment = {};
cur_stream = {};
}
}
}
Expand All @@ -74,12 +77,13 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
private:
SegmentReadTaskPoolPtr task_pool;
SegmentStreamCreator stream_creator;
AfterSegmentRead after_segment_read;
ColumnDefines columns_to_read;
Block header;

bool done = false;
BlockInputStreamPtr cur_stream;
UInt64 cur_segment_id;
SegmentPtr cur_segment;

Logger * log;
};
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream

if constexpr (!c_delta_done)
{
auto tuple_id = entry_it.getValue();
auto value = entry_it.getValue();
switch (entry_it.getType())
{
case DT_DEL:
writeDeleteFromDelta(1);
writeDeleteFromDelta(value);
break;
case DT_INS:
writeInsertFromDelta(output_columns, tuple_id);
writeInsertFromDelta(output_columns, value);
--output_write_limit;
break;
default:
Expand Down Expand Up @@ -242,15 +242,15 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
if (!stable_input_stream_raw_ptr->hasNext())
throw Exception("Unexpected end of block, need more rows to skip");

size_t rows = stable_input_stream_raw_ptr->nextRows();
if (!stable_input_stream_raw_ptr->shouldSkipNext())
ssize_t rows = stable_input_stream_raw_ptr->nextRows();
if (stable_skip > rows || stable_input_stream_raw_ptr->shouldSkipNext())
{
fillStableBlockIfNeeded();
stable_input_stream_raw_ptr->skipNext();
stable_skip -= rows;
}
else
{
stable_input_stream_raw_ptr->skipNext();
stable_skip -= rows;
fillStableBlockIfNeeded();
}
}

Expand Down Expand Up @@ -328,7 +328,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
{
auto prev_sid = entry_it.getSid();
if (entry_it.getType() == DT_DEL)
prev_sid += 1;
prev_sid += entry_it.getValue();

++entry_it;

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ using ColumnMap = std::unordered_map<ColId, ColumnPtr>;
using MutableColumnMap = std::unordered_map<ColId, MutableColumnPtr>;
using LockGuard = std::lock_guard<std::mutex>;

static const UInt64 INITIAL_EPOCH = 5; // Following TiDB, and I have no idea why 5 is chosen.
static const UInt64 INITIAL_EPOCH = 0;

// TODO maybe we should use those variables instead of macros?
#define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name
Expand Down Expand Up @@ -129,8 +129,9 @@ static constexpr Handle P_INF_HANDLE = MAX_INT64; // Used in range, indicating p
static_assert(static_cast<Int64>(static_cast<UInt64>(MIN_INT64)) == MIN_INT64, "Unsupported compiler!");
static_assert(static_cast<Int64>(static_cast<UInt64>(MAX_INT64)) == MAX_INT64, "Unsupported compiler!");

static constexpr UInt64 DEL_RANGE_POS_MARK = (1ULL << 63);
static constexpr bool DM_RUN_CHECK = true;

#define WARN_UNUSED_RESULT __attribute__((warn_unused_result))

} // namespace DM
} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ DeltaMergeStore::WriteActions prepareWriteActions(const Block &
: std::lower_bound(handle_data.cbegin() + offset, handle_data.cend(), range.end);
size_t limit = end_pos - (handle_data.cbegin() + offset);

actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = offset, .limit = limit});
actions.emplace_back(segment, offset, limit);

offset += limit;
}
Expand All @@ -54,10 +54,7 @@ DeltaMergeStore::WriteActions prepareWriteActions(const HandleRange &
{
(void)handle_;
if (segment->getRange().intersect(delete_range))
{
// TODO maybe more precise on `action.update`
actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = 0, .limit = 0, .update = delete_range});
}
actions.emplace_back(segment, delete_range);
}

return actions;
Expand Down
Loading

0 comments on commit 2c77f37

Please sign in to comment.