Skip to content

Commit

Permalink
[fix](schema-change) Fix bug that schema change may return -102 error (
Browse files Browse the repository at this point in the history
…#7808)

When using linked schema change, we need to check if all rowsets are of the same type,
ALPHA or BETA. otherwise, we need to use direct schema change to convert the data.
  • Loading branch information
morningman committed Jan 21, 2022
1 parent ed39ff1 commit 0efef1b
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 19 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ OLAPStatus CollectIterator::next(const RowCursor** row, bool* delete_flag) {

CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader)
: _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {
if (LIKELY(rs_reader->type() == RowsetReader::BETA)) {
if (LIKELY(rs_reader->type() == RowsetTypePB::BETA_ROWSET)) {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v2;
} else {
_refresh_current_row = &Level0Iterator::_refresh_current_row_v1;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/alpha_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class AlphaRowsetReader : public RowsetReader {

int64_t filtered_rows() override;

RowsetReaderType type() const override { return RowsetReaderType::ALPHA; }
RowsetTypePB type() const override { return RowsetTypePB::ALPHA_ROWSET; }

private:
OLAPStatus _init_merge_ctxs(RowsetReaderContext* read_context);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/alpha_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class AlphaRowsetWriter : public RowsetWriter {

RowsetId rowset_id() override { return _rowset_writer_context.rowset_id; }

RowsetTypePB type() const override { return RowsetTypePB::ALPHA_ROWSET; }

private:
OLAPStatus _init();

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BetaRowsetReader : public RowsetReader {
return _stats->rows_del_filtered + _stats->rows_conditions_filtered;
}

RowsetReaderType type() const override { return RowsetReaderType::BETA; }
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }

private:
RowsetReaderContext* _context;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class BetaRowsetWriter : public RowsetWriter {

RowsetId rowset_id() override { return _context.rowset_id; }

RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }

private:
template <typename RowType>
OLAPStatus _add_row(const RowType& row);
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <unordered_map>

#include "gen_cpp/olap_file.pb.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader_context.h"
#include "vec/core/block.h"
Expand All @@ -37,8 +38,6 @@ using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;

class RowsetReader {
public:
enum RowsetReaderType { ALPHA, BETA };

virtual ~RowsetReader() {}

// reader init
Expand All @@ -63,7 +62,7 @@ class RowsetReader {

virtual int64_t filtered_rows() = 0;

virtual RowsetReaderType type() const = 0;
virtual RowsetTypePB type() const = 0;
};

} // namespace doris
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H

#include "gen_cpp/olap_file.pb.h"
#include "gen_cpp/types.pb.h"
#include "gutil/macros.h"
#include "olap/column_mapping.h"
Expand Down Expand Up @@ -67,6 +68,8 @@ class RowsetWriter {

virtual RowsetId rowset_id() = 0;

virtual RowsetTypePB type() const = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
31 changes: 21 additions & 10 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,16 +933,27 @@ bool RowBlockMerger::_pop_heap() {
OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
OLAPStatus status = new_rowset_writer->add_rowset_for_linked_schema_change(
rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
<< ", base_tablet=" << base_tablet->full_name()
<< ", version=" << new_rowset_writer->version().first << "-"
<< new_rowset_writer->version().second;
}
return status;

// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema change,
// but requires direct schema change to rewrite the data.
if (rowset_reader->type() != new_rowset_writer->type()) {
LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id() << " in base tablet " << base_tablet->tablet_id()
<< " is not same as type " << new_rowset_writer->type() << ", use direct schema change.";
SchemaChangeDirectly scd(_row_block_changer, _mem_tracker);
return scd.process(rowset_reader, new_rowset_writer, new_tablet, base_tablet);
} else {
OLAPStatus status = new_rowset_writer->add_rowset_for_linked_schema_change(
rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
<< ", base_tablet=" << base_tablet->full_name()
<< ", version=" << new_rowset_writer->version().first << "-"
<< new_rowset_writer->version().second;
}
return status;
}
}

SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/olap/vcollect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ OLAPStatus VCollectIterator::next(Block* block) {

VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader)
: LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) {
DCHECK_EQ(RowsetReader::BETA, rs_reader->type());
DCHECK_EQ(RowsetTypePB::BETA_ROWSET, rs_reader->type());
_block = _schema.create_block(_reader->_return_columns);
_ref.block = &_block;
_ref.row_pos = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/test/util/threadpool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
class ThreadPoolTestTokenTypes : public ThreadPoolTest,
public testing::WithParamInterface<ThreadPool::ExecutionMode> {};

INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
INSTANTIATE_TEST_SUITE_P(Tokens, ThreadPoolTestTokenTypes,
::testing::Values(ThreadPool::ExecutionMode::SERIAL,
ThreadPool::ExecutionMode::CONCURRENT));

Expand Down
2 changes: 1 addition & 1 deletion thirdparty/vars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ LEVELDB_MD5SUM="298b5bddf12c675d6345784261302252"

# brpc
BRPC_DOWNLOAD="https://github.com/apache/incubator-brpc/archive/refs/tags/1.0.0.tar.gz"
BRPC_NAME="1.0.0.tar.gz"
BRPC_NAME="incubator-brpc-1.0.0.tar.gz"
BRPC_SOURCE="incubator-brpc-1.0.0"
BRPC_MD5SUM="73b201192a10107628e3af5ccd643676"

Expand Down

0 comments on commit 0efef1b

Please sign in to comment.