Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](cloud) Accelerate creating table by batching RPC In cloud #36786

Merged
merged 7 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"
BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status");

BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");

// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
8 changes: 8 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class MetaServiceImpl : public cloud::MetaService {
void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request,
IndexResponse* response, ::google::protobuf::Closure* done) override;

void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
CheckKVResponse* response, ::google::protobuf::Closure* done) override;

void prepare_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) override;
Expand Down Expand Up @@ -426,6 +429,11 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::drop_index, controller, request, response, done);
}

void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
CheckKVResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::check_kv, controller, request, response, done);
}

void prepare_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
98 changes: 97 additions & 1 deletion cloud/src/meta-service/meta_service_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "meta_service.h"

namespace doris::cloud {

using check_create_table_type = std::function<const std::tuple<
const ::google::protobuf::RepeatedField<int64_t>, std::string,
std::function<std::string(std::string, int64_t)>>(const CheckKVRequest* request)>;
// ATTN: xxx_id MUST NOT be reused
//
// UNKNOWN
Expand Down Expand Up @@ -67,6 +69,11 @@ static TxnErrorCode index_exists(Transaction* txn, const std::string& instance_i
return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND;
}

static TxnErrorCode check_recycle_key_exist(Transaction* txn, const std::string& key) {
std::string val;
return txn->get(key, &val);
}

void MetaServiceImpl::prepare_index(::google::protobuf::RpcController* controller,
const IndexRequest* request, IndexResponse* response,
::google::protobuf::Closure* done) {
Expand Down Expand Up @@ -614,4 +621,93 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
}
}

void check_create_table(std::string instance_id, std::shared_ptr<TxnKv> txn_kv,
const CheckKVRequest* request, CheckKVResponse* response,
MetaServiceCode* code, std::string* msg,
deardeng marked this conversation as resolved.
Show resolved Hide resolved
check_create_table_type get_check_info) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
*code = cast_as<ErrCategory::READ>(err);
*msg = "failed to create txn";
return;
}
auto& [keys, hint, key_func] = get_check_info(request);
if (keys.empty()) {
*code = MetaServiceCode::INVALID_ARGUMENT;
*msg = "empty keys";
return;
}

for (auto id : keys) {
auto key = key_func(instance_id, id);
err = check_recycle_key_exist(txn.get(), key);
deardeng marked this conversation as resolved.
Show resolved Hide resolved
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
continue;
} else if (err == TxnErrorCode::TXN_OK) {
// find not match, prepare commit
*code = MetaServiceCode::ALREADY_EXISTED;
deardeng marked this conversation as resolved.
Show resolved Hide resolved
*msg = "prepare and commit rpc not match, recycle key remained";
return;
} else {
// err != TXN_OK, fdb read err
*code = cast_as<ErrCategory::READ>(err);
*msg = "ms read key error";
deardeng marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
LOG_INFO("check {} success request={}", hint, request->ShortDebugString());
deardeng marked this conversation as resolved.
Show resolved Hide resolved
return;
deardeng marked this conversation as resolved.
Show resolved Hide resolved
}
deardeng marked this conversation as resolved.
Show resolved Hide resolved

void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller,
const CheckKVRequest* request, CheckKVResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_kv);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
if (!request->has_op()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "op not given";
return;
}
if (!request->has_check_keys()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty check keys";
return;
}
RPC_RATE_LIMIT(check_kv);
switch (request->op()) {
case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: {
check_create_table(instance_id, txn_kv_, request, response, &code, &msg,
[](const CheckKVRequest* request) {
return std::make_tuple(
request->check_keys().index_ids(), "index",
[](std::string instance_id, int64_t id) {
return recycle_index_key({std::move(instance_id), id});
});
});
break;
}
case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: {
check_create_table(
instance_id, txn_kv_, request, response, &code, &msg,
[](const CheckKVRequest* request) {
return std::make_tuple(
request->check_keys().partition_ids(), "partition",
[](std::string instance_id, int64_t id) {
return recycle_partition_key({std::move(instance_id), id});
});
});
break;
}
default:
DCHECK(false);
deardeng marked this conversation as resolved.
Show resolved Hide resolved
};
}

} // namespace doris::cloud
64 changes: 64 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5241,6 +5241,70 @@ TEST(MetaServiceTest, PartitionRequest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
req.add_index_ids(index_id);
// ------------Test check partition-----------
// Normal
req.set_db_id(1);
req.set_table_id(table_id + 1);
req.add_index_ids(index_id + 1);
req.add_partition_ids(partition_id + 1);
meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
meta_service->commit_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
CheckKVRequest req_check;
CheckKVResponse res_check;
meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
ASSERT_EQ(res_check.status().code(), MetaServiceCode::INVALID_ARGUMENT);
res_check.Clear();
req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT);
CheckKeyInfos check_keys_pb;
check_keys_pb.add_table_ids(table_id + 1);
check_keys_pb.add_index_ids(index_id + 1);
check_keys_pb.add_partition_ids(partition_id + 1);
req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK);
res_check.Clear();
// AbNomal not commit
req.Clear();
req.set_db_id(1);
req.set_table_id(table_id + 2);
req.add_index_ids(index_id + 2);
req.add_partition_ids(partition_id + 2);
meta_service->prepare_partition(&ctrl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
req_check.Clear();
req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT);
check_keys_pb.Clear();
check_keys_pb.add_table_ids(table_id + 2);
check_keys_pb.add_index_ids(index_id + 2);
check_keys_pb.add_partition_ids(partition_id + 2);
req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
ASSERT_EQ(res_check.status().code(), MetaServiceCode::ALREADY_EXISTED);

// ------------Test check index-----------
// Normal
IndexRequest req_index;
IndexResponse res_index;
req_index.set_db_id(1);
req_index.set_table_id(table_id + 3);
req_index.add_index_ids(index_id + 3);
meta_service->prepare_index(&ctrl, &req_index, &res_index, nullptr);
ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK);
meta_service->commit_index(&ctrl, &req_index, &res_index, nullptr);
ASSERT_EQ(res_index.status().code(), MetaServiceCode::OK);
req_check.Clear();
res_check.Clear();
req_check.set_op(CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT);
check_keys_pb.Clear();
check_keys_pb.add_table_ids(table_id + 3);
check_keys_pb.add_index_ids(index_id + 3);
req_check.mutable_check_keys()->CopyFrom(check_keys_pb);
meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr);
ASSERT_EQ(res_check.status().code(), MetaServiceCode::OK);
res_check.Clear();

// ------------Test drop partition------------
reset_meta_service();
req.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2891,6 +2891,10 @@ public static int metaServiceRpcRetryTimes() {
"streamload route policy in cloud mode, availale options are public-private and empty string"})
public static String streamload_redirect_policy = "";

@ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
"create table in cloud mode, check recycler key remained, default true"})
public static boolean check_create_table_recycle_key_remained = true;

//==========================================================================
// end of cloud config
//==========================================================================
Expand Down
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause);
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true);
} else if (alterClause instanceof AddPartitionLikeClause) {
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
Expand Down
19 changes: 17 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -3240,8 +3240,23 @@ public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlExceptio
getInternalCatalog().createTableAsSelect(stmt);
}

public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
getInternalCatalog().addPartition(db, tableName, addPartitionClause);
/**
* Adds a partition to a table
*
* @param db
* @param tableName
* @param addPartitionClause clause in the CreateTableStmt
* @param isCreateTable this call is for creating table
* @param generatedPartitionId the preset partition id for the partition to add
* @param writeEditLog whether to write an edit log for this addition
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
* @throws DdlException
*/
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
boolean isCreateTable, long generatedPartitionId,
boolean writeEditLog) throws DdlException {
return getInternalCatalog().addPartition(db, tableName, addPartitionClause,
isCreateTable, generatedPartitionId, writeEditLog);
}

public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)
Expand Down
Loading
Loading