Skip to content

Commit

Permalink
Revert "[fix](read) remove logic of estimating count of rows to read …
Browse files Browse the repository at this point in the history
…in segment iterator to avoid wrong result of unique key (apache#29109) (apache#29110)"

This reverts commit 74b5ef0.

And fix bug: when avg_row_size > config::doris_scan_block_max_mb,
_opts.block_row_max will be set to 0, which will cause early EOF.
  • Loading branch information
jacktengg committed Jan 4, 2024
1 parent bde2c11 commit 8934a95
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
24 changes: 24 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr sc
_lazy_materialization_read(false),
_lazy_inited(false),
_inited(false),
_estimate_row_size(true),
_wait_times_estimate_row_size(10),
_pool(new ObjectPool) {}

Status SegmentIterator::init(const StorageReadOptions& opts) {
Expand Down Expand Up @@ -2051,6 +2053,13 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {

_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
if (_wait_times_estimate_row_size > 0) {
// first time, read 100 rows to estimate average row size, to avoid oom caused by a single batch being too large.
// If no valid data is read for the first time, block_row_max is read each time thereafter.
// Avoid low performance when valid data cannot be read all the time
nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
_wait_times_estimate_row_size--;
}
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
Expand Down Expand Up @@ -2190,6 +2199,9 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);

if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
_update_max_row(block);
}
return Status::OK();
}

Expand All @@ -2215,6 +2227,10 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);

if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
_update_max_row(block);
}

// reverse block row order
if (_opts.read_orderby_key_reverse) {
size_t num_rows = block->rows();
Expand Down Expand Up @@ -2365,6 +2381,14 @@ void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
}
}

void SegmentIterator::_update_max_row(const vectorized::Block* block) {
_estimate_row_size = false;
auto avg_row_size = block->bytes() / block->rows();

int block_row_max = config::doris_scan_block_max_mb / avg_row_size;
_opts.block_row_max = std::min(block_row_max, _opts.block_row_max);
}

Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) {
DCHECK(_opts.record_rowids);
DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ class SegmentIterator : public RowwiseIterator {

void _convert_dict_code_for_predicate_if_necessary_impl(ColumnPredicate* predicate);

void _update_max_row(const vectorized::Block* block);

bool _check_apply_by_bitmap_index(ColumnPredicate* pred);
bool _check_apply_by_inverted_index(ColumnPredicate* pred, bool pred_in_compound = false);

Expand Down Expand Up @@ -383,6 +385,9 @@ class SegmentIterator : public RowwiseIterator {
// the actual init process is delayed to the first call to next_batch()
bool _lazy_inited;
bool _inited;
bool _estimate_row_size;
// Read up to 100 rows at a time while waiting for the estimated row size.
int _wait_times_estimate_row_size;

StorageReadOptions _opts;
// make a copy of `_opts.column_predicates` in order to make local changes
Expand Down

0 comments on commit 8934a95

Please sign in to comment.