Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(planner): support table statements in new planner #5907

Merged
merged 11 commits into from
Jun 15, 2022
11 changes: 6 additions & 5 deletions common/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ pub struct TableMeta {
pub engine: String,
pub engine_options: BTreeMap<String, String>,
pub options: BTreeMap<String, String>,
pub cluster_key: Option<String>,
// The vector of cluster keys.
pub current_cluster_key: Option<String>,
Copy link
Member

@zhyass zhyass Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @andylokandy
I think default_cluster_key is better than current_cluster_key. Because we have a default_cluster_key_id already.
#6012

// All cluster keys had been used before.
pub cluster_keys: Vec<String>,
// The default cluster keys id.
pub default_cluster_key_id: Option<u32>,
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Default for TableMeta {
engine: "".to_string(),
engine_options: BTreeMap::new(),
options: BTreeMap::new(),
cluster_key: None,
current_cluster_key: None,
cluster_keys: vec![],
default_cluster_key_id: None,
created_on: Default::default(),
Expand All @@ -256,13 +256,14 @@ impl Default for TableMeta {
impl TableMeta {
pub fn push_cluster_key(mut self, cluster_key: String) -> Self {
self.cluster_keys.push(cluster_key.clone());
self.cluster_key = Some(cluster_key);
self.current_cluster_key = Some(cluster_key);
self.default_cluster_key_id = Some(self.cluster_keys.len() as u32 - 1);
self
}

pub fn cluster_key(&self) -> Option<(u32, String)> {
self.default_cluster_key_id.zip(self.cluster_key.clone())
self.default_cluster_key_id
.zip(self.current_cluster_key.clone())
}
}

Expand Down
4 changes: 1 addition & 3 deletions common/planners/src/plan_table_alter_cluster_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ use std::sync::Arc;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;

use crate::Expression;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct AlterTableClusterKeyPlan {
pub tenant: String,
pub catalog: String,
pub database: String,
pub table: String,
pub cluster_keys: Vec<Expression>,
pub cluster_keys: Vec<String>,
}

impl AlterTableClusterKeyPlan {
Expand Down
4 changes: 2 additions & 2 deletions common/proto-conv/src/table_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl FromToProto<pb::TableMeta> for mt::TableMeta {
engine: p.engine,
engine_options: p.engine_options,
options: p.options,
cluster_key: p.cluster_key,
current_cluster_key: p.cluster_key,
cluster_keys: p.cluster_keys,
default_cluster_key_id: p.default_cluster_key_id,
created_on: DateTime::<Utc>::from_pb(p.created_on)?,
Expand All @@ -157,7 +157,7 @@ impl FromToProto<pb::TableMeta> for mt::TableMeta {
engine: self.engine.clone(),
engine_options: self.engine_options.clone(),
options: self.options.clone(),
cluster_key: self.cluster_key.clone(),
cluster_key: self.current_cluster_key.clone(),
cluster_keys: self.cluster_keys.clone(),
default_cluster_key_id: self.default_cluster_key_id,
created_on: self.created_on.to_pb()?,
Expand Down
4 changes: 2 additions & 2 deletions common/proto-conv/tests/it/proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ fn new_table_info() -> mt::TableInfo {
engine: "44".to_string(),
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
cluster_key: Some("(a + 2, b)".to_string()),
current_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec!["(a + 2, b)".to_string()],
default_cluster_key_id: Some(0),
created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9),
Expand Down Expand Up @@ -331,7 +331,7 @@ fn test_load_old() -> anyhow::Result<()> {
engine: "44".to_string(),
engine_options: btreemap! {s("abc") => s("def")},
options: btreemap! {s("xyz") => s("foo")},
cluster_key: Some("(a + 2, b)".to_string()),
current_cluster_key: Some("(a + 2, b)".to_string()),
cluster_keys: vec![],
default_cluster_key_id: None,
created_on: Utc.ymd(2014, 11, 28).and_hms(12, 0, 9),
Expand Down
13 changes: 1 addition & 12 deletions query/src/interpreters/interpreter_cluster_key_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::Arc;
use common_exception::Result;
use common_meta_types::GrantObject;
use common_meta_types::UserPrivilegeType;
use common_planners::validate_clustering;
use common_planners::validate_expression;
use common_planners::AlterTableClusterKeyPlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
Expand Down Expand Up @@ -71,16 +69,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
.get_table(tenant.as_str(), &plan.database, &plan.table)
.await?;

let schema = table.schema();
let cluster_keys = plan.cluster_keys.clone();
// Let's validate the expressions firstly.
for expr in cluster_keys.iter() {
validate_expression(expr, &schema)?;
validate_clustering(expr)?;
}

let cluster_key_vec: Vec<String> = cluster_keys.iter().map(|e| e.column_name()).collect();
let cluster_key_str = format!("({})", cluster_key_vec.join(", "));
let cluster_key_str = format!("({})", plan.cluster_keys.join(", "));

table
.alter_table_cluster_keys(self.ctx.clone(), &self.plan.catalog, cluster_key_str)
Expand Down
53 changes: 53 additions & 0 deletions query/src/interpreters/interpreter_factory_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,29 @@ impl InterpreterFactoryV2 {
| DfStatement::List(_)
| DfStatement::DropStage(_)
| DfStatement::RemoveStage(_)
// TODO (andylokandy)
// | DfStatement::ShowDatabases(_)
// | DfStatement::ShowCreateDatabase(_)
// | DfStatement::ShowTables(_)
// | DfStatement::ShowCreateTable(_)
// | DfStatement::DescribeTable(_)
// | DfStatement::ShowTablesStatus(_)
| DfStatement::CreateTable(_)
| DfStatement::CreateView(_)
| DfStatement::AlterView(_)
| DfStatement::DropTable(_)
| DfStatement::UndropTable(_)
| DfStatement::AlterTable(_)
| DfStatement::RenameTable(_)
| DfStatement::TruncateTable(_)
| DfStatement::OptimizeTable(_)
| DfStatement::DropView(_)
| DfStatement::ShowMetrics(_)
| DfStatement::ShowProcessList(_)
| DfStatement::ShowSettings(_)
| DfStatement::CreateDatabase(_)
| DfStatement::DropDatabase(_)
| DfStatement::CreateUser(_)
| DfStatement::AlterDatabase(_)
)
}
Expand All @@ -73,6 +87,12 @@ impl InterpreterFactoryV2 {
Plan::ShowSettings => ShowSettingsInterpreter::try_create(ctx),

// Databases
Plan::ShowDatabases(show_databases) => {
ShowDatabasesInterpreter::try_create(ctx, *show_databases.clone())
}
Plan::ShowCreateDatabase(show_create_database) => {
ShowCreateDatabaseInterpreter::try_create(ctx, *show_create_database.clone())
}
Plan::CreateDatabase(create_database) => {
CreateDatabaseInterpreter::try_create(ctx, *create_database.clone())
}
Expand All @@ -84,9 +104,42 @@ impl InterpreterFactoryV2 {
}

// Tables
Plan::ShowTables(show_tables) => {
ShowTablesInterpreter::try_create(ctx, *show_tables.clone())
}
Plan::ShowCreateTable(show_create_table) => {
ShowCreateTableInterpreter::try_create(ctx, *show_create_table.clone())
}
Plan::DescribeTable(describe_table) => {
DescribeTableInterpreter::try_create(ctx, *describe_table.clone())
}
Plan::ShowTablesStatus(show_tables_status) => {
ShowTablesStatusInterpreter::try_create(ctx, *show_tables_status.clone())
}
Plan::CreateTable(create_table) => {
CreateTableInterpreter::try_create(ctx, *create_table.clone())
}
Plan::DropTable(drop_table) => {
DropTableInterpreter::try_create(ctx, *drop_table.clone())
}
Plan::UndropTable(undrop_table) => {
UndropTableInterpreter::try_create(ctx, *undrop_table.clone())
}
Plan::RenameTable(rename_table) => {
RenameTableInterpreter::try_create(ctx, *rename_table.clone())
}
Plan::AlterTableClusterKey(alter_table_cluster_key) => {
AlterTableClusterKeyInterpreter::try_create(ctx, *alter_table_cluster_key.clone())
}
Plan::DropTableClusterKey(drop_table_cluster_key) => {
DropTableClusterKeyInterpreter::try_create(ctx, *drop_table_cluster_key.clone())
}
Plan::TruncateTable(truncate_table) => {
TruncateTableInterpreter::try_create(ctx, *truncate_table.clone())
}
Plan::OptimizeTable(optimize_table) => {
OptimizeTableInterpreter::try_create(ctx, *optimize_table.clone())
}

// Views
Plan::CreateView(create_view) => {
Expand Down
Loading