Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix empty segment cannot merge after gc and avoid write index data for empty dmfile #4500

Merged
merged 14 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
auto it = segments.find(segment->getRowKeyRange().getEnd());
// check legality
if (it == segments.end())

return {};
auto & cur_segment = it->second;
if (cur_segment.get() != segment.get())
Expand Down Expand Up @@ -1595,7 +1594,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
}

assert(segment != nullptr);
if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr)
if (segment->hasAbandoned() || segment_snap == nullptr)
continue;

const auto segment_id = segment->segmentId();
Expand All @@ -1604,37 +1603,45 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
// meet empty segment, try merge it
if (segment_snap->getRows() == 0)
{
// release segment_snap before checkSegmentUpdate, otherwise this segment is still in update status.
segment_snap = nullptr;
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
continue;
}

// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

try
{
// Check whether we should apply gc on this segment
const bool should_compact
= GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log)
|| GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log);
bool should_compact = false;
if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment_snap,
segment_range,
global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc,
log))
{
should_compact = true;
}
else if (segment->getLastCheckGCSafePoint() < gc_safe_point)
{
// Avoid recheck this segment when gc_safe_point doesn't change regardless whether we trigger this segment's DeltaMerge or not.
// Because after we calculate StableProperty and compare it with this gc_safe_point,
// there is no need to recheck it again using the same gc_safe_point.
// On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process,
// it's still worth to wait another gc_safe_point to check this segment again.
segment->setLastCheckGCSafePoint(gc_safe_point);
dm_context->min_version = gc_safe_point;

// calculate StableProperty if needed
if (!segment->getStable()->isStablePropertyCached())
segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle());

should_compact = GC::shouldCompactStable(
segment,
gc_safe_point,
global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc,
log);
}
bool finish_gc_on_segment = false;
if (should_compact)
{
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ class DMFilePackFilter
const auto file_name_base = DMFile::getFileNameBase(col_id);

auto load = [&]() {
auto index_file_size = dmfile->colIndexSize(file_name_base);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
if (!dmfile->configuration)
{
auto index_buf = ReadBufferFromFileProvider(
file_provider,
dmfile->colIndexPath(file_name_base),
dmfile->encryptionIndexPath(file_name_base),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), dmfile->colIndexSize(file_name_base)),
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), index_file_size),
read_limiter);
index_buf.seek(dmfile->colIndexOffset(file_name_base));
return MinMaxIndex::read(*type, index_buf, dmfile->colIndexSize(file_name_base));
Expand All @@ -240,11 +243,10 @@ class DMFilePackFilter
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
index_buf->seek(dmfile->colIndexOffset(file_name_base));
auto file_size = dmfile->colIndexSize(file_name_base);
auto header_size = dmfile->configuration->getChecksumHeaderLength();
auto frame_total_size = dmfile->configuration->getChecksumFrameLength();
auto frame_count = file_size / frame_total_size + (file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, file_size - header_size * frame_count);
auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0);
return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count);
}
};
MinMaxIndexPtr minmax_index;
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,25 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)

void DMFileWriter::write(const Block & block, const BlockProperty & block_property)
{
is_empty_file = false;
DMFile::PackStat stat;
stat.rows = block.rows();
stat.not_clean = block_property.not_clean_rows;
stat.bytes = block.bytes(); // This is bytes of pack data in memory.

auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column;

const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : (const ColumnVector<UInt8> *)del_mark_column.get();
const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : static_cast<const ColumnVector<UInt8> *>(del_mark_column.get());

for (auto & cd : write_columns)
{
auto & col = getByColumnId(block, cd.id).column;
const auto & col = getByColumnId(block, cd.id).column;
writeColumn(cd.id, *cd.type, *col, del_mark);

if (cd.id == VERSION_COLUMN_ID)
stat.first_version = col->get64(0);
else if (cd.id == TAG_COLUMN_ID)
stat.first_tag = (UInt8)(col->get64(0));
stat.first_tag = static_cast<UInt8>(col->get64(0));
}

if (!options.flags.isSingleFile())
Expand Down Expand Up @@ -359,7 +360,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
dmfile->encryptionIndexPath(stream_name),
false,
write_limiter);
stream->minmaxes->write(*type, buf);
if (!is_empty_file)
stream->minmaxes->write(*type, buf);
buf.sync();
bytes_written += buf.getMaterializedBytes();
}
Expand All @@ -372,7 +374,8 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
write_limiter,
dmfile->configuration->getChecksumAlgorithm(),
dmfile->configuration->getChecksumFrameLength());
stream->minmaxes->write(*type, *buf);
if (!is_empty_file)
stream->minmaxes->write(*type, *buf);
buf->sync();
bytes_written += buf->getMaterializedBytes();
#ifndef NDEBUG
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ class DMFileWriter

FileProviderPtr file_provider;
WriteLimiterPtr write_limiter;

// use to avoid write index data for empty file
bool is_empty_file = true;
};

} // namespace DM
Expand Down