diff --git a/src/meta/api/src/background_api_impl.rs b/src/meta/api/src/background_api_impl.rs index 2e181639764b..33da68774732 100644 --- a/src/meta/api/src/background_api_impl.rs +++ b/src/meta/api/src/background_api_impl.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Display; - use chrono::Utc; use databend_common_meta_app::app_error::AppError; -use databend_common_meta_app::app_error::BackgroundJobAlreadyExists; -use databend_common_meta_app::app_error::UnknownBackgroundJob; +use databend_common_meta_app::background::background_job_id_ident::BackgroundJobId; use databend_common_meta_app::background::BackgroundJobIdIdent; use databend_common_meta_app::background::BackgroundJobIdent; use databend_common_meta_app::background::BackgroundJobInfo; @@ -44,7 +41,6 @@ use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::seq_value::SeqValue; -use databend_common_meta_types::ConditionResult::Eq; use databend_common_meta_types::InvalidReply; use databend_common_meta_types::MatchSeq::Any; use databend_common_meta_types::MetaError; @@ -57,17 +53,15 @@ use log::debug; use crate::background_api::BackgroundApi; use crate::deserialize_struct; use crate::fetch_id; -use crate::get_u64_value; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; use crate::kv_pb_api::UpsertPB; use crate::send_txn; use crate::serialize_struct; -use crate::serialize_u64; use crate::txn_backoff::txn_backoff; -use crate::txn_cond_seq; -use crate::txn_op_put; use crate::util::deserialize_u64; +use crate::util::txn_op_put_pb; +use crate::util::txn_replace_exact; /// BackgroundApi is implemented upon kvapi::KVApi. /// Thus every type that impl kvapi::KVApi impls BackgroundApi. @@ -87,53 +81,38 @@ impl> BackgroundApi for KV { trials.next().unwrap()?.await; // Get db mask by name to ensure absence - let (seq, id) = get_u64_value(self, name_key).await?; - debug!(seq = seq, id = id, name_key :? =(name_key); "create_background_job"); + let job_id = self.get_pb(name_key).await?; + + debug!(res :? = job_id, name_key :? =(name_key); "get existing, when create_background_job"); - if seq > 0 { + if let Some(seq_id) = job_id { return if req.if_not_exists { - Ok(CreateBackgroundJobReply { id }) + Ok(CreateBackgroundJobReply { id: *seq_id.data }) } else { - Err(KVAppError::AppError(AppError::BackgroundJobAlreadyExists( - BackgroundJobAlreadyExists::new( - name_key.name(), - format!("create background job: {:?}", req.job_name), - ), - ))) + Err( + AppError::BackgroundJobAlreadyExists(name_key.exist_error(func_name!())) + .into(), + ) }; - } + }; let id = fetch_id(self, IdGenerator::background_job_id()).await?; - let id_key = BackgroundJobIdIdent::new(name_key.tenant(), id); + let id = BackgroundJobId::new(id); + let id_key = BackgroundJobIdIdent::new_generic(name_key.tenant(), id); - debug!( - id :? =(&id_key), - name_key :? =(name_key); - "new backgroundjob id" - ); + debug!(id :? =(&id_key),name_key :? =(name_key); "{}", func_name!()); { + let mut txn = TxnRequest::default(); + let meta: BackgroundJobInfo = req.job_info.clone(); - let condition = vec![txn_cond_seq(name_key, Eq, 0)]; - let if_then = vec![ - txn_op_put(name_key, serialize_u64(id)?), // name -> background_job_id - txn_op_put(&id_key, serialize_struct(&meta)?), // id -> meta - ]; - - let txn_req = TxnRequest { - condition, - if_then, - else_then: vec![], - }; - let (succ, _responses) = send_txn(self, txn_req).await?; + txn_replace_exact(&mut txn, name_key, 0, &id)?; // name -> background_job_id + txn.if_then.push(txn_op_put_pb(&id_key, &meta)?); // id -> meta + + let (succ, _responses) = send_txn(self, txn).await?; - debug!( - name :? =(name_key), - id :? =(&id_key), - succ = succ; - "create_background_job" - ); + debug!(name :? =(name_key),id :? =(&id_key),succ = succ;"{}", func_name!()); if succ { break id; @@ -141,7 +120,7 @@ impl> BackgroundApi for KV { } }; - Ok(CreateBackgroundJobReply { id }) + Ok(CreateBackgroundJobReply { id: *id }) } // TODO(zhihanz): needs to drop both background job and related background tasks, also needs to gracefully shutdown running queries @@ -197,10 +176,15 @@ impl> BackgroundApi for KV { let name_key = &req.name; - let (id_ident, seq_joq) = - get_background_job_or_error(self, name_key, format!("get_: {:?}", name_key)).await?; + let (seq_id, seq_job) = self + .get_id_and_value(name_key) + .await? + .ok_or_else(|| AppError::from(name_key.unknown_error("get_background_job")))?; - Ok(GetBackgroundJobReply::new(id_ident, seq_joq)) + Ok(GetBackgroundJobReply::new( + seq_id.data.into_t_ident(name_key.tenant()), + seq_job, + )) } #[fastrace::trace] @@ -298,69 +282,26 @@ impl> BackgroundApi for KV { } } -async fn get_background_job_id( - kv_api: &(impl kvapi::KVApi + ?Sized), - name_key: &BackgroundJobIdent, -) -> Result { - let (id_seq, id) = get_u64_value(kv_api, name_key).await?; - assert_background_job_exist(id_seq, name_key)?; - - Ok(BackgroundJobIdIdent::new(name_key.tenant(), id)) -} - -async fn get_background_job_or_error( +async fn update_background_job bool>( kv_api: &(impl kvapi::KVApi + ?Sized), name_ident: &BackgroundJobIdent, - _msg: impl Display, -) -> Result, KVAppError> { - let id_ident = get_background_job_id(kv_api, name_ident).await?; + mutation: F, +) -> Result { + debug!(req :? =(name_ident); "BackgroundApi: {}", func_name!()); - let seq_job = kv_api - .get_pb(&id_ident) + let (seq_id, mut seq_meta) = kv_api + .get_id_and_value(name_ident) .await? - .ok_or_else(|| unknown_background_job(name_ident))?; + .ok_or_else(|| AppError::from(name_ident.unknown_error("update_background_job")))?; - Ok((id_ident, seq_job)) -} - -/// Return OK if a db_id or db_meta exists by checking the seq. -/// -/// Otherwise returns UnknownBackgroundJob error -pub fn assert_background_job_exist( - seq: u64, - name_ident: &BackgroundJobIdent, -) -> Result<(), AppError> { - if seq == 0 { - debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist"); - let err = unknown_background_job(name_ident); - Err(err) - } else { - Ok(()) - } -} - -pub fn unknown_background_job(name_ident: &BackgroundJobIdent) -> AppError { - AppError::UnknownBackgroundJob(UnknownBackgroundJob::new( - name_ident.job_name(), - format!("{:?}", name_ident), - )) -} - -async fn update_background_job bool>( - kv_api: &(impl kvapi::KVApi + ?Sized), - name: &BackgroundJobIdent, - mutation: F, -) -> Result { - debug!(req :? =(name); "BackgroundApi: {}", func_name!()); - let (id_ident, mut seq_job) = - get_background_job_or_error(kv_api, name, "update_background_job").await?; + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); - let should_update = mutation(&mut seq_job.data); + let should_update = mutation(&mut seq_meta.data); if !should_update { return Ok(UpdateBackgroundJobReply::new(id_ident.clone())); } - let req = UpsertPB::update_exact(id_ident.clone(), seq_job); + let req = UpsertPB::update_exact(id_ident.clone(), seq_meta); let resp = kv_api.upsert_pb(&req).await?; assert!(resp.is_changed()); diff --git a/src/meta/api/src/data_mask_api.rs b/src/meta/api/src/data_mask_api.rs index 87331179ca70..718c5100cb01 100644 --- a/src/meta/api/src/data_mask_api.rs +++ b/src/meta/api/src/data_mask_api.rs @@ -14,10 +14,11 @@ use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; -use databend_common_meta_app::data_mask::DropDatamaskReply; -use databend_common_meta_app::data_mask::DropDatamaskReq; -use databend_common_meta_app::data_mask::GetDatamaskReply; -use databend_common_meta_app::data_mask::GetDatamaskReq; +use databend_common_meta_app::data_mask::DataMaskId; +use databend_common_meta_app::data_mask::DataMaskNameIdent; +use databend_common_meta_app::data_mask::DatamaskMeta; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::SeqV; use crate::kv_app_error::KVAppError; @@ -28,7 +29,15 @@ pub trait DatamaskApi: Send + Sync { req: CreateDatamaskReq, ) -> Result; - async fn drop_data_mask(&self, req: DropDatamaskReq) -> Result; + /// On success, returns the dropped id and data mask. + /// Returning None, means nothing is removed. + async fn drop_data_mask( + &self, + name_ident: &DataMaskNameIdent, + ) -> Result, SeqV)>, KVAppError>; - async fn get_data_mask(&self, req: GetDatamaskReq) -> Result; + async fn get_data_mask( + &self, + name_ident: &DataMaskNameIdent, + ) -> Result>, MetaError>; } diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 598a9d6c746e..fd0ae38802a0 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -13,16 +13,12 @@ // limitations under the License. use databend_common_meta_app::app_error::AppError; -use databend_common_meta_app::app_error::DatamaskAlreadyExists; use databend_common_meta_app::data_mask::CreateDatamaskReply; use databend_common_meta_app::data_mask::CreateDatamaskReq; +use databend_common_meta_app::data_mask::DataMaskId; use databend_common_meta_app::data_mask::DataMaskIdIdent; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; -use databend_common_meta_app::data_mask::DropDatamaskReply; -use databend_common_meta_app::data_mask::DropDatamaskReq; -use databend_common_meta_app::data_mask::GetDatamaskReply; -use databend_common_meta_app::data_mask::GetDatamaskReq; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; @@ -32,6 +28,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_types::MetaError; +use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnRequest; use fastrace::func_name; use log::debug; @@ -39,16 +36,14 @@ use log::debug; use crate::data_mask_api::DatamaskApi; use crate::fetch_id; use crate::get_pb_value; -use crate::get_u64_value; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; use crate::send_txn; -use crate::serialize_struct; -use crate::serialize_u64; use crate::txn_backoff::txn_backoff; use crate::txn_cond_eq_seq; -use crate::txn_op_del; -use crate::txn_op_put; +use crate::util::txn_delete_exact; +use crate::util::txn_op_put_pb; +use crate::util::txn_replace_exact; /// DatamaskApi is implemented upon kvapi::KVApi. /// Thus every type that impl kvapi::KVApi impls DatamaskApi. @@ -66,36 +61,35 @@ impl> DatamaskApi for KV { let id = loop { trials.next().unwrap()?.await; - // Get db mask by name to ensure absence - let (seq, id) = get_u64_value(self, name_ident).await?; - debug!(seq = seq, id = id, name_key :? =(name_ident); "create_data_mask"); - let mut txn = TxnRequest::default(); - if seq > 0 { + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "create_data_mask"); + + let mut curr_seq = 0; + + if let Some((seq_id, seq_meta)) = res { match req.create_option { CreateOption::Create => { - return Err(KVAppError::AppError(AppError::DatamaskAlreadyExists( - DatamaskAlreadyExists::new( - name_ident.name(), - format!("create data mask: {}", req.name.display()), - ), - ))); + return Err(AppError::DatamaskAlreadyExists( + name_ident.exist_error(func_name!()), + ) + .into()); + } + CreateOption::CreateIfNotExists => { + return Ok(CreateDatamaskReply { id: *seq_id.data }); } - CreateOption::CreateIfNotExists => return Ok(CreateDatamaskReply { id }), CreateOption::CreateOrReplace => { - construct_drop_mask_policy_operations( - self, - name_ident, - false, - false, - func_name!(), - &mut txn, - ) - .await?; + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + + clear_table_column_mask_policy(self, name_ident, &mut txn).await?; + + curr_seq = seq_id.seq; } }; - }; + } // Create data mask by inserting these record: // name -> id @@ -104,7 +98,8 @@ impl> DatamaskApi for KV { let id = fetch_id(self, IdGenerator::data_mask_id()).await?; - let id_ident = DataMaskIdIdent::new(name_ident.tenant(), id); + let id = DataMaskId::new(id); + let id_ident = DataMaskIdIdent::new_generic(name_ident.tenant(), id); let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); debug!( @@ -114,13 +109,13 @@ impl> DatamaskApi for KV { ); { - let meta: DatamaskMeta = req.clone().into(); + let meta: DatamaskMeta = req.data_mask_meta.clone(); let id_list = MaskpolicyTableIdList::default(); - txn.condition.push(txn_cond_eq_seq(name_ident, seq)); - txn.if_then.extend( vec![ - txn_op_put(name_ident, serialize_u64(id)?), // name -> db_id - txn_op_put(&id_ident, serialize_struct(&meta)?), // id -> meta - txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */ + txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq)); + txn.if_then.extend(vec![ + txn_op_put_pb(name_ident, &id)?, // name -> db_id + txn_op_put_pb(&id_ident, &meta)?, // id -> meta + txn_op_put_pb(&id_list_key, &id_list)?, // data mask name -> id_list ]); let (succ, _responses) = send_txn(self, txn).await?; @@ -138,13 +133,14 @@ impl> DatamaskApi for KV { } }; - Ok(CreateDatamaskReply { id }) + Ok(CreateDatamaskReply { id: *id }) } - async fn drop_data_mask(&self, req: DropDatamaskReq) -> Result { - debug!(req :? =(&req); "DatamaskApi: {}", func_name!()); - - let name_key = &req.name; + async fn drop_data_mask( + &self, + name_ident: &DataMaskNameIdent, + ) -> Result, SeqV)>, KVAppError> { + debug!(name_ident :? =(name_ident); "DatamaskApi: {}", func_name!()); let mut trials = txn_backoff(None, func_name!()); loop { @@ -152,42 +148,38 @@ impl> DatamaskApi for KV { let mut txn = TxnRequest::default(); - construct_drop_mask_policy_operations( - self, - name_key, - req.if_exists, - true, - func_name!(), - &mut txn, - ) - .await?; + let res = self.get_id_and_value(name_ident).await?; + debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!()); - let (succ, _responses) = send_txn(self, txn).await?; + let Some((seq_id, seq_meta)) = res else { + return Ok(None); + }; - debug!( - succ = succ; - "drop_data_mask" - ); + let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + + txn_delete_exact(&mut txn, name_ident, seq_id.seq); + txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); + + clear_table_column_mask_policy(self, name_ident, &mut txn).await?; + + let (succ, _responses) = send_txn(self, txn).await?; + debug!(succ = succ;"{}", func_name!()); if succ { - break; + return Ok(Some((seq_id, seq_meta))); } } - - Ok(DropDatamaskReply {}) } - async fn get_data_mask(&self, req: GetDatamaskReq) -> Result { - debug!(req :? =(&req); "DatamaskApi: {}", func_name!()); - - let name_key = &req.name; + async fn get_data_mask( + &self, + name_ident: &DataMaskNameIdent, + ) -> Result>, MetaError> { + debug!(req :? =(&name_ident); "DatamaskApi: {}", func_name!()); - let (_seq_id, policy) = self - .get_id_and_value(name_key) - .await? - .ok_or_else(|| AppError::from(name_key.unknown_error("get_data_mask")))?; + let res = self.get_id_and_value(name_ident).await?; - Ok(GetDatamaskReply { policy }) + Ok(res.map(|(_, seq_meta)| seq_meta)) } } @@ -195,7 +187,7 @@ async fn clear_table_column_mask_policy( kv_api: &(impl kvapi::KVApi + ?Sized), name_ident: &DataMaskNameIdent, txn: &mut TxnRequest, -) -> Result<(), KVAppError> { +) -> Result<(), MetaError> { let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone()); let seq_id_list = kv_api.get_pb(&id_list_key).await?; @@ -204,9 +196,7 @@ async fn clear_table_column_mask_policy( return Ok(()); }; - txn.condition - .push(txn_cond_eq_seq(&id_list_key, seq_id_list.seq)); - txn.if_then.push(txn_op_del(&id_list_key)); + txn_delete_exact(txn, &id_list_key, seq_id_list.seq); // remove mask policy from table meta for table_id in seq_id_list.data.id_list.into_iter() { @@ -214,64 +204,22 @@ async fn clear_table_column_mask_policy( let (tb_meta_seq, table_meta_opt): (_, Option) = get_pb_value(kv_api, &tbid).await?; - if let Some(mut table_meta) = table_meta_opt { - if let Some(column_mask_policy) = table_meta.column_mask_policy { - let new_column_mask_policy = column_mask_policy - .into_iter() - .filter(|(_, name)| name != name_ident.name()) - .collect(); - - table_meta.column_mask_policy = Some(new_column_mask_policy); - - txn.condition.push(txn_cond_eq_seq(&tbid, tb_meta_seq)); - txn.if_then - .push(txn_op_put(&tbid, serialize_struct(&table_meta)?)); - } - } - } - - Ok(()) -} -async fn construct_drop_mask_policy_operations( - kv_api: &(impl kvapi::KVApi + ?Sized), - name_key: &DataMaskNameIdent, - drop_if_exists: bool, - if_delete: bool, - ctx: &str, - txn: &mut TxnRequest, -) -> Result<(), KVAppError> { - let res = kv_api.get_id_and_value(name_key).await?; - - let (seq_id, seq_meta) = match res { - Some((seq_id, seq_meta)) => (seq_id, seq_meta), - None => { - return if drop_if_exists { - Ok(()) - } else { - let err = AppError::from(name_key.unknown_error("drop_data_mask")); - Err(err.into()) - }; - } - }; + let Some(mut table_meta) = table_meta_opt else { + continue; + }; - let id_ident = seq_id.data.into_t_ident(name_key.tenant()); + if let Some(column_mask_policy) = table_meta.column_mask_policy { + let new_column_mask_policy = column_mask_policy + .into_iter() + .filter(|(_, name)| name != name_ident.name()) + .collect(); - txn.condition.push(txn_cond_eq_seq(&id_ident, seq_meta.seq)); - txn.if_then.push(txn_op_del(&id_ident)); + table_meta.column_mask_policy = Some(new_column_mask_policy); - if if_delete { - txn.condition.push(txn_cond_eq_seq(name_key, seq_id.seq)); - txn.if_then.push(txn_op_del(name_key)); - clear_table_column_mask_policy(kv_api, name_key, txn).await?; + txn_replace_exact(txn, &tbid, tb_meta_seq, &table_meta)?; + } } - debug!( - name :? =(name_key), - seq_id :? =seq_id, - ctx = ctx; - "construct_drop_mask_policy_operations" - ); - Ok(()) } diff --git a/src/meta/api/src/kv_app_error.rs b/src/meta/api/src/kv_app_error.rs index 0c4132dcc1c0..516ed135b188 100644 --- a/src/meta/api/src/kv_app_error.rs +++ b/src/meta/api/src/kv_app_error.rs @@ -18,6 +18,7 @@ use databend_common_exception::ErrorCode; use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::app_error::TenantIsEmpty; use databend_common_meta_stoerr::MetaStorageError; +use databend_common_meta_types::InvalidArgument; use databend_common_meta_types::InvalidReply; use databend_common_meta_types::MetaAPIError; use databend_common_meta_types::MetaClientError; @@ -87,6 +88,13 @@ impl From for KVAppError { } } +impl From for KVAppError { + fn from(value: InvalidArgument) -> Self { + let network_error = MetaNetworkError::from(value); + Self::MetaError(MetaError::from(network_error)) + } +} + impl From for KVAppError { fn from(e: MetaAPIError) -> Self { let meta_err = MetaError::from(e); diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index df73754a096c..86c526e88c00 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -36,7 +36,6 @@ use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskIdIdent; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; -use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; @@ -2870,22 +2869,28 @@ impl SchemaApiTestSuite { let req = CreateDatamaskReq { create_option: CreateOption::CreateIfNotExists, name: DataMaskNameIdent::new(tenant.clone(), mask_name_1.to_string()), - args: vec![], - return_type: "".to_string(), - body: "".to_string(), - comment: None, - create_on: created_on, + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: None, + create_on: created_on, + update_on: None, + }, }; mt.create_data_mask(req).await?; let req = CreateDatamaskReq { create_option: CreateOption::CreateIfNotExists, name: DataMaskNameIdent::new(tenant.clone(), mask_name_2.to_string()), - args: vec![], - return_type: "".to_string(), - body: "".to_string(), - comment: None, - create_on: created_on, + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: None, + create_on: created_on, + update_on: None, + }, }; mt.create_data_mask(req).await?; } @@ -3063,12 +3068,9 @@ impl SchemaApiTestSuite { info!("--- drop mask policy check"); { - let req = DropDatamaskReq { - if_exists: true, - name: DataMaskNameIdent::new(tenant.clone(), mask_name_1), - }; - - mt.drop_data_mask(req).await?; + let name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_1); + let dropped = mt.drop_data_mask(&name_ident).await?; + assert!(dropped.is_some()); // check table meta let req = GetTableReq { @@ -3095,11 +3097,14 @@ impl SchemaApiTestSuite { let req = CreateDatamaskReq { create_option: CreateOption::CreateIfNotExists, name: name.clone(), - args: vec![], - return_type: "".to_string(), - body: "".to_string(), - comment: Some("before".to_string()), - create_on: created_on, + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: Some("before".to_string()), + create_on: created_on, + update_on: None, + }, }; mt.create_data_mask(req).await?; let old_id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?; @@ -3112,11 +3117,14 @@ impl SchemaApiTestSuite { let req = CreateDatamaskReq { create_option: CreateOption::CreateOrReplace, name: name.clone(), - args: vec![], - return_type: "".to_string(), - body: "".to_string(), - comment: Some("after".to_string()), - create_on: created_on, + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: Some("after".to_string()), + create_on: created_on, + update_on: None, + }, }; mt.create_data_mask(req).await?; diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index 661a9a40cafc..cc51fd7cce37 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -326,11 +326,35 @@ pub async fn send_txn( kv_api: &(impl kvapi::KVApi + ?Sized), txn_req: TxnRequest, ) -> Result<(bool, Vec), KVAppError> { + debug!("send txn: {}", txn_req); let tx_reply = kv_api.transaction(txn_req).await?; let (succ, responses) = txn_reply_to_api_result(tx_reply)?; Ok((succ, responses)) } +/// Add a delete operation by key and exact seq to [`TxnRequest`]. +pub fn txn_delete_exact(txn: &mut TxnRequest, key: &impl kvapi::Key, seq: u64) { + txn.condition.push(txn_cond_eq_seq(key, seq)); + txn.if_then.push(txn_op_del(key)); +} + +/// Add a replace operation by key and exact seq to [`TxnRequest`]. +pub fn txn_replace_exact( + txn: &mut TxnRequest, + key: &K, + seq: u64, + value: &K::ValueType, +) -> Result<(), InvalidArgument> +where + K: kvapi::Key, + K::ValueType: FromToProto + 'static, +{ + txn.condition.push(txn_cond_eq_seq(key, seq)); + txn.if_then.push(txn_op_put_pb(key, value)?); + + Ok(()) +} + /// Build a TxnCondition that compares the seq of a record. pub fn txn_cond_eq_seq(key: &impl kvapi::Key, seq: u64) -> TxnCondition { TxnCondition::eq_seq(key.to_string_key(), seq) @@ -345,6 +369,19 @@ pub fn txn_cond_seq(key: &impl kvapi::Key, op: ConditionResult, seq: u64) -> Txn } } +pub fn txn_op_put_pb(key: &K, value: &K::ValueType) -> Result +where + K: kvapi::Key, + K::ValueType: FromToProto + 'static, +{ + let p = value.to_pb().map_err(|e| InvalidArgument::new(e, ""))?; + + let mut buf = vec![]; + prost::Message::encode(&p, &mut buf).map_err(|e| InvalidArgument::new(e, ""))?; + + Ok(TxnOp::put(key.to_string_key(), buf)) +} + /// Build a txn operation that puts a record. pub fn txn_op_put(key: &impl kvapi::Key, value: Vec) -> TxnOp { TxnOp::put(key.to_string_key(), value) diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index ed57c873916c..d8f45fc20a76 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -17,7 +17,9 @@ use std::fmt::Display; use databend_common_exception::ErrorCode; use databend_common_meta_types::MatchSeq; +use crate::background::job_ident; use crate::data_mask::data_mask_name_ident; +use crate::tenant_key::errors::ExistError; use crate::tenant_key::errors::UnknownError; /// Output message for end users, with sensitive info stripped. @@ -79,38 +81,6 @@ impl CatalogAlreadyExists { } } -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[error("DatamaskAlreadyExists: `{name}` while `{context}`")] -pub struct DatamaskAlreadyExists { - name: String, - context: String, -} - -impl DatamaskAlreadyExists { - pub fn new(name: impl Into, context: impl Into) -> Self { - Self { - name: name.into(), - context: context.into(), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[error("BackgroundJobAlreadyExists: `{name}` while `{context}`")] -pub struct BackgroundJobAlreadyExists { - name: String, - context: String, -} - -impl BackgroundJobAlreadyExists { - pub fn new(name: impl Into, context: impl Into) -> Self { - Self { - name: name.into(), - context: context.into(), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("CreateDatabaseWithDropTime: `{db_name}` with drop_on")] pub struct CreateDatabaseWithDropTime { @@ -463,22 +433,6 @@ impl UnknownDatamask { } } -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] -#[error("UnknownBackgroundJob: `{name}` while `{context}`")] -pub struct UnknownBackgroundJob { - name: String, - context: String, -} - -impl UnknownBackgroundJob { - pub fn new(name: impl Into, context: impl Into) -> Self { - Self { - name: name.into(), - context: context.into(), - } - } -} - #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] #[error("UnknownDatabaseId: `{db_id}` while `{context}`")] pub struct UnknownDatabaseId { @@ -1256,16 +1210,16 @@ pub enum AppError { IndexColumnIdNotFound(#[from] IndexColumnIdNotFound), #[error(transparent)] - DatamaskAlreadyExists(#[from] DatamaskAlreadyExists), + DatamaskAlreadyExists(#[from] ExistError), #[error(transparent)] UnknownDataMask(#[from] UnknownError), #[error(transparent)] - BackgroundJobAlreadyExists(#[from] BackgroundJobAlreadyExists), + BackgroundJobAlreadyExists(#[from] ExistError), #[error(transparent)] - UnknownBackgroundJob(#[from] UnknownBackgroundJob), + UnknownBackgroundJob(#[from] UnknownError), #[error(transparent)] UnmatchColumnDataType(#[from] UnmatchColumnDataType), @@ -1330,18 +1284,6 @@ impl AppErrorMessage for TenantIsEmpty { } } -impl AppErrorMessage for UnknownBackgroundJob { - fn message(&self) -> String { - format!("Unknown background job '{}'", self.name) - } -} - -impl AppErrorMessage for BackgroundJobAlreadyExists { - fn message(&self) -> String { - format!("Background job '{}' already exists", self.name) - } -} - impl AppErrorMessage for UnknownDatabase { fn message(&self) -> String { format!("Unknown database '{}'", self.db_name) @@ -1652,12 +1594,6 @@ impl AppErrorMessage for IndexColumnIdNotFound { } } -impl AppErrorMessage for DatamaskAlreadyExists { - fn message(&self) -> String { - format!("Datamask '{}' already exists", self.name) - } -} - impl AppErrorMessage for UnknownDatamask { fn message(&self) -> String { format!("Datamask '{}' does not exists", self.name) diff --git a/src/meta/app/src/background/background_job_id_ident.rs b/src/meta/app/src/background/background_job_id_ident.rs index 43582f72eca1..9d8dc74810a3 100644 --- a/src/meta/app/src/background/background_job_id_ident.rs +++ b/src/meta/app/src/background/background_job_id_ident.rs @@ -12,22 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::data_id::DataId; +use crate::tenant::ToTenant; use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; -pub type BackgroundJobIdIdent = TIdent; -pub type BackgroundJobIdIdentRaw = TIdentRaw; +pub type BackgroundJobId = DataId; + +pub type BackgroundJobIdIdent = TIdent; +pub type BackgroundJobIdIdentRaw = TIdentRaw; pub use kvapi_impl::Resource; impl BackgroundJobIdIdent { - pub fn job_id(&self) -> u64 { + pub fn new(tenant: impl ToTenant, job_id: u64) -> Self { + Self::new_generic(tenant, BackgroundJobId::new(job_id)) + } + + pub fn job_id(&self) -> BackgroundJobId { *self.name() } } impl BackgroundJobIdIdentRaw { - pub fn job_id(&self) -> u64 { + pub fn job_id(&self) -> BackgroundJobId { *self.name() } } diff --git a/src/meta/app/src/background/job_ident.rs b/src/meta/app/src/background/job_ident.rs index 00cd82fb80b2..f44c27cbdcf0 100644 --- a/src/meta/app/src/background/job_ident.rs +++ b/src/meta/app/src/background/job_ident.rs @@ -32,23 +32,24 @@ mod kvapi_impl { use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::Key; - use crate::background::BackgroundJobIdIdent; + use crate::background::background_job_id_ident::BackgroundJobId; use crate::background::BackgroundJobIdent; use crate::tenant_key::resource::TenantResource; + use crate::KeyWithTenant; pub struct Resource; impl TenantResource for Resource { const PREFIX: &'static str = "__fd_background_job"; const TYPE: &'static str = "BackgroundJobIdent"; const HAS_TENANT: bool = true; - type ValueType = BackgroundJobIdIdent; + type ValueType = BackgroundJobId; } - impl kvapi::Value for BackgroundJobIdIdent { + impl kvapi::Value for BackgroundJobId { type KeyType = BackgroundJobIdent; - fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { - [self.to_string_key()] + fn dependency_keys(&self, key: &Self::KeyType) -> impl IntoIterator { + [self.into_t_ident(key.tenant()).to_string_key()] } } diff --git a/src/meta/app/src/data_mask/mod.rs b/src/meta/app/src/data_mask/mod.rs index 6976e613f008..fe432724c8e6 100644 --- a/src/meta/app/src/data_mask/mod.rs +++ b/src/meta/app/src/data_mask/mod.rs @@ -23,7 +23,6 @@ use chrono::Utc; pub use data_mask_id_ident::DataMaskId; pub use data_mask_id_ident::DataMaskIdIdent; pub use data_mask_name_ident::DataMaskNameIdent; -use databend_common_meta_types::SeqV; pub use mask_policy_table_id_list_ident::MaskPolicyTableIdListIdent; use crate::schema::CreateOption; @@ -39,28 +38,11 @@ pub struct DatamaskMeta { pub update_on: Option>, } -impl From for DatamaskMeta { - fn from(p: CreateDatamaskReq) -> Self { - DatamaskMeta { - args: p.args.clone(), - return_type: p.return_type.clone(), - body: p.body.clone(), - comment: p.comment.clone(), - create_on: p.create_on, - update_on: None, - } - } -} - #[derive(Clone, Debug, PartialEq, Eq)] pub struct CreateDatamaskReq { pub create_option: CreateOption, pub name: DataMaskNameIdent, - pub args: Vec<(String, String)>, - pub return_type: String, - pub body: String, - pub comment: Option, - pub create_on: DateTime, + pub data_mask_meta: DatamaskMeta, } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] @@ -74,19 +56,11 @@ pub struct DropDatamaskReq { pub name: DataMaskNameIdent, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct DropDatamaskReply {} - #[derive(Clone, Debug, PartialEq, Eq)] pub struct GetDatamaskReq { pub name: DataMaskNameIdent, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct GetDatamaskReply { - pub policy: SeqV, -} - /// A list of table ids #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, Default, PartialEq)] pub struct MaskpolicyTableIdList { diff --git a/src/meta/app/src/tenant_key/errors.rs b/src/meta/app/src/tenant_key/errors.rs index 233aa927de55..dfee4267d489 100644 --- a/src/meta/app/src/tenant_key/errors.rs +++ b/src/meta/app/src/tenant_key/errors.rs @@ -21,13 +21,34 @@ use crate::app_error::AppErrorMessage; use crate::tenant_key::resource::TenantResource; /// Error occurred when a record already exists for a key. -#[derive(Clone, PartialEq, Eq, thiserror::Error)] +#[derive(thiserror::Error)] pub struct ExistError { name: N, ctx: String, _p: std::marker::PhantomData, } +impl Clone for ExistError +where N: Clone +{ + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + ctx: self.ctx.clone(), + _p: Default::default(), + } + } +} + +impl PartialEq for ExistError +where N: PartialEq +{ + fn eq(&self, other: &Self) -> bool { + self.name == other.name && self.ctx == other.ctx + } +} +impl Eq for ExistError where N: PartialEq {} + impl ExistError { pub fn new(name: N, ctx: impl ToString) -> Self { Self { @@ -73,6 +94,13 @@ where } } +impl AppErrorMessage for ExistError +where + R: TenantResource, + N: fmt::Display, +{ +} + /// Error occurred when a record not found for a key. #[derive(thiserror::Error)] pub struct UnknownError { diff --git a/src/meta/app/src/tenant_key/ident.rs b/src/meta/app/src/tenant_key/ident.rs index d10d5fbb0910..ff2184d806ab 100644 --- a/src/meta/app/src/tenant_key/ident.rs +++ b/src/meta/app/src/tenant_key/ident.rs @@ -20,6 +20,7 @@ use std::hash::Hasher; use crate::tenant::Tenant; use crate::tenant::ToTenant; +use crate::tenant_key::errors::ExistError; use crate::tenant_key::errors::UnknownError; use crate::tenant_key::raw::TIdentRaw; use crate::tenant_key::resource::TenantResource; @@ -138,6 +139,11 @@ impl TIdent { where N: Clone { UnknownError::new(self.name.clone(), ctx) } + + pub fn exist_error(&self, ctx: impl Display) -> ExistError + where N: Clone { + ExistError::new(self.name.clone(), ctx) + } } impl TIdent diff --git a/src/query/ee/src/data_mask/data_mask_handler.rs b/src/query/ee/src/data_mask/data_mask_handler.rs index 49751dcd1b2e..251557c1d657 100644 --- a/src/query/ee/src/data_mask/data_mask_handler.rs +++ b/src/query/ee/src/data_mask/data_mask_handler.rs @@ -17,11 +17,11 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_exception::Result; use databend_common_meta_api::DatamaskApi; +use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; -use databend_common_meta_app::data_mask::GetDatamaskReq; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_store::MetaStore; use databend_enterprise_data_mask_feature::data_mask_handler::DatamaskHandler; @@ -42,7 +42,14 @@ impl DatamaskHandler for RealDatamaskHandler { } async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()> { - let _ = meta_api.drop_data_mask(req).await?; + let dropped = meta_api.drop_data_mask(&req.name).await?; + if dropped.is_none() { + if req.if_exists { + // Ok + } else { + return Err(AppError::from(req.name.unknown_error("drop data mask")).into()); + } + } Ok(()) } @@ -53,12 +60,12 @@ impl DatamaskHandler for RealDatamaskHandler { tenant: &Tenant, name: String, ) -> Result { - let resp = meta_api - .get_data_mask(GetDatamaskReq { - name: DataMaskNameIdent::new(tenant, name), - }) - .await?; - Ok(resp.policy.data) + let name_ident = DataMaskNameIdent::new(tenant, name); + let seq_meta = meta_api + .get_data_mask(&name_ident) + .await? + .ok_or_else(|| AppError::from(name_ident.unknown_error("get data mask")))?; + Ok(seq_meta.data) } } diff --git a/src/query/sql/src/planner/plans/data_mask.rs b/src/query/sql/src/planner/plans/data_mask.rs index ff0e220d7cf7..c59001dcfe41 100644 --- a/src/query/sql/src/planner/plans/data_mask.rs +++ b/src/query/sql/src/planner/plans/data_mask.rs @@ -22,6 +22,7 @@ use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_meta_app::data_mask::CreateDatamaskReq; use databend_common_meta_app::data_mask::DataMaskNameIdent; +use databend_common_meta_app::data_mask::DatamaskMeta; use databend_common_meta_app::data_mask::DropDatamaskReq; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; @@ -45,16 +46,19 @@ impl From for CreateDatamaskReq { CreateDatamaskReq { create_option: p.create_option, name: DataMaskNameIdent::new(p.tenant.clone(), &p.name), - args: p - .policy - .args - .iter() - .map(|arg| (arg.arg_name.to_string(), arg.arg_type.to_string())) - .collect(), - return_type: p.policy.return_type.to_string(), - body: p.policy.body.to_string(), - comment: p.policy.comment, - create_on: Utc::now(), + data_mask_meta: DatamaskMeta { + args: p + .policy + .args + .iter() + .map(|arg| (arg.arg_name.to_string(), arg.arg_type.to_string())) + .collect(), + return_type: p.policy.return_type.to_string(), + body: p.policy.body.to_string(), + comment: p.policy.comment, + create_on: Utc::now(), + update_on: None, + }, } } }