Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatibility mysql insert and select #4883

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/actions/test_stateless_cluster_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ runs:
run: |
chmod +x ./target/debug/databend-*

- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver

- name: Run Stateless Tests with Cluster mode
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/test_stateless_cluster_macos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver

- name: Run Stateless Tests with Cluster mode
shell: bash
Expand Down
5 changes: 5 additions & 0 deletions .github/actions/test_stateless_standalone_linux/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ runs:
run: |
chmod +x ./target/debug/databend-*

- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver

- name: Run Stateless Tests with Standalone mode, with embedded meta-store
shell: bash
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/test_stateless_standalone_macos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ runs:
- name: Install python dependences
shell: bash
run: |
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy
pip3 install --user boto3 "moto[all]" yapf shfmt-py mysql-connector pymysql sqlalchemy clickhouse_driver

- name: Set up file as executable
shell: bash
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/ast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ common-functions = { path = "../functions" }
# TODO (andylokandy): Use the version from crates.io once
# https://github.com/brendanzab/codespan/pull/331 is released.
codespan-reporting = { git = "https://github.com/brendanzab/codespan", rev = "c84116f5" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }

# Crates.io dependencies
async-trait = "0.1.53"
Expand Down
2 changes: 1 addition & 1 deletion common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ tonic = "=0.6.2"

# Github dependencies
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }
2 changes: 1 addition & 1 deletion common/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ serde_json = "1.0.79"
sha1 = "0.10.1"
sha2 = "0.10.2"
simdutf8 = "0.1.4"
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }
strength_reduce = "0.2.3"
twox-hash = "1.6.2"
uuid = { version = "0.8.2", features = ["v4"] }
Expand Down
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ common-tracing = { path = "../common/tracing" }
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
opensrv-clickhouse = { git = "https://github.com/datafuselabs/opensrv", rev = "9690be9", package = "opensrv-clickhouse" }
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "967477f1", package = "opensrv-mysql" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "f88fb32" }
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "818c0f9" }

# Crates.io dependencies
ahash = "0.7.6"
Expand Down
56 changes: 31 additions & 25 deletions query/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ pub async fn clickhouse_handler_get(
.map_err(InternalServerError)
}

fn try_parse_insert_formatted(sql: &str) -> Result<Option<(Format, Vec<DfStatement>)>> {
if let Ok((statements, _)) = DfParser::parse_sql(sql) {
fn try_parse_insert_formatted(
sql: &str,
typ: SessionType,
) -> Result<Option<(Format, Vec<DfStatement>)>> {
if let Ok((statements, _)) = DfParser::parse_sql(sql, typ) {
if statements.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -178,30 +181,33 @@ pub async fn clickhouse_handler_post(
let mut sql = params.query;

// Insert into format sql
let (plan, input_stream) =
if let Some((format, statements)) = try_parse_insert_formatted(&sql).map_err(BadRequest)? {
let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

let input_stream = match format {
Format::NDJson => build_ndjson_stream(&plan, body).await.map_err(BadRequest)?,
};
(plan, Some(input_stream))
} else {
// Other sql
let body = body.into_string().await.map_err(BadRequest)?;
let sql = format!("{}\n{}", sql, body);
let (statements, _) = DfParser::parse_sql(&sql).map_err(BadRequest)?;

let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

(plan, None)
let (plan, input_stream) = if let Some((format, statements)) =
try_parse_insert_formatted(&sql, ctx.get_current_session().get_type())
.map_err(BadRequest)?
{
let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

let input_stream = match format {
Format::NDJson => build_ndjson_stream(&plan, body).await.map_err(BadRequest)?,
};
(plan, Some(input_stream))
} else {
// Other sql
let body = body.into_string().await.map_err(BadRequest)?;
let sql = format!("{}\n{}", sql, body);
let (statements, _) =
DfParser::parse_sql(&sql, ctx.get_current_session().get_type()).map_err(BadRequest)?;

let plan = PlanParser::build_plan(statements, ctx.clone())
.await
.map_err(InternalServerError)?;
ctx.attach_query_str(&sql);

(plan, None)
};

execute(ctx, plan, input_stream)
.await
Expand Down
7 changes: 6 additions & 1 deletion query/src/sessions/session_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ pub enum SessionType {
FlightRPC,
HTTPAPI(String),
Test,
Fuzz,
}

impl SessionType {
pub fn is_user_session(&self) -> bool {
!matches!(self, SessionType::HTTPAPI(_) | SessionType::Test)
!matches!(
self,
SessionType::HTTPAPI(_) | SessionType::Test | SessionType::Fuzz
)
}
}

Expand All @@ -43,6 +47,7 @@ impl fmt::Display for SessionType {
SessionType::Test => "Test".to_string(),
SessionType::FlightRPC => "FlightRPC".to_string(),
SessionType::HTTPAPI(usage) => format!("HTTPAPI({})", usage),
SessionType::Fuzz => "Fuzz".to_string(),
};
write!(f, "{}", name)
}
Expand Down
4 changes: 2 additions & 2 deletions query/src/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ pub struct PlanParser;

impl PlanParser {
pub async fn parse(ctx: Arc<QueryContext>, query: &str) -> Result<PlanNode> {
let (statements, _) = DfParser::parse_sql(query)?;
let (statements, _) = DfParser::parse_sql(query, ctx.get_current_session().get_type())?;
PlanParser::build_plan(statements, ctx).await
}

pub async fn parse_with_hint(
query: &str,
ctx: Arc<QueryContext>,
) -> (Result<PlanNode>, Vec<DfHint>) {
match DfParser::parse_sql(query) {
match DfParser::parse_sql(query, ctx.get_current_session().get_type()) {
Err(cause) => (Err(cause), vec![]),
Ok((statements, hints)) => (PlanParser::build_plan(statements, ctx).await, hints),
}
Expand Down
34 changes: 22 additions & 12 deletions query/src/sql/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use sqlparser::ast::Value;
use sqlparser::dialect::keywords::Keyword;
use sqlparser::dialect::Dialect;
use sqlparser::dialect::GenericDialect;
use sqlparser::dialect::MySqlDialect;
use sqlparser::dialect::SnowflakeDialect;
use sqlparser::parser::Parser;
use sqlparser::parser::ParserError;
Expand All @@ -32,6 +33,7 @@ use sqlparser::tokenizer::Tokenizer;
use sqlparser::tokenizer::Whitespace;

use super::statements::DfShowRoles;
use crate::sessions::SessionType;
use crate::sql::statements::DfShowEngines;
use crate::sql::statements::DfShowMetrics;
use crate::sql::statements::DfShowProcessList;
Expand All @@ -55,12 +57,6 @@ pub struct DfParser<'a> {
}

impl<'a> DfParser<'a> {
/// Parse the specified tokens
pub fn new(sql: &'a str) -> Result<Self, ParserError> {
let dialect = &GenericDialect {};
DfParser::new_with_dialect(sql, dialect)
}

/// Parse the specified tokens with dialect
pub fn new_with_dialect(sql: &'a str, dialect: &'a dyn Dialect) -> Result<Self, ParserError> {
let mut tokenizer = Tokenizer::new(dialect, sql);
Expand All @@ -73,12 +69,26 @@ impl<'a> DfParser<'a> {
}

/// Parse a SQL statement and produce a set of statements with dialect
pub fn parse_sql(sql: &'a str) -> Result<(Vec<DfStatement<'a>>, Vec<DfHint>), ErrorCode> {
let dialect = &GenericDialect {};
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
pub fn parse_sql(
sql: &'a str,
typ: SessionType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If databend default sql dialect is MySqlDialect, the problems is?

Copy link
Member

@BohuTANG BohuTANG Apr 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we no need a SessionType there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use ClickHouse Client connect Databend server or ClickHouse server SQL like

select “number” will return col value

But use MySQL client it will return a string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/suites/0_stateless/00_dummy/00_0000_dummy_select_1.sql

like this test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double quota is not sql standard(not allowed by Snowflake, Pg, ClickHouse), it's just for MySQL...

So we need this special SessionType

) -> Result<(Vec<DfStatement<'a>>, Vec<DfHint>), ErrorCode> {
match typ {
SessionType::MySQL => {
let dialect = &MySqlDialect {};
b41sh marked this conversation as resolved.
Show resolved Hide resolved
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
}
_ => {
let dialect = &GenericDialect {};
let start = Instant::now();
let result = DfParser::parse_sql_with_dialect(sql, dialect)?;
histogram!(super::metrics::METRIC_PARSER_USEDTIME, start.elapsed());
Ok(result)
}
}
}

/// Parse a SQL statement and produce a set of statements
Expand Down
16 changes: 12 additions & 4 deletions query/src/sql/statements/analyzer_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sqlparser::ast::Value;

use crate::procedures::ContextFunction;
use crate::sessions::QueryContext;
use crate::sessions::SessionType;
use crate::sql::statements::analyzer_value_expr::ValueExprAnalyzer;
use crate::sql::statements::AnalyzableStatement;
use crate::sql::statements::AnalyzedResult;
Expand All @@ -61,7 +62,11 @@ impl ExpressionAnalyzer {
// Build RPN for expr. Because async function unsupported recursion
for rpn_item in &ExprRPNBuilder::build(self.context.clone(), expr).await? {
match rpn_item {
ExprRPNItem::Value(v) => Self::analyze_value(v, &mut stack)?,
ExprRPNItem::Value(v) => Self::analyze_value(
v,
&mut stack,
self.context.get_current_session().get_type(),
)?,
ExprRPNItem::Identifier(v) => self.analyze_identifier(v, &mut stack)?,
ExprRPNItem::QualifiedIdentifier(v) => self.analyze_identifiers(v, &mut stack)?,
ExprRPNItem::Function(v) => self.analyze_function(v, &mut stack)?,
Expand Down Expand Up @@ -94,8 +99,8 @@ impl ExpressionAnalyzer {
}
}

fn analyze_value(value: &Value, args: &mut Vec<Expression>) -> Result<()> {
args.push(ValueExprAnalyzer::analyze(value)?);
fn analyze_value(value: &Value, args: &mut Vec<Expression>, typ: SessionType) -> Result<()> {
args.push(ValueExprAnalyzer::analyze(value, typ)?);
Ok(())
}

Expand Down Expand Up @@ -205,7 +210,10 @@ impl ExpressionAnalyzer {
let mut parameters = Vec::with_capacity(info.parameters.len());

for parameter in &info.parameters {
match ValueExprAnalyzer::analyze(parameter)? {
match ValueExprAnalyzer::analyze(
parameter,
self.context.get_current_session().get_type(),
)? {
Expression::Literal { value, .. } => {
parameters.push(value);
}
Expand Down
18 changes: 17 additions & 1 deletion query/src/sql/statements/analyzer_value_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,32 @@ use common_planners::Expression;
use sqlparser::ast::DateTimeField;
use sqlparser::ast::Value;

use crate::sessions::SessionType;

pub struct ValueExprAnalyzer;

impl ValueExprAnalyzer {
pub fn analyze(value: &Value) -> Result<Expression> {
pub fn analyze(value: &Value, typ: SessionType) -> Result<Expression> {
match value {
Value::Null => Self::analyze_null_value(),
Value::Boolean(value) => Self::analyze_bool_value(value),
Value::Number(value, _) => Self::analyze_number_value(value, None),
Value::HexStringLiteral(value) => Self::analyze_number_value(value, Some(16)),
Value::SingleQuotedString(value) => Self::analyze_string_value(value),
Value::DoubleQuotedString(value) => {
// Only MySQL dialect Support insert SQL like this:
// INSERT INTO t VALUES("val");
// https://github.com/datafuselabs/databend/issues/4861
if let SessionType::MySQL = typ {
TCeason marked this conversation as resolved.
Show resolved Hide resolved
Self::analyze_string_value(value)
} else {
Result::Err(ErrorCode::SyntaxException(format!(
"Unsupported value expression: {}, type: {:?}",
value,
Value::DoubleQuotedString(value.to_string())
)))
}
}
Value::Interval {
leading_precision: Some(_),
..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ impl JoinedSchemaAnalyzer {

if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let (statements, _) = DfParser::parse_sql(query.as_str())?;
let (statements, _) =
DfParser::parse_sql(query.as_str(), self.ctx.get_current_session().get_type())?;
if statements.len() == 1 {
if let DfStatement::Query(subquery) = &statements[0] {
if let AnalyzedResult::SelectQuery(state) =
Expand Down
Loading