Skip to content

Commit

Permalink
feat: Implement table_info() for DistTable (GreptimeTeam#536) (Gr…
Browse files Browse the repository at this point in the history
…eptimeTeam#557)

* feat: Implement `table_info()`` for `DistTable` (GreptimeTeam#536)

* Update src/catalog/src/error.rs

Co-authored-by: Yingwen <1405012107@qq.com>

Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: Yingwen <1405012107@qq.com>
  • Loading branch information
3 people authored and paomian committed Oct 19, 2023
1 parent c88068b commit 4a36e82
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 42 deletions.
6 changes: 3 additions & 3 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ pub enum Error {
source: meta_client::error::Error,
},

#[snafu(display("Invalid table schema in catalog, source: {:?}", source))]
InvalidSchemaInCatalog {
#[snafu(display("Invalid table info in catalog, source: {}", source))]
InvalidTableInfoInCatalog {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
Expand Down Expand Up @@ -233,7 +233,7 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),
}
}
Expand Down
22 changes: 12 additions & 10 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,7 @@ impl RemoteCatalogManager {
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
schema.register_table(table_key.table_name.to_string(), table_ref)?;
info!("Registered table {}", &table_key.table_name);
if table_value.id > max_table_id {
info!("Max table id: {} -> {}", max_table_id, table_value.id);
max_table_id = table_value.id;
}
max_table_id = max_table_id.max(table_value.table_id());
table_num += 1;
}
info!(
Expand Down Expand Up @@ -311,9 +308,10 @@ impl RemoteCatalogManager {
..
} = table_key;

let table_id = table_value.table_id();

let TableGlobalValue {
id,
meta,
table_info,
regions_id_map,
..
} = table_value;
Expand All @@ -322,14 +320,17 @@ impl RemoteCatalogManager {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id: *id,
table_id,
};
match self
.engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,),
table_info: format!(
"{}.{}.{}, id:{}",
catalog_name, schema_name, table_name, table_id
),
})? {
Some(table) => {
info!(
Expand All @@ -344,6 +345,7 @@ impl RemoteCatalogManager {
catalog_name, schema_name, table_name
);

let meta = &table_info.meta;
let schema = meta
.schema
.clone()
Expand All @@ -353,7 +355,7 @@ impl RemoteCatalogManager {
schema: meta.schema.clone(),
})?;
let req = CreateTableRequest {
id: *id,
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
Expand All @@ -371,7 +373,7 @@ impl RemoteCatalogManager {
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, id
&catalog_name, &schema_name, &table_name, table_id
),
})
}
Expand Down
30 changes: 23 additions & 7 deletions src/common/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableMeta, TableId, TableVersion};
use table::metadata::{RawTableInfo, TableId, TableVersion};

use crate::consts::{
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
Expand Down Expand Up @@ -128,15 +128,18 @@ impl TableGlobalKey {
/// table id, table meta(schema...), region id allocation across datanodes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableGlobalValue {
/// Table id is the same across all datanodes.
pub id: TableId,
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,
// TODO(LFC): Maybe remove it?
/// Allocation of region ids across all datanodes.
pub regions_id_map: HashMap<u64, Vec<u32>>,
// TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way.
pub meta: RawTableMeta,
pub table_info: RawTableInfo,
}

impl TableGlobalValue {
pub fn table_id(&self) -> TableId {
self.table_info.ident.table_id
}
}

/// Table regional info that varies between datanode, so it contains a `node_id` field.
Expand Down Expand Up @@ -279,6 +282,7 @@ define_catalog_value!(
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use table::metadata::{RawTableMeta, TableIdent, TableType};

use super::*;

Expand Down Expand Up @@ -339,11 +343,23 @@ mod tests {
region_numbers: vec![1],
};

let table_info = RawTableInfo {
ident: TableIdent {
table_id: 42,
version: 1,
},
name: "table_1".to_string(),
desc: Some("blah".to_string()),
catalog_name: "catalog_1".to_string(),
schema_name: "schema_1".to_string(),
meta,
table_type: TableType::Base,
};

let value = TableGlobalValue {
id: 42,
node_id: 0,
regions_id_map: HashMap::from([(0, vec![1, 2, 3])]),
meta,
table_info,
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableGlobalValue::parse(&serialized).unwrap();
Expand Down
13 changes: 6 additions & 7 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu};
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
Expand Down Expand Up @@ -276,17 +276,16 @@ impl SchemaProvider for FrontendSchemaProvider {
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
.context(InvalidCatalogValueSnafu)?;

let table = Arc::new(DistTable {
let table = Arc::new(DistTable::new(
table_name,
schema: Arc::new(
val.meta
.schema
Arc::new(
val.table_info
.try_into()
.context(InvalidSchemaInCatalogSnafu)?,
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
),
table_routes,
datanode_clients,
});
));
Ok(Some(table as _))
})
})
Expand Down
18 changes: 15 additions & 3 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sql::statements::create::Partitions;
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::Value as SqlValue;
use table::metadata::RawTableMeta;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};

use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
Expand Down Expand Up @@ -274,11 +274,23 @@ fn create_table_global_value(
created_on: DateTime::default(),
};

let table_info = RawTableInfo {
ident: TableIdent {
table_id: table_route.table.id as u32,
version: 0,
},
name: table_name.table_name.clone(),
desc: create_table.desc.clone(),
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta,
table_type: TableType::Base,
};

Ok(TableGlobalValue {
id: table_route.table.id as u32,
node_id,
regions_id_map: HashMap::new(),
meta,
table_info,
})
}

Expand Down
69 changes: 59 additions & 10 deletions src/frontend/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ pub(crate) mod scan;

#[derive(Clone)]
pub struct DistTable {
pub(crate) table_name: TableName,
pub(crate) schema: SchemaRef,
pub(crate) table_routes: Arc<TableRoutes>,
pub(crate) datanode_clients: Arc<DatanodeClients>,
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
}

#[async_trait]
Expand All @@ -68,11 +68,11 @@ impl Table for DistTable {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
self.table_info.meta.schema.clone()
}

fn table_info(&self) -> TableInfoRef {
unimplemented!()
self.table_info.clone()
}

async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
Expand Down Expand Up @@ -133,6 +133,20 @@ impl Table for DistTable {
}

impl DistTable {
pub(crate) fn new(
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
table_name,
table_info,
table_routes,
datanode_clients,
}
}

// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
fn find_regions(
&self,
Expand Down Expand Up @@ -477,6 +491,7 @@ mod test {
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
use tempdir::TempDir;

Expand All @@ -496,11 +511,22 @@ mod test {
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();

let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let table = DistTable {
table_name: table_name.clone(),
schema,
table_info: Arc::new(table_info),
table_routes: table_routes.clone(),
datanode_clients: Arc::new(DatanodeClients::new()),
};
Expand Down Expand Up @@ -862,9 +888,20 @@ mod test {
insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await;
}

let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
DistTable {
table_name,
schema,
table_info: Arc::new(table_info),
table_routes,
datanode_clients,
}
Expand Down Expand Up @@ -968,9 +1005,21 @@ mod test {
ConcreteDataType::int32_datatype(),
true,
)]));
let table_name = TableName::new("greptime", "public", "foo");
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
let table = DistTable {
table_name: TableName::new("greptime", "public", "foo"),
schema,
table_name,
table_info: Arc::new(table_info),
table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))),
datanode_clients: Arc::new(DatanodeClients::new()),
};
Expand Down
3 changes: 1 addition & 2 deletions src/meta-srv/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ async fn fetch_tables(
}
let tv = tv.unwrap();

let table_id = tv.id as u64;
let tr_key = TableRouteKey::with_table_global_key(table_id, &tk);
let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk);
let tr = get_table_route_value(kv_store, &tr_key).await?;

tables.push((tv, tr));
Expand Down

0 comments on commit 4a36e82

Please sign in to comment.