Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng committed Jun 27, 2024
1 parent 520d6b2 commit 8b6ed86
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 95 deletions.
4 changes: 2 additions & 2 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_rowset("ms", "get_rowset");
BvarLatencyRecorderWithTag g_bvar_ms_drop_index("ms", "drop_index");
BvarLatencyRecorderWithTag g_bvar_ms_prepare_index("ms", "prepare_index");
BvarLatencyRecorderWithTag g_bvar_ms_commit_index("ms", "commit_index");
BvarLatencyRecorderWithTag g_bvar_ms_check_index("ms", "check_index");
BvarLatencyRecorderWithTag g_bvar_ms_prepare_partition("ms", "prepare_partition");
BvarLatencyRecorderWithTag g_bvar_ms_commit_partition("ms", "commit_partition");
BvarLatencyRecorderWithTag g_bvar_ms_drop_partition("ms", "drop_partition");
BvarLatencyRecorderWithTag g_bvar_ms_check_partition("ms", "check_partition");
BvarLatencyRecorderWithTag g_bvar_ms_get_tablet_stats("ms", "get_tablet_stats");
BvarLatencyRecorderWithTag g_bvar_ms_get_obj_store_info("ms", "get_obj_store_info");
BvarLatencyRecorderWithTag g_bvar_ms_alter_obj_store_info("ms", "alter_obj_store_info");
Expand Down Expand Up @@ -81,6 +79,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"
BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", "get_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", "set_cluster_status");

BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");

// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
Expand Down
3 changes: 1 addition & 2 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_rowset;
extern BvarLatencyRecorderWithTag g_bvar_ms_drop_index;
extern BvarLatencyRecorderWithTag g_bvar_ms_prepare_index;
extern BvarLatencyRecorderWithTag g_bvar_ms_commit_index;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_index;
extern BvarLatencyRecorderWithTag g_bvar_ms_prepare_partition;
extern BvarLatencyRecorderWithTag g_bvar_ms_commit_partition;
extern BvarLatencyRecorderWithTag g_bvar_ms_drop_partition;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_partition;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_tablet_stats;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_obj_store_info;
extern BvarLatencyRecorderWithTag g_bvar_ms_alter_obj_store_info;
Expand Down Expand Up @@ -176,6 +174,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
27 changes: 12 additions & 15 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ class MetaServiceImpl : public cloud::MetaService {
void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request,
IndexResponse* response, ::google::protobuf::Closure* done) override;

void check_index(::google::protobuf::RpcController* controller, const IndexRequest* request,
IndexResponse* response, ::google::protobuf::Closure* done) override;
void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
CheckKVResponse* response, ::google::protobuf::Closure* done) override;

void prepare_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
Expand All @@ -160,10 +160,6 @@ class MetaServiceImpl : public cloud::MetaService {
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) override;

void check_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) override;

void get_tablet_stats(::google::protobuf::RpcController* controller,
const GetTabletStatsRequest* request, GetTabletStatsResponse* response,
::google::protobuf::Closure* done) override;
Expand Down Expand Up @@ -287,6 +283,13 @@ class MetaServiceImpl : public cloud::MetaService {
const AlterInstanceRequest* request,
std::function<std::pair<MetaServiceCode, std::string>(InstanceInfoPB*)> action);

using check_create_table_type = std::function<const std::tuple<
const ::google::protobuf::RepeatedField<int64_t>, std::string,
std::function<std::string(std::string, int64_t)>>(const CheckKVRequest* request)>;
void check_create_table(std::string instance_id, const CheckKVRequest* request,
CheckKVResponse* response, MetaServiceCode* code, std::string* msg,
check_create_table_type get_check_info);

std::shared_ptr<TxnKv> txn_kv_;
std::shared_ptr<ResourceManager> resource_mgr_;
std::shared_ptr<RateLimiter> rate_limiter_;
Expand Down Expand Up @@ -433,9 +436,9 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::drop_index, controller, request, response, done);
}

void check_index(::google::protobuf::RpcController* controller, const IndexRequest* request,
IndexResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::check_index, controller, request, response, done);
void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request,
CheckKVResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::check_kv, controller, request, response, done);
}

void prepare_partition(::google::protobuf::RpcController* controller,
Expand All @@ -456,12 +459,6 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::drop_partition, controller, request, response, done);
}

void check_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::check_partition, controller, request, response, done);
}

void get_tablet_stats(::google::protobuf::RpcController* controller,
const GetTabletStatsRequest* request, GetTabletStatsResponse* response,
::google::protobuf::Closure* done) override {
Expand Down
140 changes: 66 additions & 74 deletions cloud/src/meta-service/meta_service_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,53 +327,6 @@ void MetaServiceImpl::drop_index(::google::protobuf::RpcController* controller,
}
}

void MetaServiceImpl::check_index(::google::protobuf::RpcController* controller,
const IndexRequest* request, IndexResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_index);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(check_index);

if (request->index_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty index_ids or table_id";
return;
}

std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}

for (auto index_id : request->index_ids()) {
auto key = recycle_index_key({instance_id, index_id});
err = check_recycle_key_exist(txn.get(), key);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
continue;
} else if (err == TxnErrorCode::TXN_OK) {
// find not match, prepare commit
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "prepare and commit rpc not match, recycle key remained";
return;
} else {
// err != TXN_OK, fdb read err
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "ms read key error";
return;
}
}
LOG_INFO("check index success request={}", request->ShortDebugString());
return;
}

// Return TXN_OK if exists, TXN_KEY_NOT_FOUND if not exists, otherwise error
static TxnErrorCode partition_exists(Transaction* txn, const std::string& instance_id,
const PartitionRequest* req) {
Expand Down Expand Up @@ -666,51 +619,90 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
}
}

void MetaServiceImpl::check_partition(::google::protobuf::RpcController* controller,
const PartitionRequest* request, PartitionResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_partition);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
RPC_RATE_LIMIT(check_partition)

if (request->partition_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty partition_ids or table_id";
return;
}

void MetaServiceImpl::check_create_table(std::string instance_id, const CheckKVRequest* request,
CheckKVResponse* response, MetaServiceCode* code,
std::string* msg, check_create_table_type get_check_info) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = "failed to create txn";
*code = cast_as<ErrCategory::READ>(err);
*msg = "failed to create txn";
return;
}
auto& [keys, hint, key_func] = get_check_info(request);

for (auto part_id : request->partition_ids()) {
auto key = recycle_partition_key({instance_id, part_id});
if (keys.empty()) {
*code = MetaServiceCode::INVALID_ARGUMENT;
*msg = "empty partition_ids";
return;
}

for (auto id : keys) {
auto key = key_func(instance_id, id);
err = check_recycle_key_exist(txn.get(), key);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
continue;
} else if (err == TxnErrorCode::TXN_OK) {
// find not match, prepare commit
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "prepare and commit rpc not match, recycle key remained";
*code = MetaServiceCode::INVALID_ARGUMENT;
*msg = "prepare and commit rpc not match, recycle key remained";
return;
} else {
// err != TXN_OK, fdb read err
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "ms read key error";
*code = MetaServiceCode::INVALID_ARGUMENT;
*msg = "ms read key error";
return;
}
}
LOG_INFO("check partition success request={}", request->ShortDebugString());
LOG_INFO("check {} success request={}", hint, request->ShortDebugString());
return;
}

void MetaServiceImpl::check_kv(::google::protobuf::RpcController* controller,
const CheckKVRequest* request, CheckKVResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(check_kv);
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
if (!request->has_op()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "op not given";
return;
}
if (!request->has_check_keys()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty check keys";
return;
}
RPC_RATE_LIMIT(check_kv);
switch (request->op()) {
case CheckKVRequest::CREATE_INDEX_AFTER_FE_COMMIT: {
check_create_table(
instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) {
return std::make_tuple(request->check_keys().index_ids(), "index",
[](std::string instance_id, int64_t id) {
return recycle_index_key({instance_id, id});
});
});
break;
}
case CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT: {
check_create_table(
instance_id, request, response, &code, &msg, [](const CheckKVRequest* request) {
return std::make_tuple(request->check_keys().partition_ids(), "partition",
[](std::string instance_id, int64_t id) {
return recycle_partition_key({instance_id, id});
});
});
break;
}
default:
DCHECK(false);
};
}

} // namespace doris::cloud
27 changes: 25 additions & 2 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,28 @@ message GetRLTaskCommitAttachResponse {
optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
}

message CheckKeyInfos {
repeated int64 db_id = 1;
repeated int64 table_ids = 2;
repeated int64 index_ids = 3;
repeated int64 partition_ids = 4;
}

message CheckKVRequest {
enum Operation {
CREATE_INDEX_AFTER_FE_COMMIT = 1;
CREATE_PARTITION_AFTER_FE_COMMIT = 2;
}
optional string cloud_unique_id = 1; // For auth
optional CheckKeyInfos check_keys = 2;
optional Operation op = 3;
}

message CheckKVResponse {
optional MetaServiceResponseStatus status = 1;
optional CheckKeyInfos bad_keys = 2;
}

service MetaService {
rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse);
rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse);
Expand All @@ -1440,11 +1462,9 @@ service MetaService {
rpc prepare_index(IndexRequest) returns (IndexResponse);
rpc commit_index(IndexRequest) returns (IndexResponse);
rpc drop_index(IndexRequest) returns (IndexResponse);
rpc check_index(IndexRequest) returns (IndexResponse);
rpc prepare_partition(PartitionRequest) returns (PartitionResponse);
rpc commit_partition(PartitionRequest) returns (PartitionResponse);
rpc drop_partition(PartitionRequest) returns (PartitionResponse);
rpc check_partition(PartitionRequest) returns (PartitionResponse);

rpc start_tablet_job(StartTabletJobRequest) returns (StartTabletJobResponse);
rpc finish_tablet_job(FinishTabletJobRequest) returns (FinishTabletJobResponse);
Expand Down Expand Up @@ -1484,6 +1504,9 @@ service MetaService {

// routine load progress
rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse);

// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
};

service RecyclerService {
Expand Down

0 comments on commit 8b6ed86

Please sign in to comment.