Skip to content

Commit

Permalink
Merge pull request #5296 from lichuang/user_pb_api_impl
Browse files Browse the repository at this point in the history
Feature: user api pb convert impl
  • Loading branch information
BohuTANG authored May 11, 2022
2 parents 3d3c514 + 1fa5ebb commit e6a76e2
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ common-exception = { path = "../exception" }
common-functions = { path = "../functions" }
common-meta-api = { path = "../meta/api" }
common-meta-types = { path = "../meta/types" }
common-proto-conv = { path = "../proto-conv" }
common-protos = { path = "../protos" }

async-trait = "0.1.53"
serde_json = "1.0.79"
Expand Down
3 changes: 3 additions & 0 deletions common/management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod cluster;
mod role;
mod serde;
mod stage;
mod udf;
mod user;
Expand All @@ -22,6 +23,8 @@ pub use cluster::ClusterApi;
pub use cluster::ClusterMgr;
pub use role::RoleApi;
pub use role::RoleMgr;
pub use serde::deserialize_struct;
pub use serde::serialize_struct;
pub use stage::StageApi;
pub use stage::StageMgr;
pub use udf::UdfApi;
Expand Down
18 changes: 18 additions & 0 deletions common/management/src/serde/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod pb_serde;

pub use pb_serde::deserialize_struct;
pub use pb_serde::serialize_struct;
55 changes: 55 additions & 0 deletions common/management/src/serde/pb_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use common_proto_conv::FromToProto;

pub fn serialize_struct<PB: common_protos::prost::Message, ErrFn, CtxFn, D>(
value: &impl FromToProto<PB>,
err_code_fn: ErrFn,
context_fn: CtxFn,
) -> Result<Vec<u8>>
where
ErrFn: FnOnce(String) -> ErrorCode + std::marker::Copy,
D: Display,
CtxFn: FnOnce() -> D + std::marker::Copy,
{
let p = value.to_pb().map_err_to_code(err_code_fn, context_fn)?;
let mut buf = vec![];
common_protos::prost::Message::encode(&p, &mut buf).map_err_to_code(err_code_fn, context_fn)?;
Ok(buf)
}

pub fn deserialize_struct<PB, T, ErrFn, CtxFn, D>(
buf: &[u8],
err_code_fn: ErrFn,
context_fn: CtxFn,
) -> Result<T>
where
PB: common_protos::prost::Message + Default,
T: FromToProto<PB>,
ErrFn: FnOnce(String) -> ErrorCode + std::marker::Copy,
D: Display,
CtxFn: FnOnce() -> D + std::marker::Copy,
{
let p: PB =
common_protos::prost::Message::decode(buf).map_err_to_code(err_code_fn, context_fn)?;
let v: T = FromToProto::from_pb(p).map_err_to_code(err_code_fn, context_fn)?;

Ok(v)
}
17 changes: 13 additions & 4 deletions common/management/src/stage/stage_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use common_base::base::escape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_api::KVApi;
use common_meta_types::IntoSeqV;
use common_meta_types::MatchSeq;
use common_meta_types::MatchSeqExt;
use common_meta_types::OkOrExist;
Expand All @@ -27,6 +26,8 @@ use common_meta_types::SeqV;
use common_meta_types::UpsertKVAction;
use common_meta_types::UserStageInfo;

use crate::serde::deserialize_struct;
use crate::serde::serialize_struct;
use crate::stage::StageApi;

static USER_STAGE_API_KEY_PREFIX: &str = "__fd_stages";
Expand Down Expand Up @@ -55,7 +56,11 @@ impl StageMgr {
impl StageApi for StageMgr {
async fn add_stage(&self, info: UserStageInfo) -> Result<u64> {
let seq = MatchSeq::Exact(0);
let val = Operation::Update(serde_json::to_vec(&info)?);
let val = Operation::Update(serialize_struct(
&info,
ErrorCode::IllegalUserStageFormat,
|| "",
)?);
let key = format!(
"{}/{}",
self.stage_prefix,
Expand Down Expand Up @@ -85,7 +90,10 @@ impl StageApi for StageMgr {
res.ok_or_else(|| ErrorCode::UnknownStage(format!("Unknown stage {}", name)))?;

match MatchSeq::from(seq).match_seq(&seq_value) {
Ok(_) => Ok(seq_value.into_seqv()?),
Ok(_) => Ok(SeqV::new(
seq_value.seq,
deserialize_struct(&seq_value.data, ErrorCode::IllegalUserStageFormat, || "")?,
)),
Err(_) => Err(ErrorCode::UnknownStage(format!("Unknown stage {}", name))),
}
}
Expand All @@ -95,7 +103,8 @@ impl StageApi for StageMgr {

let mut stage_infos = Vec::with_capacity(values.len());
for (_, value) in values {
let stage_info = serde_json::from_slice::<UserStageInfo>(&value.data)?;
let stage_info =
deserialize_struct(&value.data, ErrorCode::IllegalUserStageFormat, || "")?;
stage_infos.push(stage_info);
}
Ok(stage_infos)
Expand Down
16 changes: 9 additions & 7 deletions common/management/src/user/user_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ use std::sync::Arc;
use common_base::base::escape_for_key;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::ToErrorCode;
use common_meta_api::KVApi;
use common_meta_types::AuthInfo;
use common_meta_types::GrantObject;
use common_meta_types::IntoSeqV;
use common_meta_types::MatchSeq;
use common_meta_types::MatchSeqExt;
use common_meta_types::OkOrExist;
Expand All @@ -33,6 +31,8 @@ use common_meta_types::UserInfo;
use common_meta_types::UserOption;
use common_meta_types::UserPrivilegeSet;

use crate::serde::deserialize_struct;
use crate::serde::serialize_struct;
use crate::user::user_api::UserApi;

static USER_API_KEY_PREFIX: &str = "__fd_users";
Expand Down Expand Up @@ -63,7 +63,7 @@ impl UserMgr {
) -> common_exception::Result<u64> {
let user_key = format_user_key(&user_info.name, &user_info.hostname);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serde_json::to_vec(&user_info)?;
let value = serialize_struct(user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

let match_seq = match seq {
None => MatchSeq::GE(1),
Expand Down Expand Up @@ -95,7 +95,7 @@ impl UserApi for UserMgr {
let match_seq = MatchSeq::Exact(0);
let user_key = format_user_key(&user_info.name, &user_info.hostname);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serde_json::to_vec(&user_info)?;
let value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

let kv_api = self.kv_api.clone();
let upsert_kv = kv_api.upsert_kv(UpsertKVAction::new(
Expand All @@ -122,7 +122,10 @@ impl UserApi for UserMgr {
res.ok_or_else(|| ErrorCode::UnknownUser(format!("unknown user {}", user_key)))?;

match MatchSeq::from(seq).match_seq(&seq_value) {
Ok(_) => Ok(seq_value.into_seqv()?),
Ok(_) => Ok(SeqV::new(
seq_value.seq,
deserialize_struct(&seq_value.data, ErrorCode::IllegalUserInfoFormat, || "")?,
)),
Err(_) => Err(ErrorCode::UnknownUser(format!("unknown user {}", user_key))),
}
}
Expand All @@ -133,8 +136,7 @@ impl UserApi for UserMgr {

let mut r = vec![];
for (_key, val) in values {
let u = serde_json::from_slice::<UserInfo>(&val.data)
.map_err_to_code(ErrorCode::IllegalUserInfoFormat, || "")?;
let u = deserialize_struct(&val.data, ErrorCode::IllegalUserInfoFormat, || "")?;

r.push(SeqV::new(val.seq, u));
}
Expand Down
6 changes: 5 additions & 1 deletion common/management/tests/it/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use common_base::base::tokio;
use common_exception::ErrorCode;
use common_exception::Result;
use common_management::*;
use common_meta_api::KVApi;
Expand All @@ -36,7 +37,10 @@ async fn test_add_stage() -> Result<()> {
meta: _,
data: value,
}) => {
assert_eq!(value, serde_json::to_vec(&stage_info)?);
assert_eq!(
value,
serialize_struct(&stage_info, ErrorCode::IllegalUserStageFormat, || "")?
);
}
catch => panic!("GetKVActionReply{:?}", catch),
}
Expand Down
29 changes: 19 additions & 10 deletions common/management/tests/it/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ mod add {
let test_user_name = "test_user";
let test_hostname = "localhost";
let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let v = serde_json::to_vec(&user_info)?;
let value = Operation::Update(serde_json::to_vec(&user_info)?);

let v = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;
let value = Operation::Update(serialize_struct(
&user_info,
ErrorCode::IllegalUserInfoFormat,
|| "",
)?);

let test_key = format!(
"__fd_users/tenant1/{}",
Expand Down Expand Up @@ -188,7 +193,7 @@ mod get {
);

let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let value = serde_json::to_vec(&user_info)?;
let value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

let mut kv = MockKV::new();
kv.expect_get_kv()
Expand All @@ -214,7 +219,7 @@ mod get {
);

let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let value = serde_json::to_vec(&user_info)?;
let value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

let mut kv = MockKV::new();
kv.expect_get_kv()
Expand Down Expand Up @@ -333,7 +338,10 @@ mod get_users {
let user_info = UserInfo::new(&name, &hostname, default_test_auth_info());
res.push((
"fake_key".to_string(),
SeqV::new(i, serde_json::to_vec(&user_info)?),
SeqV::new(
i,
serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?,
),
));
user_infos.push(SeqV::new(i, user_info));
}
Expand Down Expand Up @@ -489,7 +497,7 @@ mod update {
let test_seq = None;

let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let prev_value = serde_json::to_vec(&user_info)?;
let prev_value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

// get_kv should be called
let mut kv = MockKV::new();
Expand All @@ -503,7 +511,8 @@ mod update {

// and then, update_kv should be called
let new_user_info = UserInfo::new(test_user_name, test_hostname, new_test_auth_info(full));
let new_value_with_old_salt = serde_json::to_vec(&new_user_info)?;
let new_value_with_old_salt =
serialize_struct(&new_user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

kv.expect_upsert_kv()
.with(predicate::eq(UpsertKVAction::new(
Expand Down Expand Up @@ -574,7 +583,7 @@ mod update {
let test_seq = None;

let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let prev_value = serde_json::to_vec(&user_info)?;
let prev_value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

// - get_kv should be called
let mut kv = MockKV::new();
Expand Down Expand Up @@ -630,7 +639,7 @@ mod set_user_privileges {
let test_seq = None;

let mut user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());
let prev_value = serde_json::to_vec(&user_info)?;
let prev_value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

// - get_kv should be called
let mut kv = MockKV::new();
Expand All @@ -647,7 +656,7 @@ mod set_user_privileges {
user_info
.grants
.grant_privileges(&GrantObject::Global, privileges);
let new_value = serde_json::to_vec(&user_info)?;
let new_value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

kv.expect_upsert_kv()
.with(predicate::eq(UpsertKVAction::new(
Expand Down

0 comments on commit e6a76e2

Please sign in to comment.