Skip to content

Commit

Permalink
feat: Add KVPbApi::get_id_and_value()
Browse files Browse the repository at this point in the history
`KVPbApi::get_id_and_value()` is used to fetch `name -> id`
and `id -> value` in one shot.

Add `DataId` to generalize the `id` of a value.
  • Loading branch information
drmingdrmer committed Aug 24, 2024
1 parent 16c04a1 commit fb06651
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 138 deletions.
26 changes: 18 additions & 8 deletions src/meta/api/src/crud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ where

if let CreateOption::Create = create_option {
if res.prev.is_some() {
return Err(ExistError::new(value.name(), "Exist when add").into());
return Err(ExistError::new(value.name().to_string(), "Exist when add").into());
}
}

Expand Down Expand Up @@ -168,7 +168,12 @@ where
if res.is_changed() {
Ok(res.result.seq())
} else {
Err(UnknownError::new(value.name(), match_seq, "NotFound when update").into())
Err(UnknownError::new_match_seq(
value.name().to_string(),
match_seq,
"NotFound when update",
)
.into())
}
}

Expand Down Expand Up @@ -216,8 +221,8 @@ where

let res = self.kv_api.upsert_pb(&upsert).await?;
res.removed_or_else(|e| {
UnknownError::new(
name,
UnknownError::new_match_seq(
name.to_string(),
seq,
format_args!("NotFound when remove, seq of existing record: {}", e.seq()),
)
Expand All @@ -237,13 +242,18 @@ where

let res = self.kv_api.get_pb(&ident).await?;

let seq_value = res.ok_or_else(|| UnknownError::new(name, seq, "NotFound when get"))?;
let seq_value = res.ok_or_else(|| {
UnknownError::new_match_seq(name.to_string(), seq, "NotFound when get")
})?;

match seq.match_seq(&seq_value) {
Ok(_) => Ok(seq_value),
Err(e) => {
Err(UnknownError::new(name, seq, format_args!("NotFound when get: {}", e)).into())
}
Err(e) => Err(UnknownError::new_match_seq(
name.to_string(),
seq,
format_args!("NotFound when get: {}", e),
)
.into()),
}
}

Expand Down
71 changes: 16 additions & 55 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::DatamaskAlreadyExists;
use databend_common_meta_app::app_error::UnknownDatamask;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskId;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
Expand All @@ -36,7 +32,6 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;
Expand Down Expand Up @@ -187,43 +182,15 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

let name_key = &req.name;

let (_seq_id, policy) = get_data_mask_or_err(
self,
name_key,
format!("drop_data_mask: {}", name_key.display()),
)
.await?;
let (_seq_id, policy) = self
.get_id_and_value(name_key)
.await?
.ok_or_else(|| AppError::from(name_key.unknown_error("get_data_mask")))?;

Ok(GetDatamaskReply { policy })
}
}

/// Returns (id_seq, id, data_mask_seq, data_mask)
async fn get_data_mask_or_err(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(SeqV<DataMaskId>, SeqV<DatamaskMeta>), KVAppError> {
let seq_id = kv_api.get_pb(name_key).await?.ok_or_else(|| {
AppError::from(UnknownDatamask::new(
name_key.name(),
format!("{}: {}", msg, name_key.data_mask_name()),
))
})?;

let id_ident = DataMaskIdIdent::new_generic(name_key.tenant(), seq_id.data.into_inner());

let seq_v = kv_api.get_pb(&id_ident).await?.ok_or_else(|| {
AppError::from(UnknownDatamask::new(
name_key.name(),
format!("{}: {}", msg, name_key.data_mask_name()),
))
})?;

// Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
Ok((seq_id.map(|id| id.into_inner()), seq_v))
}

async fn clear_table_column_mask_policy(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &DataMaskNameIdent,
Expand Down Expand Up @@ -274,27 +241,21 @@ async fn construct_drop_mask_policy_operations(
ctx: &str,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
let result = get_data_mask_or_err(
kv_api,
name_key,
format!("drop_data_mask: {}", name_key.display()),
)
.await;

let (seq_id, seq_meta) = match result {
Ok((seq_id, seq_meta)) => (seq_id, seq_meta),
Err(err) => {
if let KVAppError::AppError(AppError::UnknownDatamask(_)) = err {
if drop_if_exists {
return Ok(());
}
}

return Err(err);
let res = kv_api.get_id_and_value(name_key).await?;

let (seq_id, seq_meta) = match res {
Some((seq_id, seq_meta)) => (seq_id, seq_meta),
None => {
return if drop_if_exists {
Ok(())
} else {
let err = AppError::from(name_key.unknown_error("drop_data_mask"));
Err(err.into())
};
}
};

let id_ident = DataMaskIdIdent::new_generic(name_key.tenant(), seq_id.data);
let id_ident = seq_id.data.into_t_ident(name_key.tenant());

txn.condition.push(txn_cond_eq_seq(&id_ident, seq_meta.seq));
txn.if_then.push(txn_op_del(&id_ident));
Expand Down
32 changes: 32 additions & 0 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ mod upsert_pb;

use std::future::Future;

use databend_common_meta_app::data_id::DataId;
use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::KVApi;
Expand Down Expand Up @@ -90,6 +93,35 @@ pub trait KVPbApi: KVApi {
}
}

/// Query kvapi for a 2 level mapping: `name -> id -> value`.
///
/// `K` is the key type for `name -> id`.
/// `R2` is the level 2 resource type and the level 2 key type is `DataId<R2>`.
fn get_id_and_value<K, R2>(
&self,
key: &K,
) -> impl Future<Output = Result<Option<(SeqV<DataId<R2>>, SeqV<R2::ValueType>)>, Self::Error>> + Send
where
K: kvapi::Key<ValueType = DataId<R2>> + KeyWithTenant + Sync,
R2: TenantResource + Send + Sync,
R2::ValueType: FromToProto,
Self::Error: From<PbApiReadError<Self::Error>>,
{
async move {
let Some(seq_id) = self.get_pb(key).await? else {
return Ok(None);
};

let id_ident = seq_id.data.into_t_ident(key.tenant());

let Some(seq_v) = self.get_pb(&id_ident).await? else {
return Ok(None);
};

Ok(Some((seq_id, seq_v)))
}
}

/// Get protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
Expand Down
7 changes: 5 additions & 2 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use std::fmt::Display;
use databend_common_exception::ErrorCode;
use databend_common_meta_types::MatchSeq;

use crate::data_mask::data_mask_name_ident;
use crate::tenant_key::errors::UnknownError;

/// Output message for end users, with sensitive info stripped.
pub trait AppErrorMessage: Display {
fn message(&self) -> String {
Expand Down Expand Up @@ -1256,7 +1259,7 @@ pub enum AppError {
DatamaskAlreadyExists(#[from] DatamaskAlreadyExists),

#[error(transparent)]
UnknownDatamask(#[from] UnknownDatamask),
UnknownDataMask(#[from] UnknownError<data_mask_name_ident::Resource>),

#[error(transparent)]
BackgroundJobAlreadyExists(#[from] BackgroundJobAlreadyExists),
Expand Down Expand Up @@ -1848,7 +1851,7 @@ impl From<AppError> for ErrorCode {
AppError::IndexColumnIdNotFound(err) => ErrorCode::IndexColumnIdNotFound(err.message()),

AppError::DatamaskAlreadyExists(err) => ErrorCode::DatamaskAlreadyExists(err.message()),
AppError::UnknownDatamask(err) => ErrorCode::UnknownDatamask(err.message()),
AppError::UnknownDataMask(err) => ErrorCode::UnknownDatamask(err.message()),

AppError::BackgroundJobAlreadyExists(err) => {
ErrorCode::BackgroundJobAlreadyExists(err.message())
Expand Down
Loading

0 comments on commit fb06651

Please sign in to comment.