Skip to content

Commit

Permalink
chore: simplify create_data_mask
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 22, 2024
1 parent 9ef953d commit 9271890
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 80 deletions.
1 change: 0 additions & 1 deletion src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;
Expand Down
107 changes: 46 additions & 61 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;
Expand All @@ -54,7 +51,6 @@ use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::txn_cond_seq;
use crate::txn_op_del;
use crate::txn_op_put;

Expand All @@ -78,8 +74,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
let (seq, id) = get_u64_value(self, name_ident).await?;
debug!(seq = seq, id = id, name_key :? =(name_ident); "create_data_mask");

let mut condition = vec![];
let mut if_then = vec![];
let mut txn = TxnRequest::default();

if seq > 0 {
match req.create_option {
Expand All @@ -99,8 +94,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
false,
false,
func_name!(),
&mut condition,
&mut if_then,
&mut txn,
)
.await?;
}
Expand All @@ -126,20 +120,14 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
{
let meta: DatamaskMeta = req.clone().into();
let id_list = MaskpolicyTableIdList::default();
condition.push(txn_cond_seq(name_ident, Eq, seq));
if_then.extend( vec![
txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.extend( vec![
txn_op_put(name_ident, serialize_u64(id)?), // name -> db_id
txn_op_put(&id_ident, serialize_struct(&meta)?), // id -> meta
txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */
]);

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;
let (succ, _responses) = send_txn(self, txn).await?;

debug!(
name :? =(name_ident),
Expand All @@ -166,25 +154,19 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
loop {
trials.next().unwrap()?.await;

let mut condition = vec![];
let mut if_then = vec![];
let mut txn = TxnRequest::default();

construct_drop_mask_policy_operations(
self,
name_key,
req.if_exists,
true,
func_name!(),
&mut condition,
&mut if_then,
&mut txn,
)
.await?;
let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};
let (succ, _responses) = send_txn(self, txn_req).await?;

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
succ = succ;
Expand Down Expand Up @@ -253,35 +235,38 @@ pub fn assert_data_mask_exist(
async fn clear_table_column_mask_policy(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &DataMaskNameIdent,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
let id_list_key = MaskPolicyTableIdListIdent::new_from(name_ident.clone());

let (id_list_seq, id_list_opt): (_, Option<MaskpolicyTableIdList>) =
get_pb_value(kv_api, &id_list_key).await?;
if let Some(id_list) = id_list_opt {
condition.push(txn_cond_seq(&id_list_key, Eq, id_list_seq));
if_then.push(txn_op_del(&id_list_key));

// remove mask policy from table meta
for table_id in id_list.id_list.into_iter() {
let tbid = TableId { table_id };

let (tb_meta_seq, table_meta_opt): (_, Option<TableMeta>) =
get_pb_value(kv_api, &tbid).await?;
if let Some(mut table_meta) = table_meta_opt {
if let Some(column_mask_policy) = table_meta.column_mask_policy {
let new_column_mask_policy = column_mask_policy
.into_iter()
.filter(|(_, name)| name != name_ident.name())
.collect();

table_meta.column_mask_policy = Some(new_column_mask_policy);

condition.push(txn_cond_seq(&tbid, Eq, tb_meta_seq));
if_then.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));
}
let seq_id_list = kv_api.get_pb(&id_list_key).await?;

let Some(seq_id_list) = seq_id_list else {
return Ok(());
};

txn.condition
.push(txn_cond_eq_seq(&id_list_key, seq_id_list.seq));
txn.if_then.push(txn_op_del(&id_list_key));

// remove mask policy from table meta
for table_id in seq_id_list.data.id_list.into_iter() {
let tbid = TableId { table_id };

let (tb_meta_seq, table_meta_opt): (_, Option<TableMeta>) =
get_pb_value(kv_api, &tbid).await?;
if let Some(mut table_meta) = table_meta_opt {
if let Some(column_mask_policy) = table_meta.column_mask_policy {
let new_column_mask_policy = column_mask_policy
.into_iter()
.filter(|(_, name)| name != name_ident.name())
.collect();

table_meta.column_mask_policy = Some(new_column_mask_policy);

txn.condition.push(txn_cond_eq_seq(&tbid, tb_meta_seq));
txn.if_then
.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));
}
}
}
Expand All @@ -295,8 +280,7 @@ async fn construct_drop_mask_policy_operations(
drop_if_exists: bool,
if_delete: bool,
ctx: &str,
condition: &mut Vec<TxnCondition>,
if_then: &mut Vec<TxnOp>,
txn: &mut TxnRequest,
) -> Result<(), KVAppError> {
let result = get_data_mask_or_err(
kv_api,
Expand All @@ -320,13 +304,14 @@ async fn construct_drop_mask_policy_operations(

let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);

condition.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
if_then.push(txn_op_del(&id_ident));
txn.condition
.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
txn.if_then.push(txn_op_del(&id_ident));

if if_delete {
condition.push(txn_cond_eq_seq(name_key, id_seq));
if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, condition, if_then).await?;
txn.condition.push(txn_cond_eq_seq(name_key, id_seq));
txn.if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, txn).await?;
}

debug!(
Expand Down
1 change: 0 additions & 1 deletion src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_store::MetaStore;
use databend_common_meta_types::anyerror::func_name;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
use databend_storages_common_session::SessionState;
Expand Down
18 changes: 12 additions & 6 deletions src/query/ee/tests/it/background_service/job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ async fn test_one_shot_job() -> Result<()> {
let counter = Arc::new(AtomicUsize::new(0));
let job = TestJob {
counter: counter.clone(),
info: BackgroundJobInfo::new_compactor_job(
BackgroundJobParams::new_one_shot_job(),
UserIdentity::default(),
info: SeqV::new(
0,
BackgroundJobInfo::new_compactor_job(
BackgroundJobParams::new_one_shot_job(),
UserIdentity::default(),
),
),
finish_tx: scheduler.finish_tx.clone(),
};
Expand All @@ -107,9 +110,12 @@ async fn test_interval_job() -> Result<()> {
let counter = Arc::new(AtomicUsize::new(0));
let job = TestJob {
counter: counter.clone(),
info: BackgroundJobInfo::new_compactor_job(
BackgroundJobParams::new_interval_job(Duration::from_millis(10)),
UserIdentity::default(),
info: SeqV::new(
0,
BackgroundJobInfo::new_compactor_job(
BackgroundJobParams::new_interval_job(Duration::from_millis(10)),
UserIdentity::default(),
),
),
finish_tx: scheduler.finish_tx.clone(),
};
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
use databend_storages_common_session::SessionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
use databend_storages_common_table_meta::table_id_ranges::SYS_DB_ID_BEGIN;
Expand Down
9 changes: 0 additions & 9 deletions src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
use databend_storages_common_session::SessionState;
Expand All @@ -116,17 +115,9 @@ use databend_storages_common_session::TxnManagerRef;
use databend_storages_common_session::TxnState;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id;
use databend_storages_common_txn::TxnManagerRef;
use databend_storages_common_txn::TxnState;

use crate::catalog::Catalog;
use crate::catalog::StorageDescription;
use crate::catalogs::default::MutableCatalog;
use crate::catalogs::Catalog;
use crate::database::Database;
use crate::table::Table;
use crate::table_args::TableArgs;
use crate::table_function::TableFunction;
#[derive(Clone, Debug)]
pub struct SessionCatalog {
inner: MutableCatalog,
Expand Down

0 comments on commit 9271890

Please sign in to comment.