diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index d993d6ddd..b24595313 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -365,6 +365,13 @@ object CometConf { .booleanConf .createWithDefault(false) + val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf( + "spark.comet.cast.stringToTimestamp") + .doc( + "Comet is not currently fully compatible with Spark when casting from String to Timestamp.") + .booleanConf + .createWithDefault(false) + } object ConfigHelpers { diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index d817ba5b6..b4b4c92eb 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,3 +38,9 @@ Comet currently delegates to Apache DataFusion for most cast operations, and thi guaranteed to be consistent with Spark. There is an [epic](https://github.com/apache/datafusion-comet/issues/286) where we are tracking the work to implement Spark-compatible cast expressions. + +### Cast from String to Timestamp + +Casting from String to Timestamp is disabled by default due to incompatibilities with Spark, including timezone +issues, and can be enabled by setting `spark.comet.castStringToTimestamp=true`. See the +[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for more information. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 57b15e2f5..e1e7a7117 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} @@ -584,7 +585,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY evalMode.toString } - castToProto(timeZoneId, dt, childExpr, evalModeStr) + val supportedCast = (child.dataType, dt) match { + case (DataTypes.StringType, DataTypes.TimestampType) + if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() => + // https://github.com/apache/datafusion-comet/issues/328 + withInfo(expr, s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled") + false + case _ => true + } + if (supportedCast) { + castToProto(timeZoneId, dt, childExpr, evalModeStr) + } else { + // no need to call withInfo here since it was called when determining + // the value for `supportedCast` + None + } } else { withInfo(expr, child) None diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8abd24598..669a85559 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -98,12 +98,22 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } ignore("cast string to date") { - castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DoubleType) + castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType) + } + + test("cast string to timestamp disabled by default") { + val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a") + castFallbackTest( + values.toDF("a"), + DataTypes.TimestampType, + "spark.comet.cast.stringToTimestamp is disabled") } ignore("cast string to timestamp") { - val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8) - castTest(values.toDF("a"), DataTypes.DoubleType) + withSQLConf((CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true")) { + val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8) + castTest(values.toDF("a"), DataTypes.TimestampType) + } } private def generateFloats(): DataFrame = { @@ -126,6 +136,24 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { Range(0, dataSize).map(_ => generateString(r, chars, maxLen)) } + private def castFallbackTest( + input: DataFrame, + toType: DataType, + expectedMessage: String): Unit = { + withTempPath { dir => + val data = roundtripParquet(input, dir).coalesce(1) + data.createOrReplaceTempView("t") + + withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) { + val df = data.withColumn("converted", col("a").cast(toType)) + df.collect() + val str = + new ExtendedExplainInfo().generateExtendedInfo(df.queryExecution.executedPlan) + assert(str.contains(expectedMessage)) + } + } + } + private def castTest(input: DataFrame, toType: DataType): Unit = { withTempPath { dir => val data = roundtripParquet(input, dir).coalesce(1)