Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement table_info() for DistTable (#536) #557

Merged
merged 2 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ pub enum Error {
source: meta_client::error::Error,
},

#[snafu(display("Invalid table schema in catalog, source: {:?}", source))]
InvalidSchemaInCatalog {
#[snafu(display("Invalid table info in catalog, source: {}", source))]
InvalidTableInfoInCatalog {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
Expand Down Expand Up @@ -233,7 +233,7 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),
}
}
Expand Down
22 changes: 12 additions & 10 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,7 @@ impl RemoteCatalogManager {
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
schema.register_table(table_key.table_name.to_string(), table_ref)?;
info!("Registered table {}", &table_key.table_name);
if table_value.id > max_table_id {
info!("Max table id: {} -> {}", max_table_id, table_value.id);
max_table_id = table_value.id;
}
max_table_id = max_table_id.max(table_value.table_id());
table_num += 1;
}
info!(
Expand Down Expand Up @@ -311,9 +308,10 @@ impl RemoteCatalogManager {
..
} = table_key;

let table_id = table_value.table_id();

let TableGlobalValue {
id,
meta,
table_info,
regions_id_map,
..
} = table_value;
Expand All @@ -322,14 +320,17 @@ impl RemoteCatalogManager {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id: *id,
table_id,
};
match self
.engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,),
table_info: format!(
"{}.{}.{}, id:{}",
catalog_name, schema_name, table_name, table_id
),
})? {
Some(table) => {
info!(
Expand All @@ -344,6 +345,7 @@ impl RemoteCatalogManager {
catalog_name, schema_name, table_name
);

let meta = &table_info.meta;
let schema = meta
.schema
.clone()
Expand All @@ -353,7 +355,7 @@ impl RemoteCatalogManager {
schema: meta.schema.clone(),
})?;
let req = CreateTableRequest {
id: *id,
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
Expand All @@ -371,7 +373,7 @@ impl RemoteCatalogManager {
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, id
&catalog_name, &schema_name, &table_name, table_id
),
})
}
Expand Down
30 changes: 23 additions & 7 deletions src/common/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableMeta, TableId, TableVersion};
use table::metadata::{RawTableInfo, TableId, TableVersion};

use crate::consts::{
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
Expand Down Expand Up @@ -128,15 +128,18 @@ impl TableGlobalKey {
/// table id, table meta(schema...), region id allocation across datanodes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableGlobalValue {
/// Table id is the same across all datanodes.
pub id: TableId,
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,
// TODO(LFC): Maybe remove it?
/// Allocation of region ids across all datanodes.
pub regions_id_map: HashMap<u64, Vec<u32>>,
// TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way.
pub meta: RawTableMeta,
pub table_info: RawTableInfo,
}

impl TableGlobalValue {
pub fn table_id(&self) -> TableId {
self.table_info.ident.table_id
}
}

/// Table regional info that varies between datanode, so it contains a `node_id` field.
Expand Down Expand Up @@ -279,6 +282,7 @@ define_catalog_value!(
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use table::metadata::{RawTableMeta, TableIdent, TableType};

use super::*;

Expand Down Expand Up @@ -339,11 +343,23 @@ mod tests {
region_numbers: vec![1],
};

let table_info = RawTableInfo {
ident: TableIdent {
table_id: 42,
version: 1,
},
name: "table_1".to_string(),
desc: Some("blah".to_string()),
catalog_name: "catalog_1".to_string(),
schema_name: "schema_1".to_string(),
meta,
table_type: TableType::Base,
};

let value = TableGlobalValue {
id: 42,
node_id: 0,
regions_id_map: HashMap::from([(0, vec![1, 2, 3])]),
meta,
table_info,
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableGlobalValue::parse(&serialized).unwrap();
Expand Down
13 changes: 6 additions & 7 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu};
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
Expand Down Expand Up @@ -276,17 +276,16 @@ impl SchemaProvider for FrontendSchemaProvider {
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
.context(InvalidCatalogValueSnafu)?;

let table = Arc::new(DistTable {
let table = Arc::new(DistTable::new(
table_name,
schema: Arc::new(
val.meta
.schema
Arc::new(
val.table_info
.try_into()
.context(InvalidSchemaInCatalogSnafu)?,
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
),
table_routes,
datanode_clients,
});
));
Ok(Some(table as _))
})
})
Expand Down
18 changes: 15 additions & 3 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sql::statements::create::Partitions;
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::Value as SqlValue;
use table::metadata::RawTableMeta;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};

use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
Expand Down Expand Up @@ -274,11 +274,23 @@ fn create_table_global_value(
created_on: DateTime::default(),
};

let table_info = RawTableInfo {
ident: TableIdent {
table_id: table_route.table.id as u32,
version: 0,
},
name: table_name.table_name.clone(),
desc: create_table.desc.clone(),
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
meta,
table_type: TableType::Base,
};

Ok(TableGlobalValue {
id: table_route.table.id as u32,
node_id,
regions_id_map: HashMap::new(),
meta,
table_info,
})
}

Expand Down
69 changes: 59 additions & 10 deletions src/frontend/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ pub(crate) mod scan;

#[derive(Clone)]
pub struct DistTable {
pub(crate) table_name: TableName,
pub(crate) schema: SchemaRef,
pub(crate) table_routes: Arc<TableRoutes>,
pub(crate) datanode_clients: Arc<DatanodeClients>,
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
}

#[async_trait]
Expand All @@ -68,11 +68,11 @@ impl Table for DistTable {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
self.table_info.meta.schema.clone()
}

fn table_info(&self) -> TableInfoRef {
unimplemented!()
self.table_info.clone()
}

async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
Expand Down Expand Up @@ -133,6 +133,20 @@ impl Table for DistTable {
}

impl DistTable {
pub(crate) fn new(
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
table_name,
table_info,
table_routes,
datanode_clients,
}
}

// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
fn find_regions(
&self,
Expand Down Expand Up @@ -477,6 +491,7 @@ mod test {
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
use tempdir::TempDir;

Expand All @@ -496,11 +511,22 @@ mod test {
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();

let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let table = DistTable {
table_name: table_name.clone(),
schema,
table_info: Arc::new(table_info),
table_routes: table_routes.clone(),
datanode_clients: Arc::new(DatanodeClients::new()),
};
Expand Down Expand Up @@ -862,9 +888,20 @@ mod test {
insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await;
}

let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
DistTable {
table_name,
schema,
table_info: Arc::new(table_info),
table_routes,
datanode_clients,
}
Expand Down Expand Up @@ -968,9 +1005,21 @@ mod test {
ConcreteDataType::int32_datatype(),
true,
)]));
let table_name = TableName::new("greptime", "public", "foo");
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
let table = DistTable {
table_name: TableName::new("greptime", "public", "foo"),
schema,
table_name,
table_info: Arc::new(table_info),
table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))),
datanode_clients: Arc::new(DatanodeClients::new()),
};
Expand Down
3 changes: 1 addition & 2 deletions src/meta-srv/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ async fn fetch_tables(
}
let tv = tv.unwrap();

let table_id = tv.id as u64;
let tr_key = TableRouteKey::with_table_global_key(table_id, &tk);
let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk);
let tr = get_table_route_value(kv_store, &tr_key).await?;

tables.push((tv, tr));
Expand Down