Skip to content

Commit

Permalink
feat: Disable cast string to timestamp by default (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Apr 29, 2024
1 parent a340748 commit e90d484
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf(
"spark.comet.cast.stringToTimestamp")
.doc(
"Comet is not currently fully compatible with Spark when casting from String to Timestamp.")
.booleanConf
.createWithDefault(false)

}

object ConfigHelpers {
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ Comet currently delegates to Apache DataFusion for most cast operations, and thi
guaranteed to be consistent with Spark.

There is an [epic](https://github.com/apache/datafusion-comet/issues/286) where we are tracking the work to implement Spark-compatible cast expressions.

### Cast from String to Timestamp

Casting from String to Timestamp is disabled by default due to incompatibilities with Spark, including timezone
issues, and can be enabled by setting `spark.comet.castStringToTimestamp=true`. See the
[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for more information.
17 changes: 16 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo}
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo}
Expand Down Expand Up @@ -584,7 +585,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
evalMode.toString
}
castToProto(timeZoneId, dt, childExpr, evalModeStr)
val supportedCast = (child.dataType, dt) match {
case (DataTypes.StringType, DataTypes.TimestampType)
if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() =>
// https://github.com/apache/datafusion-comet/issues/328
withInfo(expr, s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled")
false
case _ => true
}
if (supportedCast) {
castToProto(timeZoneId, dt, childExpr, evalModeStr)
} else {
// no need to call withInfo here since it was called when determining
// the value for `supportedCast`
None
}
} else {
withInfo(expr, child)
None
Expand Down
34 changes: 31 additions & 3 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,22 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}

ignore("cast string to date") {
castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DoubleType)
castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType)
}

test("cast string to timestamp disabled by default") {
val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a")
castFallbackTest(
values.toDF("a"),
DataTypes.TimestampType,
"spark.comet.cast.stringToTimestamp is disabled")
}

ignore("cast string to timestamp") {
val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8)
castTest(values.toDF("a"), DataTypes.DoubleType)
withSQLConf((CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true")) {
val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8)
castTest(values.toDF("a"), DataTypes.TimestampType)
}
}

private def generateFloats(): DataFrame = {
Expand All @@ -126,6 +136,24 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Range(0, dataSize).map(_ => generateString(r, chars, maxLen))
}

private def castFallbackTest(
input: DataFrame,
toType: DataType,
expectedMessage: String): Unit = {
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
data.createOrReplaceTempView("t")

withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) {
val df = data.withColumn("converted", col("a").cast(toType))
df.collect()
val str =
new ExtendedExplainInfo().generateExtendedInfo(df.queryExecution.executedPlan)
assert(str.contains(expectedMessage))
}
}
}

private def castTest(input: DataFrame, toType: DataType): Unit = {
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
Expand Down

0 comments on commit e90d484

Please sign in to comment.