Skip to content

Commit

Permalink
refactor: Simplify table lock API. (#16413)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Sep 7, 2024
1 parent 7720978 commit 65bb320
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 283 deletions.
2 changes: 1 addition & 1 deletion src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let meta: BackgroundJobInfo = req.job_info.clone();

txn_replace_exact(&mut txn, name_key, 0, &id)?; // name -> background_job_id
txn.if_then.push(txn_op_put_pb(&id_key, &meta)?); // id -> meta
txn.if_then.push(txn_op_put_pb(&id_key, &meta, None)?); // id -> meta

let (succ, _responses) = send_txn(self, txn).await?;

Expand Down
6 changes: 3 additions & 3 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
let id_list = MaskpolicyTableIdList::default();
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
txn.if_then.extend(vec![
txn_op_put_pb(name_ident, &id)?, // name -> db_id
txn_op_put_pb(&id_ident, &meta)?, // id -> meta
txn_op_put_pb(&id_list_key, &id_list)?, // data mask name -> id_list
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
]);

let (succ, _responses) = send_txn(self, txn).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ where
txn.condition
.extend(vec![txn_cond_eq_seq(name_ident, current_id_seq)]);

txn.if_then.push(txn_op_put_pb(name_ident, &id)?); // (tenant, name) -> id
txn.if_then.push(txn_op_put_pb(&id_ident, value)?); // (id) -> value
txn.if_then.push(txn_op_put_pb(name_ident, &id, None)?); // (tenant, name) -> id
txn.if_then.push(txn_op_put_pb(&id_ident, value, None)?); // (id) -> value

// Add associated
let kvs = associated_records(id);
Expand Down
40 changes: 35 additions & 5 deletions src/meta/api/src/name_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::fmt;
use std::time::Duration;

use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant_key::errors::ExistError;
Expand All @@ -34,7 +35,8 @@ use crate::kv_pb_api::UpsertPB;
use crate::meta_txn_error::MetaTxnError;
use crate::send_txn;
use crate::txn_backoff::txn_backoff;
use crate::util::txn_replace_exact;
use crate::txn_cond_eq_seq;
use crate::util::txn_op_put_pb;

/// NameValueApi provide generic meta-service access pattern implementations for `name -> value` mapping.
///
Expand All @@ -49,6 +51,30 @@ where
N: fmt::Debug + Clone + Send + Sync + 'static,
{
/// Create a `name -> value` mapping.
async fn create_name_value(
&self,
name_ident: TIdent<R, N>,
value: R::ValueType,
ttl: Option<Duration>,
) -> Result<Result<(), ExistError<R, N>>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::insert(name_ident.clone(), value);
let upsert = if let Some(ttl) = ttl {
upsert.with_ttl(ttl)
} else {
upsert
};

let transition = self.upsert_pb(&upsert).await?;

if !transition.is_changed() {
return Ok(Err(name_ident.exist_error(func_name!())));
}
Ok(Ok(()))
}

/// Create a `name -> value` mapping, with `CreateOption` support
async fn create_name_value_with_create_option(
&self,
name_ident: TIdent<R, N>,
Expand All @@ -73,15 +99,18 @@ where

/// Update an existent `name -> value` mapping.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// The `update` function is called with the previous value
/// and should output the updated to write back,
/// with an optional time-to-last value.
///
/// `not_found` is called when the name does not exist.
/// And this function decide to:
/// - cancel update by returning `Ok(())`
/// - or return an error when the name does not exist.
async fn update_existent_name_value<E>(
&self,
name_ident: &TIdent<R, N>,
update: impl Fn(R::ValueType) -> Option<R::ValueType> + Send,
update: impl Fn(R::ValueType) -> Option<(R::ValueType, Option<Duration>)> + Send,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());
Expand All @@ -100,12 +129,13 @@ where
None => return Ok(not_found()),
};

let Some(updated) = updated else {
let Some((updated, ttl)) = updated else {
// update is cancelled
return Ok(Ok(()));
};

txn_replace_exact(&mut txn, name_ident, seq, &updated)?;
txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.push(txn_op_put_pb(name_ident, &updated, ttl)?);

let (succ, _responses) = send_txn(self, txn).await?;

Expand Down
Loading

0 comments on commit 65bb320

Please sign in to comment.