Skip to content

Commit

Permalink
Merge pull request #5907 from andylokandy/table
Browse files Browse the repository at this point in the history
feature(planner): support table statements in new planner
  • Loading branch information
mergify[bot] authored Jun 15, 2022
2 parents 8c8892d + 76252d9 commit c0a46ee
Show file tree
Hide file tree
Showing 27 changed files with 592 additions and 54 deletions.
11 changes: 6 additions & 5 deletions common/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ pub struct TableMeta {
pub engine: String,
pub engine_options: BTreeMap<String, String>,
pub options: BTreeMap<String, String>,
pub cluster_key: Option<String>,
// The vector of cluster keys.
pub current_cluster_key: Option<String>,
// All cluster keys had been used before.
pub cluster_keys: Vec<String>,
// The default cluster keys id.
pub default_cluster_key_id: Option<u32>,
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Default for TableMeta {
engine: "".to_string(),
engine_options: BTreeMap::new(),
options: BTreeMap::new(),
cluster_key: None,
current_cluster_key: None,
cluster_keys: vec![],
default_cluster_key_id: None,
created_on: Default::default(),
Expand All @@ -256,13 +256,14 @@ impl Default for TableMeta {
impl TableMeta {
pub fn push_cluster_key(mut self, cluster_key: String) -> Self {
self.cluster_keys.push(cluster_key.clone());
self.cluster_key = Some(cluster_key);
self.current_cluster_key = Some(cluster_key);
self.default_cluster_key_id = Some(self.cluster_keys.len() as u32 - 1);
self
}

pub fn cluster_key(&self) -> Option<(u32, String)> {
self.default_cluster_key_id.zip(self.cluster_key.clone())
self.default_cluster_key_id
.zip(self.current_cluster_key.clone())
}
}

Expand Down
4 changes: 1 addition & 3 deletions common/planners/src/plan_table_alter_cluster_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ use std::sync::Arc;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;

use crate::Expression;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct AlterTableClusterKeyPlan {
pub tenant: String,
pub catalog: String,
pub database: String,
pub table: String,
pub cluster_keys: Vec<Expression>,
pub cluster_keys: Vec<String>,
}

impl AlterTableClusterKeyPlan {
Expand Down
4 changes: 2 additions & 2 deletions common/proto-conv/src/table_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl FromToProto<pb::TableMeta> for mt::TableMeta {
engine: p.engine,
engine_options: p.engine_options,
options: p.options,
cluster_key: p.cluster_key,
current_cluster_key: p.cluster_key,
cluster_keys: p.cluster_keys,
default_cluster_key_id: p.default_cluster_key_id,
created_on: DateTime::<Utc>::from_pb(p.created_on)?,
Expand All @@ -157,7 +157,7 @@ impl FromToProto<pb::TableMeta> for mt::TableMeta {
engine: self.engine.clone(),
engine_options: self.engine_options.clone(),
options: self.options.clone(),
cluster_key: self.cluster_key.clone(),
cluster_key: self.current_cluster_key.clone(),
cluster_keys: self.cluster_keys.clone(),
default_cluster_key_id: self.default_cluster_key_id,
created_on: self.created_on.to_pb()?,
Expand Down
4 changes: 2 additions & 2 deletions common/proto-conv/tests/it/proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn new_table_info() -> mt::TableInfo {
engine: "44".to_string(),
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
cluster_key: Some("(a + 2, b)".to_string()),
current_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9),
Expand Down Expand Up @@ -331,7 +331,7 @@ fn test_load_old() -> anyhow::Result<()> {
engine: "44".to_string(),
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
cluster_key: Some("(a + 2, b)".to_string()),
current_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec![],
default_cluster_key_id: None,
created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9),
Expand Down
13 changes: 1 addition & 12 deletions query/src/interpreters/interpreter_cluster_key_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::Arc;
use common_exception::Result;
use common_meta_types::GrantObject;
use common_meta_types::UserPrivilegeType;
use common_planners::validate_clustering;
use common_planners::validate_expression;
use common_planners::AlterTableClusterKeyPlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
Expand Down Expand Up @@ -71,16 +69,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
.get_table(tenant.as_str(), &plan.database, &plan.table)
.await?;

let schema = table.schema();
let cluster_keys = plan.cluster_keys.clone();
// Let's validate the expressions firstly.
for expr in cluster_keys.iter() {
validate_expression(expr, &schema)?;
validate_clustering(expr)?;
}

let cluster_key_vec: Vec<String> = cluster_keys.iter().map(|e| e.column_name()).collect();
let cluster_key_str = format!("({})", cluster_key_vec.join(", "));
let cluster_key_str = format!("({})", plan.cluster_keys.join(", "));

table
.alter_table_cluster_keys(self.ctx.clone(), &self.plan.catalog, cluster_key_str)
Expand Down
53 changes: 53 additions & 0 deletions query/src/interpreters/interpreter_factory_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,29 @@ impl InterpreterFactoryV2 {
| DfStatement::List(_)
| DfStatement::DropStage(_)
| DfStatement::RemoveStage(_)
// TODO (andylokandy)
// | DfStatement::ShowDatabases(_)
// | DfStatement::ShowCreateDatabase(_)
// | DfStatement::ShowTables(_)
// | DfStatement::ShowCreateTable(_)
// | DfStatement::DescribeTable(_)
// | DfStatement::ShowTablesStatus(_)
| DfStatement::CreateTable(_)
| DfStatement::CreateView(_)
| DfStatement::AlterView(_)
| DfStatement::DropTable(_)
| DfStatement::UndropTable(_)
| DfStatement::AlterTable(_)
| DfStatement::RenameTable(_)
| DfStatement::TruncateTable(_)
| DfStatement::OptimizeTable(_)
| DfStatement::DropView(_)
| DfStatement::ShowMetrics(_)
| DfStatement::ShowProcessList(_)
| DfStatement::ShowSettings(_)
| DfStatement::CreateDatabase(_)
| DfStatement::DropDatabase(_)
| DfStatement::CreateUser(_)
| DfStatement::AlterDatabase(_)
)
}
Expand All @@ -73,6 +87,12 @@ impl InterpreterFactoryV2 {
Plan::ShowSettings => ShowSettingsInterpreter::try_create(ctx),

// Databases
Plan::ShowDatabases(show_databases) => {
ShowDatabasesInterpreter::try_create(ctx, *show_databases.clone())
}
Plan::ShowCreateDatabase(show_create_database) => {
ShowCreateDatabaseInterpreter::try_create(ctx, *show_create_database.clone())
}
Plan::CreateDatabase(create_database) => {
CreateDatabaseInterpreter::try_create(ctx, *create_database.clone())
}
Expand All @@ -84,9 +104,42 @@ impl InterpreterFactoryV2 {
}

// Tables
Plan::ShowTables(show_tables) => {
ShowTablesInterpreter::try_create(ctx, *show_tables.clone())
}
Plan::ShowCreateTable(show_create_table) => {
ShowCreateTableInterpreter::try_create(ctx, *show_create_table.clone())
}
Plan::DescribeTable(describe_table) => {
DescribeTableInterpreter::try_create(ctx, *describe_table.clone())
}
Plan::ShowTablesStatus(show_tables_status) => {
ShowTablesStatusInterpreter::try_create(ctx, *show_tables_status.clone())
}
Plan::CreateTable(create_table) => {
CreateTableInterpreter::try_create(ctx, *create_table.clone())
}
Plan::DropTable(drop_table) => {
DropTableInterpreter::try_create(ctx, *drop_table.clone())
}
Plan::UndropTable(undrop_table) => {
UndropTableInterpreter::try_create(ctx, *undrop_table.clone())
}
Plan::RenameTable(rename_table) => {
RenameTableInterpreter::try_create(ctx, *rename_table.clone())
}
Plan::AlterTableClusterKey(alter_table_cluster_key) => {
AlterTableClusterKeyInterpreter::try_create(ctx, *alter_table_cluster_key.clone())
}
Plan::DropTableClusterKey(drop_table_cluster_key) => {
DropTableClusterKeyInterpreter::try_create(ctx, *drop_table_cluster_key.clone())
}
Plan::TruncateTable(truncate_table) => {
TruncateTableInterpreter::try_create(ctx, *truncate_table.clone())
}
Plan::OptimizeTable(optimize_table) => {
OptimizeTableInterpreter::try_create(ctx, *optimize_table.clone())
}

// Views
Plan::CreateView(create_view) => {
Expand Down
Loading

0 comments on commit c0a46ee

Please sign in to comment.