Skip to content

Commit

Permalink
Merge pull request #3136 from BohuTANG/dev-catalog-3098
Browse files Browse the repository at this point in the history
STORAGE-3098:  Catalog refactor
  • Loading branch information
BohuTANG authored Nov 29, 2021
2 parents 0508a66 + 1069a60 commit 7dae4cb
Show file tree
Hide file tree
Showing 136 changed files with 831 additions and 1,473 deletions.
48 changes: 43 additions & 5 deletions query/src/catalogs/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateDatabaseReq;
use common_meta_types::CreateTableReq;
use common_meta_types::DropDatabaseReq;
use common_meta_types::DropTableReply;
use common_meta_types::DropTableReq;
use common_meta_types::MetaId;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
Expand All @@ -30,7 +33,7 @@ use dyn_clone::DynClone;
use crate::catalogs::Database;
use crate::catalogs::Table;
use crate::catalogs::TableFunction;
use crate::datasources::table_func_engine::TableArgs;
use crate::table_functions::TableArgs;

/// Catalog is the global view of all the databases of the user.
/// The global view has many engine type: Local-Database(engine=Local), Remote-Database(engine=Remote)
Expand All @@ -39,6 +42,10 @@ use crate::datasources::table_func_engine::TableArgs;
/// and use the engine to create them.
#[async_trait::async_trait]
pub trait Catalog: DynClone + Send + Sync {
///
/// Database.
///

// Get the database by name.
async fn get_database(&self, db_name: &str) -> Result<Arc<dyn Database>>;

Expand All @@ -63,14 +70,48 @@ pub trait Catalog: DynClone + Send + Sync {
}
}

///
/// Table.
///

// Build a `Arc<dyn Table>` from `TableInfo`.
fn build_table(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>>;
fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>>;

// Get the table meta by meta id.
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc<TableMeta>)>;

// Get one table by db and table name.
async fn get_table(&self, db_name: &str, table_name: &str) -> Result<Arc<dyn Table>>;

async fn list_tables(&self, db_name: &str) -> Result<Vec<Arc<dyn Table>>>;

async fn create_table(&self, req: CreateTableReq) -> Result<()>;

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply>;

// Check a db.table is exists or not.
async fn exists_table(&self, db_name: &str, table_name: &str) -> Result<bool> {
match self.get_table(db_name, table_name).await {
Ok(_) => Ok(true),
Err(err) => {
if err.code() == ErrorCode::UnknownTableCode() {
Ok(false)
} else {
Err(err)
}
}
}
}

async fn upsert_table_option(
&self,
req: UpsertTableOptionReq,
) -> Result<UpsertTableOptionReply>;

///
/// Table function
///

// Get function by name.
fn get_table_function(
&self,
Expand All @@ -79,7 +120,4 @@ pub trait Catalog: DynClone + Send + Sync {
) -> Result<Arc<dyn TableFunction>> {
unimplemented!()
}

// Get the table meta by meta id.
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc<TableMeta>)>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ use common_dal::InMemoryData;
use common_infallible::RwLock;
use common_meta_api::MetaApi;

use crate::datasources::database_engine_registry::DatabaseEngineRegistry;
use crate::datasources::table_engine_registry::TableEngineRegistry;
use crate::storages::StorageFactory;

/// Datasource Context.
#[derive(Clone)]
pub struct DataSourceContext {
pub struct CatalogContext {
pub meta: Arc<dyn MetaApi>,
pub storage_factory: Arc<StorageFactory>,
pub in_memory_data: Arc<RwLock<InMemoryData<u64>>>,
pub table_engine_registry: Arc<TableEngineRegistry>,
pub database_engine_registry: Arc<DatabaseEngineRegistry>,
}
47 changes: 15 additions & 32 deletions query/src/catalogs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::CreateTableReq;
use common_meta_types::DropTableReply;
use common_meta_types::DropTableReq;
use dyn_clone::DynClone;

use crate::catalogs::Table;

#[async_trait::async_trait]
pub trait Database: DynClone + Sync + Send {
/// Database name.
fn name(&self) -> &str;
}

async fn init(&self) -> Result<()> {
Ok(())
}

// Get one table by db and table name.
async fn get_table(&self, db_name: &str, table_name: &str) -> Result<Arc<dyn Table>>;

async fn list_tables(&self, db_name: &str) -> Result<Vec<Arc<dyn Table>>>;

async fn create_table(&self, req: CreateTableReq) -> Result<()>;

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply>;
#[derive(Clone)]
pub struct DefaultDatabase {
db_name: String,
}

// Check a db.table is exists or not.
async fn exists_table(&self, db_name: &str, table_name: &str) -> Result<bool> {
match self.get_table(db_name, table_name).await {
Ok(_) => Ok(true),
Err(err) => {
if err.code() == ErrorCode::UnknownTableCode() {
Ok(false)
} else {
Err(err)
}
}
impl DefaultDatabase {
pub fn new(db_name: impl Into<String>) -> Self {
Self {
db_name: db_name.into(),
}
}
}

impl Database for DefaultDatabase {
fn name(&self) -> &str {
&self.db_name
}
}
107 changes: 72 additions & 35 deletions query/src/catalogs/impls/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
// limitations under the License.
//

use std::collections::HashMap;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::CreateDatabaseReply;
use common_meta_types::CreateDatabaseReq;
use common_meta_types::CreateTableReq;
use common_meta_types::DropDatabaseReq;
use common_meta_types::DropTableReply;
use common_meta_types::DropTableReq;
use common_meta_types::MetaId;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
Expand All @@ -35,10 +37,8 @@ use crate::catalogs::Database;
use crate::catalogs::Table;
use crate::catalogs::TableFunction;
use crate::configs::Config;
use crate::datasources::prelude_func_engines;
use crate::datasources::TableArgs;
use crate::datasources::TableFuncEngine;
use crate::datasources::TableFuncEngineRegistry;
use crate::table_functions::TableArgs;
use crate::table_functions::TableFunctionFactory;

/// Combine two catalogs together
/// - read/search like operations are always performed at
Expand All @@ -51,38 +51,38 @@ pub struct DatabaseCatalog {
/// bottom layer, writing goes here
mutable_catalog: Arc<dyn Catalog>,
/// table function engine factories
func_engine_registry: TableFuncEngineRegistry,
table_function_factory: Arc<TableFunctionFactory>,
}

impl DatabaseCatalog {
pub fn create(
immutable_catalog: Arc<dyn Catalog>,
mutable_catalog: Arc<dyn Catalog>,
func_engine_registry: HashMap<String, (u64, Arc<dyn TableFuncEngine>)>,
table_function_factory: Arc<TableFunctionFactory>,
) -> Self {
Self {
immutable_catalog,
mutable_catalog,
func_engine_registry,
table_function_factory,
}
}

pub async fn try_create_with_config(conf: Config) -> Result<DatabaseCatalog> {
let immutable_catalog = ImmutableCatalog::try_create_with_config(&conf).await?;
let mutable_catalog = MutableCatalog::try_create_with_config(conf).await?;
let func_engine_registry = prelude_func_engines();
let table_function_factory = TableFunctionFactory::create();
let res = DatabaseCatalog::create(
Arc::new(immutable_catalog),
Arc::new(mutable_catalog),
func_engine_registry,
Arc::new(table_function_factory),
);
Ok(res)
}
}

#[async_trait::async_trait]
impl Catalog for DatabaseCatalog {
async fn get_database(&self, db_name: &str) -> common_exception::Result<Arc<dyn Database>> {
async fn get_database(&self, db_name: &str) -> Result<Arc<dyn Database>> {
let r = self.immutable_catalog.get_database(db_name).await;
match r {
Err(e) => {
Expand All @@ -96,7 +96,7 @@ impl Catalog for DatabaseCatalog {
}
}

async fn list_databases(&self) -> common_exception::Result<Vec<Arc<dyn Database>>> {
async fn list_databases(&self) -> Result<Vec<Arc<dyn Database>>> {
let mut dbs = self.immutable_catalog.list_databases().await?;
let mut other = self.mutable_catalog.list_databases().await?;
dbs.append(&mut other);
Expand Down Expand Up @@ -125,20 +125,76 @@ impl Catalog for DatabaseCatalog {
self.mutable_catalog.drop_database(req).await
}

fn build_table(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let res = self.immutable_catalog.build_table(table_info);
fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let res = self.immutable_catalog.get_table_by_info(table_info);
match res {
Ok(t) => Ok(t),
Err(e) => {
if e.code() == ErrorCode::UnknownTable("").code() {
self.mutable_catalog.build_table(table_info)
self.mutable_catalog.get_table_by_info(table_info)
} else {
Err(e)
}
}
}
}

async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc<TableMeta>)> {
let res = self.immutable_catalog.get_table_meta_by_id(table_id).await;

if let Ok(x) = res {
Ok(x)
} else {
self.mutable_catalog.get_table_meta_by_id(table_id).await
}
}

async fn get_table(&self, db_name: &str, table_name: &str) -> Result<Arc<dyn Table>> {
let res = self.immutable_catalog.get_table(db_name, table_name).await;
match res {
Ok(v) => Ok(v),
Err(e) => {
if e.code() == ErrorCode::UnknownDatabaseCode() {
self.mutable_catalog.get_table(db_name, table_name).await
} else {
Err(e)
}
}
}
}

async fn list_tables(&self, db_name: &str) -> Result<Vec<Arc<dyn Table>>> {
let r = self.immutable_catalog.list_tables(db_name).await;
match r {
Ok(x) => Ok(x),
Err(e) => {
if e.code() == ErrorCode::UnknownDatabaseCode() {
self.mutable_catalog.list_tables(db_name).await
} else {
Err(e)
}
}
}
}

async fn create_table(&self, req: CreateTableReq) -> Result<()> {
self.mutable_catalog.create_table(req).await
}

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply> {
let r = self.immutable_catalog.drop_table(req.clone()).await;
match r {
Err(e) => {
if e.code() == ErrorCode::UnknownTableCode() {
self.mutable_catalog.drop_table(req).await
} else {
Err(e)
}
}
Ok(x) => Ok(x),
}
}

async fn upsert_table_option(
&self,
req: UpsertTableOptionReq,
Expand All @@ -152,25 +208,6 @@ impl Catalog for DatabaseCatalog {
func_name: &str,
tbl_args: TableArgs,
) -> Result<Arc<dyn TableFunction>> {
let (id, factory) = self.func_engine_registry.get(func_name).ok_or_else(|| {
ErrorCode::UnknownTable(format!("Unknown table function {}", func_name))
})?;

// table function belongs to no/every database
let func = factory.try_create("", func_name, *id, tbl_args)?;
Ok(func)
}

async fn get_table_meta_by_id(
&self,
table_id: MetaId,
) -> common_exception::Result<(TableIdent, Arc<TableMeta>)> {
let res = self.immutable_catalog.get_table_meta_by_id(table_id).await;

if let Ok(x) = res {
Ok(x)
} else {
self.mutable_catalog.get_table_meta_by_id(table_id).await
}
self.table_function_factory.get(func_name, tbl_args)
}
}
Loading

0 comments on commit 7dae4cb

Please sign in to comment.