Skip to content

Commit

Permalink
Support to unparse TimestampSecond and TimestampMicrosecond to St…
Browse files Browse the repository at this point in the history
…ring (apache#11120)

* support to unparse TimestampSecond and TimestampMicrosecond

* cargo fmt

* fmt
  • Loading branch information
goldmedal authored and findepi committed Jul 16, 2024
1 parent e81e963 commit 068778d
Showing 1 changed file with 74 additions and 66 deletions.
140 changes: 74 additions & 66 deletions datafusion/sql/src/unparser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
// under the License.

use core::fmt;
use std::sync::Arc;
use std::{fmt::Display, vec};

use arrow::datatypes::{Decimal128Type, Decimal256Type, DecimalType};
use arrow::util::display::array_value_to_string;
use arrow_array::types::{
ArrowTemporalType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType,
};
use arrow_array::{
Date32Array, Date64Array, PrimitiveArray, TimestampMillisecondArray,
TimestampNanosecondArray,
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::{Date32Array, Date64Array, PrimitiveArray};
use arrow_schema::DataType;
use sqlparser::ast::Value::SingleQuotedString;
use sqlparser::ast::{
Expand Down Expand Up @@ -655,6 +654,47 @@ impl Unparser<'_> {
}
}

fn handle_timestamp<T: ArrowTemporalType>(
&self,
v: &ScalarValue,
tz: &Option<Arc<str>>,
) -> Result<ast::Expr>
where
i64: From<T::Native>,
{
let ts = if let Some(tz) = tz {
v.to_array()?
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.ok_or(internal_datafusion_err!(
"Failed to downcast type {v:?} to arrow array"
))?
.value_as_datetime_with_tz(0, tz.parse()?)
.ok_or(internal_datafusion_err!(
"Unable to convert {v:?} to DateTime"
))?
.to_string()
} else {
v.to_array()?
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.ok_or(internal_datafusion_err!(
"Failed to downcast type {v:?} to arrow array"
))?
.value_as_datetime(0)
.ok_or(internal_datafusion_err!(
"Unable to convert {v:?} to DateTime"
))?
.to_string()
};
Ok(ast::Expr::Cast {
kind: ast::CastKind::Cast,
expr: Box::new(ast::Expr::Value(SingleQuotedString(ts))),
data_type: ast::DataType::Timestamp(None, TimezoneInfo::None),
format: None,
})
}

fn handle_time<T: ArrowTemporalType>(&self, v: &ScalarValue) -> Result<ast::Expr>
where
i64: From<T::Native>,
Expand All @@ -677,15 +717,6 @@ impl Unparser<'_> {
})
}

fn timestamp_string_to_sql(&self, ts: String) -> Result<ast::Expr> {
Ok(ast::Expr::Cast {
kind: ast::CastKind::Cast,
expr: Box::new(ast::Expr::Value(SingleQuotedString(ts))),
data_type: ast::DataType::Timestamp(None, TimezoneInfo::None),
format: None,
})
}

/// DataFusion ScalarValues sometimes require a ast::Expr to construct.
/// For example ScalarValue::Date32(d) corresponds to the ast::Expr CAST('datestr' as DATE)
fn scalar_to_sql(&self, v: &ScalarValue) -> Result<ast::Expr> {
Expand Down Expand Up @@ -847,72 +878,26 @@ impl Unparser<'_> {
self.handle_time::<Time64NanosecondType>(v)
}
ScalarValue::Time64Nanosecond(None) => Ok(ast::Expr::Value(ast::Value::Null)),
ScalarValue::TimestampSecond(Some(_ts), _) => {
not_impl_err!("Unsupported scalar: {v:?}")
ScalarValue::TimestampSecond(Some(_ts), tz) => {
self.handle_timestamp::<TimestampSecondType>(v, tz)
}
ScalarValue::TimestampSecond(None, _) => {
Ok(ast::Expr::Value(ast::Value::Null))
}
ScalarValue::TimestampMillisecond(Some(_ts), tz) => {
let result = if let Some(tz) = tz {
v.to_array()?
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(internal_datafusion_err!(
"Unable to downcast to TimestampMillisecond from TimestampMillisecond scalar"
))?
.value_as_datetime_with_tz(0, tz.parse()?)
.ok_or(internal_datafusion_err!(
"Unable to convert TimestampMillisecond to DateTime"
))?.to_string()
} else {
v.to_array()?
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or(internal_datafusion_err!(
"Unable to downcast to TimestampMillisecond from TimestampMillisecond scalar"
))?
.value_as_datetime(0)
.ok_or(internal_datafusion_err!(
"Unable to convert TimestampMillisecond to NaiveDateTime"
))?.to_string()
};
self.timestamp_string_to_sql(result)
self.handle_timestamp::<TimestampMillisecondType>(v, tz)
}
ScalarValue::TimestampMillisecond(None, _) => {
Ok(ast::Expr::Value(ast::Value::Null))
}
ScalarValue::TimestampMicrosecond(Some(_ts), _) => {
not_impl_err!("Unsupported scalar: {v:?}")
ScalarValue::TimestampMicrosecond(Some(_ts), tz) => {
self.handle_timestamp::<TimestampMicrosecondType>(v, tz)
}
ScalarValue::TimestampMicrosecond(None, _) => {
Ok(ast::Expr::Value(ast::Value::Null))
}
ScalarValue::TimestampNanosecond(Some(_ts), tz) => {
let result = if let Some(tz) = tz {
v.to_array()?
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.ok_or(internal_datafusion_err!(
"Unable to downcast to TimestampNanosecond from TimestampNanosecond scalar"
))?
.value_as_datetime_with_tz(0, tz.parse()?)
.ok_or(internal_datafusion_err!(
"Unable to convert TimestampNanosecond to DateTime"
))?.to_string()
} else {
v.to_array()?
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.ok_or(internal_datafusion_err!(
"Unable to downcast to TimestampNanosecond from TimestampNanosecond scalar"
))?
.value_as_datetime(0)
.ok_or(internal_datafusion_err!(
"Unable to convert TimestampNanosecond to NaiveDateTime"
))?.to_string()
};
self.timestamp_string_to_sql(result)
self.handle_timestamp::<TimestampNanosecondType>(v, tz)
}
ScalarValue::TimestampNanosecond(None, _) => {
Ok(ast::Expr::Value(ast::Value::Null))
Expand Down Expand Up @@ -1079,6 +1064,7 @@ mod tests {

use arrow::datatypes::{Field, Schema};
use arrow_schema::DataType::Int8;

use datafusion_common::TableReference;
use datafusion_expr::{
case, col, cube, exists, grouping_set, interval_datetime_lit,
Expand Down Expand Up @@ -1246,6 +1232,17 @@ mod tests {
Expr::Literal(ScalarValue::Date32(Some(-1))),
r#"CAST('1969-12-31' AS DATE)"#,
),
(
Expr::Literal(ScalarValue::TimestampSecond(Some(10001), None)),
r#"CAST('1970-01-01 02:46:41' AS TIMESTAMP)"#,
),
(
Expr::Literal(ScalarValue::TimestampSecond(
Some(10001),
Some("+08:00".into()),
)),
r#"CAST('1970-01-01 10:46:41 +08:00' AS TIMESTAMP)"#,
),
(
Expr::Literal(ScalarValue::TimestampMillisecond(Some(10001), None)),
r#"CAST('1970-01-01 00:00:10.001' AS TIMESTAMP)"#,
Expand All @@ -1257,6 +1254,17 @@ mod tests {
)),
r#"CAST('1970-01-01 08:00:10.001 +08:00' AS TIMESTAMP)"#,
),
(
Expr::Literal(ScalarValue::TimestampMicrosecond(Some(10001), None)),
r#"CAST('1970-01-01 00:00:00.010001' AS TIMESTAMP)"#,
),
(
Expr::Literal(ScalarValue::TimestampMicrosecond(
Some(10001),
Some("+08:00".into()),
)),
r#"CAST('1970-01-01 08:00:00.010001 +08:00' AS TIMESTAMP)"#,
),
(
Expr::Literal(ScalarValue::TimestampNanosecond(Some(10001), None)),
r#"CAST('1970-01-01 00:00:00.000010001' AS TIMESTAMP)"#,
Expand Down

0 comments on commit 068778d

Please sign in to comment.