diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index c539452bb9ae0..90d6f6ae2fbfc 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc import java.math.{BigDecimal => JBigDecimal} import java.sql.{Connection, Date, Timestamp} import java.text.SimpleDateFormat -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import java.util.Properties import org.apache.spark.sql.Column @@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { |('2013-04-05 18:01:02.123'), |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() + conn.prepareStatement("CREATE TABLE infinity_timestamp" + + "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate() + conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" + + " VALUES ('infinity'), ('-infinity');").executeUpdate() + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + "type not_null_text)").executeUpdate() @@ -432,4 +437,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa")) assert(row(0).getString(1) == "fdasfasdf") } + + test("SPARK-44280: infinity timestamp test") { + val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties) + val row = df.collect() + + assert(row.length == 2) + val infinity = row(0).getAs[Timestamp]("timestamp_column") + val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") + val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) + val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) + + assert(infinity.getTime == maxTimestamp) + assert(negativeInfinity.getTime == minTimeStamp) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index daeecaeb90a52..448ef220829f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -483,7 +483,8 @@ object JdbcUtils extends Logging with SQLConfHelper { (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { - row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) + row.setLong(pos, DateTimeUtils. + fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))) } else { row.update(pos, null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 93a311be2f867..fac3cc60d952a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -105,10 +105,21 @@ abstract class JdbcDialect extends Serializable with Logging { */ def getJDBCType(dt: DataType): Option[JdbcType] = None + /** + * Converts an instance of `java.sql.Timestamp` to a custom `java.sql.Timestamp` value. + * @param t represents a specific instant in time based on + * the hybrid calendar which combines Julian and + * Gregorian calendars. + * @return the timestamp value to convert to + * @throws IllegalArgumentException if t is null + */ + @Since("3.5.0") + def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t + /** * Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the * value stored in a remote database. - * JDBC dialects should override this function to provide implementations that suite their + * JDBC dialects should override this function to provide implementations that suit their * JDBC drivers. * @param t Timestamp returned from JDBC driver getTimestamp method. * @return A LocalDateTime representing the same wall clock time as the timestamp in database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index ab8b1a7e1a50d..9c1ca2cb913e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, SQLException, Timestamp, Types} -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import java.util import java.util.Locale @@ -281,4 +281,34 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" } + + /** + * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue + * milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured + * at microseconds accuracy. For the "infinity values" in PostgreSQL (represented by + * big constants), we need clamp them to avoid overflow. If it is not one of the infinity + * values, fall back to default behavior. + */ + override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = { + // Variable names come from PostgreSQL "constant field docs": + // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html + val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L + val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L + val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L + val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L + + val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) + val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) + + val time = t.getTime + if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || + time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { + new Timestamp(maxTimestamp) + } else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || + time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { + new Timestamp(minTimeStamp) + } else { + t + } + } }