From ef9c0bc01884b9bf1b1bd38b6ae8e4ae2d06b330 Mon Sep 17 00:00:00 2001 From: Xiaoying Wang Date: Fri, 12 Nov 2021 06:37:12 +0000 Subject: [PATCH] cte support for oracle --- .../connectorx/tests/test_oracle.py | 16 ++ connectorx-python/src/source_router.rs | 1 - connectorx/src/sql.rs | 172 +++++++++--------- 3 files changed, 106 insertions(+), 83 deletions(-) diff --git a/connectorx-python/connectorx/tests/test_oracle.py b/connectorx-python/connectorx/tests/test_oracle.py index 9c9407542b..1ab84ab554 100644 --- a/connectorx-python/connectorx/tests/test_oracle.py +++ b/connectorx-python/connectorx/tests/test_oracle.py @@ -258,3 +258,19 @@ def test_oracle_empty_result_on_some_partition(oracle_url: str) -> None: } ) assert_frame_equal(df, expected, check_names=True) + + +@pytest.mark.skipif(not os.environ.get("ORACLE_URL"), reason="Test oracle only when `ORACLE_URL` is set") +def test_oracle_cte(oracle_url: str) -> None: + query = "with test_cte (test_int, test_str) as (select test_int, test_char from test_table where test_float > 0) select test_int, test_str from test_cte" + df = read_sql(oracle_url, query, + partition_on="test_int", partition_num=3) + df.sort_values(by="TEST_INT", inplace=True, ignore_index=True) + expected = pd.DataFrame( + index=range(2), + data={ + "TEST_INT": pd.Series([1, 2], dtype="Int64"), + "TEST_STR": pd.Series(["str1 ", "str2 "], dtype="object"), + }, + ) + assert_frame_equal(df, expected, check_names=True) diff --git a/connectorx-python/src/source_router.rs b/connectorx-python/src/source_router.rs index edf2b78ecb..4defd18693 100644 --- a/connectorx-python/src/source_router.rs +++ b/connectorx-python/src/source_router.rs @@ -104,7 +104,6 @@ impl SourceConn { single_col_partition_query(query, col, lower, upper, &OracleDialect {})? } }; - println!("get partition query: {:?}", query); CXQuery::Wrapped(query) } } diff --git a/connectorx/src/sql.rs b/connectorx/src/sql.rs index a2d651fb4a..cf07c840b4 100644 --- a/connectorx/src/sql.rs +++ b/connectorx/src/sql.rs @@ -162,6 +162,17 @@ fn wrap_query( ) -> Statement { let with = query.with.clone(); query.with = None; + let alias = if tmp_tab_name.is_empty() { + None + } else { + Some(TableAlias { + name: Ident { + value: tmp_tab_name.into(), + quote_style: None, + }, + columns: vec![], + }) + }; Statement::Query(Box::new(Query { with, body: SetExpr::Select(Box::new(Select { @@ -172,13 +183,7 @@ fn wrap_query( relation: TableFactor::Derived { lateral: false, subquery: Box::new(query.clone()), - alias: Some(TableAlias { - name: Ident { - value: tmp_tab_name.into(), - quote_style: None, - }, - columns: vec![], - }), + alias, }, joins: vec![], }], @@ -227,8 +232,18 @@ impl QueryExt for Query { pub fn count_query(sql: &CXQuery, dialect: &T) -> CXQuery { trace!("Incoming query: {}", sql); + const COUNT_TMP_TAB_NAME: &str = "CXTMPTAB_COUNT"; + #[allow(unused_mut)] - let mut sql = match sql.map(|sql| Parser::parse_sql(dialect, sql)).result() { + let mut table_alias = COUNT_TMP_TAB_NAME; + + // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias + #[cfg(feature = "src_oracle")] + if dialect.type_id() == (OracleDialect {}.type_id()) { + table_alias = ""; + } + + let tsql = match sql.map(|sql| Parser::parse_sql(dialect, sql)).result() { Ok(ast) => { let projection = vec![SelectItem::UnnamedExpr(Expr::Function(Function { name: ObjectName(vec![Ident { @@ -253,7 +268,7 @@ pub fn count_query(sql: &CXQuery, dialect: &T) -> CXQuery { if ast.len() != 1 { @@ -274,19 +289,16 @@ pub fn count_query(sql: &CXQuery, dialect: &T) -> CXQuery { warn!("parser error: {:?}, manually compose query string", e); - format!("SELECT COUNT(*) FROM ({}) as CXTMPTAB_COUNT", sql.as_str()) + format!( + "SELECT COUNT(*) FROM ({}) as {}", + sql.as_str(), + COUNT_TMP_TAB_NAME + ) } }; - // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias - // Hard code "(subquery) alias" instead of output "(subquery) AS alias" - #[cfg(feature = "src_oracle")] - if dialect.type_id() == (OracleDialect {}.type_id()) { - sql = sql.replace(" AS", ""); - } - - debug!("Transformed count query: {}", sql); - CXQuery::Wrapped(sql) + debug!("Transformed count query: {}", tsql); + CXQuery::Wrapped(tsql) } #[throws(ConnectorXError)] @@ -343,9 +355,9 @@ pub fn limit1_query_oracle(sql: &CXQuery) -> CXQuery { }; ast_part = wrap_query(&mut query, vec![SelectItem::Wildcard], Some(selection), ""); - let sql = format!("{}", ast_part).replace(" AS", ""); - debug!("Transformed limit 1 query: {}", sql); - CXQuery::Wrapped(sql) + let tsql = format!("{}", ast_part); + debug!("Transformed limit 1 query: {}", tsql); + CXQuery::Wrapped(tsql) } #[throws(ConnectorXError)] @@ -390,8 +402,31 @@ pub fn single_col_partition_query( const PART_TMP_TAB_NAME: &str = "CXTMPTAB_PART"; #[allow(unused_mut)] - let mut tsql = match Parser::parse_sql(dialect, sql) { - Ok(mut ast) => { + let mut table_alias = PART_TMP_TAB_NAME; + #[allow(unused_mut)] + let mut cid = Box::new(Expr::CompoundIdentifier(vec![ + Ident { + value: PART_TMP_TAB_NAME.to_string(), + quote_style: None, + }, + Ident { + value: col.to_string(), + quote_style: None, + }, + ])); + + // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias + #[cfg(feature = "src_oracle")] + if dialect.type_id() == (OracleDialect {}.type_id()) { + table_alias = ""; + cid = Box::new(Expr::Identifier(Ident { + value: col.to_string(), + quote_style: None, + })); + } + + let tsql = match Parser::parse_sql(dialect, sql) { + Ok(ast) => { if ast.len() != 1 { throw!(ConnectorXError::SqlQueryNotSupported(sql.to_string())); } @@ -411,29 +446,11 @@ pub fn single_col_partition_query( let lb = Expr::BinaryOp { left: Box::new(Expr::Value(Value::Number(lower.to_string(), false))), op: BinaryOperator::LtEq, - right: Box::new(Expr::CompoundIdentifier(vec![ - Ident { - value: PART_TMP_TAB_NAME.to_string(), - quote_style: None, - }, - Ident { - value: col.to_string(), - quote_style: None, - }, - ])), + right: cid.clone(), }; let ub = Expr::BinaryOp { - left: Box::new(Expr::CompoundIdentifier(vec![ - Ident { - value: PART_TMP_TAB_NAME.to_string(), - quote_style: None, - }, - Ident { - value: col.to_string(), - quote_style: None, - }, - ])), + left: cid, op: BinaryOperator::Lt, right: Box::new(Expr::Value(Value::Number(upper.to_string(), false))), }; @@ -455,7 +472,7 @@ pub fn single_col_partition_query( &mut query, vec![SelectItem::Wildcard], Some(selection), - PART_TMP_TAB_NAME, + table_alias, ); format!("{}", ast_part) } @@ -465,13 +482,6 @@ pub fn single_col_partition_query( } }; - // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias - // Hard code "(subquery) alias" instead of output "(subquery) AS alias" - #[cfg(feature = "src_oracle")] - if dialect.type_id() == (OracleDialect {}.type_id()) { - tsql = tsql.replace(" AS", "") - } - debug!("Transformed single column partition query: {}", tsql); tsql } @@ -482,8 +492,31 @@ pub fn get_partition_range_query(sql: &str, col: &str, dialect: &T) const RANGE_TMP_TAB_NAME: &str = "CXTMPTAB_RANGE"; #[allow(unused_mut)] - let mut tsql = match Parser::parse_sql(dialect, sql) { - Ok(mut ast) => { + let mut table_alias = RANGE_TMP_TAB_NAME; + #[allow(unused_mut)] + let mut args = vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![ + Ident { + value: RANGE_TMP_TAB_NAME.to_string(), + quote_style: None, + }, + Ident { + value: col.to_string(), + quote_style: None, + }, + ]))]; + + // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias + #[cfg(feature = "src_oracle")] + if dialect.type_id() == (OracleDialect {}.type_id()) { + table_alias = ""; + args = vec![FunctionArg::Unnamed(Expr::Identifier(Ident { + value: col.to_string(), + quote_style: None, + }))]; + } + + let tsql = match Parser::parse_sql(dialect, sql) { + Ok(ast) => { if ast.len() != 1 { throw!(ConnectorXError::SqlQueryNotSupported(sql.to_string())); } @@ -500,16 +533,7 @@ pub fn get_partition_range_query(sql: &str, col: &str, dialect: &T) value: "min".to_string(), quote_style: None, }]), - args: vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![ - Ident { - value: RANGE_TMP_TAB_NAME.to_string(), - quote_style: None, - }, - Ident { - value: col.to_string(), - quote_style: None, - }, - ]))], + args: args.clone(), over: None, distinct: false, })), @@ -518,21 +542,12 @@ pub fn get_partition_range_query(sql: &str, col: &str, dialect: &T) value: "max".to_string(), quote_style: None, }]), - args: vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![ - Ident { - value: RANGE_TMP_TAB_NAME.to_string(), - quote_style: None, - }, - Ident { - value: col.to_string(), - quote_style: None, - }, - ]))], + args, over: None, distinct: false, })), ]; - ast_range = wrap_query(&mut query, projection, None, RANGE_TMP_TAB_NAME); + ast_range = wrap_query(&mut query, projection, None, table_alias); format!("{}", ast_range) } Err(e) => { @@ -544,13 +559,6 @@ pub fn get_partition_range_query(sql: &str, col: &str, dialect: &T) } }; - // HACK: Some dialect (e.g. Oracle) does not support "AS" for alias - // Hard code "(subquery) alias" instead of output "(subquery) AS alias" - #[cfg(feature = "src_oracle")] - if dialect.type_id() == (OracleDialect {}.type_id()) { - tsql = tsql.replace(" AS", "") - } - debug!("Transformed partition range query: {}", tsql); tsql }