Skip to content

Commit

Permalink
use mget_databases replace for
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Sep 10, 2024
1 parent f74c975 commit ee2fc51
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 83 deletions.
6 changes: 6 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_meta_app::schema::catalog_id_ident::CatalogId;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_id_ident::DictionaryId;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::index_id_ident::IndexId;
Expand Down Expand Up @@ -231,6 +232,11 @@ pub trait SchemaApi: Send + Sync {

async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String, KVAppError>;

async fn mget_databases(
&self,
db_names: Vec<DatabaseNameIdent>,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError>;

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>, MetaError>;

async fn get_table_copied_file_info(
Expand Down
21 changes: 21 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ use crate::list_keys;
use crate::list_u64_value;
use crate::meta_txn_error::MetaTxnError;
use crate::name_id_value_api::NameIdValueApi;
use crate::name_id_value_api::NameIdValueApiCompat;
use crate::name_value_api::NameValueApi;
use crate::send_txn;
use crate::serialize_struct;
Expand Down Expand Up @@ -1869,6 +1870,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
Ok(seq_meta.data.database_name().to_string())
}

#[logcall::logcall]
#[fastrace::trace]
async fn mget_databases(
&self,
db_names: Vec<DatabaseNameIdent>,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
debug!(req :? =(&db_names); "SchemaApi: {}", func_name!());
let res = self.mget_id_value_compat(db_names.into_iter()).await?;
let res = res
.map(|(name_ident, database_id, meta)| {
Arc::new(DatabaseInfo {
database_id,
name_ident,
meta,
})
})
.collect::<Vec<Arc<DatabaseInfo>>>();
Ok(res)
}

#[logcall::logcall]
#[fastrace::trace]
async fn mget_database_names_by_ids(
Expand Down
8 changes: 8 additions & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -223,6 +224,13 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
// Get the db name by meta id.
async fn get_db_name_by_id(&self, db_ids: MetaId) -> Result<String>;

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>>;

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -338,6 +339,38 @@ impl Catalog for DatabaseCatalog {
}
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let sys_dbs = self.immutable_catalog.list_databases(tenant).await?;
let sys_db_names: Vec<_> = sys_dbs
.iter()
.map(|sys_db| sys_db.get_db_info().name_ident.database_name())
.collect();

let mut mut_db_names: Vec<_> = Vec::new();
for db_name in db_names {
if !sys_db_names.contains(&db_name.database_name()) {
mut_db_names.push(db_name.clone());
}
}

let mut dbs = self
.immutable_catalog
.mget_databases(tenant, db_names)
.await?;

let other = self
.mutable_catalog
.mget_databases(tenant, &mut_db_names)
.await?;

dbs.extend(other);
Ok(dbs)
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::catalog::Catalog;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -232,6 +233,23 @@ impl Catalog for ImmutableCatalog {
}
}

async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let mut res: Vec<Arc<dyn Database>> = vec![];
for db_name in db_names {
let db_name = db_name.database_name();
if db_name == "system" {
res.push(self.sys_db.clone());
} else if db_name == "information_schema" {
res.push(self.info_schema_db.clone());
}
}
Ok(res)
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
15 changes: 15 additions & 0 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ impl Catalog for MutableCatalog {
Ok(res)
}

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let dbs = self.ctx.meta.mget_databases(db_names.to_vec()).await?;

dbs.iter().try_fold(vec![], |mut acc, item| {
let db = self.build_db_instance(item)?;
acc.push(db);
Ok(acc)
})
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_function::TableFunction;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -305,6 +306,14 @@ impl Catalog for SessionCatalog {
self.inner.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.inner.mget_databases(tenant, db_names).await
}

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -208,6 +209,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

async fn mget_database_names_by_ids(
&self,
tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -957,6 +958,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/hive/hive/src/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -385,6 +386,16 @@ impl Catalog for HiveCatalog {
))
}

async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in HIVE catalog",
))
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
10 changes: 10 additions & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -295,6 +296,15 @@ impl Catalog for IcebergCatalog {
"Cannot get db name by id in ICEBERG catalog",
))
}
async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in ICEBERG catalog",
))
}

async fn mget_database_names_by_ids(
&self,
Expand Down
50 changes: 25 additions & 25 deletions src/query/storages/system/src/columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRefExt;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -291,39 +292,38 @@ pub(crate) async fn dump_tables(
}
} else {
let catalog_dbs = visibility_checker.get_visibility_database();
// None means has global level privileges
if let Some(catalog_dbs) = catalog_dbs {
for (catalog_name, dbs) in catalog_dbs {
if catalog_name == CATALOG_DEFAULT {
let mut catalog_db_ids = vec![];
for (db_name, db_id) in dbs {
if let Some(db_name) = db_name {
let db_id = catalog
.get_database(&tenant, db_name)
.await?
.get_db_info()
.database_id
.db_id;
final_dbs.push((db_name.to_string(), db_id));
}
if let Some(db_id) = db_id {
catalog_db_ids.push(*db_id);
}
}
match catalog
let mut catalog_db_names = vec![];
catalog_db_names.extend(
dbs.iter()
.filter_map(|(db_name, _)| *db_name)
.map(|db_name| db_name.to_string()),
);
catalog_db_ids.extend(dbs.iter().filter_map(|(_, db_id)| *db_id));
if let Ok(databases) = catalog
.mget_database_names_by_ids(&tenant, &catalog_db_ids)
.await
{
Ok(databases) => {
for (i, db) in databases.into_iter().flatten().enumerate() {
final_dbs.push((db.to_string(), catalog_db_ids[i]));
}
}
Err(err) => {
let msg =
format!("Failed to get database: {}, {}", catalog.name(), err);
warn!("{}", msg);
}
catalog_db_names.extend(databases.into_iter().flatten());
} else {
let msg = format!("Failed to get database name by id: {}", catalog.name());
warn!("{}", msg);
}
let db_idents = catalog_db_names
.iter()
.map(|name| DatabaseNameIdent::new(&tenant, name))
.collect::<Vec<DatabaseNameIdent>>();
let dbs: Vec<(String, u64)> = catalog
.mget_databases(&tenant, &db_idents)
.await?
.iter()
.map(|db| (db.name().to_string(), db.get_db_info().database_id.db_id))
.collect();
final_dbs.extend(dbs);
}
}
} else {
Expand Down
Loading

0 comments on commit ee2fc51

Please sign in to comment.