Skip to content

Commit

Permalink
compatibility mysql insert and select
Browse files Browse the repository at this point in the history
compatibility double quoted for MySQL:

```
insert into t values("xxx");
select * from t where col="xxx";
```
  • Loading branch information
TCeason committed Apr 16, 2022
1 parent d1a33ce commit 11c1772
Show file tree
Hide file tree
Showing 33 changed files with 223 additions and 74 deletions.
5 changes: 5 additions & 0 deletions .github/actions/test_stateless_cluster_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ runs:
run: |
chmod +x ./target/debug/databend-*
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver
- name: Run Stateless Tests with Cluster mode
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/test_stateless_cluster_macos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver
- name: Run Stateless Tests with Cluster mode
shell: bash
Expand Down
5 changes: 5 additions & 0 deletions .github/actions/test_stateless_standalone_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ runs:
run: |
chmod +x ./target/debug/databend-*
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver
- name: Run Stateless Tests with Standalone mode, with embedded meta-store
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/test_stateless_standalone_macos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ runs:
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver
- name: Set up file as executable
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/ast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ common-functions = { path = "../functions" }
# TODO (andylokandy): Use the version from crates.io once
# https://github.com/brendanzab/codespan/pull/331 is released.
codespan-reporting = { git = "https://github.com/brendanzab/codespan", rev = "c84116f5" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }

# Crates.io dependencies
async-trait = "0.1.53"
Expand Down
2 changes: 1 addition & 1 deletion common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ tonic = "=0.6.2"

# Github dependencies
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }
2 changes: 1 addition & 1 deletion common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ serde_json = "1.0.79"
sha1 = "0.10.1"
sha2 = "0.10.2"
simdutf8 = "0.1.4"
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }
strength_reduce = "0.2.3"
twox-hash = "1.6.2"
uuid = { version = "0.8.2", features = ["v4"] }
Expand Down
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ common-tracing = { path = "../common/tracing" }
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
opensrv-clickhouse = { git = "https://github.com/datafuselabs/opensrv", rev = "9690be9", package = "opensrv-clickhouse" }
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "967477f1", package = "opensrv-mysql" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }

# Crates.io dependencies
ahash = "0.7.6"
Expand Down
56 changes: 31 additions & 25 deletions query/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ pub async fn clickhouse_handler_get(
.map_err(InternalServerError)
}

fn try_parse_insert_formatted(sql: &str) -> Result<Option<(Format, Vec<DfStatement>)>> {
if let Ok((statements, _)) = DfParser::parse_sql(sql) {
fn try_parse_insert_formatted(
sql: &str,
typ: SessionType,
) -> Result<Option<(Format, Vec<DfStatement>)>> {
if let Ok((statements, _)) = DfParser::parse_sql(sql, typ) {
if statements.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -178,30 +181,33 @@ pub async fn clickhouse_handler_post(
let mut sql = params.query;

// Insert into format sql
let (plan, input_stream) =
if let Some((format, statements)) = try_parse_insert_formatted(&sql).map_err(BadRequest)? {
let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

let input_stream = match format {
Format::NDJson => build_ndjson_stream(&plan, body).await.map_err(BadRequest)?,
};
(plan, Some(input_stream))
} else {
// Other sql
let body = body.into_string().await.map_err(BadRequest)?;
let sql = format!("{}\n{}", sql, body);
let (statements, _) = DfParser::parse_sql(&sql).map_err(BadRequest)?;

let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

(plan, None)
let (plan, input_stream) = if let Some((format, statements)) =
try_parse_insert_formatted(&sql, ctx.get_current_session().get_type())
.map_err(BadRequest)?
{
let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

let input_stream = match format {
Format::NDJson => build_ndjson_stream(&plan, body).await.map_err(BadRequest)?,
};
(plan, Some(input_stream))
} else {
// Other sql
let body = body.into_string().await.map_err(BadRequest)?;
let sql = format!("{}\n{}", sql, body);
let (statements, _) =
DfParser::parse_sql(&sql, ctx.get_current_session().get_type()).map_err(BadRequest)?;

let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

(plan, None)
};

execute(ctx, plan, input_stream)
.await
Expand Down
7 changes: 6 additions & 1 deletion query/src/sessions/session_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ pub enum SessionType {
FlightRPC,
HTTPAPI(String),
Test,
Fuzz,
}

impl SessionType {
pub fn is_user_session(&self) -> bool {
!matches!(self, SessionType::HTTPAPI(_) | SessionType::Test)
!matches!(
self,
SessionType::HTTPAPI(_) | SessionType::Test | SessionType::Fuzz
)
}
}

Expand All @@ -43,6 +47,7 @@ impl fmt::Display for SessionType {
SessionType::Test => "Test".to_string(),
SessionType::FlightRPC => "FlightRPC".to_string(),
SessionType::HTTPAPI(usage) => format!("HTTPAPI({})", usage),
SessionType::Fuzz => "Fuzz".to_string(),
};
write!(f, "{}", name)
}
Expand Down
4 changes: 2 additions & 2 deletions query/src/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ pub struct PlanParser;

impl PlanParser {
pub async fn parse(ctx: Arc<QueryContext>, query: &str) -> Result<PlanNode> {
let (statements, _) = DfParser::parse_sql(query)?;
let (statements, _) = DfParser::parse_sql(query, ctx.get_current_session().get_type())?;
PlanParser::build_plan(statements, ctx).await
}

pub async fn parse_with_hint(
query: &str,
ctx: Arc<QueryContext>,
) -> (Result<PlanNode>, Vec<DfHint>) {
match DfParser::parse_sql(query) {
match DfParser::parse_sql(query, ctx.get_current_session().get_type()) {
Err(cause) => (Err(cause), vec![]),
Ok((statements, hints)) => (PlanParser::build_plan(statements, ctx).await, hints),
}
Expand Down
30 changes: 23 additions & 7 deletions query/src/sql/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use sqlparser::ast::Value;
use sqlparser::dialect::keywords::Keyword;
use sqlparser::dialect::Dialect;
use sqlparser::dialect::GenericDialect;
use sqlparser::dialect::MySqlDialect;
use sqlparser::dialect::SnowflakeDialect;
use sqlparser::parser::Parser;
use sqlparser::parser::ParserError;
Expand All @@ -32,6 +33,7 @@ use sqlparser::tokenizer::Tokenizer;
use sqlparser::tokenizer::Whitespace;

use super::statements::DfShowRoles;
use crate::sessions::SessionType;
use crate::sql::statements::DfShowEngines;
use crate::sql::statements::DfShowMetrics;
use crate::sql::statements::DfShowProcessList;
Expand All @@ -57,7 +59,7 @@ pub struct DfParser<'a> {
impl<'a> DfParser<'a> {
/// Parse the specified tokens
pub fn new(sql: &'a str) -> Result<Self, ParserError> {
let dialect = &GenericDialect {};
let dialect = &MySqlDialect {};
DfParser::new_with_dialect(sql, dialect)
}

Expand All @@ -73,12 +75,26 @@ impl<'a> DfParser<'a> {
}

/// Parse a SQL statement and produce a set of statements with dialect
pub fn parse_sql(sql: &'a str) -> Result<(Vec<DfStatement<'a>>, Vec<DfHint>), ErrorCode> {
let dialect = &GenericDialect {};
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
pub fn parse_sql(
sql: &'a str,
typ: SessionType,
) -> Result<(Vec<DfStatement<'a>>, Vec<DfHint>), ErrorCode> {
match typ {
SessionType::MySQL => {
let dialect = &MySqlDialect {};
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
}
_ => {
let dialect = &GenericDialect {};
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
}
}
}

/// Parse a SQL statement and produce a set of statements
Expand Down
16 changes: 12 additions & 4 deletions query/src/sql/statements/analyzer_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sqlparser::ast::Value;

use crate::procedures::ContextFunction;
use crate::sessions::QueryContext;
use crate::sessions::SessionType;
use crate::sql::statements::analyzer_value_expr::ValueExprAnalyzer;
use crate::sql::statements::AnalyzableStatement;
use crate::sql::statements::AnalyzedResult;
Expand All @@ -61,7 +62,11 @@ impl ExpressionAnalyzer {
// Build RPN for expr. Because async function unsupported recursion
for rpn_item in &ExprRPNBuilder::build(self.context.clone(), expr).await? {
match rpn_item {
ExprRPNItem::Value(v) => Self::analyze_value(v, &mut stack)?,
ExprRPNItem::Value(v) => Self::analyze_value(
v,
&mut stack,
self.context.get_current_session().get_type(),
)?,
ExprRPNItem::Identifier(v) => self.analyze_identifier(v, &mut stack)?,
ExprRPNItem::QualifiedIdentifier(v) => self.analyze_identifiers(v, &mut stack)?,
ExprRPNItem::Function(v) => self.analyze_function(v, &mut stack)?,
Expand Down Expand Up @@ -94,8 +99,8 @@ impl ExpressionAnalyzer {
}
}

fn analyze_value(value: &Value, args: &mut Vec<Expression>) -> Result<()> {
args.push(ValueExprAnalyzer::analyze(value)?);
fn analyze_value(value: &Value, args: &mut Vec<Expression>, typ: SessionType) -> Result<()> {
args.push(ValueExprAnalyzer::analyze(value, typ)?);
Ok(())
}

Expand Down Expand Up @@ -205,7 +210,10 @@ impl ExpressionAnalyzer {
let mut parameters = Vec::with_capacity(info.parameters.len());

for parameter in &info.parameters {
match ValueExprAnalyzer::analyze(parameter)? {
match ValueExprAnalyzer::analyze(
parameter,
self.context.get_current_session().get_type(),
)? {
Expression::Literal { value, .. } => {
parameters.push(value);
}
Expand Down
15 changes: 14 additions & 1 deletion query/src/sql/statements/analyzer_value_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,29 @@ use common_planners::Expression;
use sqlparser::ast::DateTimeField;
use sqlparser::ast::Value;

use crate::sessions::SessionType;

pub struct ValueExprAnalyzer;

impl ValueExprAnalyzer {
pub fn analyze(value: &Value) -> Result<Expression> {
pub fn analyze(value: &Value, typ: SessionType) -> Result<Expression> {
match value {
Value::Null => Self::analyze_null_value(),
Value::Boolean(value) => Self::analyze_bool_value(value),
Value::Number(value, _) => Self::analyze_number_value(value, None),
Value::HexStringLiteral(value) => Self::analyze_number_value(value, Some(16)),
Value::SingleQuotedString(value) => Self::analyze_string_value(value),
Value::DoubleQuotedString(value) => {
if let SessionType::MySQL = typ {
Self::analyze_string_value(value)
} else {
Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported value expression: {}, type: {:?}",
value,
Value::DoubleQuotedString(value.to_string())
)))
}
}
Value::Interval {
leading_precision: Some(_),
..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ impl JoinedSchemaAnalyzer {

if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let (statements, _) = DfParser::parse_sql(query.as_str())?;
let (statements, _) =
DfParser::parse_sql(query.as_str(), self.ctx.get_current_session().get_type())?;
if statements.len() == 1 {
if let DfStatement::Query(subquery) = &statements[0] {
if let AnalyzedResult::SelectQuery(state) =
Expand Down
30 changes: 22 additions & 8 deletions query/src/sql/statements/value_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use common_io::prelude::*;
use common_planners::Expression;
use sqlparser::ast::Expr;
use sqlparser::dialect::GenericDialect;
use sqlparser::dialect::MySqlDialect;
use sqlparser::parser::Parser;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Tokenizer;

use crate::pipelines::transforms::ExpressionExecutor;
use crate::sessions::QueryContext;
use crate::sessions::SessionType;
use crate::sql::statements::ExpressionAnalyzer;

pub struct ValueSource {
Expand Down Expand Up @@ -103,7 +105,7 @@ impl ValueSource {
analyzer: ExpressionAnalyzer,
ctx: Arc<QueryContext>,
) -> Result<DataBlock> {
let values = parse_exprs(bytes)?;
let values = parse_exprs(bytes, ctx.get_current_session().get_type())?;

let mut blocks = vec![];
for value in values {
Expand Down Expand Up @@ -150,11 +152,23 @@ async fn exprs_to_datablock(
executor.execute(&one_row_block)
}

fn parse_exprs(buf: &[u8]) -> std::result::Result<Vec<Vec<Expr>>, ParserError> {
let dialect = GenericDialect {};
let sql = std::str::from_utf8(buf).unwrap();
let mut tokenizer = Tokenizer::new(&dialect, sql);
let (tokens, position_map) = tokenizer.tokenize()?;
let mut parser = Parser::new(tokens, position_map, &dialect);
parser.parse_values()
fn parse_exprs(buf: &[u8], typ: SessionType) -> std::result::Result<Vec<Vec<Expr>>, ParserError> {
match typ {
SessionType::MySQL => {
let dialect = MySqlDialect {};
let sql = std::str::from_utf8(buf).unwrap();
let mut tokenizer = Tokenizer::new(&dialect, sql);
let (tokens, position_map) = tokenizer.tokenize()?;
let mut parser = Parser::new(tokens, position_map, &dialect);
parser.parse_values()
}
_ => {
let dialect = GenericDialect {};
let sql = std::str::from_utf8(buf).unwrap();
let mut tokenizer = Tokenizer::new(&dialect, sql);
let (tokens, position_map) = tokenizer.tokenize()?;
let mut parser = Parser::new(tokens, position_map, &dialect);
parser.parse_values()
}
}
}
Loading

0 comments on commit 11c1772

Please sign in to comment.