Skip to content

Commit

Permalink
[fix](compaction) fix mismatch between segment key and value column r…
Browse files Browse the repository at this point in the history
…ows during compaction (apache#37960)

When a block is splitted to 3 segments, old code just handles 2 and the
last is overlowed.
  • Loading branch information
luwei16 committed Aug 13, 2024
1 parent e19f603 commit ac0841b
Showing 1 changed file with 24 additions and 29 deletions.
53 changes: 24 additions & 29 deletions be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,35 +95,30 @@ Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block,
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else {
// value columns
uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written();
VLOG_NOTICE << "num_rows_written: " << num_rows_written
<< ", _cur_writer_idx: " << _cur_writer_idx;
uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
// init if it's first value column write in current segment
if (_cur_writer_idx == 0 && num_rows_written == 0) {
VLOG_NOTICE << "init first value column segment writer";
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
}
// when splitting segment, need to make rows align between key columns and value columns
size_t start_offset = 0, limit = num_rows;
if (num_rows_written + num_rows >= num_rows_key_group &&
_cur_writer_idx < _segment_writers.size() - 1) {
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(
block, 0, num_rows_key_group - num_rows_written));
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
start_offset = num_rows_key_group - num_rows_written;
limit = num_rows - start_offset;
++_cur_writer_idx;
// switch to next writer
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
num_rows_written = 0;
num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
}
if (limit > 0) {
RETURN_IF_ERROR(
_segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit));
DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <=
_segment_writers[_cur_writer_idx]->row_count());
int64_t left = num_rows;
while (left > 0) {
uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written();
VLOG_NOTICE << "num_rows_written: " << num_rows_written
<< ", _cur_writer_idx: " << _cur_writer_idx;
uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count();
CHECK_LE(num_rows_written, num_rows_key_group);
// init if it's first value column write in current segment
if (num_rows_written == 0) {
VLOG_NOTICE << "init first value column segment writer";
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key));
}

int64_t to_write = num_rows_written + left >= num_rows_key_group
? num_rows_key_group - num_rows_written
: left;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, num_rows - left,
to_write));
left -= to_write;
CHECK_GE(left, 0);

if (left > 0) {
++_cur_writer_idx;
}
}
}
if (is_key) {
Expand Down

0 comments on commit ac0841b

Please sign in to comment.