From dfd86a8725b436f304dff81fd19bf6dac2907e59 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 14 Dec 2021 15:23:41 +0800 Subject: [PATCH] [NSE-609] Complete to_date expression support (#622) * Initial commit * Remove useless code * Call gandiva func that can check validation of input * Change arrow branch for test [revert this commit at last] * Fallback if time parser policy is not 'corrected' * Fix a unit test failure * Revert "Change arrow branch for test [revert this commit at last]" This reverts commit d2bf64c0d5415d45efed9b15600a96ebceaccdc5. --- .../expression/ColumnarBinaryExpression.scala | 6 ++ .../ColumnarDateTimeExpressions.scala | 78 ++++++++++++++++++- .../com/intel/oap/misc/DateTimeSuite.scala | 15 ++-- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala index c97a7ce73..897df4e03 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryExpression.scala @@ -30,12 +30,14 @@ import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ + import scala.collection.mutable.ListBuffer import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateDiff import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateSub import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarFromUnixTime +import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarGetTimestamp /** * A version of add that supports columnar processing for longs. @@ -104,6 +106,10 @@ object ColumnarBinaryExpression { new ColumnarDateDiff(left, right) case a: UnixTimestamp => new ColumnarUnixTimestamp(left, right) + // To match GetTimestamp (a private class). + case _ if (original.isInstanceOf[ToTimestamp] && original.dataType == TimestampType) => + // Convert a string to Timestamp. Default timezone is used. + new ColumnarGetTimestamp(left, right, None) case a: FromUnixTime => new ColumnarFromUnixTime(left, right) case d: DateSub => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala index dfae74549..4f4884654 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarDateTimeExpressions.scala @@ -24,8 +24,9 @@ import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestam import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.gandiva.expression.TreeNode -import org.apache.arrow.vector.types.DateUnit +import org.apache.arrow.vector.types.{DateUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.ArrowType + import org.apache.spark.sql.catalyst.expressions.CheckOverflow import org.apache.spark.sql.catalyst.expressions.CurrentDate import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp @@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.DayOfMonth import org.apache.spark.sql.catalyst.expressions.DayOfWeek import org.apache.spark.sql.catalyst.expressions.DayOfYear import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.FromUnixTime import org.apache.spark.sql.catalyst.expressions.Hour import org.apache.spark.sql.catalyst.expressions.MakeDate import org.apache.spark.sql.catalyst.expressions.MakeTimestamp @@ -45,15 +47,16 @@ import org.apache.spark.sql.catalyst.expressions.Month import org.apache.spark.sql.catalyst.expressions.Now import org.apache.spark.sql.catalyst.expressions.Second import org.apache.spark.sql.catalyst.expressions.SecondsToTimestamp +import org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression +import org.apache.spark.sql.catalyst.expressions.ToTimestamp import org.apache.spark.sql.catalyst.expressions.UnixDate import org.apache.spark.sql.catalyst.expressions.UnixMicros import org.apache.spark.sql.catalyst.expressions.UnixMillis import org.apache.spark.sql.catalyst.expressions.UnixSeconds import org.apache.spark.sql.catalyst.expressions.UnixTimestamp -import org.apache.spark.sql.catalyst.expressions.FromUnixTime import org.apache.spark.sql.catalyst.expressions.Year import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType} +import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType} import org.apache.spark.sql.util.ArrowUtils object ColumnarDateTimeExpressions { @@ -495,6 +498,75 @@ object ColumnarDateTimeExpressions { } } + // The datatype is TimestampType. + // Internally, a timestamp is stored as the number of microseconds from unix epoch. + case class ColumnarGetTimestamp(leftChild: Expression, + rightChild: Expression, + timeZoneId: Option[String] = None) + extends ToTimestamp with ColumnarExpression { + + override def left: Expression = leftChild + override def right : Expression = rightChild + override def canEqual(that: Any): Boolean = true + // The below functions are consistent with spark GetTimestamp. + override val downScaleFactor = 1 + override def dataType: DataType = TimestampType + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def failOnError: Boolean = SQLConf.get.ansiEnabled + + buildCheck() + + def buildCheck(): Unit = { + val parserPolicy = SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY); + // TODO: support "exception" time parser policy. + if (!parserPolicy.equalsIgnoreCase("corrected")) { + throw new UnsupportedOperationException( + s"$parserPolicy is NOT a supported time parser policy"); + } + + val supportedTypes = List(StringType) + if (supportedTypes.indexOf(left.dataType) == -1) { + throw new UnsupportedOperationException( + s"${left.dataType} is not supported in ColumnarUnixTimestamp.") + } + if (left.dataType == StringType) { + right match { + case literal: ColumnarLiteral => + val format = literal.value.toString + // TODO: support other format. + if (!format.equals("yyyy-MM-dd")) { + throw new UnsupportedOperationException( + s"$format is not supported in ColumnarUnixTimestamp.") + } + case _ => + } + } + } + + override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = { + // Use default timeZoneId. Give specific timeZoneId if needed in the future. + val outType = CodeGeneration.getResultType(TimestampType) + val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args) + // convert to milli, then convert to micro + val intermediateType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null) + + right match { + case literal: ColumnarLiteral => + val format = literal.value.toString + if (format.equals("yyyy-MM-dd")) { + val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check", + Lists.newArrayList(leftNode), intermediateType) + ConverterUtils.convertTimestampToMicro(funcNode, intermediateType) + } else { + // TODO: add other format support. + throw new UnsupportedOperationException( + s"$format is not supported in ColumnarUnixTimestamp.") + } + } + } + } + class ColumnarFromUnixTime(left: Expression, right: Expression) extends FromUnixTime(left, right) with ColumnarExpression { diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala index abe99bb3b..20678943a 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala @@ -23,11 +23,11 @@ import java.util.Locale import java.util.TimeZone import com.intel.oap.execution.ColumnarConditionProjectExec - import org.apache.spark.SparkConf import org.apache.spark.sql.ColumnarProjectExec import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession class DateTimeSuite extends QueryTest with SharedSparkSession { @@ -751,6 +751,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { } } + // We already made a ColumnarGetTimestamp that extends the parent class of GetTimestamp. test("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ? withTempView("dates") { @@ -758,11 +759,13 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { .map(s => Tuple1(s)).toDF("time") dates.createOrReplaceTempView("dates") - val frame = sql("SELECT to_date(time, 'yyyy-MM-dd') FROM dates") - frame.explain() - frame.show() - assert(frame.queryExecution.executedPlan.find(p => p - .isInstanceOf[ColumnarConditionProjectExec]).isDefined) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + val frame = sql("SELECT to_date(time, 'yyyy-MM-dd') FROM dates") + frame.explain() + frame.show() + assert(frame.queryExecution.executedPlan.find(p => p + .isInstanceOf[ColumnarConditionProjectExec]).isDefined) + } } }