From 32f3d4dc03892f23bc073205335ad0d4174c213a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 21 Mar 2024 07:48:41 -0700 Subject: [PATCH] [SPARK-47501][SQL] Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect ### Why are the changes needed? The date '±infinity' values cause overflows like timestamp '±infinity' in #41843 ### Does this PR introduce _any_ user-facing change? fix expected overflow for dates to align with the timestamps of PostgreSQL ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45638 from yaooqinn/SPARK-47501. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../sql/jdbc/PostgresIntegrationSuite.scala | 24 ++++++++++++ .../datasources/jdbc/JdbcUtils.scala | 6 ++- .../apache/spark/sql/jdbc/JdbcDialects.scala | 8 ++++ .../spark/sql/jdbc/PostgresDialect.scala | 37 +++++++++---------- 4 files changed, 54 insertions(+), 21 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 8d137ba88cb1e..a47e834a4b3c8 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -155,6 +155,14 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { "('-infinity', ARRAY[TIMESTAMP '-infinity'])") .executeUpdate() + conn.prepareStatement("CREATE TABLE infinity_dates" + + "(id SERIAL PRIMARY KEY, date_column DATE, date_array DATE[])") + .executeUpdate() + conn.prepareStatement("INSERT INTO infinity_dates (date_column, date_array)" + + " VALUES ('infinity', ARRAY[DATE 'infinity']), " + + "('-infinity', ARRAY[DATE '-infinity'])") + .executeUpdate() + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + "type not_null_text)").executeUpdate() @@ -462,6 +470,22 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(negativeInfinitySeq.head.getTime == minTimeStamp) } + test("SPARK-47501: infinity date test") { + val df = sqlContext.read.jdbc(jdbcUrl, "infinity_dates", new Properties) + val row = df.collect() + + assert(row.length == 2) + val infinity = row(0).getDate(1) + val negativeInfinity = row(1).getDate(1) + val infinitySeq = row(0).getAs[scala.collection.Seq[Date]]("date_array") + val negativeInfinitySeq = row(1).getAs[scala.collection.Seq[Date]]("date_array") + val minDate = -62135654400000L + val maxDate = 253402156800000L + assert(infinity.getTime == maxDate) + assert(negativeInfinity.getTime == minDate) + assert(infinitySeq.head.getTime == maxDate) + assert(negativeInfinitySeq.head.getTime == minDate) + } test("SPARK-47407: Support java.sql.Types.NULL for NullType") { val df = spark.read.format("jdbc") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 84d87f0082178..70fd9bd071e97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -404,7 +404,7 @@ object JdbcUtils extends Logging with SQLConfHelper { // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. val dateVal = rs.getDate(pos + 1) if (dateVal != null) { - row.setInt(pos, fromJavaDate(dateVal)) + row.setInt(pos, fromJavaDate(dialect.convertDateToDate(dateVal))) } else { row.update(pos, null) } @@ -526,7 +526,9 @@ object JdbcUtils extends Logging with SQLConfHelper { case StringType => arrayConverter[Object]((obj: Object) => UTF8String.fromString(obj.toString)) - case DateType => arrayConverter[Date](fromJavaDate) + case DateType => arrayConverter[Date] { + (d: Date) => fromJavaDate(dialect.convertDateToDate(d)) + } case dt: DecimalType => arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, dt.scale)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 845161c81ea54..ff23afbc3125a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -156,6 +156,14 @@ abstract class JdbcDialect extends Serializable with Logging { @Since("3.5.0") def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t + /** + * Converts an instance of `java.sql.Date` to a custom `java.sql.Date` value. + * @param d the date value returned from JDBC ResultSet getDate method. + * @return the date value after conversion + */ + @Since("4.0.0") + def convertDateToDate(d: Date): Date = d + /** * Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the * value stored in a remote database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 5c949b28ba7c0..d8692a055ffe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, SQLException, Timestamp, Types} +import java.sql.{Connection, Date, SQLException, Timestamp, Types} import java.time.{LocalDateTime, ZoneOffset} import java.util import java.util.Locale @@ -307,24 +307,23 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = { // Variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html - val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L - val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L - val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L - val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L - - val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli - val maxTimestamp = - LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999).toInstant(ZoneOffset.UTC).toEpochMilli - - val time = t.getTime - if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || - time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { - new Timestamp(maxTimestamp) - } else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || - time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { - new Timestamp(minTimeStamp) - } else { - t + t.getTime match { + case 9223372036825200000L => + new Timestamp(LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999) + .toInstant(ZoneOffset.UTC).toEpochMilli) + case -9223372036832400000L => + new Timestamp(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli) + case _ => t + } + } + + override def convertDateToDate(d: Date): Date = { + d.getTime match { + case 9223372036825200000L => + new Date(LocalDateTime.of(9999, 12, 31, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli) + case -9223372036832400000L => + new Date(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli) + case _ => d } } }