Skip to content

Commit

Permalink
[SPARK-47501][SQL] Add convertDateToDate like the existing convertTim…
Browse files Browse the repository at this point in the history
…estampToTimestamp for JdbcDialect

### What changes were proposed in this pull request?

Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect

### Why are the changes needed?

The date '±infinity' values cause overflows like timestamp '±infinity' in #41843

### Does this PR introduce _any_ user-facing change?

fix expected overflow for dates to align with the timestamps of PostgreSQL

### How was this patch tested?
new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #45638 from yaooqinn/SPARK-47501.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Mar 21, 2024
1 parent 2b0e841 commit 32f3d4d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
"('-infinity', ARRAY[TIMESTAMP '-infinity'])")
.executeUpdate()

conn.prepareStatement("CREATE TABLE infinity_dates" +
"(id SERIAL PRIMARY KEY, date_column DATE, date_array DATE[])")
.executeUpdate()
conn.prepareStatement("INSERT INTO infinity_dates (date_column, date_array)" +
" VALUES ('infinity', ARRAY[DATE 'infinity']), " +
"('-infinity', ARRAY[DATE '-infinity'])")
.executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
"type not_null_text)").executeUpdate()
Expand Down Expand Up @@ -462,6 +470,22 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(negativeInfinitySeq.head.getTime == minTimeStamp)
}

test("SPARK-47501: infinity date test") {
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_dates", new Properties)
val row = df.collect()

assert(row.length == 2)
val infinity = row(0).getDate(1)
val negativeInfinity = row(1).getDate(1)
val infinitySeq = row(0).getAs[scala.collection.Seq[Date]]("date_array")
val negativeInfinitySeq = row(1).getAs[scala.collection.Seq[Date]]("date_array")
val minDate = -62135654400000L
val maxDate = 253402156800000L
assert(infinity.getTime == maxDate)
assert(negativeInfinity.getTime == minDate)
assert(infinitySeq.head.getTime == maxDate)
assert(negativeInfinitySeq.head.getTime == minDate)
}

test("SPARK-47407: Support java.sql.Types.NULL for NullType") {
val df = spark.read.format("jdbc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos + 1)
if (dateVal != null) {
row.setInt(pos, fromJavaDate(dateVal))
row.setInt(pos, fromJavaDate(dialect.convertDateToDate(dateVal)))
} else {
row.update(pos, null)
}
Expand Down Expand Up @@ -526,7 +526,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
case StringType =>
arrayConverter[Object]((obj: Object) => UTF8String.fromString(obj.toString))

case DateType => arrayConverter[Date](fromJavaDate)
case DateType => arrayConverter[Date] {
(d: Date) => fromJavaDate(dialect.convertDateToDate(d))
}

case dt: DecimalType =>
arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, dt.scale))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ abstract class JdbcDialect extends Serializable with Logging {
@Since("3.5.0")
def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t

/**
* Converts an instance of `java.sql.Date` to a custom `java.sql.Date` value.
* @param d the date value returned from JDBC ResultSet getDate method.
* @return the date value after conversion
*/
@Since("4.0.0")
def convertDateToDate(d: Date): Date = d

/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the
* value stored in a remote database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, SQLException, Timestamp, Types}
import java.sql.{Connection, Date, SQLException, Timestamp, Types}
import java.time.{LocalDateTime, ZoneOffset}
import java.util
import java.util.Locale
Expand Down Expand Up @@ -307,24 +307,23 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L

val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
val maxTimestamp =
LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999).toInstant(ZoneOffset.UTC).toEpochMilli

val time = t.getTime
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) {
new Timestamp(maxTimestamp)
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) {
new Timestamp(minTimeStamp)
} else {
t
t.getTime match {
case 9223372036825200000L =>
new Timestamp(LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999)
.toInstant(ZoneOffset.UTC).toEpochMilli)
case -9223372036832400000L =>
new Timestamp(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case _ => t
}
}

override def convertDateToDate(d: Date): Date = {
d.getTime match {
case 9223372036825200000L =>
new Date(LocalDateTime.of(9999, 12, 31, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case -9223372036832400000L =>
new Date(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case _ => d
}
}
}

0 comments on commit 32f3d4d

Please sign in to comment.