Skip to content

Commit

Permalink
[Feature](schema change) fix write and add regression test (apache#69)
Browse files Browse the repository at this point in the history
Co-authored-by: yixiutt <yixiu@selectdb.com>
  • Loading branch information
2 people authored and Lchangliang committed Jun 27, 2022
1 parent 3fe2a6b commit 17a9730
Show file tree
Hide file tree
Showing 13 changed files with 671 additions and 18 deletions.
2 changes: 0 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ void NodeChannel::open() {
request.set_index_id(_index_channel->_index_id);
request.set_txn_id(_parent->_txn_id);
request.set_allocated_schema(_parent->_schema->to_protobuf());
LOG(INFO) << "yixiu proto: " << _parent->_schema->to_protobuf()->DebugString();
for (auto& tablet : _all_tablets) {
auto ptablet = request.add_tablets();
ptablet->set_partition_id(tablet.partition_id);
Expand Down Expand Up @@ -683,7 +682,6 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
_tuple_desc_id = table_sink.tuple_id;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
LOG(INFO) << "yixiu thrift:" << apache::thrift::ThriftDebugString(table_sink.schema);
_partition = _pool->add(new OlapTablePartitionParam(_schema, table_sink.partition));
RETURN_IF_ERROR(_partition->init());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,8 @@ int64_t DeltaWriter::partition_id() const {
void DeltaWriter::_build_current_tablet_schema(const POlapTableSchemaParam& ptable_schema_param,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
LOG(INFO) << "build current tablet schema";
LOG(INFO) << "yixiu: " << ptable_schema_param.DebugString();
//new tablet schame if new table
if (ptable_schema_param.columns_size() != 0 && ptable_schema_param.columns(0).unique_id() > 0) {
LOG(INFO) << "yixiu new table";
if (ptable_schema_param.columns_size() != 0 && ptable_schema_param.columns(0).unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(ptable_schema_param, ori_tablet_schema);
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, co
reader_params.version = dst_rowset_writer->version();
reader_params.tablet_schema = cur_tablet_schema;

const auto& schema = tablet->tablet_schema();
const auto& schema = *cur_tablet_schema;
reader_params.return_columns.resize(schema.num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class TabletReader {
std::set<uint32_t>* load_bf_columns);

TabletSharedPtr tablet() { return _tablet; }
const TabletSchema& tablet_schema() {return *_tablet_schema;}

std::unique_ptr<MemPool> _predicate_mem_pool;
std::set<uint32_t> _load_bf_columns;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ namespace doris {

Rowset::Rowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta)
: //_schema(schema),
_rowset_path_desc(rowset_path_desc),
: _rowset_path_desc(rowset_path_desc),
_rowset_meta(std::move(rowset_meta)),
_refs_by_reader(0),
_rowset_state_machine(RowsetStateMachine()) {
Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ class RowsetMeta {
void set_tablet_schema(const TabletSchema* tablet_schema) {
TabletSchemaPB* ts_pb = _rowset_meta_pb.mutable_tablet_schema();
tablet_schema->to_schema_pb(ts_pb);
LOG(INFO) << "yixiu rowset: " << ts_pb->DebugString();
CHECK(_schema == nullptr);
_schema = std::make_shared<TabletSchema>(*tablet_schema);
}
Expand All @@ -292,9 +291,7 @@ class RowsetMeta {
private:
friend class AlphaRowsetMeta;
bool _deserialize_from_pb(const std::string& value) {
auto ret = _rowset_meta_pb.ParseFromString(value);
LOG(INFO) << "yixiu rowset: " << _rowset_meta_pb.DebugString();
return ret;
return _rowset_meta_pb.ParseFromString(value);
}

bool _serialize_to_pb(std::string* value) {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
}
LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << request.tablets().size()
<< ", timeout(s): " << request.load_channel_timeout_s();
LOG(INFO) << "yixiu schema: " << request.schema().DebugString();
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = new OlapTableSchemaParam();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_string_tag.resize(_stored_data_columns.size());

auto& tablet_schema = tablet()->tablet_schema();
auto& tablet_schema = *_tablet_schema;
for (auto idx : _agg_columns_idx) {
AggregateFunctionPtr function =
tablet_schema
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/olap/vcollect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class VCollectIterator {
// then merged with other rowset readers.
class LevelIterator {
public:
LevelIterator(TabletReader* reader) : _schema(reader->tablet()->tablet_schema()) {};
LevelIterator(TabletReader* reader) : _schema(reader->tablet_schema()) {};

virtual Status init() = 0;

Expand Down
4 changes: 2 additions & 2 deletions regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
// **Note**: default db will be create if not exist
defaultDb = "regression_test"

jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
jdbcUrl = "jdbc:mysql://192.168.0.21:8032/?"
jdbcUser = "root"
jdbcPassword = ""

feHttpAddress = "127.0.0.1:8030"
feHttpAddress = "192.168.0.21:8030"
feHttpUser = "root"
feHttpPassword = ""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite ("test_agg_keys_schema_change") {
def tableName = "schema_change_agg_keys_regression_test"

try {
sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`last_update_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次更新时间",
`last_visit_date_not_null` DATETIME REPLACE NOT NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间",
`hll_col` HLL HLL_UNION NOT NULL COMMENT "HLL列",
`bitmap_col` Bitmap BITMAP_UNION NOT NULL COMMENT "bitmap列")
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
PROPERTIES ( "replication_num" = "1" );
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', 'Beijing', 10, 1, '2020-01-01', '2020-01-01', '2020-01-01', 1, 30, 20, hll_hash(1), to_bitmap(1))
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', 'Beijing', 10, 1, '2020-01-02', '2020-01-02', '2020-01-02', 1, 31, 19, hll_hash(2), to_bitmap(2))
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', 'Beijing', 10, 1, '2020-01-02', '2020-01-02', '2020-01-02', 1, 31, 21, hll_hash(2), to_bitmap(2))
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(3), to_bitmap(3))
"""
def result1 = sql """
select * from ${tableName}
"""
assertTrue(result1.size() == 2)
assertTrue(result1[0].size() == 13)
assertTrue(result1[0][8] == 2, "user id 1 cost should be 2")
assertTrue(result1[1][8] == 2, "user id 2 cost should be 2")

// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT MAX default "1"
"""

def result2 = sql """ SELECT * FROM ${tableName} WHERE user_id=2 """
assertTrue(result1[0][8] == 2, "user id 2 cost should be 2")

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(4), to_bitmap(4), 2)
"""
result2 = sql """ SELECT * FROM ${tableName} WHERE user_id=2 """
assertTrue(result2[0][8] == 3, "user id 2 cost should be 3")


sql """ INSERT INTO ${tableName} (`user_id`,`date`,`city`,`age`,`sex`,`last_visit_date`,`last_update_date`,
`last_visit_date_not_null`,`cost`,`max_dwell_time`,`min_dwell_time`, `hll_col`, `bitmap_col`)
VALUES
(3, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(4), to_bitmap(4))
"""

result2 = sql """ SELECT * FROM ${tableName} WHERE user_id=3 """

assertTrue(result2.size() == 1)
assertTrue(result2[0].size() == 14)
assertTrue(result2[0][13] == 1, "new add column default value should be 1")

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(4), to_bitmap(4), 2)
"""
def result3 = sql """ SELECT * FROM ${tableName} WHERE user_id = 3 """

assertTrue(result3.size() == 1)
assertTrue(result3[0].size() == 14)
assertTrue(result3[0][13] == 2, "new add column value is set to 2")

def result4 = sql """ select count(*) from ${tableName} """
logger.info("result4.size:"+result4.size() + " result4[0].size:" + result4[0].size + " " + result4[0][0])
assertTrue(result4.size() == 1)
assertTrue(result4[0].size() == 1)
assertTrue(result4[0][0] == 3, "total count is 3")

// drop column
sql """
ALTER TABLE ${tableName} DROP COLUMN last_visit_date
"""
def result5 = sql """ select * from ${tableName} where user_id = 3 """
assertTrue(result5.size() == 1)
assertTrue(result5[0].size() == 13)

sql """ INSERT INTO ${tableName} VALUES
(4, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(4), to_bitmap(4), 2)
"""

def result6 = sql """ select * from ${tableName} where user_id = 4 """
assertTrue(result6.size() == 1)
assertTrue(result6[0].size() == 13)

sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""
sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""
sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""
sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""
sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""
sql """ INSERT INTO ${tableName} VALUES
(5, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', 1, 32, 20, hll_hash(5), to_bitmap(5), 2)
"""

Thread.sleep(30 * 1000)
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
logger.info("run compaction:" + tablet_id)
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append("192.168.0.21:8041")
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")

String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append("192.168.0.21:8041")
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
def result7 = sql """ select count(*) from ${tableName} """
assertTrue(result7.size() == 1)
assertTrue(result7[0][0] == 5)

def result8 = sql """ SELECT * FROM ${tableName} WHERE user_id=2 """
assertTrue(result8.size() == 1)
assertTrue(result8[0].size() == 13)

int rowCount = 0
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append("192.168.0.21:8041")
sb.append("/api/compaction/show?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
// wait for cleaning stale_rowsets
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
rowCount += Integer.parseInt(rowset.split(" ")[1])
}
}
logger.info("size:" + rowCount)
assertTrue(rowCount <= 8)
} finally {
//try_sql("DROP TABLE IF EXISTS ${tableName}")
}

}
Loading

0 comments on commit 17a9730

Please sign in to comment.