Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support explain syntax #546

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ debug/

# JetBrains IDE config directory
.idea/
*.iml

# VSCode IDE config directory
.vscode/
Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ impl Instance {
};
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
sql_handler: SqlHandler::new(
table_engine,
catalog_manager.clone(),
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
Expand Down
5 changes: 5 additions & 0 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ impl Instance {
Statement::ShowTables(stmt) => {
self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await
}
Statement::Explain(stmt) => {
self.sql_handler
.execute(SqlRequest::Explain(Box::new(stmt)))
.await
}
Statement::DescribeTable(stmt) => {
self.sql_handler
.execute(SqlRequest::DescribeTable(stmt))
Expand Down
12 changes: 10 additions & 2 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ impl Instance {
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();

let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone());
let sql_handler = SqlHandler::new(
mock_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
);
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
Expand Down Expand Up @@ -123,7 +127,11 @@ impl Instance {
);
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
sql_handler: SqlHandler::new(
table_engine,
catalog_manager.clone(),
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
Expand Down
18 changes: 15 additions & 3 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

use catalog::CatalogManagerRef;
use common_query::Output;
use query::sql::{describe_table, show_databases, show_tables};
use query::query_engine::QueryEngineRef;
use query::sql::{describe_table, explain, show_databases, show_tables};
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::{EngineContext, TableEngineRef, TableReference};
use table::requests::*;
Expand All @@ -39,19 +41,26 @@ pub enum SqlRequest {
ShowDatabases(ShowDatabases),
ShowTables(ShowTables),
DescribeTable(DescribeTable),
Explain(Box<Explain>),
}

// Handler to execute SQL except query
pub struct SqlHandler {
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
}

impl SqlHandler {
pub fn new(table_engine: TableEngineRef, catalog_manager: CatalogManagerRef) -> Self {
pub fn new(
table_engine: TableEngineRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Self {
Self {
table_engine,
catalog_manager,
query_engine,
}
}

Expand All @@ -70,6 +79,9 @@ impl SqlHandler {
SqlRequest::DescribeTable(stmt) => {
describe_table(stmt, self.catalog_manager.clone()).context(error::ExecuteSqlSnafu)
}
SqlRequest::Explain(stmt) => explain(stmt, self.query_engine.clone())
.await
.context(error::ExecuteSqlSnafu),
}
}

Expand Down Expand Up @@ -216,7 +228,7 @@ mod tests {
);
let factory = QueryEngineFactory::new(catalog_list.clone());
let query_engine = factory.query_engine();
let sql_handler = SqlHandler::new(table_engine, catalog_list);
let sql_handler = SqlHandler::new(table_engine, catalog_list, query_engine.clone());

let stmt = match query_engine.sql_to_statement(sql).unwrap() {
Statement::Insert(i) => i,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/sql/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl SqlHandler {
return ConstraintNotSupportedSnafu {
constraint: format!("{:?}", c),
}
.fail()
.fail();
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use query::QueryEngineFactory;
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
Expand Down Expand Up @@ -121,5 +122,9 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
.await
.unwrap(),
);
SqlHandler::new(mock_engine, catalog_manager)

let catalog_list = catalog::local::new_memory_catalog_list().unwrap();
let factory = QueryEngineFactory::new(catalog_list);

SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
}
19 changes: 18 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use snafu::prelude::*;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::create::Partitions;
use sql::statements::explain::Explain;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;

Expand Down Expand Up @@ -275,6 +276,17 @@ impl Instance {
.context(AlterTableSnafu)
}

/// Handle explain expr
pub async fn handle_explain(&self, sql: &str, explain_stmt: Explain) -> Result<Output> {
if let Some(dist_instance) = &self.dist_instance {
dist_instance
.handle_sql(sql, Statement::Explain(explain_stmt))
.await
} else {
Ok(Output::AffectedRows(0))
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Handle batch inserts
pub async fn handle_inserts(&self, insert_expr: &[InsertExpr]) -> Result<Output> {
let mut success = 0;
Expand Down Expand Up @@ -634,8 +646,13 @@ impl SqlQueryHandler for Instance {
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::Explain(explain_stmt) => self
.handle_explain(query, explain_stmt)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query }),
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail()
return server_error::NotSupportedSnafu { feat: query }.fail();
}
}
.map_err(BoxedError::new)
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse,
TableName, TableRoute,
};
use query::sql::{describe_table, show_databases, show_tables};
use query::sql::{describe_table, explain, show_databases, show_tables};
use query::{QueryEngineFactory, QueryEngineRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
Expand Down Expand Up @@ -143,6 +143,9 @@ impl DistInstance {
.context(error::ExecuteSqlSnafu { sql }),
Statement::DescribeTable(stmt) => describe_table(stmt, self.catalog_manager.clone())
.context(error::ExecuteSqlSnafu { sql }),
Statement::Explain(stmt) => explain(Box::new(stmt), self.query_engine.clone())
.await
.context(error::ExecuteSqlSnafu { sql }),
_ => unreachable!(),
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/query/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use datatypes::arrow::datatypes::DataType;
use snafu::ResultExt;
use sql::statements::explain::Explain;
use sql::statements::query::Query;
use sql::statements::statement::Statement;

Expand Down Expand Up @@ -53,6 +54,18 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {

Ok(LogicalPlan::DfPlan(result))
}

/// Converts EXPLAIN statement to logical plan.
pub fn explain_to_plan(&self, explain: Explain) -> Result<LogicalPlan> {
let result = self
.sql_to_rel
.sql_statement_to_plan(explain.inner.clone())
.context(error::PlanSqlSnafu {
sql: explain.to_string(),
})?;

Ok(LogicalPlan::DfPlan(result))
}
}

impl<'a, S> Planner for DfPlanner<'a, S>
Expand All @@ -63,6 +76,7 @@ where
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Explain(explain) => self.explain_to_plan(explain),
Statement::ShowTables(_)
| Statement::ShowDatabases(_)
| Statement::ShowCreateTable(_)
Expand Down
8 changes: 8 additions & 0 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ use datatypes::vectors::{Helper, StringVector};
use once_cell::sync::Lazy;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use sql::statements::statement::Statement;

use crate::error::{self, Result};
use crate::QueryEngineRef;

const SCHEMAS_COLUMN: &str = "Schemas";
const TABLES_COLUMN: &str = "Tables";
Expand Down Expand Up @@ -138,6 +141,11 @@ pub fn show_tables(stmt: ShowTables, catalog_manager: CatalogManagerRef) -> Resu
Ok(Output::RecordBatches(records))
}

pub async fn explain(stmt: Box<Explain>, query_engine: QueryEngineRef) -> Result<Output> {
let plan = query_engine.statement_to_plan(Statement::Explain(*stmt))?;
query_engine.execute(&plan).await
}

pub fn describe_table(stmt: DescribeTable, catalog_manager: CatalogManagerRef) -> Result<Output> {
let catalog = stmt.catalog_name.as_str();
let schema = stmt.schema_name.as_str();
Expand Down
63 changes: 62 additions & 1 deletion src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu, TokenizerSnafu,
};
use crate::statements::describe::DescribeTable;
use crate::statements::explain::Explain;
use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowKind, ShowTables};
use crate::statements::statement::Statement;
use crate::statements::table_idents_to_full_name;
Expand Down Expand Up @@ -258,7 +259,16 @@ impl<'a> ParserContext<'a> {
}

fn parse_explain(&mut self) -> Result<Statement> {
todo!()
let explain_statement =
self.parser
.parse_explain(false)
.with_context(|_| error::UnexpectedSnafu {
sql: self.sql,
expected: "a query statement",
actual: self.peek_token_as_string(),
})?;

Ok(Statement::Explain(Explain::try_from(explain_statement)?))
}

// Report unexpected token
Expand Down Expand Up @@ -328,6 +338,7 @@ impl<'a> ParserContext<'a> {
mod tests {
use std::assert_matches::assert_matches;

use sqlparser::ast::{Query as SpQuery, Statement as SpStatement};
use sqlparser::dialect::GenericDialect;

use super::*;
Expand Down Expand Up @@ -471,4 +482,54 @@ mod tests {
})
);
}

#[test]
pub fn test_explain() {
let sql = "EXPLAIN select * from foo";
let result = ParserContext::create_with_dialect(sql, &GenericDialect {});
let stmts = result.unwrap();
assert_eq!(1, stmts.len());

let select = sqlparser::ast::Select {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
distinct: false,
top: None,
projection: vec![sqlparser::ast::SelectItem::Wildcard],
from: vec![sqlparser::ast::TableWithJoins {
relation: sqlparser::ast::TableFactor::Table {
name: sqlparser::ast::ObjectName(vec![sqlparser::ast::Ident::new("foo")]),
alias: None,
args: vec![],
with_hints: vec![],
},
joins: vec![],
}],
lateral_views: vec![],
selection: None,
group_by: vec![],
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
};

let sp_statement = SpStatement::Query(Box::new(SpQuery {
with: None,
body: sqlparser::ast::SetExpr::Select(Box::new(select)),
order_by: vec![],
limit: None,
offset: None,
fetch: None,
lock: None,
}));

let explain = Explain::try_from(SpStatement::Explain {
describe_alias: false,
analyze: false,
verbose: false,
statement: Box::new(sp_statement),
})
.unwrap();

assert_eq!(stmts[0], Statement::Explain(explain))
}
}
1 change: 1 addition & 0 deletions src/sql/src/statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod alter;
pub mod create;
pub mod describe;
pub mod explain;
pub mod insert;
pub mod query;
pub mod show;
Expand Down
Loading