diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index c7b8737b7a753..b06ca71b04ecc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -54,7 +54,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { private def run(cardinality: Int, func: String): Unit = { codegenBenchmark(s"$func of timestamp", cardinality) { - doBenchmark(cardinality, s"$func(cast(id as timestamp))") + doBenchmark(cardinality, s"$func(timestamp_seconds(id))") } } @@ -64,7 +64,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { val N = 10000000 runBenchmark("datetime +/- interval") { val benchmark = new Benchmark("datetime +/- interval", N, output = output) - val ts = "cast(id as timestamp)" + val ts = "timestamp_seconds(id)" val dt = s"cast($ts as date)" benchmark.addCase("date + interval(m)") { _ => doBenchmark(N, s"$dt + interval 1 month") @@ -105,7 +105,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { benchmark.run() } runBenchmark("Extract components") { - run(N, "cast to timestamp", "cast(id as timestamp)") + run(N, "cast to timestamp", "timestamp_seconds(id)") run(N, "year") run(N, "quarter") run(N, "month") @@ -124,7 +124,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "current_timestamp", "current_timestamp") } runBenchmark("Date arithmetic") { - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" run(N, "cast to date", dateExpr) run(N, "last_day", s"last_day($dateExpr)") run(N, "next_day", s"next_day($dateExpr, 'TU')") @@ -133,31 +133,31 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "add_months", s"add_months($dateExpr, 10)") } runBenchmark("Formatting dates") { - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" run(N, "format date", s"date_format($dateExpr, 'MMM yyyy')") } runBenchmark("Formatting timestamps") { run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')") } runBenchmark("Convert timestamps") { - val timestampExpr = "cast(id as timestamp)" + val timestampExpr = "timestamp_seconds(id)" run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") } runBenchmark("Intervals") { - val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)") + val (start, end) = ("timestamp_seconds(id)", "timestamp_seconds(id+8640000)") run(N, "cast interval", start, end) run(N, "datediff", s"datediff($start, $end)") run(N, "months_between", s"months_between($start, $end)") run(1000000, "window", s"window($start, 100, 10, 1)") } runBenchmark("Truncation") { - val timestampExpr = "cast(id as timestamp)" + val timestampExpr = "timestamp_seconds(id)" Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE", "SECOND", "WEEK", "QUARTER").foreach { level => run(N, s"date_trunc $level", s"date_trunc('$level', $timestampExpr)") } - val dateExpr = "cast(cast(id as timestamp) as date)" + val dateExpr = "cast(timestamp_seconds(id) as date)" Seq("year", "yyyy", "yy", "mon", "month", "mm").foreach { level => run(N, s"trunc $level", s"trunc('$level', $dateExpr)") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala index 287854dc3646c..8372698fb47ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ExtractBenchmark.scala @@ -59,10 +59,10 @@ object ExtractBenchmark extends SqlBasedBenchmark { } private def castExpr(from: String): String = from match { - case "timestamp" => "cast(id as timestamp)" - case "date" => "cast(cast(id as timestamp) as date)" - case "interval" => "(cast(cast(id as timestamp) as date) - date'0001-01-01') + " + - "(cast(id as timestamp) - timestamp'1000-01-01 01:02:03.123456')" + case "timestamp" => "timestamp_seconds(id)" + case "date" => "cast(timestamp_seconds(id) as date)" + case "interval" => "(cast(timestamp_seconds(id) as date) - date'0001-01-01') + " + + "(timestamp_seconds(id) - timestamp'1000-01-01 01:02:03.123456')" case other => throw new IllegalArgumentException( s"Unsupported column type $other. Valid column types are 'timestamp' and 'date'") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index b3f65d40ad95b..9ade8b14f59b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -24,7 +24,7 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType} @@ -332,11 +332,11 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) { val columns = (1 to width).map(i => s"CAST(id AS string) c$i") val df = spark.range(numRows).selectExpr(columns: _*) - .withColumn("value", monotonically_increasing_id().cast(TimestampType)) + .withColumn("value", timestamp_seconds(monotonically_increasing_id())) withTempTable("orcTable", "parquetTable") { saveAsTable(df, dir) - Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr => + Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr => val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)" .replace("value AND value", "value") filterPushDownBenchmark(numRows, title, whereExpr) @@ -348,8 +348,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { filterPushDownBenchmark( numRows, s"Select $percent% timestamp stored as $fileType rows " + - s"(value < CAST(${numRows * percent / 100} AS timestamp))", - s"value < CAST(${numRows * percent / 100} as timestamp)", + s"(value < timestamp_seconds(${numRows * percent / 100}))", + s"value < timestamp_seconds(${numRows * percent / 100})", selectExpr ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala index caf3387875813..704227e4b4db4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/InExpressionBenchmark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.{array, struct} +import org.apache.spark.sql.functions.{array, struct, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -128,15 +128,15 @@ object InExpressionBenchmark extends SqlBasedBenchmark { private def runTimestampBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { val name = s"$numItems timestamps" - val values = (1 to numItems).map(m => s"CAST('1970-01-01 01:00:00.$m' AS timestamp)") - val df = spark.range(0, numRows).select($"id".cast(TimestampType)) + val values = (1 to numItems).map(m => s"timestamp'1970-01-01 01:00:00.$m'") + val df = spark.range(0, numRows).select(timestamp_seconds($"id").as("id")) runBenchmark(name, df, values, numRows, minNumIters) } private def runDateBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = { val name = s"$numItems dates" - val values = (1 to numItems).map(n => 1970 + n).map(y => s"CAST('$y-01-01' AS date)") - val df = spark.range(0, numRows).select($"id".cast(TimestampType).cast(DateType)) + val values = (1 to numItems).map(n => 1970 + n).map(y => s"date'$y-01-01'") + val df = spark.range(0, numRows).select(timestamp_seconds($"id").cast(DateType).as("id")) runBenchmark(name, df, values, numRows, minNumIters) }