Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-609] Complete to_date expression support #622

Merged
merged 7 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-data-source/script/build_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ echo "ARROW_SOURCE_DIR=${ARROW_SOURCE_DIR}"
echo "ARROW_INSTALL_DIR=${ARROW_INSTALL_DIR}"
mkdir -p $ARROW_SOURCE_DIR
mkdir -p $ARROW_INSTALL_DIR
git clone https://github.com/oap-project/arrow.git --branch arrow-4.0.0-oap $ARROW_SOURCE_DIR
git clone https://github.com/philo-he/arrow.git --branch check_input $ARROW_SOURCE_DIR
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just removed.

pushd $ARROW_SOURCE_DIR

cmake ./cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

import scala.collection.mutable.ListBuffer

import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateDiff
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarDateSub
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarUnixTimestamp
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarFromUnixTime
import com.intel.oap.expression.ColumnarDateTimeExpressions.ColumnarGetTimestamp

/**
* A version of add that supports columnar processing for longs.
Expand Down Expand Up @@ -104,6 +106,10 @@ object ColumnarBinaryExpression {
new ColumnarDateDiff(left, right)
case a: UnixTimestamp =>
new ColumnarUnixTimestamp(left, right)
// To match GetTimestamp (a private class).
case _ if (original.isInstanceOf[ToTimestamp] && original.dataType == TimestampType) =>
// Convert a string to Timestamp. Default timezone is used.
new ColumnarGetTimestamp(left, right, None)
case a: FromUnixTime =>
new ColumnarFromUnixTime(left, right)
case d: DateSub =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import com.intel.oap.expression.ColumnarDateTimeExpressions.castDateFromTimestam
import com.intel.oap.expression.ColumnarDateTimeExpressions.unimplemented
import org.apache.arrow.gandiva.expression.TreeBuilder
import org.apache.arrow.gandiva.expression.TreeNode
import org.apache.arrow.vector.types.DateUnit
import org.apache.arrow.vector.types.{DateUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.expressions.CheckOverflow
import org.apache.spark.sql.catalyst.expressions.CurrentDate
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp
Expand All @@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.DayOfMonth
import org.apache.spark.sql.catalyst.expressions.DayOfWeek
import org.apache.spark.sql.catalyst.expressions.DayOfYear
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.FromUnixTime
import org.apache.spark.sql.catalyst.expressions.Hour
import org.apache.spark.sql.catalyst.expressions.MakeDate
import org.apache.spark.sql.catalyst.expressions.MakeTimestamp
Expand All @@ -45,15 +47,16 @@ import org.apache.spark.sql.catalyst.expressions.Month
import org.apache.spark.sql.catalyst.expressions.Now
import org.apache.spark.sql.catalyst.expressions.Second
import org.apache.spark.sql.catalyst.expressions.SecondsToTimestamp
import org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression
import org.apache.spark.sql.catalyst.expressions.ToTimestamp
import org.apache.spark.sql.catalyst.expressions.UnixDate
import org.apache.spark.sql.catalyst.expressions.UnixMicros
import org.apache.spark.sql.catalyst.expressions.UnixMillis
import org.apache.spark.sql.catalyst.expressions.UnixSeconds
import org.apache.spark.sql.catalyst.expressions.UnixTimestamp
import org.apache.spark.sql.catalyst.expressions.FromUnixTime
import org.apache.spark.sql.catalyst.expressions.Year
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType}
import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, TimestampType}
import org.apache.spark.sql.util.ArrowUtils

object ColumnarDateTimeExpressions {
Expand Down Expand Up @@ -495,6 +498,75 @@ object ColumnarDateTimeExpressions {
}
}

// The datatype is TimestampType.
// Internally, a timestamp is stored as the number of microseconds from unix epoch.
case class ColumnarGetTimestamp(leftChild: Expression,
rightChild: Expression,
timeZoneId: Option[String] = None)
extends ToTimestamp with ColumnarExpression {

override def left: Expression = leftChild
override def right : Expression = rightChild
override def canEqual(that: Any): Boolean = true
// The below functions are consistent with spark GetTimestamp.
override val downScaleFactor = 1
override def dataType: DataType = TimestampType
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def failOnError: Boolean = SQLConf.get.ansiEnabled

buildCheck()

def buildCheck(): Unit = {
val parserPolicy = SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY);
// TODO: support "exception" time parser policy.
if (!parserPolicy.equalsIgnoreCase("corrected")) {
throw new UnsupportedOperationException(
s"$parserPolicy is NOT a supported time parser policy");
}

val supportedTypes = List(StringType)
if (supportedTypes.indexOf(left.dataType) == -1) {
throw new UnsupportedOperationException(
s"${left.dataType} is not supported in ColumnarUnixTimestamp.")
}
if (left.dataType == StringType) {
right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
// TODO: support other format.
if (!format.equals("yyyy-MM-dd")) {
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarUnixTimestamp.")
}
case _ =>
}
}
}

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
// Use default timeZoneId. Give specific timeZoneId if needed in the future.
val outType = CodeGeneration.getResultType(TimestampType)
val (leftNode, leftType) = left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
// convert to milli, then convert to micro
val intermediateType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)

right match {
case literal: ColumnarLiteral =>
val format = literal.value.toString
if (format.equals("yyyy-MM-dd")) {
val funcNode = TreeBuilder.makeFunction("castTIMESTAMP_with_validation_check",
Lists.newArrayList(leftNode), intermediateType)
ConverterUtils.convertTimestampToMicro(funcNode, intermediateType)
} else {
// TODO: add other format support.
throw new UnsupportedOperationException(
s"$format is not supported in ColumnarUnixTimestamp.")
}
}
}
}

class ColumnarFromUnixTime(left: Expression, right: Expression)
extends FromUnixTime(left, right) with
ColumnarExpression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import java.util.Locale
import java.util.TimeZone

import com.intel.oap.execution.ColumnarConditionProjectExec

import org.apache.spark.SparkConf
import org.apache.spark.sql.ColumnarProjectExec
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.Row
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

class DateTimeSuite extends QueryTest with SharedSparkSession {
Expand Down Expand Up @@ -751,18 +751,21 @@ class DateTimeSuite extends QueryTest with SharedSparkSession {
}
}

// We already made a ColumnarGetTimestamp that extends the parent class of GetTimestamp.
test("datetime function - to_date with format") { // todo GetTimestamp IS PRIVATE ?
withTempView("dates") {

val dates = Seq("2009-07-30", "2009-07-31", "2009-08-01")
.map(s => Tuple1(s)).toDF("time")
dates.createOrReplaceTempView("dates")

val frame = sql("SELECT to_date(time, 'yyyy-MM-dd') FROM dates")
frame.explain()
frame.show()
assert(frame.queryExecution.executedPlan.find(p => p
.isInstanceOf[ColumnarConditionProjectExec]).isDefined)
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
val frame = sql("SELECT to_date(time, 'yyyy-MM-dd') FROM dates")
frame.explain()
frame.show()
assert(frame.queryExecution.executedPlan.find(p => p
.isInstanceOf[ColumnarConditionProjectExec]).isDefined)
}
}
}

Expand Down