diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index a3895973941f..24ab530f4ed4 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -185,8 +185,8 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Invalid table schema in catalog, source: {:?}", source))] - InvalidSchemaInCatalog { + #[snafu(display("Invalid table info in catalog, source: {}", source))] + InvalidTableInfoInCatalog { #[snafu(backtrace)] source: datatypes::error::Error, }, @@ -233,7 +233,7 @@ impl ErrorExt for Error { Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableSchema { source, .. } => source.status_code(), - Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected, + Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected, Error::Internal { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 5fd814ec1c73..5c4ddd680eba 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -250,10 +250,7 @@ impl RemoteCatalogManager { let table_ref = self.open_or_create_table(&table_key, &table_value).await?; schema.register_table(table_key.table_name.to_string(), table_ref)?; info!("Registered table {}", &table_key.table_name); - if table_value.id > max_table_id { - info!("Max table id: {} -> {}", max_table_id, table_value.id); - max_table_id = table_value.id; - } + max_table_id = max_table_id.max(table_value.table_id()); table_num += 1; } info!( @@ -311,9 +308,10 @@ impl RemoteCatalogManager { .. } = table_key; + let table_id = table_value.table_id(); + let TableGlobalValue { - id, - meta, + table_info, regions_id_map, .. } = table_value; @@ -322,14 +320,17 @@ impl RemoteCatalogManager { catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), table_name: table_name.clone(), - table_id: *id, + table_id, }; match self .engine .open_table(&context, request) .await .with_context(|_| OpenTableSnafu { - table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,), + table_info: format!( + "{}.{}.{}, id:{}", + catalog_name, schema_name, table_name, table_id + ), })? { Some(table) => { info!( @@ -344,6 +345,7 @@ impl RemoteCatalogManager { catalog_name, schema_name, table_name ); + let meta = &table_info.meta; let schema = meta .schema .clone() @@ -353,7 +355,7 @@ impl RemoteCatalogManager { schema: meta.schema.clone(), })?; let req = CreateTableRequest { - id: *id, + id: table_id, catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), table_name: table_name.clone(), @@ -371,7 +373,7 @@ impl RemoteCatalogManager { .context(CreateTableSnafu { table_info: format!( "{}.{}.{}, id:{}", - &catalog_name, &schema_name, &table_name, id + &catalog_name, &schema_name, &table_name, table_id ), }) } diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index e36778c94640..ccfe3629691a 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -19,7 +19,7 @@ use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize, Serializer}; use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::{RawTableMeta, TableId, TableVersion}; +use table::metadata::{RawTableInfo, TableId, TableVersion}; use crate::consts::{ CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX, @@ -128,15 +128,18 @@ impl TableGlobalKey { /// table id, table meta(schema...), region id allocation across datanodes. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TableGlobalValue { - /// Table id is the same across all datanodes. - pub id: TableId, /// Id of datanode that created the global table info kv. only for debugging. pub node_id: u64, // TODO(LFC): Maybe remove it? /// Allocation of region ids across all datanodes. pub regions_id_map: HashMap>, - // TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way. - pub meta: RawTableMeta, + pub table_info: RawTableInfo, +} + +impl TableGlobalValue { + pub fn table_id(&self) -> TableId { + self.table_info.ident.table_id + } } /// Table regional info that varies between datanode, so it contains a `node_id` field. @@ -279,6 +282,7 @@ define_catalog_value!( mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema}; + use table::metadata::{RawTableMeta, TableIdent, TableType}; use super::*; @@ -339,11 +343,23 @@ mod tests { region_numbers: vec![1], }; + let table_info = RawTableInfo { + ident: TableIdent { + table_id: 42, + version: 1, + }, + name: "table_1".to_string(), + desc: Some("blah".to_string()), + catalog_name: "catalog_1".to_string(), + schema_name: "schema_1".to_string(), + meta, + table_type: TableType::Base, + }; + let value = TableGlobalValue { - id: 42, node_id: 0, regions_id_map: HashMap::from([(0, vec![1, 2, 3])]), - meta, + table_info, }; let serialized = serde_json::to_string(&value).unwrap(); let deserialized = TableGlobalValue::parse(&serialized).unwrap(); diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 0cd4e9a30e03..2e5d7b64d48b 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; -use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu}; +use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu}; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest, @@ -276,17 +276,16 @@ impl SchemaProvider for FrontendSchemaProvider { let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1)) .context(InvalidCatalogValueSnafu)?; - let table = Arc::new(DistTable { + let table = Arc::new(DistTable::new( table_name, - schema: Arc::new( - val.meta - .schema + Arc::new( + val.table_info .try_into() - .context(InvalidSchemaInCatalogSnafu)?, + .context(catalog_err::InvalidTableInfoInCatalogSnafu)?, ), table_routes, datanode_clients, - }); + )); Ok(Some(table as _)) }) }) diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index ced630b6180c..7d5764b7c529 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -37,7 +37,7 @@ use sql::statements::create::Partitions; use sql::statements::sql_value_to_value; use sql::statements::statement::Statement; use sqlparser::ast::Value as SqlValue; -use table::metadata::RawTableMeta; +use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; @@ -274,11 +274,23 @@ fn create_table_global_value( created_on: DateTime::default(), }; + let table_info = RawTableInfo { + ident: TableIdent { + table_id: table_route.table.id as u32, + version: 0, + }, + name: table_name.table_name.clone(), + desc: create_table.desc.clone(), + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + meta, + table_type: TableType::Base, + }; + Ok(TableGlobalValue { - id: table_route.table.id as u32, node_id, regions_id_map: HashMap::new(), - meta, + table_info, }) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 903f44032a55..8499972cb445 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -55,10 +55,10 @@ pub(crate) mod scan; #[derive(Clone)] pub struct DistTable { - pub(crate) table_name: TableName, - pub(crate) schema: SchemaRef, - pub(crate) table_routes: Arc, - pub(crate) datanode_clients: Arc, + table_name: TableName, + table_info: TableInfoRef, + table_routes: Arc, + datanode_clients: Arc, } #[async_trait] @@ -68,11 +68,11 @@ impl Table for DistTable { } fn schema(&self) -> SchemaRef { - self.schema.clone() + self.table_info.meta.schema.clone() } fn table_info(&self) -> TableInfoRef { - unimplemented!() + self.table_info.clone() } async fn insert(&self, request: InsertRequest) -> table::Result { @@ -133,6 +133,20 @@ impl Table for DistTable { } impl DistTable { + pub(crate) fn new( + table_name: TableName, + table_info: TableInfoRef, + table_routes: Arc, + datanode_clients: Arc, + ) -> Self { + Self { + table_name, + table_info, + table_routes, + datanode_clients, + } + } + // TODO(LFC): Finding regions now seems less efficient, should be further looked into. fn find_regions( &self, @@ -477,6 +491,7 @@ mod test { use sql::parser::ParserContext; use sql::statements::statement::Statement; use sqlparser::dialect::GenericDialect; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::TableRef; use tempdir::TempDir; @@ -496,11 +511,22 @@ mod test { ColumnSchema::new("b", ConcreteDataType::string_datatype(), true), ]; let schema = Arc::new(Schema::new(column_schemas.clone())); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .next_column_id(1) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(&table_name.table_name) + .meta(meta) + .build() + .unwrap(); let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))); let table = DistTable { table_name: table_name.clone(), - schema, + table_info: Arc::new(table_info), table_routes: table_routes.clone(), datanode_clients: Arc::new(DatanodeClients::new()), }; @@ -862,9 +888,20 @@ mod test { insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await; } + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .next_column_id(1) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(&table_name.table_name) + .meta(meta) + .build() + .unwrap(); DistTable { table_name, - schema, + table_info: Arc::new(table_info), table_routes, datanode_clients, } @@ -968,9 +1005,21 @@ mod test { ConcreteDataType::int32_datatype(), true, )])); + let table_name = TableName::new("greptime", "public", "foo"); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .next_column_id(1) + .build() + .unwrap(); + let table_info = TableInfoBuilder::default() + .name(&table_name.table_name) + .meta(meta) + .build() + .unwrap(); let table = DistTable { - table_name: TableName::new("greptime", "public", "foo"), - schema, + table_name, + table_info: Arc::new(table_info), table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))), datanode_clients: Arc::new(DatanodeClients::new()), }; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 02e65f8a0b42..11226fca1a88 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -184,8 +184,7 @@ async fn fetch_tables( } let tv = tv.unwrap(); - let table_id = tv.id as u64; - let tr_key = TableRouteKey::with_table_global_key(table_id, &tk); + let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk); let tr = get_table_route_value(kv_store, &tr_key).await?; tables.push((tv, tr));