Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47501][SQL] Add convertDateToDate like the existing convertTimestampToTimestamp for JdbcDialect #45638

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
"('-infinity', ARRAY[TIMESTAMP '-infinity'])")
.executeUpdate()

conn.prepareStatement("CREATE TABLE infinity_dates" +
"(id SERIAL PRIMARY KEY, date_column DATE, date_array DATE[])")
.executeUpdate()
conn.prepareStatement("INSERT INTO infinity_dates (date_column, date_array)" +
" VALUES ('infinity', ARRAY[DATE 'infinity']), " +
"('-infinity', ARRAY[DATE '-infinity'])")
.executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
"type not_null_text)").executeUpdate()
Expand Down Expand Up @@ -462,6 +470,22 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(negativeInfinitySeq.head.getTime == minTimeStamp)
}

test("SPARK-47501: infinity date test") {
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_dates", new Properties)
val row = df.collect()

assert(row.length == 2)
val infinity = row(0).getDate(1)
val negativeInfinity = row(1).getDate(1)
val infinitySeq = row(0).getAs[scala.collection.Seq[Date]]("date_array")
val negativeInfinitySeq = row(1).getAs[scala.collection.Seq[Date]]("date_array")
val minDate = -62135654400000L
val maxDate = 253402156800000L
assert(infinity.getTime == maxDate)
assert(negativeInfinity.getTime == minDate)
assert(infinitySeq.head.getTime == maxDate)
assert(negativeInfinitySeq.head.getTime == minDate)
}

test("SPARK-47407: Support java.sql.Types.NULL for NullType") {
val df = spark.read.format("jdbc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos + 1)
if (dateVal != null) {
row.setInt(pos, fromJavaDate(dateVal))
row.setInt(pos, fromJavaDate(dialect.convertDateToDate(dateVal)))
} else {
row.update(pos, null)
}
Expand Down Expand Up @@ -526,7 +526,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
case StringType =>
arrayConverter[Object]((obj: Object) => UTF8String.fromString(obj.toString))

case DateType => arrayConverter[Date](fromJavaDate)
case DateType => arrayConverter[Date] {
(d: Date) => fromJavaDate(dialect.convertDateToDate(d))
}

case dt: DecimalType =>
arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, dt.scale))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ abstract class JdbcDialect extends Serializable with Logging {
@Since("3.5.0")
def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t

/**
* Converts an instance of `java.sql.Date` to a custom `java.sql.Date` value.
* @param d the date value returned from JDBC ResultSet getDate method.
* @return the date value after conversion
*/
@Since("4.0.0")
def convertDateToDate(d: Date): Date = d
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for name consistency, should this be convertJavaDateToDate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks. +1 for the above new name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for being so careful. And sorry for my carelessness

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix my mistake, I sent #45656. Please help review when you have some time


/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the
* value stored in a remote database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, SQLException, Timestamp, Types}
import java.sql.{Connection, Date, SQLException, Timestamp, Types}
import java.time.{LocalDateTime, ZoneOffset}
import java.util
import java.util.Locale
Expand Down Expand Up @@ -307,24 +307,23 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*_SMALLER_INFINITYs are for PgResultSet to identify date infinities internally, they are supplanted with the larger abs-values to clients

val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L

val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
val maxTimestamp =
LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999).toInstant(ZoneOffset.UTC).toEpochMilli

val time = t.getTime
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) {
new Timestamp(maxTimestamp)
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) {
new Timestamp(minTimeStamp)
} else {
t
t.getTime match {
case 9223372036825200000L =>
new Timestamp(LocalDateTime.of(9999, 12, 31, 23, 59, 59, 999999999)
.toInstant(ZoneOffset.UTC).toEpochMilli)
case -9223372036832400000L =>
new Timestamp(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case _ => t
}
}

override def convertDateToDate(d: Date): Date = {
d.getTime match {
case 9223372036825200000L =>
new Date(LocalDateTime.of(9999, 12, 31, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case -9223372036832400000L =>
new Date(LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli)
case _ => d
}
}
}