Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect API #41843

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bdb2be3
add support for fromJavaTimeStamptoTimeStamp
mingkangli-db Jul 4, 2023
2ed6130
retrigger workflow
mingkangli-db Jul 4, 2023
6f9c93a
remove requires in fromJavaTimestampToTimestamp
mingkangli-db Jul 6, 2023
adf23ce
Update PostgresDialect.scala
mingkangli-db Jul 6, 2023
9445d37
Update PostgresDialect.scala
mingkangli-db Jul 6, 2023
2e05df7
further clarify why clamping is needed in PostgresDialect.scala
mingkangli-db Jul 7, 2023
f2f77e8
fix style
mingkangli-db Jul 7, 2023
a2da4ff
Update PostgresDialect.scala
mingkangli-db Jul 9, 2023
304de8f
Update PostgresDialect.scala
mingkangli-db Jul 10, 2023
f29b4de
fix formatting
mingkangli-db Jul 18, 2023
d287b29
Merge branch 'master' into SPARK-44280
mingkangli-db Jul 18, 2023
6041a43
format once more
mingkangli-db Jul 18, 2023
9471d6a
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 18, 2023
40597af
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 19, 2023
4b07018
Merge branch 'apache:master' into SPARK-44280
mingkangli-db Jul 19, 2023
2916930
add fix attempt
mingkangli-db Jul 20, 2023
700fc72
refactor
mingkangli-db Jul 20, 2023
34ecd58
explicit import UTC
mingkangli-db Jul 20, 2023
c7c84d7
remove duplicate import
mingkangli-db Jul 20, 2023
bf88f33
import ZoneOffset
mingkangli-db Jul 20, 2023
a4ea4b5
Update PostgresDialect.scala
mingkangli-db Jul 21, 2023
ffb9ea9
Update PostgresDialect.scala
mingkangli-db Jul 21, 2023
18bb802
Merge branch 'apache:master' into SPARK-44280
mingkangli-db Jul 21, 2023
ec204c4
Empty-Commit
mingkangli-db Jul 21, 2023
ace6182
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
d1f5578
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
d752feb
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
fa52e05
address comments
mingkangli-db Jul 25, 2023
e3fd600
Merge branch 'SPARK-44280' of https://github.com/mingkangli-db/spark …
mingkangli-db Jul 25, 2023
30db0bb
Merge branch 'master' into SPARK-44280
mingkangli-db Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Connection, Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.LocalDateTime
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Properties

import org.apache.spark.sql.Column
Expand Down Expand Up @@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
|('2013-04-05 18:01:02.123'),
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()

conn.prepareStatement("CREATE TABLE infinity_timestamp" +
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
" VALUES ('infinity'), ('-infinity');").executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
"type not_null_text)").executeUpdate()
Expand Down Expand Up @@ -432,4 +437,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa"))
assert(row(0).getString(1) == "fdasfasdf")
}

test("SPARK-44280: infinity timestamp test") {
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also use the write API to play a roundtrip here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is upon reading one of the infinity timestamps in Postgresql, it is cast into reasonable values in Spark SQL instead of throwing an overflow error. However, from the other direction, since there is no built-in "infinity" values in Spark SQL, we can't write an infinity timestamp value to Postgresql, so I don't think a roundtrip test would be possible here.

val row = df.collect()

assert(row.length == 2)
val infinity = row(0).getAs[Timestamp]("timestamp_column")
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)

assert(infinity.getTime == maxTimestamp)
assert(negativeInfinity.getTime == minTimeStamp)
Comment on lines +448 to +452
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts.getTime()' returns epoch in milliseconds, and LocalDateTime.toEpochSecond()` returns values in seconds.

Shouldn't this be

    val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
    val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000

Otherwise, convertJavaTimestampToTimestamp() does not return infinites as expected

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
row.setLong(pos, DateTimeUtils.
fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t)))
} else {
row.update(pos, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,21 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None

/**
* Converts an instance of `java.sql.Timestamp` to a custom `java.sql.Timestamp` value.
* @param t represents a specific instant in time based on
* the hybrid calendar which combines Julian and
* Gregorian calendars.
* @return the timestamp value to convert to
* @throws IllegalArgumentException if t is null
mingkangli-db marked this conversation as resolved.
Show resolved Hide resolved
*/
mingkangli-db marked this conversation as resolved.
Show resolved Hide resolved
@Since("3.5.0")
def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t

/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the
* value stored in a remote database.
* JDBC dialects should override this function to provide implementations that suite their
* JDBC dialects should override this function to provide implementations that suit their
* JDBC drivers.
* @param t Timestamp returned from JDBC driver getTimestamp method.
* @return A LocalDateTime representing the same wall clock time as the timestamp in database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.jdbc

import java.sql.{Connection, SQLException, Timestamp, Types}
import java.time.LocalDateTime
import java.time.{LocalDateTime, ZoneOffset}
import java.util
import java.util.Locale

Expand Down Expand Up @@ -281,4 +281,34 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
}
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}"
}

/**
* java.sql timestamps are measured with millisecond accuracy (from Long.MinValue
* milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured
* at microseconds accuracy. For the "infinity values" in PostgreSQL (represented by
* big constants), we need clamp them to avoid overflow. If it is not one of the infinity
* values, fall back to default behavior.
*/
override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L

val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)

val time = t.getTime
Comment on lines +300 to +303
Copy link

@yruslan yruslan Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn, @mingkangli-db, @cloud-fan

ts.getTime() returns epoch in milliseconds, new Timestamp(ts) takes time in milliseconds , and LocalDateTime.toEpochSecond() returns values in seconds.

Shouldn't this be

    val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
    val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000

e.g. * 1000

Otherwise:

val tsLong = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)
val ts = new Timestamp(tsLong)

gives:

tsLong: Long = 253402300799
ts: java.sql.Timestamp = 1978-01-11 22:31:40.799

not 9999-12-31

and time is used later to create a new instance of Timestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! We shall fix this...

if (time == POSTGRESQL_DATE_POSITIVE_INFINITY ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) {
new Timestamp(maxTimestamp)
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) {
new Timestamp(minTimeStamp)
} else {
t
}
}
}