Skip to content

Commit

Permalink
refactor: add generic CrudMgr to implement various simple CRUD manager (
Browse files Browse the repository at this point in the history
#14988)

`CrudMgr` is a generic CRUD interface for meta data operations.

- It provide `add`, `update`, `get`, `remove` and `list` operations.

- The key space it operates on is defined by the type `TIdent<R>`
  which contains a `Tenant` and a `Name`.

- `R: TenantResource` contains essential information for such a CRUD
  manger, including key prefix, value type, and customizable error for
  unknown record and existing record.

- A `CrudMgr` instance can only access keys of exactly one [`Tenant`].
  • Loading branch information
drmingdrmer authored Mar 18, 2024
1 parent e4e0ebe commit beba993
Show file tree
Hide file tree
Showing 20 changed files with 382 additions and 183 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ databend-common-meta-types = { path = "../types" }
databend-common-proto-conv = { path = "../proto-conv" }

anyhow = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
enumflags2 = { workspace = true }
Expand Down
178 changes: 178 additions & 0 deletions src/meta/api/src/crud/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A generic CRUD interface for meta data operations.
use std::marker::PhantomData;
use std::sync::Arc;

use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant_key::TIdent;
use databend_common_meta_app::tenant_key::TenantResource;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::ValueWithName;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::NonEmptyString;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use futures::TryStreamExt;

use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;

/// A generic CRUD interface for meta data operations.
///
/// - It provide `add`, `update`, `get`, `remove` and `list` operations.
/// - The key space it operates on is defined by the type [`TIdent<R>`],
/// which contains a `Tenant` and a `Name`.
///
/// One `CrudMgr` instance can only access keys of exactly one [`Tenant`].
///
/// [`TIdent<R>`]: TIdent
pub struct CrudMgr<R> {
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
tenant: Tenant,
_p: PhantomData<R>,
}

impl<R> CrudMgr<R> {
/// Create a new `CrudMgr` instance providing CRUD access for a key space defined by `R`: [`TenantResource`].
pub fn create(
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
tenant: &NonEmptyString,
) -> Self {
CrudMgr {
kv_api,
tenant: Tenant::new_nonempty(tenant.clone()),
_p: Default::default(),
}
}

/// Create a structured key for the given name.
fn ident(&self, name: &str) -> TIdent<R> {
TIdent::new(self.tenant.clone(), name)
}
}

/// A shortcut
type ValueOf<R> = <TIdent<R> as kvapi::Key>::ValueType;

impl<R> CrudMgr<R>
where
R: TenantResource + Send + 'static,
R::UnknownError: From<MetaError>,
R::ExistError: From<MetaError>,
// As a kvapi::Key, the corresponding value contains a name.
ValueOf<R>: ValueWithName + FromToProto + Clone,
{
#[async_backtrace::framed]
#[minitrace::trace]
pub async fn add(
&self,
value: ValueOf<R>,
create_option: &CreateOption,
) -> Result<(), R::ExistError> {
let ident = self.ident(value.name());

let seq = MatchSeq::from(*create_option);
let upsert = UpsertPB::insert(ident, value.clone()).with(seq);

let res = self.kv_api.upsert_pb(&upsert).await?;

if let CreateOption::Create = create_option {
if res.prev.is_some() {
return Err(R::error_exist(
&self.tenant,
value.name(),
|| "Exist when add",
));
}
}

Ok(())
}

#[async_backtrace::framed]
#[minitrace::trace]
pub async fn update(
&self,
value: ValueOf<R>,
match_seq: MatchSeq,
) -> Result<u64, R::UnknownError> {
let ident = self.ident(value.name());
let upsert = UpsertPB::update(ident, value.clone()).with(match_seq);

let res = self.kv_api.upsert_pb(&upsert).await?;

match res.result {
Some(SeqV { seq: s, .. }) => Ok(s),
None => Err(R::error_unknown(
&self.tenant,
value.name(),
|| "NotFound when update",
)),
}
}

#[async_backtrace::framed]
#[minitrace::trace]
pub async fn remove(&self, name: &str, seq: MatchSeq) -> Result<(), R::UnknownError> {
let ident = self.ident(name);

let upsert = UpsertPB::delete(ident).with(seq);

let res = self.kv_api.upsert_pb(&upsert).await?;
res.removed_or_else(|e| {
R::error_unknown(&self.tenant, name, || format!("Exist when remove: {:?}", e))
})?;

Ok(())
}

#[async_backtrace::framed]
#[minitrace::trace]
pub async fn get(
&self,
name: &str,
seq: MatchSeq,
) -> Result<SeqV<ValueOf<R>>, R::UnknownError> {
let ident = self.ident(name);

let res = self.kv_api.get_pb(&ident).await?;

let seq_value =
res.ok_or_else(|| R::error_unknown(&self.tenant, name, || "NotFound when get"))?;

match seq.match_seq(&seq_value) {
Ok(_) => Ok(seq_value),
Err(e) => Err(R::error_unknown(&self.tenant, name, || e)),
}
}

#[async_backtrace::framed]
#[minitrace::trace]
pub async fn list(&self) -> Result<Vec<ValueOf<R>>, MetaError> {
let dir_name = DirName::new(self.ident("dummy"));

let values = self.kv_api.list_pb_values(&dir_name).await?;
let values = values.try_collect().await?;

Ok(values)
}
}
2 changes: 2 additions & 0 deletions src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub(crate) mod testing;
pub mod txn_backoff;
pub(crate) mod util;

pub mod crud;

pub use background_api::BackgroundApi;
pub use background_api_test_suite::BackgroundApiTestSuite;
pub use data_mask_api::DatamaskApi;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mod connection;
mod file_format;
mod network_policy;
mod network_policy_ident;
pub mod network_policy_ident;
mod ownership_info;
mod password_policy;
mod password_policy_ident;
Expand Down
36 changes: 35 additions & 1 deletion src/meta/app/src/principal/network_policy_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,59 @@
use crate::tenant_key::TIdent;

/// Defines the meta-service key for network policy.
pub type NetworkPolicyIdent = TIdent<kvapi_impl::Resource>;
pub type NetworkPolicyIdent = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use std::fmt::Display;

use databend_common_exception::ErrorCode;
use databend_common_meta_kvapi::kvapi;

use crate::principal::NetworkPolicy;
use crate::tenant::Tenant;
use crate::tenant_key::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_network_policies";
type ValueType = NetworkPolicy;
type UnknownError = ErrorCode;

fn error_unknown<D: Display>(
_tenant: &Tenant,
name: &str,
ctx: impl FnOnce() -> D,
) -> Self::UnknownError {
ErrorCode::UnknownNetworkPolicy(format!("Unknown network policy '{name}': {}", ctx()))
}

type ExistError = ErrorCode;

fn error_exist<D: Display>(
_tenant: &Tenant,
name: &str,
ctx: impl FnOnce() -> D,
) -> Self::ExistError {
ErrorCode::NetworkPolicyAlreadyExists(format!(
"Network policy '{name}' already exists: {}",
ctx()
))
}
}

impl kvapi::Value for NetworkPolicy {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
}
}

impl kvapi::ValueWithName for NetworkPolicy {
fn name(&self) -> &str {
&self.name
}
}
}

#[cfg(test)]
Expand Down
24 changes: 24 additions & 0 deletions src/meta/app/src/principal/password_policy_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,40 @@ use crate::tenant_key::TIdent;
pub type PasswordPolicyIdent = TIdent<kvapi_impl::Resource>;

mod kvapi_impl {
use std::fmt::Display;

use databend_common_exception::ErrorCode;
use databend_common_meta_kvapi::kvapi;

use crate::principal::PasswordPolicy;
use crate::tenant::Tenant;
use crate::tenant_key::TenantResource;

pub struct Resource;

impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_password_policies";
type ValueType = PasswordPolicy;

type UnknownError = ErrorCode;

fn error_unknown<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::UnknownError {
todo!()
}

type ExistError = ErrorCode;

fn error_exist<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::ExistError {
todo!()
}
}

impl kvapi::Value for PasswordPolicy {
Expand Down
23 changes: 23 additions & 0 deletions src/meta/app/src/principal/user_setting_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,38 @@ use crate::tenant_key::TIdent;
pub type SettingIdent = TIdent<kvapi_impl::Resource>;

mod kvapi_impl {
use std::fmt::Display;

use databend_common_exception::ErrorCode;
use databend_common_meta_kvapi::kvapi;

use crate::principal::UserSetting;
use crate::tenant::Tenant;
use crate::tenant_key::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_settings";
type ValueType = UserSetting;
type UnknownError = ErrorCode;

fn error_unknown<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::UnknownError {
todo!()
}

type ExistError = ErrorCode;

fn error_exist<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::ExistError {
todo!()
}
}

impl kvapi::Value for UserSetting {
Expand Down
24 changes: 24 additions & 0 deletions src/meta/app/src/principal/user_stage_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,39 @@ use crate::tenant_key::TIdent;
pub type StageIdent = TIdent<kvapi_impl::Resource>;

mod kvapi_impl {
use std::fmt::Display;

use databend_common_exception::ErrorCode;
use databend_common_meta_kvapi::kvapi;

use crate::principal::StageInfo;
use crate::tenant::Tenant;
use crate::tenant_key::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_stages";
type ValueType = StageInfo;

type UnknownError = ErrorCode;

fn error_unknown<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::UnknownError {
todo!()
}

type ExistError = ErrorCode;

fn error_exist<D: Display>(
_tenant: &Tenant,
_name: &str,
_ctx: impl FnOnce() -> D,
) -> Self::ExistError {
todo!()
}
}

impl kvapi::Value for StageInfo {
Expand Down
Loading

0 comments on commit beba993

Please sign in to comment.