From 03194a219caf64f7faf92fa4a0949a0e6293bb58 Mon Sep 17 00:00:00 2001 From: Flowyi Date: Tue, 27 Aug 2019 15:26:57 +0800 Subject: [PATCH] Atomic read/write on DeltaMergeStore level (#172) * Atomic read/write on DeltaMergeStore level * Bug fix * bugfix 2 * bug fix * fix compile error * remove useless code * address comments * fix compile error --- dbms/src/Common/EventRecorder.h | 9 +- dbms/src/Common/ProfileEvents.cpp | 10 +- dbms/src/Storages/DeltaMerge/Chunk.cpp | 8 +- dbms/src/Storages/DeltaMerge/Chunk.h | 5 +- .../DeltaMerge/ChunkBlockInputStream.h | 8 +- .../DeltaMerge/DMSegmentThreadInputStream.h | 15 +- dbms/src/Storages/DeltaMerge/DeltaMerge.h | 9 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 153 ++++-- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 8 +- dbms/src/Storages/DeltaMerge/DeltaPlace.h | 7 +- .../Storages/DeltaMerge/DiskValueSpace.cpp | 94 ++-- dbms/src/Storages/DeltaMerge/DiskValueSpace.h | 49 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 456 ++++++++++-------- dbms/src/Storages/DeltaMerge/Segment.h | 114 +++-- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 40 +- dbms/src/Storages/DeltaMerge/StoragePool.h | 21 +- .../tests/gtest_dm_disk_value_space.cpp | 7 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 26 +- dbms/src/Storages/Page/PageStorage.cpp | 37 +- dbms/src/Storages/Page/PageStorage.h | 45 +- dbms/src/Storages/StorageDeltaMerge.cpp | 6 + dbms/src/Storages/StorageDeltaMerge.h | 2 + dbms/src/Storages/Transaction/Datum.h | 2 + dbms/src/Storages/Transaction/ExtraCFData.h | 2 + 24 files changed, 676 insertions(+), 457 deletions(-) diff --git a/dbms/src/Common/EventRecorder.h b/dbms/src/Common/EventRecorder.h index b95b3536182..f203bcb7d03 100644 --- a/dbms/src/Common/EventRecorder.h +++ b/dbms/src/Common/EventRecorder.h @@ -3,7 +3,7 @@ #include #include - +/// This class is NOT multi-threads safe! class EventRecorder { public: @@ -12,8 +12,14 @@ class EventRecorder watch.start(); } + ~EventRecorder() + { + if (!done) submit(); + } + inline void submit() { + done = true; ProfileEvents::increment(event); ProfileEvents::increment(event_elapsed, watch.elapsed()); } @@ -23,4 +29,5 @@ class EventRecorder ProfileEvents::Event event_elapsed; Stopwatch watch; + bool done = false; }; \ No newline at end of file diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index 6bc34ce02b3..08e3e1b9ee8 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -171,8 +171,14 @@ \ M(DMWriteBlock) \ M(DMWriteBlockNS) \ - M(DMAppendDelta) \ - M(DMAppendDeltaNS) \ + M(DMAppendDeltaPrepare) \ + M(DMAppendDeltaPrepareNS) \ + M(DMAppendDeltaCommitMemory) \ + M(DMAppendDeltaCommitMemoryNS) \ + M(DMAppendDeltaCommitDisk) \ + M(DMAppendDeltaCommitDiskNS) \ + M(DMAppendDeltaCleanUp) \ + M(DMAppendDeltaCleanUpNS) \ M(DMPlace) \ M(DMPlaceNS) \ M(DMPlaceUpsert) \ diff --git a/dbms/src/Storages/DeltaMerge/Chunk.cpp b/dbms/src/Storages/DeltaMerge/Chunk.cpp index 1f0f720c98e..43eea3105c9 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/Chunk.cpp @@ -143,7 +143,7 @@ void deserializeColumn(IColumn & column, const ColumnMeta & meta, const Page & p void readChunkData(MutableColumns & columns, const Chunk & chunk, const ColumnDefines & column_defines, - PageStorage & storage, + const PageReader & page_reader, size_t rows_offset, size_t rows_limit) { @@ -177,11 +177,11 @@ void readChunkData(MutableColumns & columns, col.insertRangeFrom(*tmp_col, rows_offset, rows_limit); } }; - storage.read(page_ids, page_handler); + page_reader.read(page_ids, page_handler); } -Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, PageStorage & data_storage) +Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, const PageReader & page_reader) { if (read_column_defines.empty()) return {}; @@ -196,7 +196,7 @@ Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, if (chunk.getRows()) { // Read from storage - readChunkData(columns, chunk, read_column_defines, data_storage, 0, chunk.getRows()); + readChunkData(columns, chunk, read_column_defines, page_reader, 0, chunk.getRows()); } Block res; diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h index 1d1803bf786..0e0b70d963c 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.h +++ b/dbms/src/Storages/DeltaMerge/Chunk.h @@ -101,6 +101,7 @@ class Chunk size_t rows = 0; }; +// TODO: use list instead of vector, so that DiskValueSpace won't need to do copy during Segment#getReadSnapshot. using Chunks = std::vector; using GenPageId = std::function; @@ -116,12 +117,12 @@ Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_ void readChunkData(MutableColumns & columns, const Chunk & chunk, const ColumnDefines & column_defines, - PageStorage & storage, + const PageReader & page_reader, size_t rows_offset, size_t rows_limit); -Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, PageStorage & data_storage); +Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, const PageReader & page_reader); } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h index e0260a7fb7a..f83c9310eb7 100644 --- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h @@ -11,8 +11,8 @@ namespace DM class ChunkBlockInputStream final : public IBlockInputStream { public: - ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, PageStorage & data_storage_) - : chunks(chunks_), read_columns(read_columns_), data_storage(data_storage_) + ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, const PageReader & page_reader_) + : chunks(chunks_), read_columns(read_columns_), page_reader(page_reader_) { } @@ -36,7 +36,7 @@ class ChunkBlockInputStream final : public IBlockInputStream { if (!hasNextBlock()) return {}; - return readChunk(chunks[chunk_index++], read_columns, data_storage); + return readChunk(chunks[chunk_index++], read_columns, page_reader); } bool hasNextBlock() { return chunk_index < chunks.size(); } @@ -48,7 +48,7 @@ class ChunkBlockInputStream final : public IBlockInputStream Chunks chunks; size_t chunk_index = 0; ColumnDefines read_columns; - PageStorage & data_storage; + PageReader page_reader; Block header; }; diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index b71d80c703c..3643f37f28b 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -12,17 +12,19 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream public: /// If handle_real_type_ is empty, means do not convert handle column back to real type. DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_, + const SegmentStreamCreator & stream_creator_, const ColumnDefines & columns_to_read_, const String & handle_name_, const DataTypePtr & handle_real_type_, const Context & context_) : task_pool(task_pool_), + stream_creator(stream_creator_), columns_to_read(columns_to_read_), header(createHeader(columns_to_read)), handle_name(handle_name_), handle_real_type(handle_real_type_), context(context_), - log(&Logger::get("SegmentReadTaskPool")) + log(&Logger::get("DMSegmentThreadInputStream")) { } @@ -38,12 +40,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream { if (!cur_stream) { - std::tie(cur_segment_id, cur_stream) = task_pool->nextTask(); - if (!cur_stream) // we are done. + auto task = task_pool->nextTask(); + if (!task) { done = true; return {}; } + + cur_segment_id = task->segment->segmentId(); + cur_stream = stream_creator(*task); LOG_DEBUG(log, "Start to read segment [" + DB::toString(cur_segment_id) + "]"); } @@ -57,7 +62,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream } else { - cur_stream = {}; + cur_segment_id = 0; + cur_stream = {}; LOG_DEBUG(log, "Finish reading segment [" + DB::toString(cur_segment_id) + "]"); } } @@ -80,6 +86,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream private: SegmentReadTaskPoolPtr task_pool; + SegmentStreamCreator stream_creator; ColumnDefines columns_to_read; Block header; String handle_name; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index c92ce411a84..2dc33e3c9ad 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -41,6 +41,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream ssize_t stable_skip = 0; DeltaValueSpacePtr delta_value_space; + size_t delta_rows_limit; IndexIterator entry_it; IndexIterator entry_end; @@ -71,6 +72,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream stable_input_stream(stable_input_stream_), stable_input_stream_raw_ptr(stable_input_stream.get()), delta_value_space(delta_value_space_), + delta_rows_limit(delta_value_space->getRows()), entry_it(index_begin), entry_end(index_end), max_block_size(max_block_size_) @@ -159,8 +161,11 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream } else if (entry_it.getType() == DT_INS) { - writeInsertFromDelta(output_columns, entry_it.getValue()); - --output_write_limit; + if (entry_it.getValue() < delta_rows_limit) + { + writeInsertFromDelta(output_columns, entry_it.getValue()); + --output_write_limit; + } } else { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 74af5d234ff..8ce3f06a612 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -13,6 +13,10 @@ namespace ProfileEvents { extern const Event DMWriteBlock; extern const Event DMWriteBlockNS; +extern const Event DMAppendDeltaCommitDisk; +extern const Event DMAppendDeltaCommitDiskNS; +extern const Event DMAppendDeltaCleanUp; +extern const Event DMAppendDeltaCleanUpNS; } // namespace ProfileEvents namespace DB @@ -86,9 +90,32 @@ DeltaMergeStore::~DeltaMergeStore() background_pool.removeTask(gc_handle); } +inline Block getSubBlock(const Block & block, size_t offset, size_t limit) +{ + if (!offset && limit == block.rows()) + { + return block; + } + else + { + Block sub_block; + for (const auto & c : block) + { + auto column = c.column->cloneEmpty(); + column->insertRangeFrom(*c.column, offset, limit); + + auto sub_col = c.cloneEmpty(); + sub_col.column = std::move(column); + sub_col.column_id = c.column_id; + sub_block.insert(std::move(sub_col)); + } + return sub_block; + } +} + void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_settings, const Block & to_write) { - EventRecorder recorder(ProfileEvents::DMWriteBlock, ProfileEvents::DMWriteBlockNS); + EventRecorder write_block_recorder(ProfileEvents::DMWriteBlock, ProfileEvents::DMWriteBlockNS); const size_t rows = to_write.rows(); if (rows == 0) @@ -140,6 +167,9 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ SegmentPtr segment; size_t offset; size_t limit; + + BlockOrDelete update = {}; + AppendTaskPtr task = {}; }; std::vector actions; @@ -164,59 +194,59 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ : std::lower_bound(handle_data.cbegin() + offset, handle_data.cend(), range.end); size_t limit = end_pos - (handle_data.cbegin() + offset); - actions.emplace_back(WriteAction{segment, offset, limit}); + actions.emplace_back(WriteAction{.segment = segment, .offset = offset, .limit = limit}); offset += limit; } } + auto op_context = OpContext::createForLogStorage(dm_context); + AppendWriteBatches wbs; + + // Prepare updates' information. for (auto & action : actions) { - LOG_DEBUG(log, - "Insert block. Segment range " + action.segment->getRange().toString() + // - ", block range " + rangeToString(action.offset, action.offset + action.limit)); - auto range_end = action.segment->getRange().end; - auto new_segment = write_segment(dm_context, action.segment, block, action.offset, action.limit); - if (new_segment) - { - std::unique_lock lock(mutex); - - segments[range_end] = new_segment; - } + action.update = getSubBlock(block, action.offset, action.limit); + action.task = action.segment->createAppendTask(op_context, wbs, action.update); } - // This should be called by background thread. - afterInsertOrDelete(db_context, db_settings); - - recorder.submit(); -} + // Commit updates to disk. + { + EventRecorder recorder(ProfileEvents::DMAppendDeltaCommitDisk, ProfileEvents::DMAppendDeltaCommitDiskNS); + dm_context.storage_pool.log().write(wbs.data); + dm_context.storage_pool.meta().write(wbs.meta); + } -SegmentPtr DeltaMergeStore::write_segment(DMContext & dm_context, // - const SegmentPtr & segment, - const Block & block, - size_t offset, - size_t limit) -{ - if (!offset && limit == block.rows()) + // Commit updates in memory. + for (auto & action : actions) { - Block block_copy = block; - return segment->write(dm_context, std::move(block_copy)); + action.segment->applyAppendTask(op_context, action.task, action.update); } - else + + // Flush delta if needed. + for (auto & action : actions) { - Block sub_block; - for (const auto & c : block) + const auto & segment = action.segment; + const auto range = segment->getRange(); + // TODO: Do flush by background threads. + if (segment->shouldFlush(dm_context)) { - auto column = c.column->cloneEmpty(); - column->insertRangeFrom(*c.column, offset, limit); - - auto sub_col = c.cloneEmpty(); - sub_col.column = std::move(column); - sub_col.column_id = c.column_id; - sub_block.insert(std::move(sub_col)); + auto new_segment = action.segment->flush(dm_context); + { + std::shared_lock lock(mutex); + segments[range.end] = new_segment; + } } - return segment->write(dm_context, std::move(sub_block)); } + + // Clean up deleted data on disk. + { + EventRecorder recorder(ProfileEvents::DMAppendDeltaCleanUp, ProfileEvents::DMAppendDeltaCleanUpNS); + dm_context.storage_pool.log().write(wbs.removed_data); + } + + // TODO: Should only check the updated segments. + afterInsertOrDelete(db_context, db_settings); } BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, @@ -224,28 +254,36 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const ColumnDefines & columns_to_read, size_t num_streams) { - SegmentReadTasks tasks; + SegmentReadTasks tasks; + StorageSnapshotPtr storage_snapshot; + { std::shared_lock lock(mutex); + storage_snapshot = std::make_shared(storage_pool); + for (const auto & [handle, segment] : segments) { (void)handle; - tasks.emplace_back(SegmentReadTask(segment, {segment->getRange()})); + tasks.emplace_back(std::make_shared(segment, segment->getReadSnapshot(), HandleRanges{segment->getRange()})); } } - auto dm_context = newDMContext(db_context, db_settings); - auto stream_creator = [=](const SegmentReadTask & task) { return task.segment->getInputStreamRaw(dm_context, columns_to_read); }; + auto dm_context = newDMContext(db_context, db_settings); + + auto stream_creator = [=](const SegmentReadTask & task) { + return task.segment->getInputStreamRaw(dm_context, task.read_snapshot, *storage_snapshot, columns_to_read); + }; size_t final_num_stream = std::min(num_streams, tasks.size()); - auto read_task_pool = std::make_shared(std::move(tasks), stream_creator); + auto read_task_pool = std::make_shared(std::move(tasks)); BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { BlockInputStreamPtr stream = std::make_shared( // read_task_pool, + stream_creator, columns_to_read, table_handle_define.name, DataTypePtr{}, @@ -264,11 +302,15 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, UInt64 max_version, size_t expected_block_size) { - SegmentReadTasks tasks; + SegmentReadTasks tasks; + StorageSnapshotPtr storage_snapshot; { std::shared_lock lock(mutex); + /// FIXME: the creation of storage_snapshot is not atomic! + storage_snapshot = std::make_shared(storage_pool); + auto range_it = sorted_ranges.begin(); auto seg_it = segments.upper_bound(range_it->start); @@ -286,10 +328,13 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto & seg_range = seg_it->second->getRange(); if (req_range.intersect(seg_range)) { - if (tasks.empty() || tasks.back().segment != seg_it->second) - tasks.emplace_back(seg_it->second); + if (tasks.empty() || tasks.back()->segment != seg_it->second) + { + auto segment = seg_it->second; + tasks.emplace_back(std::make_shared(segment, segment->getReadSnapshot())); + } - tasks.back().addRange(req_range); + tasks.back()->addRange(req_range); if (req_range.end < seg_range.end) { @@ -317,19 +362,25 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, auto dm_context = newDMContext(db_context, db_settings); - auto stream_creator = [=](const SegmentReadTask & task) { - return task.segment->getInputStream( - dm_context, columns_to_read, task.ranges, max_version, std::min(expected_block_size, DEFAULT_BLOCK_SIZE)); + SegmentStreamCreator stream_creator = [=](const SegmentReadTask & task) { + return task.segment->getInputStream(dm_context, + task.read_snapshot, + *storage_snapshot, + columns_to_read, + task.ranges, + max_version, + std::min(expected_block_size, DEFAULT_BLOCK_SIZE)); }; size_t final_num_stream = std::min(num_streams, tasks.size()); - auto read_task_pool = std::make_shared(std::move(tasks), stream_creator); + auto read_task_pool = std::make_shared(std::move(tasks)); BlockInputStreams res; for (size_t i = 0; i < final_num_stream; ++i) { BlockInputStreamPtr stream = std::make_shared( // read_task_pool, + stream_creator, columns_to_read, table_handle_define.name, table_handle_real_type, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 41af6658057..865168e18ca 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -19,6 +19,8 @@ static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1; class DeltaMergeStore { + using OpContext = DiskValueSpace::OpContext; + public: struct Settings { @@ -89,12 +91,6 @@ class DeltaMergeStore void split(DMContext & dm_context, const SegmentPtr & segment); void merge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right); - SegmentPtr write_segment(DMContext & dm_context, // - const SegmentPtr & segment, - const Block & block, - size_t offset, - size_t limit); - private: using SegmentSortedMap = std::map; diff --git a/dbms/src/Storages/DeltaMerge/DeltaPlace.h b/dbms/src/Storages/DeltaMerge/DeltaPlace.h index 7f4f30244a9..8c210f16277 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaPlace.h +++ b/dbms/src/Storages/DeltaMerge/DeltaPlace.h @@ -202,14 +202,9 @@ void placeInsert(const BlockInputStreamPtr & stable, // using Rids = std::vector>; Rids rids(block_rows); - for (size_t i = 0; i < block_rows; ++i) { + for (size_t i = 0; i < block_rows; ++i) rids[i] = rid_gen.nextForUpsert(); - if (rids[i].first == 262140) - { - rids[i].first +=0; - } - } for (size_t i = 0; i < block_rows; ++i) { auto [rid, dup] = rids[i]; diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index db67c81ffb4..29109d133ef 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -63,9 +63,10 @@ void DiskValueSpace::restore(const OpContext & context) } // Load cache into memory. - size_t total_rows = num_rows(); - size_t cache_rows = rowsFromBack(chunks_to_cache); - Block cache_data = read(context.dm_context.table_columns, context.data_storage, total_rows - cache_rows, cache_rows); + size_t total_rows = num_rows(); + size_t cache_rows = rowsFromBack(chunks_to_cache); + PageReader page_reader(context.data_storage); + Block cache_data = read(context.dm_context.table_columns, page_reader, total_rows - cache_rows, cache_rows); if (unlikely(cache_data.rows() != cache_rows)) throw Exception("The fragment rows from storage mismatch"); @@ -82,12 +83,13 @@ void DiskValueSpace::restore(const OpContext & context) tryFlushCache(context); } -DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & context, const BlockOrDelete & block_or_delete) const + +AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & context, AppendWriteBatches & wbs, const BlockOrDelete & update) const { auto task = std::make_unique(); - auto & append_block = block_or_delete.block; - auto & delete_range = block_or_delete.delete_range; + auto & append_block = update.block; + auto & delete_range = update.delete_range; const bool is_delete = !append_block; const size_t block_bytes = is_delete ? 0 : blockBytes(append_block); @@ -108,7 +110,7 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & && (cache_bytes + block_bytes) < context.dm_context.delta_cache_limit_bytes) { // Simply put the newly appended block into cache. - Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, task->data_write_batch, append_block); + Chunk chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wbs.data, append_block); task->append_cache = true; task->remove_chunk_back = 0; @@ -126,9 +128,8 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & if (!cache_chunks) { // There are no caches, simple write the newly block or delete_range is enough. - Chunk chunk = is_delete - ? delete_chunk - : prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, task->data_write_batch, append_block); + Chunk chunk + = is_delete ? delete_chunk : prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wbs.data, append_block); serializeChunks(buf, chunks.begin(), chunks.end(), &chunk); task->append_chunks.emplace_back(std::move(chunk)); @@ -145,7 +146,7 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & for (const auto & [col_id, col_meta] : old_chunk.getMetas()) { (void)col_id; - task->data_remove_write_batch.delPage(col_meta.page_id); + wbs.removed_data.delPage(col_meta.page_id); } } @@ -153,9 +154,12 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & size_t compacted_rows = cache_rows + append_rows; if (cache.empty()) { + // TODO: Currently in write thread, we do not use snapshot read. This may need to refactor later. + + PageReader page_reader(context.data_storage); // Load fragment chunks' data from disk. compacted_block = read(context.dm_context.table_columns, // - context.data_storage, + page_reader, in_storage_rows, cache_rows, compacted_rows); @@ -187,8 +191,7 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & } } - Chunk compacted_chunk - = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, task->data_write_batch, compacted_block); + Chunk compacted_chunk = prepareChunkDataWrite(context.dm_context, context.gen_data_page_id, wbs.data, compacted_block); if (is_delete) serializeChunks(buf, chunks.begin(), chunks.begin() + (chunks.size() - cache_chunks), &compacted_chunk, &delete_chunk); @@ -202,39 +205,55 @@ DiskValueSpace::AppendTaskPtr DiskValueSpace::createAppendTask(const OpContext & } auto data_size = buf.count(); // Must be read before tryGetReadBuffer(). - task->meta_write_batch.putPage(page_id, 0, buf.tryGetReadBuffer(), data_size); + wbs.meta.putPage(page_id, 0, buf.tryGetReadBuffer(), data_size); return task; } -void DiskValueSpace::applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & block_or_delete) +DiskValueSpacePtr DiskValueSpace::applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update) { - if (task->remove_chunk_back) - chunks.resize(chunks.size() - task->remove_chunk_back); + DiskValueSpace * instance = this; + if (task->remove_chunk_back > 0) + { + // If any chunks got removed, then we create a new instance. + // Make sure a reference to an instance is read constantly by rows. + instance = new DiskValueSpace(should_cache, page_id, chunks); + } + + if (task->remove_chunk_back > 0) + instance->chunks.resize(chunks.size() - task->remove_chunk_back); for (auto & chunk : task->append_chunks) - chunks.emplace_back(std::move(chunk)); + instance->chunks.emplace_back(std::move(chunk)); if (task->append_cache) { - auto block_rows = block_or_delete.block.rows(); + if (unlikely(instance != this)) + throw Exception("cache should only be applied to this"); + + auto block_rows = update.block.rows(); for (const auto & col_define : context.dm_context.table_columns) { - const ColumnWithTypeAndName & col = block_or_delete.block.getByName(col_define.name); + const ColumnWithTypeAndName & col = update.block.getByName(col_define.name); - auto it = cache.find(col_define.id); - if (it == cache.end()) - cache.emplace(col_define.id, col_define.type->createColumn()); + auto it = instance->cache.find(col_define.id); + if (it == instance->cache.end()) + instance->cache.emplace(col_define.id, col_define.type->createColumn()); - cache[col_define.id]->insertRangeFrom(*col.column, 0, block_rows); + instance->cache[col_define.id]->insertRangeFrom(*col.column, 0, block_rows); } - ++cache_chunks; + ++instance->cache_chunks; } else { - cache.clear(); - cache_chunks = 0; + instance->cache.clear(); + instance->cache_chunks = 0; } + + if (instance != this) + return DiskValueSpacePtr(instance); + else + return {}; } Chunks DiskValueSpace::writeChunks(const OpContext & context, const BlockInputStreamPtr & input_stream) @@ -321,7 +340,7 @@ void DiskValueSpace::appendChunkWithCache(const OpContext & context, Chunk && ch } Block DiskValueSpace::read(const ColumnDefines & read_column_defines, - PageStorage & data_storage, + const PageReader & page_reader, size_t rows_offset, size_t rows_limit, std::optional reserve_rows_) const @@ -361,7 +380,7 @@ Block DiskValueSpace::read(const ColumnDefines & read_column_defines, if (rows_end_in_chunk > rows_start_in_chunk) { readChunkData( - columns, cur_chunk, read_column_defines, data_storage, rows_start_in_chunk, rows_end_in_chunk - rows_start_in_chunk); + columns, cur_chunk, read_column_defines, page_reader, rows_start_in_chunk, rows_end_in_chunk - rows_start_in_chunk); already_read_rows += rows_end_in_chunk - rows_start_in_chunk; } @@ -403,7 +422,7 @@ Block DiskValueSpace::read(const ColumnDefines & read_column_defines, return res; } -Block DiskValueSpace::read(const ColumnDefines & read_column_defines, PageStorage & data_storage, size_t chunk_index) const +Block DiskValueSpace::read(const ColumnDefines & read_column_defines, const PageReader & page_reader, size_t chunk_index) const { if (read_column_defines.empty()) return {}; @@ -423,7 +442,7 @@ Block DiskValueSpace::read(const ColumnDefines & read_column_defines, PageStorag if (chunk_index < chunk_cache_start) { // Read from storage - readChunkData(columns, chunk, read_column_defines, data_storage, 0, chunk.getRows()); + readChunkData(columns, chunk, read_column_defines, page_reader, 0, chunk.getRows()); } else { @@ -457,7 +476,7 @@ Block DiskValueSpace::read(const ColumnDefines & read_column_defines, PageStorag } BlockOrDeletes DiskValueSpace::getMergeBlocks(const ColumnDefine & handle, - PageStorage & data_storage, + const PageReader & page_reader, size_t rows_begin, size_t deletes_begin, size_t rows_end, @@ -485,7 +504,7 @@ BlockOrDeletes DiskValueSpace::getMergeBlocks(const ColumnDefine & handle, if (chunk.isDeleteRange()) res.emplace_back(chunk.getDeleteRange()); if (block_rows_end != block_rows_start) - res.emplace_back(read({handle, VERSION_COLUMN_DEFINE}, data_storage, block_rows_start, block_rows_end - block_rows_start)); + res.emplace_back(read({handle, VERSION_COLUMN_DEFINE}, page_reader, block_rows_start, block_rows_end - block_rows_start)); block_rows_start = block_rows_end; } @@ -551,7 +570,8 @@ bool DiskValueSpace::doFlushCache(const OpContext & context) if (cache.empty()) { // Load fragment data from disk. - compacted = read(context.dm_context.table_columns, context.data_storage, in_storage_rows, cache_rows); + PageReader page_reader(context.data_storage); + compacted = read(context.dm_context.table_columns, page_reader, in_storage_rows, cache_rows); if (unlikely(compacted.rows() != cache_rows)) throw Exception("The fragment rows from storage mismatch"); @@ -613,9 +633,9 @@ bool DiskValueSpace::doFlushCache(const OpContext & context) return true; } -ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, PageStorage & data_storage) const +ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const { - return std::make_shared(chunks, read_columns, data_storage); + return std::make_shared(chunks, read_columns, page_reader); } size_t DiskValueSpace::num_rows() const diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h index 4ba2d7b4dce..1b4f00de188 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h @@ -22,6 +22,7 @@ namespace DM struct BlockOrDelete { + BlockOrDelete() = default; BlockOrDelete(Block && block_) : block(block_) {} BlockOrDelete(const HandleRange & delete_range_) : delete_range(delete_range_) {} @@ -30,6 +31,24 @@ struct BlockOrDelete }; using BlockOrDeletes = std::vector; +struct AppendWriteBatches +{ + WriteBatch data; + WriteBatch meta; + WriteBatch removed_data; +}; + +struct AppendTask +{ + bool append_cache; // If not append cache, then clear cache. + size_t remove_chunk_back; + Chunks append_chunks; +}; +using AppendTaskPtr = std::unique_ptr; + +class DiskValueSpace; +using DiskValueSpacePtr = std::shared_ptr; + class DiskValueSpace { public: @@ -62,21 +81,6 @@ class DiskValueSpace GenPageId gen_data_page_id; }; - struct AppendTask - { - /// The write order of the following wirte batch is critical! - - WriteBatch data_write_batch; - WriteBatch meta_write_batch; - - WriteBatch data_remove_write_batch; - - bool append_cache; // If not append cache, then clear cache. - size_t remove_chunk_back; - Chunks append_chunks; - }; - using AppendTaskPtr = std::unique_ptr; - DiskValueSpace(bool should_cache_, PageId page_id_); DiskValueSpace(bool should_cache_, PageId page_id_, const Chunks & chunks_); DiskValueSpace(const DiskValueSpace & other); @@ -84,9 +88,8 @@ class DiskValueSpace /// Called after the instance is created from existing metadata. void restore(const OpContext & context); - AppendTaskPtr createAppendTask(const OpContext & context, const BlockOrDelete & block_or_delete) const; - - void applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & block_or_delete); + AppendTaskPtr createAppendTask(const OpContext & context, AppendWriteBatches & wbs, const BlockOrDelete & update) const; + DiskValueSpacePtr applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update); /// Write the blocks from input_stream into underlying storage, the returned chunks can be added to /// specified value space instance by #setChunks or #appendChunkWithCache later. @@ -103,24 +106,24 @@ class DiskValueSpace /// Read the requested chunks' data and compact into a block. /// The columns of the returned block are guaranteed to be in order of read_columns. Block read(const ColumnDefines & read_columns, - PageStorage & data_storage, + const PageReader & page_reader, size_t rows_offset, size_t rows_limit, std::optional reserve_rows = {}) const; /// Read the chunk data. /// The columns of the returned block are guaranteed to be in order of read_columns. - Block read(const ColumnDefines & read_columns, PageStorage & data_storage, size_t chunk_index) const; + Block read(const ColumnDefines & read_columns, const PageReader & page_reader, size_t chunk_index) const; /// The data of returned block is in insert order. BlockOrDeletes getMergeBlocks(const ColumnDefine & handle, - PageStorage & data_storage, + const PageReader & page_reader, size_t rows_begin, size_t deletes_begin, size_t rows_end, size_t deletes_end) const; - ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, PageStorage & data_storage) const; + ChunkBlockInputStreamPtr getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const; bool tryFlushCache(const OpContext & context, bool force = false); @@ -158,7 +161,5 @@ class DiskValueSpace Logger * log; }; -using DiskValueSpacePtr = std::shared_ptr; - } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8d0deee0d07..4f4d7fb05a0 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -18,14 +18,20 @@ namespace ProfileEvents { extern const Event DMWriteBlock; extern const Event DMWriteBlockNS; -extern const Event DMAppendDelta; -extern const Event DMAppendDeltaNS; extern const Event DMPlace; extern const Event DMPlaceNS; extern const Event DMPlaceUpsert; extern const Event DMPlaceUpsertNS; extern const Event DMPlaceDeleteRange; extern const Event DMPlaceDeleteRangeNS; +extern const Event DMAppendDeltaPrepare; +extern const Event DMAppendDeltaPrepareNS; +extern const Event DMAppendDeltaCommitMemory; +extern const Event DMAppendDeltaCommitMemoryNS; +extern const Event DMAppendDeltaCommitDisk; +extern const Event DMAppendDeltaCommitDiskNS; +extern const Event DMAppendDeltaCleanUp; +extern const Event DMAppendDeltaCleanUpNS; extern const Event DMSegmentSplit; extern const Event DMSegmentSplitNS; extern const Event DMSegmentGetSplitPoint; @@ -53,8 +59,6 @@ const Segment::Version Segment::CURRENT_VERSION = 1; const static size_t SEGMENT_BUFFER_SIZE = 128; // More than enough. const static size_t STABLE_CHUNK_ROWS = DEFAULT_BLOCK_SIZE; -using OpContext = DiskValueSpace::OpContext; - //========================================================================================== // Segment ser/deser //========================================================================================== @@ -74,8 +78,8 @@ SegmentPtr Segment::newSegment(DMContext & context, const HandleRange & range, P // Write metadata. segment->serialize(meta_wb); - segment->delta.setChunks({}, meta_wb, log_wb); - segment->stable.setChunks({}, meta_wb, data_wb); + segment->delta->setChunks({}, meta_wb, log_wb); + segment->stable->setChunks({}, meta_wb, data_wb); context.storage_pool.meta().write(meta_wb); @@ -104,8 +108,8 @@ SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) auto segment = std::make_shared(epoch, range, segment_id, next_segment_id, delta_id, stable_id); - segment->delta.restore(OpContext::createForLogStorage(context)); - segment->stable.restore(OpContext::createForDataStorage(context)); + segment->delta->restore(OpContext::createForLogStorage(context)); + segment->stable->restore(OpContext::createForDataStorage(context)); return segment; } @@ -118,8 +122,8 @@ void Segment::serialize(WriteBatch & wb) writeIntBinary(range.start, buf); writeIntBinary(range.end, buf); writeIntBinary(next_segment_id, buf); - writeIntBinary(delta.pageId(), buf); - writeIntBinary(stable.pageId(), buf); + writeIntBinary(delta->pageId(), buf); + writeIntBinary(stable->pageId(), buf); auto data_size = buf.count(); // Must be called before tryGetReadBuffer. wb.putPage(segment_id, 0, buf.tryGetReadBuffer(), data_size); @@ -129,76 +133,81 @@ void Segment::serialize(WriteBatch & wb) // Segment public APIs. //========================================================================================== -SegmentPtr Segment::write(DMContext & dm_context, BlockOrDelete && block_or_delete) +void Segment::write(DMContext & dm_context, const BlockOrDelete & update) { - if (block_or_delete.block) - LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] write rows: " + DB::toString(block_or_delete.block.rows())); + if (update.block) + LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] write rows: " + DB::toString(update.block.rows())); else - LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] delete range: " + block_or_delete.delete_range.toString()); - - OpContext opc = OpContext::createForLogStorage(dm_context); + LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] delete range: " + update.delete_range.toString()); - EventRecorder recorder(ProfileEvents::DMAppendDelta, ProfileEvents::DMAppendDeltaNS); + auto op_context = OpContext::createForLogStorage(dm_context); + AppendWriteBatches wbs; - DiskValueSpace::AppendTaskPtr task; + auto task = createAppendTask(op_context, wbs, update); { - // Create everything we need to do the update. - // We only need a shared lock because this operation won't do any modifications. - std::shared_lock lock(read_write_mutex); - task = delta.createAppendTask(opc, block_or_delete); + EventRecorder recorder(ProfileEvents::DMAppendDeltaCommitDisk, ProfileEvents::DMAppendDeltaCommitDiskNS); + dm_context.storage_pool.log().write(wbs.data); + dm_context.storage_pool.meta().write(wbs.meta); } - // Write data to disk. - dm_context.storage_pool.log().write(task->data_write_batch); - dm_context.storage_pool.meta().write(task->meta_write_batch); - dm_context.storage_pool.log().write(task->data_remove_write_batch); + applyAppendTask(op_context, task, update); { - // Update metadata in memory. - // Here we need a unique lock to do modifications in memory. - std::unique_lock lock(read_write_mutex); - - delta.applyAppendTask(opc, task, block_or_delete); + EventRecorder recorder(ProfileEvents::DMAppendDeltaCleanUp, ProfileEvents::DMAppendDeltaCleanUpNS); + dm_context.storage_pool.log().write(wbs.removed_data); } +} - recorder.submit(); +AppendTaskPtr Segment::createAppendTask(const OpContext & opc, AppendWriteBatches & wbs, const BlockOrDelete & update) +{ + if (update.block) + LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] write rows: " + DB::toString(update.block.rows())); + else + LOG_DEBUG(log, "Segment[" + DB::toString(segment_id) + "] delete range: " + update.delete_range.toString()); -#ifndef NDEBUG - check(dm_context, "After write"); -#endif + EventRecorder recorder(ProfileEvents::DMAppendDeltaPrepare, ProfileEvents::DMAppendDeltaPrepareNS); - if (shouldFlush(dm_context)) - { - auto res = flush(dm_context); -#ifndef NDEBUG - res->check(dm_context, "After delta merge"); -#endif - return res; - } - return {}; + // Create everything we need to do the update. + // We only need a shared lock because this operation won't do any modifications. + std::shared_lock lock(read_write_mutex); + return delta->createAppendTask(opc, wbs, update); +} + +void Segment::applyAppendTask(const OpContext & opc, const AppendTaskPtr & task, const BlockOrDelete & update) +{ + // Update metadata in memory. + // Here we need a unique lock to do modifications in memory. + std::unique_lock lock(read_write_mutex); + + EventRecorder recorder(ProfileEvents::DMAppendDeltaCommitMemory, ProfileEvents::DMAppendDeltaCommitMemoryNS); + + auto new_delta = delta->applyAppendTask(opc, task, update); + if (new_delta) + delta = new_delta; } void Segment::check(DMContext & dm_context, const String & when) { auto & handle = dm_context.table_handle_define; - size_t stable_rows = stable.num_rows(); - size_t delta_rows = delta.num_rows(); + size_t stable_rows = stable->num_rows(); + size_t delta_rows = delta->num_rows(); LOG_DEBUG(log, when + ": stable_rows:" + DB::toString(stable_rows) + ", delta_rows:" + DB::toString(delta_rows)); - auto snapshot = getReadSnapshot(dm_context, {handle}); + StorageSnapshot storage_snapshot(dm_context.storage_pool); + auto read_info = getReadInfo(dm_context, {delta, delta_rows}, storage_snapshot, {handle}); LOG_DEBUG(log, when + ": entries:" + DB::toString(delta_tree->numEntries()) + ", inserts:" + DB::toString(delta_tree->numInserts()) + ", deletes:" + DB::toString(delta_tree->numDeletes())); - auto stream = getPlacedStream(dm_context, + auto stream = getPlacedStream(storage_snapshot.data_reader, {range}, - snapshot.read_columns, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, + read_info.read_columns, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, DEFAULT_BLOCK_SIZE); size_t total_rows = 0; @@ -222,29 +231,39 @@ void Segment::check(DMContext & dm_context, const String & when) LOG_DEBUG(log, when + ": rows(raw): " + DB::toString(total_rows)); } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const HandleRanges & read_ranges, - UInt64 max_version, - size_t expected_block_size) +SegmentSnapshot Segment::getReadSnapshot() { - auto & handle = dm_context.table_handle_define; - auto snapshot = getReadSnapshot(dm_context, columns_to_read); - auto stream = getPlacedStream(dm_context, + std::unique_lock lock(read_write_mutex); + return {delta, delta->num_rows()}; +} + +BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read, + const HandleRanges & read_ranges, + UInt64 max_version, + size_t expected_block_size) +{ + auto & handle = dm_context.table_handle_define; + auto read_info = getReadInfo(dm_context, segment_snap, storage_snaps, columns_to_read); + auto stream = getPlacedStream(storage_snaps.data_reader, read_ranges, - snapshot.read_columns, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, + read_info.read_columns, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, expected_block_size); - stream = std::make_shared>(stream, handle, max_version); + stream = std::make_shared>(stream, handle, max_version); return stream; } -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) +BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read) { - auto & handle = dm_context.table_handle_define; - auto & storage = dm_context.storage_pool; + auto & handle = dm_context.table_handle_define; ColumnDefines new_columns_to_read; new_columns_to_read.push_back(handle); @@ -255,20 +274,17 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, con new_columns_to_read.push_back(c); } - DiskValueSpacePtr delta_snap; { // Create a new delta vs, so that later read operations won't block write thread. std::unique_lock lock(read_write_mutex); - delta_snap = std::make_shared(delta); + delta_snap = std::make_shared(*segment_snap.delta); } - // auto delta_block = delta_snap->read(new_columns_to_read, storage.log(), 0, delta_snap->num_rows()); - // BlockInputStreamPtr delta_stream = std::make_shared(delta_block); - BlockInputStreamPtr delta_stream = delta_snap->getInputStream(new_columns_to_read, storage.log()); + BlockInputStreamPtr delta_stream = delta_snap->getInputStream(new_columns_to_read, storage_snaps.log_reader); delta_stream = std::make_shared(delta_stream, range, 0, false); - BlockInputStreamPtr stable_stream = stable.getInputStream(new_columns_to_read, storage.data()); + BlockInputStreamPtr stable_stream = stable->getInputStream(new_columns_to_read, storage_snaps.data_reader); stable_stream = std::make_shared(stable_stream, range, 0, true); BlockInputStreams streams; @@ -279,11 +295,14 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, con SegmentPair Segment::split(DMContext & dm_context) { + /// Currently split & merge are done after update segment, so snapshot is not needed. LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] start to split."); - auto snapshot = getReadSnapshot(dm_context, dm_context.table_columns); - Handle split_point = getSplitPoint(dm_context, snapshot); - auto res = doSplit(dm_context, snapshot, split_point); + StorageSnapshot storage_snapshot(dm_context.storage_pool); + + auto read_info = getReadInfo(dm_context, {delta, delta->num_rows()}, storage_snapshot, dm_context.table_columns); + Handle split_point = getSplitPoint(dm_context, storage_snapshot.data_reader, read_info); + auto res = doSplit(dm_context, storage_snapshot.data_reader, read_info, split_point); LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] done split."); @@ -294,10 +313,14 @@ SegmentPtr Segment::merge(DMContext & dm_context, const SegmentPtr & left, const { LOG_DEBUG(left->log, "Merge segment [" + DB::toString(left->segment_id) + "] with [" + DB::toString(right->segment_id) + "]"); - auto left_snapshot = left->getReadSnapshot(dm_context, dm_context.table_columns); - auto right_snapshot = right->getReadSnapshot(dm_context, dm_context.table_columns); + StorageSnapshot storage_snapshot(dm_context.storage_pool); - auto res = doMerge(dm_context, left, left_snapshot, right, right_snapshot); + auto left_read_info + = left->getReadInfo(dm_context, {left->delta, left->delta->num_rows()}, storage_snapshot, dm_context.table_columns); + auto right_read_info + = right->getReadInfo(dm_context, {right->delta, right->delta->num_rows()}, storage_snapshot, dm_context.table_columns); + + auto res = doMerge(dm_context, storage_snapshot.data_reader, left, left_read_info, right, right_read_info); LOG_DEBUG(left->log, "Done merge segment [" + DB::toString(left->segment_id) + "] with [" + DB::toString(right->segment_id) + "]"); @@ -305,13 +328,54 @@ SegmentPtr Segment::merge(DMContext & dm_context, const SegmentPtr & left, const } +bool Segment::shouldFlush(DMContext & dm_context) const +{ + return delta->num_rows() >= dm_context.delta_limit_rows || delta->num_bytes() >= dm_context.delta_limit_bytes; +} + +SegmentPtr Segment::flush(DMContext & dm_context) +{ + EventRecorder recorder(ProfileEvents::DMDeltaMerge, ProfileEvents::DMDeltaMergeNS); + + LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] start to merge delta."); + + auto & handle = dm_context.table_handle_define; + auto & columns = dm_context.table_columns; + auto min_version = dm_context.min_version; + + StorageSnapshot storage_snapshot(dm_context.storage_pool); + + auto read_info = getReadInfo(dm_context, {delta, delta->num_rows()}, storage_snapshot, columns); + auto data_stream = getPlacedStream(storage_snapshot.data_reader, + {range}, + read_info.read_columns, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, + STABLE_CHUNK_ROWS); + data_stream = std::make_shared>(data_stream, handle, min_version); + + SegmentPtr new_me = reset(dm_context, data_stream); + + // Force tcmalloc to return memory back to system. + // https://internal.pingcap.net/jira/browse/FLASH-41 + // TODO: Evaluate the cost of this. + // MallocExtension::instance()->ReleaseFreeMemory(); + + LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] done merge delta."); + + recorder.submit(); + + return new_me; +} + Segment::Segment(UInt64 epoch_, const HandleRange & range_, PageId segment_id_, PageId next_segment_id_, PageId delta_id, PageId stable_id) : epoch(epoch_), range(range_), segment_id(segment_id_), next_segment_id(next_segment_id_), - delta(true, delta_id), - stable(false, stable_id), + delta(std::make_shared(true, delta_id)), + stable(std::make_shared(false, stable_id)), delta_tree(std::make_shared()), log(&Logger::get("Segment")) { @@ -329,8 +393,8 @@ Segment::Segment(UInt64 epoch_, range(range_), segment_id(segment_id_), next_segment_id(next_segment_id_), - delta(true, delta_id, delta_chunks_), - stable(false, stable_id, stable_chunks_), + delta(std::make_shared(true, delta_id, delta_chunks_)), + stable(std::make_shared(false, stable_id, stable_chunks_)), delta_tree(std::make_shared()), log(&Logger::get("Segment")) { @@ -351,13 +415,13 @@ size_t Segment::getEstimatedBytes() size_t Segment::delta_rows() { std::shared_lock lock(read_write_mutex); - return delta.num_rows(); + return delta->num_rows(); } size_t Segment::delta_deletes() { std::shared_lock lock(read_write_mutex); - return delta.num_deletes(); + return delta->num_deletes(); } //========================================================================================== @@ -365,36 +429,39 @@ size_t Segment::delta_deletes() //========================================================================================== template -ReadSnapshot Segment::getReadSnapshot(const DMContext & dm_context, const ColumnDefines & columns_to_read) +Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read) { auto new_columns_to_read = arrangeReadColumns(dm_context.table_handle_define, columns_to_read); // Create a new delta vs and delta index snapshot, so that later read/write operations won't block write thread. + // TODO: We don't need to do copy if chunks in DiskValueSpace is a list. DiskValueSpacePtr delta_snap; { // Synchronize between read/write threads. std::shared_lock lock(read_write_mutex); - delta_snap = std::make_shared(delta); + delta_snap = std::make_shared(*segment_snap.delta); } - auto & handle = dm_context.table_handle_define; - auto & storage = dm_context.storage_pool; + auto & handle = dm_context.table_handle_define; - const auto delta_block = delta_snap->read(new_columns_to_read, storage.log(), 0, delta_snap->num_rows()); - auto delta_snap_value_space = std::make_shared(handle, new_columns_to_read, delta_block); + const auto delta_block = delta_snap->read(new_columns_to_read, storage_snaps.log_reader, 0, segment_snap.delta_rows); + auto delta_value_space = std::make_shared(handle, new_columns_to_read, delta_block); DeltaIndexPtr delta_index; { // Synchronize between read/read threads. std::unique_lock lock(read_read_mutex); - delta_index = ensurePlace(dm_context, delta_snap, delta_snap_value_space); + delta_index = ensurePlace(dm_context, storage_snaps, delta_snap, delta_value_space); } auto index_begin = DeltaIndex::begin(delta_index); auto index_end = DeltaIndex::end(delta_index); return { - .delta_value_space = delta_snap_value_space, + .delta_value_space = delta_value_space, .index_begin = index_begin, .index_end = index_end, .read_columns = new_columns_to_read, @@ -427,7 +494,7 @@ ColumnDefines Segment::arrangeReadColumns(const ColumnDefine & handle, const Col } template -BlockInputStreamPtr Segment::getPlacedStream(const DMContext & dm_context, +BlockInputStreamPtr Segment::getPlacedStream(const PageReader & data_page_reader, const HandleRanges & read_ranges, const ColumnDefines & read_columns, const DeltaValueSpacePtr & delta_value_space, @@ -436,7 +503,7 @@ BlockInputStreamPtr Segment::getPlacedStream(const DMContext & dm_conte size_t expected_block_size) const { auto placed_stream_creator = [&](const HandleRange & read_range) { - auto stable_input_stream = stable.getInputStream(read_columns, dm_context.storage_pool.data()); + auto stable_input_stream = stable->getInputStream(read_columns, data_page_reader); return std::make_shared>( // 0, read_range, @@ -470,7 +537,8 @@ BlockInputStreamPtr Segment::getPlacedStream(const DMContext & dm_conte } -SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapshot, Handle split_point) const +SegmentPair +Segment::doSplit(DMContext & dm_context, const PageReader & data_page_reader, const ReadInfo & read_info, Handle split_point) const { EventRecorder recorder(ProfileEvents::DMSegmentSplit, ProfileEvents::DMSegmentSplitNS); @@ -487,12 +555,12 @@ SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapsh OpContext opc = OpContext::createForDataStorage(dm_context); { // Write my data - BlockInputStreamPtr my_data = getPlacedStream(dm_context, + BlockInputStreamPtr my_data = getPlacedStream(data_page_reader, {my_range}, - snapshot.read_columns, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, + read_info.read_columns, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, STABLE_CHUNK_ROWS); my_data = std::make_shared>(my_data, handle, min_version); auto tmp = DiskValueSpace::writeChunks(opc, my_data); @@ -501,12 +569,12 @@ SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapsh { // Write new segment's data - BlockInputStreamPtr other_data = getPlacedStream(dm_context, + BlockInputStreamPtr other_data = getPlacedStream(data_page_reader, {other_range}, - snapshot.read_columns, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, + read_info.read_columns, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, STABLE_CHUNK_ROWS); other_data = std::make_shared>(other_data, handle, min_version); auto tmp = DiskValueSpace::writeChunks(opc, other_data); @@ -521,10 +589,10 @@ SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapsh my_range, this->segment_id, other_segment_id, - this->delta.pageId(), - this->delta.getChunks(), - this->stable.pageId(), - this->stable.getChunks()); + this->delta->pageId(), + this->delta->getChunks(), + this->stable->pageId(), + this->stable->getChunks()); auto other = std::make_shared(INITIAL_EPOCH, // other_range, @@ -538,12 +606,12 @@ SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapsh WriteBatch data_wb; new_me->serialize(meta_wb); - new_me->delta.setChunks({}, meta_wb, log_wb); - new_me->stable.setChunks(std::move(my_new_stable_chunks), meta_wb, data_wb); + new_me->delta->setChunks({}, meta_wb, log_wb); + new_me->stable->setChunks(std::move(my_new_stable_chunks), meta_wb, data_wb); other->serialize(meta_wb); - other->delta.setChunks({}, meta_wb, log_wb); - other->stable.setChunks(std::move(other_new_stable_chunks), meta_wb, data_wb); + other->delta->setChunks({}, meta_wb, log_wb); + other->stable->setChunks(std::move(other_new_stable_chunks), meta_wb, data_wb); // Commit meta. storage_pool.meta().write(meta_wb); @@ -557,11 +625,12 @@ SegmentPair Segment::doSplit(DMContext & dm_context, const ReadSnapshot & snapsh return {new_me, other}; } -SegmentPtr Segment::doMerge(DMContext & dm_context, - const SegmentPtr & left, - const ReadSnapshot & left_snapshot, - const SegmentPtr & right, - const ReadSnapshot & right_snapshot) +SegmentPtr Segment::doMerge(DMContext & dm_context, + const PageReader & data_page_reader, + const SegmentPtr & left, + const ReadInfo & left_read_info, + const SegmentPtr & right, + const ReadInfo & right_read_info) { if (left->range.end != right->range.start || left->next_segment_id != right->segment_id) throw Exception("The ranges of merge segments are not consecutive: first end: " + DB::toString(left->range.end) @@ -575,21 +644,21 @@ SegmentPtr Segment::doMerge(DMContext & dm_context, Chunks new_stable_chunks; { - BlockInputStreamPtr left_data = left->getPlacedStream(dm_context, + BlockInputStreamPtr left_data = left->getPlacedStream(data_page_reader, {left->range}, - left_snapshot.read_columns, - left_snapshot.delta_value_space, - left_snapshot.index_begin, - left_snapshot.index_end, + left_read_info.read_columns, + left_read_info.delta_value_space, + left_read_info.index_begin, + left_read_info.index_end, STABLE_CHUNK_ROWS); left_data = std::make_shared>(left_data, handle, min_version); - BlockInputStreamPtr right_data = right->getPlacedStream(dm_context, + BlockInputStreamPtr right_data = right->getPlacedStream(data_page_reader, {right->range}, - right_snapshot.read_columns, - right_snapshot.delta_value_space, - right_snapshot.index_begin, - right_snapshot.index_end, + right_read_info.read_columns, + right_read_info.delta_value_space, + right_read_info.index_begin, + right_read_info.index_end, STABLE_CHUNK_ROWS); right_data = std::make_shared>(right_data, handle, min_version); @@ -607,36 +676,36 @@ SegmentPtr Segment::doMerge(DMContext & dm_context, merge_range, left->segment_id, right->next_segment_id, - left->delta.pageId(), - left->delta.getChunks(), - left->stable.pageId(), - left->stable.getChunks()); + left->delta->pageId(), + left->delta->getChunks(), + left->stable->pageId(), + left->stable->getChunks()); // right_copy is used to generate write batch. Because we cannot modify the content of original object. Segment right_copy(right->epoch, right->range, right->segment_id, right->next_segment_id, - right->delta.pageId(), - right->delta.getChunks(), - right->stable.pageId(), - right->stable.getChunks()); + right->delta->pageId(), + right->delta->getChunks(), + right->stable->pageId(), + right->stable->getChunks()); WriteBatch meta_wb; WriteBatch log_wb; WriteBatch data_wb; merged->serialize(meta_wb); - merged->delta.setChunks({}, meta_wb, log_wb); - merged->stable.setChunks(std::move(new_stable_chunks), meta_wb, data_wb); + merged->delta->setChunks({}, meta_wb, log_wb); + merged->stable->setChunks(std::move(new_stable_chunks), meta_wb, data_wb); - right_copy.delta.setChunks({}, meta_wb, log_wb); - right_copy.stable.setChunks({}, meta_wb, data_wb); + right_copy.delta->setChunks({}, meta_wb, log_wb); + right_copy.stable->setChunks({}, meta_wb, data_wb); // Remove other's meta data. meta_wb.delPage(left->segment_id); - meta_wb.delPage(left->delta.pageId()); - meta_wb.delPage(left->stable.pageId()); + meta_wb.delPage(left->delta->pageId()); + meta_wb.delPage(left->stable->pageId()); // Commit meta updates. storage_pool.meta().write(meta_wb); @@ -662,10 +731,10 @@ SegmentPtr Segment::reset(DMContext & dm_context, BlockInputStreamPtr & input_st range, segment_id, next_segment_id, - delta.pageId(), - delta.getChunks(), - stable.pageId(), - stable.getChunks()); + delta->pageId(), + delta->getChunks(), + stable->pageId(), + stable->getChunks()); WriteBatch meta_wb; WriteBatch log_wb; @@ -673,8 +742,8 @@ SegmentPtr Segment::reset(DMContext & dm_context, BlockInputStreamPtr & input_st // The order of following code is critical. - new_me->delta.setChunks({}, meta_wb, log_wb); - new_me->stable.setChunks(std::move(new_stable_chunks), meta_wb, data_wb); + new_me->delta->setChunks({}, meta_wb, log_wb); + new_me->stable->setChunks(std::move(new_stable_chunks), meta_wb, data_wb); // Commit meta updates. storage_pool.meta().write(meta_wb); @@ -686,66 +755,27 @@ SegmentPtr Segment::reset(DMContext & dm_context, BlockInputStreamPtr & input_st return new_me; } -bool Segment::shouldFlush(DMContext & dm_context, bool force) -{ - return force || delta.num_rows() >= dm_context.delta_limit_rows || delta.num_bytes() >= dm_context.delta_limit_bytes; -} - -SegmentPtr Segment::flush(DMContext & dm_context) -{ - EventRecorder recorder(ProfileEvents::DMDeltaMerge, ProfileEvents::DMDeltaMergeNS); - - LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] start to merge delta."); - - auto & handle = dm_context.table_handle_define; - auto & columns = dm_context.table_columns; - auto min_version = dm_context.min_version; - - auto snapshot = getReadSnapshot(dm_context, columns); - auto data_stream = getPlacedStream(dm_context, - {range}, - snapshot.read_columns, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, - STABLE_CHUNK_ROWS); - data_stream = std::make_shared>(data_stream, handle, min_version); - - SegmentPtr new_me = reset(dm_context, data_stream); - - // Force tcmalloc to return memory back to system. - // https://internal.pingcap.net/jira/browse/FLASH-41 - // TODO: Evaluate the cost of this. - // MallocExtension::instance()->ReleaseFreeMemory(); - - LOG_DEBUG(log, "Segment [" + DB::toString(segment_id) + "] done merge delta."); - - recorder.submit(); - - return new_me; -} - -DeltaIndexPtr -Segment::ensurePlace(const DMContext & dm_context, const DiskValueSpacePtr & to_place_delta, const DeltaValueSpacePtr & delta_value_space) +DeltaIndexPtr Segment::ensurePlace(const DMContext & dm_context, + const StorageSnapshot & storage_snapshot, + const DiskValueSpacePtr & to_place_delta, + const DeltaValueSpacePtr & delta_value_space) { EventRecorder recorder(ProfileEvents::DMPlace, ProfileEvents::DMPlaceNS); - auto & handle = dm_context.table_handle_define; - auto & storage = dm_context.storage_pool; - size_t delta_rows = to_place_delta->num_rows(); size_t delta_deletes = to_place_delta->num_deletes(); if (placed_delta_rows == delta_rows && placed_delta_deletes == delta_deletes) return delta_tree->getEntriesCopy>(); - auto blocks = to_place_delta->getMergeBlocks(handle, storage.log(), placed_delta_rows, placed_delta_deletes, delta_rows, delta_deletes); + auto blocks = to_place_delta->getMergeBlocks( + dm_context.table_handle_define, storage_snapshot.log_reader, placed_delta_rows, placed_delta_deletes, delta_rows, delta_deletes); for (auto & v : blocks) { if (!v.delete_range.none()) - placeDelete(dm_context, delta_value_space, v.delete_range); + placeDelete(dm_context, storage_snapshot.data_reader, delta_value_space, v.delete_range); else if (v.block) - placeUpsert(dm_context, delta_value_space, std::move(v.block)); + placeUpsert(dm_context, storage_snapshot.data_reader, delta_value_space, std::move(v.block)); } recorder.submit(); @@ -753,7 +783,10 @@ Segment::ensurePlace(const DMContext & dm_context, const DiskValueSpacePtr & to_ return delta_tree->getEntriesCopy>(); } -void Segment::placeUpsert(const DMContext & dm_context, const DeltaValueSpacePtr & delta_value_space, Block && block) +void Segment::placeUpsert(const DMContext & dm_context, + const PageReader & data_page_reader, + const DeltaValueSpacePtr & delta_value_space, + Block && block) { EventRecorder recorder(ProfileEvents::DMPlaceUpsert, ProfileEvents::DMPlaceUpsertNS); @@ -762,7 +795,7 @@ void Segment::placeUpsert(const DMContext & dm_context, const DeltaValueSpacePtr auto delta_index_end = delta_tree->end(); BlockInputStreamPtr merged_stream = getPlacedStream( // - dm_context, + data_page_reader, {range}, {handle, VERSION_COLUMN_DEFINE}, delta_value_space, @@ -777,7 +810,10 @@ void Segment::placeUpsert(const DMContext & dm_context, const DeltaValueSpacePtr recorder.submit(); } -void Segment::placeDelete(const DMContext & dm_context, const DeltaValueSpacePtr & delta_value_space, const HandleRange & delete_range) +void Segment::placeDelete(const DMContext & dm_context, + const PageReader & data_page_reader, + const DeltaValueSpacePtr & delta_value_space, + const HandleRange & delete_range) { EventRecorder recorder(ProfileEvents::DMPlaceDeleteRange, ProfileEvents::DMPlaceDeleteRangeNS); @@ -788,7 +824,7 @@ void Segment::placeDelete(const DMContext & dm_context, const DeltaValueSpacePtr Blocks delete_data; { BlockInputStreamPtr delete_stream = getPlacedStream( // - dm_context, + data_page_reader, {delete_range}, {handle, VERSION_COLUMN_DEFINE}, delta_value_space, @@ -811,7 +847,7 @@ void Segment::placeDelete(const DMContext & dm_context, const DeltaValueSpacePtr for (const auto & block : delete_data) { BlockInputStreamPtr merged_stream = getPlacedStream( // - dm_context, + data_page_reader, {range}, {handle, VERSION_COLUMN_DEFINE}, delta_value_space, @@ -825,17 +861,17 @@ void Segment::placeDelete(const DMContext & dm_context, const DeltaValueSpacePtr recorder.submit(); } -Handle Segment::getSplitPoint(DMContext & dm_context, const ReadSnapshot & snapshot) +Handle Segment::getSplitPoint(DMContext & dm_context, const PageReader & data_page_reader, const ReadInfo & read_info) { EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS); auto & handle = dm_context.table_handle_define; - auto stream = getPlacedStream(dm_context, + auto stream = getPlacedStream(data_page_reader, {range}, {dm_context.table_handle_define}, - snapshot.delta_value_space, - snapshot.index_begin, - snapshot.index_end, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, DEFAULT_BLOCK_SIZE); stream->readPrefix(); @@ -865,13 +901,13 @@ Handle Segment::getSplitPoint(DMContext & dm_context, const ReadSnapshot & snaps size_t Segment::estimatedRows() { // Not 100% accurate. - return stable.num_rows() + delta_tree->numInserts() - delta_tree->numDeletes() + (delta.num_rows() - placed_delta_rows); + return stable->num_rows() + delta_tree->numInserts() - delta_tree->numDeletes() + (delta->num_rows() - placed_delta_rows); } size_t Segment::estimatedBytes() { - size_t stable_bytes = stable.num_bytes(); - return stable_bytes + delta.num_bytes() - (stable_bytes / stable.num_rows()) * delta_tree->numDeletes(); + size_t stable_bytes = stable->num_bytes(); + return stable_bytes + delta->num_bytes() - (stable_bytes / stable->num_rows()) * delta_tree->numDeletes(); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index d946bc5c4a4..abe29be53d2 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -46,19 +46,21 @@ struct DeltaValueSpace return (*handle_column)[value_id]; } + inline size_t getRows() { return handle_column->size(); } + Columns columns; ColumnRawPtrs columns_ptr; PaddedPODArray const * handle_column; }; using DeltaValueSpacePtr = std::shared_ptr; -struct ReadSnapshot +/// A structure stores the informations to constantly read a segment instance. +struct SegmentSnapshot { - DeltaValueSpacePtr delta_value_space; - DeltaIndex::Iterator index_begin; - DeltaIndex::Iterator index_end; + DiskValueSpacePtr delta; + size_t delta_rows; - ColumnDefines read_columns; + SegmentSnapshot(const DiskValueSpacePtr & delta_, size_t delta_rows_) : delta{delta_}, delta_rows(delta_rows_) {} }; /// A segment contains many rows of a table. A table is split into segments by succeeding ranges. @@ -73,27 +75,49 @@ class Segment : private boost::noncopyable using Version = UInt32; static const Version CURRENT_VERSION; - static SegmentPtr newSegment(DMContext & context, const HandleRange & range_, PageId segment_id_, PageId next_segment_id_); - static SegmentPtr restoreSegment(DMContext & context, PageId segment_id); - - void serialize(WriteBatch & wb); + using OpContext = DiskValueSpace::OpContext; - const HandleRange & getRange() { return range; } - - SegmentPtr write(DMContext & dm_context, BlockOrDelete && block_or_delete); + struct ReadInfo + { + DeltaValueSpacePtr delta_value_space; + DeltaIndex::Iterator index_begin; + DeltaIndex::Iterator index_end; - BlockInputStreamPtr getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const HandleRanges & read_ranges, - UInt64 max_version, - size_t expected_block_size); + ColumnDefines read_columns; + }; - BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read); + static SegmentPtr newSegment(DMContext & context, const HandleRange & range_, PageId segment_id_, PageId next_segment_id_); + static SegmentPtr restoreSegment(DMContext & context, PageId segment_id); - SegmentPair split(DMContext & dm_context); + void serialize(WriteBatch & wb); + /// Write an update. This function only guarantees atomic on segment level. + void write(DMContext & dm_context, const BlockOrDelete & update); + + /// Use #createAppendTask and #applyAppendTask to build higher atomic level. + AppendTaskPtr createAppendTask(const OpContext & context, AppendWriteBatches & wbs, const BlockOrDelete & update); + void applyAppendTask(const OpContext & context, const AppendTaskPtr & task, const BlockOrDelete & update); + + SegmentSnapshot getReadSnapshot(); + BlockInputStreamPtr getInputStream(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read, + const HandleRanges & read_ranges, + UInt64 max_version, + size_t expected_block_size); + BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read); + + SegmentPair split(DMContext & dm_context); static SegmentPtr merge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right); + bool shouldFlush(DMContext & dm_context) const; + /// Flush delta into stable. i.e. delta merge. + SegmentPtr flush(DMContext & dm_context); + size_t getEstimatedRows(); size_t getEstimatedBytes(); @@ -119,26 +143,31 @@ class Segment : private boost::noncopyable void check(DMContext & dm_context, const String & when); - String simpleInfo() { return "{" + DB::toString(segment_id) + ":" + range.toString() + "}"; } + String simpleInfo() const { return "{" + DB::toString(segment_id) + ":" + range.toString() + "}"; } - String info() + String info() const { return "{id:" + DB::toString(segment_id) + ", next: " + DB::toString(next_segment_id) + ", epoch: " + DB::toString(epoch) + ", range: " + range.toString() + "}"; } + const HandleRange & getRange() const { return range; } + size_t delta_rows(); size_t delta_deletes(); private: template - ReadSnapshot getReadSnapshot(const DMContext & dm_context, const ColumnDefines & columns_to_read); + ReadInfo getReadInfo(const DMContext & dm_context, + const SegmentSnapshot & segment_snap, + const StorageSnapshot & storage_snaps, + const ColumnDefines & columns_to_read); template static ColumnDefines arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); template - BlockInputStreamPtr getPlacedStream(const DMContext & dm_context, + BlockInputStreamPtr getPlacedStream(const PageReader & data_page_reader, const HandleRanges & read_ranges, const ColumnDefines & read_columns, const DeltaValueSpacePtr & delta_value_space, @@ -148,31 +177,36 @@ class Segment : private boost::noncopyable /// Split this segment into two. /// Generates two new segment objects, the current object is not modified. - SegmentPair doSplit(DMContext & dm_context, const ReadSnapshot & snapshot, Handle split_point) const; + SegmentPair doSplit(DMContext & dm_context, const PageReader & data_page_reader, const ReadInfo & read_info, Handle split_point) const; /// Merge this segment and the other into one. /// Generates a new segment object, the current object is not modified. - static SegmentPtr doMerge(DMContext & dm_context, - const SegmentPtr & left, - const ReadSnapshot & left_snapshot, - const SegmentPtr & right, - const ReadSnapshot & right_snapshot); + static SegmentPtr doMerge(DMContext & dm_context, + const PageReader & data_page_reader, + const SegmentPtr & left, + const ReadInfo & left_snapshot, + const SegmentPtr & right, + const ReadInfo & right_snapshot); /// Reset the content of this segment. /// Generates a new segment object, the current object is not modified. SegmentPtr reset(DMContext & dm_context, BlockInputStreamPtr & input_stream) const; - bool shouldFlush(DMContext & dm_context, bool force = false); - - /// Flush delta into stable. i.e. delta merge. - SegmentPtr flush(DMContext & dm_context); /// Make sure that all delta chunks have been placed. - DeltaIndexPtr - ensurePlace(const DMContext & dm_context, const DiskValueSpacePtr & to_place_delta, const DeltaValueSpacePtr & delta_value_space); + DeltaIndexPtr ensurePlace(const DMContext & dm_context, + const StorageSnapshot & storage_snapshot, + const DiskValueSpacePtr & to_place_delta, + const DeltaValueSpacePtr & delta_value_space); /// Reference the inserts/updates by delta tree. - void placeUpsert(const DMContext & dm_context, const DeltaValueSpacePtr & delta_value_space, Block && block); + void placeUpsert(const DMContext & dm_context, + const PageReader & data_page_reader, + const DeltaValueSpacePtr & delta_value_space, + Block && block); /// Reference the deletes by delta tree. - void placeDelete(const DMContext & dm_context, const DeltaValueSpacePtr & delta_value_space, const HandleRange & delete_range); + void placeDelete(const DMContext & dm_context, + const PageReader & data_page_reader, + const DeltaValueSpacePtr & delta_value_space, + const HandleRange & delete_range); - Handle getSplitPoint(DMContext & dm_context, const ReadSnapshot & snapshot); + Handle getSplitPoint(DMContext & dm_context, const PageReader & data_page_reader, const ReadInfo & read_info); size_t estimatedRows(); size_t estimatedBytes(); @@ -183,8 +217,8 @@ class Segment : private boost::noncopyable const PageId segment_id; const PageId next_segment_id; - DiskValueSpace delta; - DiskValueSpace stable; + DiskValueSpacePtr delta; + DiskValueSpacePtr stable; DeltaTreePtr delta_tree; size_t placed_delta_rows = 0; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 9782fc12177..d3718288852 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -12,41 +12,43 @@ namespace DM struct SegmentReadTask { - SegmentPtr segment; - HandleRanges ranges; + SegmentPtr segment; + SegmentSnapshot read_snapshot; + HandleRanges ranges; - SegmentReadTask() = default; - explicit SegmentReadTask(const SegmentPtr & segment_) : segment(segment_) {} - SegmentReadTask(const SegmentPtr & segment_, const HandleRanges & ranges_) : segment(segment_), ranges(ranges_) {} + explicit SegmentReadTask(const SegmentPtr & segment_, const SegmentSnapshot & read_snapshot_) + : segment(segment_), read_snapshot(read_snapshot_) + { + } + + SegmentReadTask(const SegmentPtr & segment_, // + const SegmentSnapshot & read_snapshot_, + const HandleRanges & ranges_) + : segment(segment_), read_snapshot(read_snapshot_), ranges(ranges_) + { + } void addRange(const HandleRange & range) { ranges.push_back(range); } }; -using SegmentReadTasks = std::vector; +using SegmentReadTaskPtr = std::shared_ptr; +using SegmentReadTasks = std::vector; +using SegmentStreamCreator = std::function; class SegmentReadTaskPool : private boost::noncopyable { public: - using StreamCreator = std::function; - SegmentReadTaskPool(SegmentReadTasks && tasks_, StreamCreator creator_) : tasks(std::move(tasks_)), creator(creator_) {} + SegmentReadTaskPool(SegmentReadTasks && tasks_) : tasks(std::move(tasks_)) {} - std::pair nextTask() + SegmentReadTaskPtr nextTask() { - SegmentReadTask * task; - { - std::lock_guard lock(mutex); - - if (index == tasks.size()) - return {0, {}}; - task = &(tasks[index++]); - } - return {task->segment->segmentId(), creator(*task)}; + std::lock_guard lock(mutex); + return index == tasks.size() ? SegmentReadTaskPtr() : tasks[index++]; } private: SegmentReadTasks tasks; size_t index = 0; - StreamCreator creator; std::mutex mutex; }; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 84f31a1aef5..e58f62317d0 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -12,7 +12,7 @@ namespace DM static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60); -class StoragePool +class StoragePool : private boost::noncopyable { public: using Clock = std::chrono::system_clock; @@ -21,7 +21,6 @@ class StoragePool using Seconds = std::chrono::seconds; explicit StoragePool(const String & path); - StoragePool(const StoragePool &) = delete; PageId maxLogPageId() { return max_log_page_id; } PageId maxDataPageId() { return max_data_page_id; } @@ -51,5 +50,23 @@ class StoragePool std::mutex mutex; }; +const static PageStorage::SnapshotPtr EMPTY_PS_SNAP_PTR = {}; + +class StorageSnapshot +{ +public: + StorageSnapshot(StoragePool & storage, bool snapshot_read = true) + : log_reader(storage.log(), snapshot_read ? storage.log().getSnapshot() : EMPTY_PS_SNAP_PTR), + data_reader(storage.data(), snapshot_read ? storage.data().getSnapshot() : EMPTY_PS_SNAP_PTR), + meta_reader(storage.meta(), snapshot_read ? storage.meta().getSnapshot() : EMPTY_PS_SNAP_PTR) + { + } + + PageReader log_reader; + PageReader data_reader; + PageReader meta_reader; +}; +using StorageSnapshotPtr = std::shared_ptr; + } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index ecb5a3c6a90..3e9456b69b3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -1,6 +1,7 @@ #include "dm_basic_include.h" #include + #include #include @@ -86,7 +87,8 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead) { // read using `getInputStream` - BlockInputStreamPtr in = delta.getInputStream(table_columns, dm_context->storage_pool.log()); + PageReader page_reader(dm_context->storage_pool.log()); + BlockInputStreamPtr in = delta.getInputStream(table_columns, page_reader); size_t num_rows_read = 0; while (Block block = in->read()) { @@ -103,7 +105,8 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead) // read using `read` of offset && limit const size_t read_offset = 15; const size_t num_rows_expect = 20; - Block block = delta.read(table_columns, dm_context->storage_pool.log(), read_offset, num_rows_expect); + PageReader page_reader(dm_context->storage_pool.log()); + Block block = delta.read(table_columns, page_reader, read_offset, num_rows_expect); // check the order of cols is the same as read_columns const Names colnames = block.getNames(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 43d7847ea4d..5b8f304ba9f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -89,8 +89,10 @@ TEST_F(Segment_test, WriteRead) { // read written data auto in = segment->getInputStream(/* dm_context= */ *dm_context, - /*columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, + /* columns_to_read= */ table_columns, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); size_t num_rows_read = 0; @@ -114,8 +116,10 @@ TEST_F(Segment_test, WriteRead) { // read after delete range auto in = segment->getInputStream(/* dm_context= */ *dm_context, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, /* columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); in->readPrefix(); @@ -148,8 +152,10 @@ TEST_F(Segment_test, Split) { // read written data auto in = segment->getInputStream(/* dm_context= */ *dm_context, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, /*columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); size_t num_rows_read = 0; @@ -181,8 +187,10 @@ TEST_F(Segment_test, Split) { { auto in = segment->getInputStream(/* dm_context= */ *dm_context, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, /* columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); in->readPrefix(); @@ -194,8 +202,10 @@ TEST_F(Segment_test, Split) } { auto in = new_segment->getInputStream(/* dm_context= */ *dm_context, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, /*columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); in->readPrefix(); @@ -221,8 +231,10 @@ TEST_F(Segment_test, Split) { size_t num_rows_read = 0; auto in = segment->getInputStream(/* dm_context= */ *dm_context, + /* segment_snap= */ segment->getReadSnapshot(), + /* storage_snap= */ {dm_context->storage_pool}, /* columns_to_read= */ table_columns, - {HandleRange::newAll()}, + /* read_ranges= */ {HandleRange::newAll()}, /* max_version= */ std::numeric_limits::max(), /* expected_block_size= */ 1024); in->readPrefix(); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 2a75c570cc0..376a93b5330 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -54,7 +54,7 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file, PageStorage::PageStorage(const String & storage_path_, const Config & config_) : storage_path(storage_path_), config(config_), - version_set(), + versioned_page_entries(), page_file_log(&Poco::Logger::get("PageFile")), log(&Poco::Logger::get("PageStorage")) { @@ -74,13 +74,12 @@ PageStorage::PageStorage(const String & storage_path_, const Config & config_) write_file = page_file; } // apply edit to new version - version_set.apply(edit); + versioned_page_entries.apply(edit); } #else - auto snapshot = version_set.getSnapshot(); + auto snapshot = versioned_page_entries.getSnapshot(); - typename PageEntryMapVersionSet::BuilderType builder( - snapshot->version(), true, log); // If there are invalid ref-pairs, just ignore that + typename PageEntriesVersionSet::BuilderType builder(snapshot->version(), true, log); // If there are invalid ref-pairs, just ignore that for (auto & page_file : page_files) { PageEntriesEdit edit; @@ -94,19 +93,19 @@ PageStorage::PageStorage(const String & storage_path_, const Config & config_) // apply edit to new version builder.apply(edit); } - version_set.restore(builder.build()); + versioned_page_entries.restore(builder.build()); #endif } PageId PageStorage::getMaxId() { std::lock_guard write_lock(write_mutex); - return version_set.getSnapshot()->version()->maxId(); + return versioned_page_entries.getSnapshot()->version()->maxId(); } PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -162,21 +161,21 @@ void PageStorage::write(const WriteBatch & wb) std::lock_guard lock(write_mutex); getWriter().write(wb, edit); - // Apply changes into version_set(generate a new version) + // Apply changes into versioned_page_entries(generate a new version) // If there are RefPages to non-exist Pages, just put the ref pair to new version // instead of throwing exception. Or we can't open PageStorage since we have already // persist the invalid ref pair into PageFile. - version_set.apply(edit); + versioned_page_entries.apply(edit); } PageStorage::SnapshotPtr PageStorage::getSnapshot() { - return version_set.getSnapshot(); + return versioned_page_entries.getSnapshot(); } Page PageStorage::read(PageId page_id, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -192,7 +191,7 @@ Page PageStorage::read(PageId page_id, SnapshotPtr snapshot) PageMap PageStorage::read(const std::vector & page_ids, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -223,9 +222,9 @@ PageMap PageStorage::read(const std::vector & page_ids, SnapshotPtr snap return page_map; } -void PageStorage::read(const std::vector & page_ids, PageHandler & handler, SnapshotPtr snapshot) +void PageStorage::read(const std::vector & page_ids, const PageHandler & handler, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -255,7 +254,7 @@ void PageStorage::read(const std::vector & page_ids, PageHandler & handl void PageStorage::traverse(const std::function & acceptor, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -297,7 +296,7 @@ void PageStorage::traversePageEntries( // const std::function & acceptor, SnapshotPtr snapshot) { - if (snapshot == nullptr) + if (!snapshot) { snapshot = this->getSnapshot(); } @@ -397,8 +396,8 @@ bool PageStorage::gc() } std::set live_files; - /// Here we have to apply edit to version_set and generate a new version, then return all files that are in used - live_files = version_set.gcApply(gc_file_entries_edit); + /// Here we have to apply edit to versioned_page_entries and generate a new version, then return all files that are in used + live_files = versioned_page_entries.gcApply(gc_file_entries_edit); { // Remove obsolete files' reader cache that are not used by any version diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index cb74dd2b542..5f5129a82e2 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -44,6 +44,13 @@ class PageStorage size_t merge_hint_low_used_file_num = 10; }; +#ifdef DELTA_VERSION_SET + using VersionedPageEntries = PageEntriesVersionSetWithDelta; +#else + using VersionedPageEntries = PageEntriesVersionSet; +#endif + + using SnapshotPtr = VersionedPageEntries::SnapshotPtr; using WriterPtr = std::unique_ptr; using ReaderPtr = std::shared_ptr; using OpenReadFiles = std::map; @@ -55,18 +62,13 @@ class PageStorage void write(const WriteBatch & write_batch); -#ifdef DELTA_VERSION_SET - using SnapshotPtr = PageEntriesVersionSetWithDelta::SnapshotPtr; -#else - using SnapshotPtr = PageEntryMapVersionSet::SnapshotPtr; -#endif SnapshotPtr getSnapshot(); - PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = nullptr); - Page read(PageId page_id, SnapshotPtr snapshot = nullptr); - PageMap read(const std::vector & page_ids, SnapshotPtr snapshot = nullptr); - void read(const std::vector & page_ids, PageHandler & handler, SnapshotPtr snapshot = nullptr); - void traverse(const std::function & acceptor, SnapshotPtr snapshot = nullptr); + PageEntry getEntry(PageId page_id, SnapshotPtr snapshot = {}); + Page read(PageId page_id, SnapshotPtr snapshot = {}); + PageMap read(const std::vector & page_ids, SnapshotPtr snapshot = {}); + void read(const std::vector & page_ids, const PageHandler & handler, SnapshotPtr snapshot = {}); + void traverse(const std::function & acceptor, SnapshotPtr snapshot = {}); void traversePageEntries(const std::function & acceptor, SnapshotPtr snapshot); bool gc(); @@ -91,11 +93,7 @@ class PageStorage String storage_path; Config config; -#ifdef DELTA_VERSION_SET - PageEntriesVersionSetWithDelta version_set; -#else - PageEntryMapVersionSet version_set; -#endif + VersionedPageEntries versioned_page_entries; PageFile write_file; WriterPtr write_file_writer; @@ -110,4 +108,21 @@ class PageStorage std::mutex gc_mutex; // A mutex used to protect gc }; +class PageReader +{ +public: + /// Not snapshot read. + explicit PageReader(PageStorage & storage_) : storage(storage_), snap() {} + /// Snapshot read. + PageReader(PageStorage & storage_, const PageStorage::SnapshotPtr & snap_) : storage(storage_), snap(snap_) {} + + Page read(PageId page_id) const { return storage.read(page_id, snap); } + PageMap read(const std::vector & page_ids) const { return storage.read(page_ids, snap); } + void read(const std::vector & page_ids, PageHandler & handler) const { storage.read(page_ids, handler); }; + +private: + PageStorage & storage; + PageStorage::SnapshotPtr snap; +}; + } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 05c97909481..dc6301dd8ec 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -96,6 +96,12 @@ StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, global_context, path, name, table_column_defines, handle_column_define, DeltaMergeStore::Settings()); } +void StorageDeltaMerge::drop() +{ + // Reclaim memory. + MallocExtension::instance()->ReleaseFreeMemory(); +} + Block StorageDeltaMerge::buildInsertBlock(bool is_import, const Block & old_block) { Block block = old_block; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 9a3039ea2d5..94d75ba0fb6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -23,6 +23,8 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ String getName() const override { return "DeltaMerge"; } String getTableName() const override { return name; } + void drop() override ; + BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, diff --git a/dbms/src/Storages/Transaction/Datum.h b/dbms/src/Storages/Transaction/Datum.h index 4c20ffb3d9f..eaab616dd07 100644 --- a/dbms/src/Storages/Transaction/Datum.h +++ b/dbms/src/Storages/Transaction/Datum.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include diff --git a/dbms/src/Storages/Transaction/ExtraCFData.h b/dbms/src/Storages/Transaction/ExtraCFData.h index 124bc9b9226..e9dce947d9b 100644 --- a/dbms/src/Storages/Transaction/ExtraCFData.h +++ b/dbms/src/Storages/Transaction/ExtraCFData.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace DB