Skip to content

Commit

Permalink
[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect…
Browse files Browse the repository at this point in the history
… API

### What changes were proposed in this pull request?

This PR fixes an overflow issue upon reading an "infinity" timestamp from PostgreSQL. It addresses the issue by adding a new function, convertJavaTimestampToTimestamp, to the JDBCDialects API, and overrides it in PostgresDialect.scala by clamping the special "infinity" timestamps to long.max and long.min.

### Why are the changes needed?

The pre-existing default behavior of timestamp conversion potentially triggers an overflow due to these special values (i.e. The executor would crash if you select a column that contains infinity timestamps in PostgreSQL.) By integrating this new function, we can mitigate such issues, enabling more versatile and robust timestamp value conversions across various JDBC-based connectors.

### Does this PR introduce _any_ user-facing change?

A new function, convertJavaTimestampToTimestamp, is added to the JDBCDialects API to allow JDBC dialects to override the default behavior of converting Java timestamps.

### How was this patch tested?

An integration test was added in PostgresIntegrationSuite.scala to verify it can handle +infinity and -infinity timestamps in PostgreSQL.

Closes apache#41843 from mingkangli-db/SPARK-44280.

Authored-by: Mingkang Li <mingkang.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
mingkangli-db authored and ragnarok56 committed Mar 2, 2024
1 parent 0b5bb10 commit 4a6262e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Connection, Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Properties

import org.apache.spark.sql.Column
Expand Down Expand Up @@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
|('2013-04-05 18:01:02.123'),
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()

conn.prepareStatement("CREATE TABLE infinity_timestamp" +
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
" VALUES ('infinity'), ('-infinity');").executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
"type not_null_text)").executeUpdate()
Expand Down Expand Up @@ -432,4 +437,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa"))
assert(row(0).getString(1) == "fdasfasdf")
}

test("SPARK-44280: infinity timestamp test") {
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties)
val row = df.collect()

assert(row.length == 2)
val infinity = row(0).getAs[Timestamp]("timestamp_column")
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)

assert(infinity.getTime == maxTimestamp)
assert(negativeInfinity.getTime == minTimeStamp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
row.setLong(pos, DateTimeUtils.
fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t)))
} else {
row.update(pos, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,21 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None

/**
* Converts an instance of `java.sql.Timestamp` to a custom `java.sql.Timestamp` value.
* @param t represents a specific instant in time based on
* the hybrid calendar which combines Julian and
* Gregorian calendars.
* @return the timestamp value to convert to
* @throws IllegalArgumentException if t is null
*/
@Since("3.5.0")
def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t

/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the
* value stored in a remote database.
* JDBC dialects should override this function to provide implementations that suite their
* JDBC dialects should override this function to provide implementations that suit their
* JDBC drivers.
* @param t Timestamp returned from JDBC driver getTimestamp method.
* @return A LocalDateTime representing the same wall clock time as the timestamp in database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.jdbc

import java.sql.{Connection, SQLException, Timestamp, Types}
import java.time.LocalDateTime
import java.time.{LocalDateTime, ZoneOffset}
import java.util
import java.util.Locale

Expand Down Expand Up @@ -281,4 +281,34 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
}
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}"
}

/**
* java.sql timestamps are measured with millisecond accuracy (from Long.MinValue
* milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured
* at microseconds accuracy. For the "infinity values" in PostgreSQL (represented by
* big constants), we need clamp them to avoid overflow. If it is not one of the infinity
* values, fall back to default behavior.
*/
override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L

val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)

val time = t.getTime
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) {
new Timestamp(maxTimestamp)
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) {
new Timestamp(minTimeStamp)
} else {
t
}
}
}

0 comments on commit 4a6262e

Please sign in to comment.