Skip to content

Commit

Permalink
cte support for oracle
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed Nov 12, 2021
1 parent 6ac1c7f commit ef9c0bc
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 83 deletions.
16 changes: 16 additions & 0 deletions connectorx-python/connectorx/tests/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,19 @@ def test_oracle_empty_result_on_some_partition(oracle_url: str) -> None:
}
)
assert_frame_equal(df, expected, check_names=True)


@pytest.mark.skipif(not os.environ.get("ORACLE_URL"), reason="Test oracle only when `ORACLE_URL` is set")
def test_oracle_cte(oracle_url: str) -> None:
query = "with test_cte (test_int, test_str) as (select test_int, test_char from test_table where test_float > 0) select test_int, test_str from test_cte"
df = read_sql(oracle_url, query,
partition_on="test_int", partition_num=3)
df.sort_values(by="TEST_INT", inplace=True, ignore_index=True)
expected = pd.DataFrame(
index=range(2),
data={
"TEST_INT": pd.Series([1, 2], dtype="Int64"),
"TEST_STR": pd.Series(["str1 ", "str2 "], dtype="object"),
},
)
assert_frame_equal(df, expected, check_names=True)
1 change: 0 additions & 1 deletion connectorx-python/src/source_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ impl SourceConn {
single_col_partition_query(query, col, lower, upper, &OracleDialect {})?
}
};
println!("get partition query: {:?}", query);
CXQuery::Wrapped(query)
}
}
Expand Down
172 changes: 90 additions & 82 deletions connectorx/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ fn wrap_query(
) -> Statement {
let with = query.with.clone();
query.with = None;
let alias = if tmp_tab_name.is_empty() {
None
} else {
Some(TableAlias {
name: Ident {
value: tmp_tab_name.into(),
quote_style: None,
},
columns: vec![],
})
};
Statement::Query(Box::new(Query {
with,
body: SetExpr::Select(Box::new(Select {
Expand All @@ -172,13 +183,7 @@ fn wrap_query(
relation: TableFactor::Derived {
lateral: false,
subquery: Box::new(query.clone()),
alias: Some(TableAlias {
name: Ident {
value: tmp_tab_name.into(),
quote_style: None,
},
columns: vec![],
}),
alias,
},
joins: vec![],
}],
Expand Down Expand Up @@ -227,8 +232,18 @@ impl QueryExt for Query {
pub fn count_query<T: Dialect>(sql: &CXQuery<String>, dialect: &T) -> CXQuery<String> {
trace!("Incoming query: {}", sql);

const COUNT_TMP_TAB_NAME: &str = "CXTMPTAB_COUNT";

#[allow(unused_mut)]
let mut sql = match sql.map(|sql| Parser::parse_sql(dialect, sql)).result() {
let mut table_alias = COUNT_TMP_TAB_NAME;

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
table_alias = "";
}

let tsql = match sql.map(|sql| Parser::parse_sql(dialect, sql)).result() {
Ok(ast) => {
let projection = vec![SelectItem::UnnamedExpr(Expr::Function(Function {
name: ObjectName(vec![Ident {
Expand All @@ -253,7 +268,7 @@ pub fn count_query<T: Dialect>(sql: &CXQuery<String>, dialect: &T) -> CXQuery<St
.as_select_mut()
.ok_or_else(|| ConnectorXError::SqlQueryNotSupported(sql.to_string()))?;
select.sort_by = vec![];
wrap_query(&mut query, projection, None, "CXTMPTAB_COUNT")
wrap_query(&mut query, projection, None, table_alias)
}
CXQuery::Wrapped(ast) => {
if ast.len() != 1 {
Expand All @@ -274,19 +289,16 @@ pub fn count_query<T: Dialect>(sql: &CXQuery<String>, dialect: &T) -> CXQuery<St
}
Err(e) => {
warn!("parser error: {:?}, manually compose query string", e);
format!("SELECT COUNT(*) FROM ({}) as CXTMPTAB_COUNT", sql.as_str())
format!(
"SELECT COUNT(*) FROM ({}) as {}",
sql.as_str(),
COUNT_TMP_TAB_NAME
)
}
};

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
// Hard code "(subquery) alias" instead of output "(subquery) AS alias"
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
sql = sql.replace(" AS", "");
}

debug!("Transformed count query: {}", sql);
CXQuery::Wrapped(sql)
debug!("Transformed count query: {}", tsql);
CXQuery::Wrapped(tsql)
}

#[throws(ConnectorXError)]
Expand Down Expand Up @@ -343,9 +355,9 @@ pub fn limit1_query_oracle(sql: &CXQuery<String>) -> CXQuery<String> {
};
ast_part = wrap_query(&mut query, vec![SelectItem::Wildcard], Some(selection), "");

let sql = format!("{}", ast_part).replace(" AS", "");
debug!("Transformed limit 1 query: {}", sql);
CXQuery::Wrapped(sql)
let tsql = format!("{}", ast_part);
debug!("Transformed limit 1 query: {}", tsql);
CXQuery::Wrapped(tsql)
}

#[throws(ConnectorXError)]
Expand Down Expand Up @@ -390,8 +402,31 @@ pub fn single_col_partition_query<T: Dialect>(
const PART_TMP_TAB_NAME: &str = "CXTMPTAB_PART";

#[allow(unused_mut)]
let mut tsql = match Parser::parse_sql(dialect, sql) {
Ok(mut ast) => {
let mut table_alias = PART_TMP_TAB_NAME;
#[allow(unused_mut)]
let mut cid = Box::new(Expr::CompoundIdentifier(vec![
Ident {
value: PART_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
]));

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
table_alias = "";
cid = Box::new(Expr::Identifier(Ident {
value: col.to_string(),
quote_style: None,
}));
}

let tsql = match Parser::parse_sql(dialect, sql) {
Ok(ast) => {
if ast.len() != 1 {
throw!(ConnectorXError::SqlQueryNotSupported(sql.to_string()));
}
Expand All @@ -411,29 +446,11 @@ pub fn single_col_partition_query<T: Dialect>(
let lb = Expr::BinaryOp {
left: Box::new(Expr::Value(Value::Number(lower.to_string(), false))),
op: BinaryOperator::LtEq,
right: Box::new(Expr::CompoundIdentifier(vec![
Ident {
value: PART_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
])),
right: cid.clone(),
};

let ub = Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident {
value: PART_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
])),
left: cid,
op: BinaryOperator::Lt,
right: Box::new(Expr::Value(Value::Number(upper.to_string(), false))),
};
Expand All @@ -455,7 +472,7 @@ pub fn single_col_partition_query<T: Dialect>(
&mut query,
vec![SelectItem::Wildcard],
Some(selection),
PART_TMP_TAB_NAME,
table_alias,
);
format!("{}", ast_part)
}
Expand All @@ -465,13 +482,6 @@ pub fn single_col_partition_query<T: Dialect>(
}
};

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
// Hard code "(subquery) alias" instead of output "(subquery) AS alias"
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
tsql = tsql.replace(" AS", "")
}

debug!("Transformed single column partition query: {}", tsql);
tsql
}
Expand All @@ -482,8 +492,31 @@ pub fn get_partition_range_query<T: Dialect>(sql: &str, col: &str, dialect: &T)
const RANGE_TMP_TAB_NAME: &str = "CXTMPTAB_RANGE";

#[allow(unused_mut)]
let mut tsql = match Parser::parse_sql(dialect, sql) {
Ok(mut ast) => {
let mut table_alias = RANGE_TMP_TAB_NAME;
#[allow(unused_mut)]
let mut args = vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![
Ident {
value: RANGE_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
]))];

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
table_alias = "";
args = vec![FunctionArg::Unnamed(Expr::Identifier(Ident {
value: col.to_string(),
quote_style: None,
}))];
}

let tsql = match Parser::parse_sql(dialect, sql) {
Ok(ast) => {
if ast.len() != 1 {
throw!(ConnectorXError::SqlQueryNotSupported(sql.to_string()));
}
Expand All @@ -500,16 +533,7 @@ pub fn get_partition_range_query<T: Dialect>(sql: &str, col: &str, dialect: &T)
value: "min".to_string(),
quote_style: None,
}]),
args: vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![
Ident {
value: RANGE_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
]))],
args: args.clone(),
over: None,
distinct: false,
})),
Expand All @@ -518,21 +542,12 @@ pub fn get_partition_range_query<T: Dialect>(sql: &str, col: &str, dialect: &T)
value: "max".to_string(),
quote_style: None,
}]),
args: vec![FunctionArg::Unnamed(Expr::CompoundIdentifier(vec![
Ident {
value: RANGE_TMP_TAB_NAME.to_string(),
quote_style: None,
},
Ident {
value: col.to_string(),
quote_style: None,
},
]))],
args,
over: None,
distinct: false,
})),
];
ast_range = wrap_query(&mut query, projection, None, RANGE_TMP_TAB_NAME);
ast_range = wrap_query(&mut query, projection, None, table_alias);
format!("{}", ast_range)
}
Err(e) => {
Expand All @@ -544,13 +559,6 @@ pub fn get_partition_range_query<T: Dialect>(sql: &str, col: &str, dialect: &T)
}
};

// HACK: Some dialect (e.g. Oracle) does not support "AS" for alias
// Hard code "(subquery) alias" instead of output "(subquery) AS alias"
#[cfg(feature = "src_oracle")]
if dialect.type_id() == (OracleDialect {}.type_id()) {
tsql = tsql.replace(" AS", "")
}

debug!("Transformed partition range query: {}", tsql);
tsql
}
Expand Down

1 comment on commit ef9c0bc

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectorX TPC-H Scale@1 Benchmarks

Benchmark suite Current: ef9c0bc Previous: bd66558 Ratio
connectorx/tests/benchmarks.py::bench_mysql 0.06830433231021929 iter/sec (stddev: 0.5468539512086531) 0.06078909818294486 iter/sec (stddev: 0.11850730366963984) 0.89
connectorx/tests/benchmarks.py::bench_postgres 0.06828053754436762 iter/sec (stddev: 2.9811491351485713) 0.056167765383764584 iter/sec (stddev: 3.117850560349933) 0.82

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.