Skip to content

Commit

Permalink
fix: When Replacing column mask, old name->id->value should be comp…
Browse files Browse the repository at this point in the history
…letely deleted (#16328)

* fix: When Replacing column mask, old `name->id->value` should be completely deleted

Before this commit, only the `name->id` is removed when replacing column
mask, this would cause column mask records leaked as a junk in
meta-service.

Other changes: simplify data mask and background job API implementation
with KVPbApi routines.

* refactor: simplify data-mask get/drop arg and return value
  • Loading branch information
drmingdrmer authored Aug 27, 2024
1 parent 41cf22e commit af5fe4c
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 385 deletions.
143 changes: 42 additions & 101 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use chrono::Utc;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::BackgroundJobAlreadyExists;
use databend_common_meta_app::app_error::UnknownBackgroundJob;
use databend_common_meta_app::background::background_job_id_ident::BackgroundJobId;
use databend_common_meta_app::background::BackgroundJobIdIdent;
use databend_common_meta_app::background::BackgroundJobIdent;
use databend_common_meta_app::background::BackgroundJobInfo;
Expand All @@ -44,7 +41,6 @@ use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::ConditionResult::Eq;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MatchSeq::Any;
use databend_common_meta_types::MetaError;
Expand All @@ -57,17 +53,15 @@ use log::debug;
use crate::background_api::BackgroundApi;
use crate::deserialize_struct;
use crate::fetch_id;
use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::send_txn;
use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_seq;
use crate::txn_op_put;
use crate::util::deserialize_u64;
use crate::util::txn_op_put_pb;
use crate::util::txn_replace_exact;

/// BackgroundApi is implemented upon kvapi::KVApi.
/// Thus every type that impl kvapi::KVApi impls BackgroundApi.
Expand All @@ -87,61 +81,46 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
trials.next().unwrap()?.await;

// Get db mask by name to ensure absence
let (seq, id) = get_u64_value(self, name_key).await?;
debug!(seq = seq, id = id, name_key :? =(name_key); "create_background_job");
let job_id = self.get_pb(name_key).await?;

debug!(res :? = job_id, name_key :? =(name_key); "get existing, when create_background_job");

if seq > 0 {
if let Some(seq_id) = job_id {
return if req.if_not_exists {
Ok(CreateBackgroundJobReply { id })
Ok(CreateBackgroundJobReply { id: *seq_id.data })
} else {
Err(KVAppError::AppError(AppError::BackgroundJobAlreadyExists(
BackgroundJobAlreadyExists::new(
name_key.name(),
format!("create background job: {:?}", req.job_name),
),
)))
Err(
AppError::BackgroundJobAlreadyExists(name_key.exist_error(func_name!()))
.into(),
)
};
}
};

let id = fetch_id(self, IdGenerator::background_job_id()).await?;
let id_key = BackgroundJobIdIdent::new(name_key.tenant(), id);
let id = BackgroundJobId::new(id);
let id_key = BackgroundJobIdIdent::new_generic(name_key.tenant(), id);

debug!(
id :? =(&id_key),
name_key :? =(name_key);
"new backgroundjob id"
);
debug!(id :? =(&id_key),name_key :? =(name_key); "{}", func_name!());

{
let mut txn = TxnRequest::default();

let meta: BackgroundJobInfo = req.job_info.clone();
let condition = vec![txn_cond_seq(name_key, Eq, 0)];
let if_then = vec![
txn_op_put(name_key, serialize_u64(id)?), // name -> background_job_id
txn_op_put(&id_key, serialize_struct(&meta)?), // id -> meta
];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;
txn_replace_exact(&mut txn, name_key, 0, &id)?; // name -> background_job_id
txn.if_then.push(txn_op_put_pb(&id_key, &meta)?); // id -> meta

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
name :? =(name_key),
id :? =(&id_key),
succ = succ;
"create_background_job"
);
debug!(name :? =(name_key),id :? =(&id_key),succ = succ;"{}", func_name!());

if succ {
break id;
}
}
};

Ok(CreateBackgroundJobReply { id })
Ok(CreateBackgroundJobReply { id: *id })
}

// TODO(zhihanz): needs to drop both background job and related background tasks, also needs to gracefully shutdown running queries
Expand Down Expand Up @@ -197,10 +176,15 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {

let name_key = &req.name;

let (id_ident, seq_joq) =
get_background_job_or_error(self, name_key, format!("get_: {:?}", name_key)).await?;
let (seq_id, seq_job) = self
.get_id_and_value(name_key)
.await?
.ok_or_else(|| AppError::from(name_key.unknown_error("get_background_job")))?;

Ok(GetBackgroundJobReply::new(id_ident, seq_joq))
Ok(GetBackgroundJobReply::new(
seq_id.data.into_t_ident(name_key.tenant()),
seq_job,
))
}

#[fastrace::trace]
Expand Down Expand Up @@ -298,69 +282,26 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
}
}

async fn get_background_job_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &BackgroundJobIdent,
) -> Result<BackgroundJobIdIdent, KVAppError> {
let (id_seq, id) = get_u64_value(kv_api, name_key).await?;
assert_background_job_exist(id_seq, name_key)?;

Ok(BackgroundJobIdIdent::new(name_key.tenant(), id))
}

async fn get_background_job_or_error(
async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_ident: &BackgroundJobIdent,
_msg: impl Display,
) -> Result<kvapi::Pair<BackgroundJobIdIdent>, KVAppError> {
let id_ident = get_background_job_id(kv_api, name_ident).await?;
mutation: F,
) -> Result<UpdateBackgroundJobReply, KVAppError> {
debug!(req :? =(name_ident); "BackgroundApi: {}", func_name!());

let seq_job = kv_api
.get_pb(&id_ident)
let (seq_id, mut seq_meta) = kv_api
.get_id_and_value(name_ident)
.await?
.ok_or_else(|| unknown_background_job(name_ident))?;
.ok_or_else(|| AppError::from(name_ident.unknown_error("update_background_job")))?;

Ok((id_ident, seq_job))
}

/// Return OK if a db_id or db_meta exists by checking the seq.
///
/// Otherwise returns UnknownBackgroundJob error
pub fn assert_background_job_exist(
seq: u64,
name_ident: &BackgroundJobIdent,
) -> Result<(), AppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
let err = unknown_background_job(name_ident);
Err(err)
} else {
Ok(())
}
}

pub fn unknown_background_job(name_ident: &BackgroundJobIdent) -> AppError {
AppError::UnknownBackgroundJob(UnknownBackgroundJob::new(
name_ident.job_name(),
format!("{:?}", name_ident),
))
}

async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name: &BackgroundJobIdent,
mutation: F,
) -> Result<UpdateBackgroundJobReply, KVAppError> {
debug!(req :? =(name); "BackgroundApi: {}", func_name!());
let (id_ident, mut seq_job) =
get_background_job_or_error(kv_api, name, "update_background_job").await?;
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());

let should_update = mutation(&mut seq_job.data);
let should_update = mutation(&mut seq_meta.data);
if !should_update {
return Ok(UpdateBackgroundJobReply::new(id_ident.clone()));
}

let req = UpsertPB::update_exact(id_ident.clone(), seq_job);
let req = UpsertPB::update_exact(id_ident.clone(), seq_meta);
let resp = kv_api.upsert_pb(&req).await?;

assert!(resp.is_changed());
Expand Down
21 changes: 15 additions & 6 deletions src/meta/api/src/data_mask_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DropDatamaskReply;
use databend_common_meta_app::data_mask::DropDatamaskReq;
use databend_common_meta_app::data_mask::GetDatamaskReply;
use databend_common_meta_app::data_mask::GetDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskId;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;

use crate::kv_app_error::KVAppError;

Expand All @@ -28,7 +29,15 @@ pub trait DatamaskApi: Send + Sync {
req: CreateDatamaskReq,
) -> Result<CreateDatamaskReply, KVAppError>;

async fn drop_data_mask(&self, req: DropDatamaskReq) -> Result<DropDatamaskReply, KVAppError>;
/// On success, returns the dropped id and data mask.
/// Returning None, means nothing is removed.
async fn drop_data_mask(
&self,
name_ident: &DataMaskNameIdent,
) -> Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, KVAppError>;

async fn get_data_mask(&self, req: GetDatamaskReq) -> Result<GetDatamaskReply, KVAppError>;
async fn get_data_mask(
&self,
name_ident: &DataMaskNameIdent,
) -> Result<Option<SeqV<DatamaskMeta>>, MetaError>;
}
Loading

0 comments on commit af5fe4c

Please sign in to comment.