Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect API #41843

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bdb2be3
add support for fromJavaTimeStamptoTimeStamp
mingkangli-db Jul 4, 2023
2ed6130
retrigger workflow
mingkangli-db Jul 4, 2023
6f9c93a
remove requires in fromJavaTimestampToTimestamp
mingkangli-db Jul 6, 2023
adf23ce
Update PostgresDialect.scala
mingkangli-db Jul 6, 2023
9445d37
Update PostgresDialect.scala
mingkangli-db Jul 6, 2023
2e05df7
further clarify why clamping is needed in PostgresDialect.scala
mingkangli-db Jul 7, 2023
f2f77e8
fix style
mingkangli-db Jul 7, 2023
a2da4ff
Update PostgresDialect.scala
mingkangli-db Jul 9, 2023
304de8f
Update PostgresDialect.scala
mingkangli-db Jul 10, 2023
f29b4de
fix formatting
mingkangli-db Jul 18, 2023
d287b29
Merge branch 'master' into SPARK-44280
mingkangli-db Jul 18, 2023
6041a43
format once more
mingkangli-db Jul 18, 2023
9471d6a
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 18, 2023
40597af
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 19, 2023
4b07018
Merge branch 'apache:master' into SPARK-44280
mingkangli-db Jul 19, 2023
2916930
add fix attempt
mingkangli-db Jul 20, 2023
700fc72
refactor
mingkangli-db Jul 20, 2023
34ecd58
explicit import UTC
mingkangli-db Jul 20, 2023
c7c84d7
remove duplicate import
mingkangli-db Jul 20, 2023
bf88f33
import ZoneOffset
mingkangli-db Jul 20, 2023
a4ea4b5
Update PostgresDialect.scala
mingkangli-db Jul 21, 2023
ffb9ea9
Update PostgresDialect.scala
mingkangli-db Jul 21, 2023
18bb802
Merge branch 'apache:master' into SPARK-44280
mingkangli-db Jul 21, 2023
ec204c4
Empty-Commit
mingkangli-db Jul 21, 2023
ace6182
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
d1f5578
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
d752feb
Update PostgresIntegrationSuite.scala
mingkangli-db Jul 21, 2023
fa52e05
address comments
mingkangli-db Jul 25, 2023
e3fd600
Merge branch 'SPARK-44280' of https://github.com/mingkangli-db/spark …
mingkangli-db Jul 25, 2023
30db0bb
Merge branch 'master' into SPARK-44280
mingkangli-db Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
|('2013-04-05 18:01:02.123'),
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()

conn.prepareStatement(s"CREATE TABLE infinity_timestamp" +
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP").executeUpdate();
conn.prepareStatement(s"INSERT INTO infinity_timestamp (timestamp_column)" +
" VALUES ('infinity'), ('-infinity');").executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
"type not_null_text)").executeUpdate()
Expand Down Expand Up @@ -432,4 +437,15 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa"))
assert(row(0).getString(1) == "fdasfasdf")
}

test("SPARK-44280: infinity timestamp test") {
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also use the write API to play a roundtrip here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is upon reading one of the infinity timestamps in Postgresql, it is cast into reasonable values in Spark SQL instead of throwing an overflow error. However, from the other direction, since there is no built-in "infinity" values in Spark SQL, we can't write an infinity timestamp value to Postgresql, so I don't think a roundtrip test would be possible here.

val row = df.collect()
assert(row.length == 2)

val infinity = row(0).getAs[Timestamp]("timestamp_column")
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
assert(infinity.getTime == Long.MaxValue)
assert(negativeInfinity.getTime == Long.MinValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
row.setLong(pos, dialect.convertJavaTimestampToTimestamp(t))
} else {
row.update(pos, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,29 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None

/**
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since
* 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds
* a local timestamp in Proleptic Gregorian calendar from the fields, and binds
* the timestamp to the system time zone. The resulted instant is converted to
* microseconds since the epoch.
* JDBC dialects can override this function to provide implementations that suit their
* JDBC drivers (e.g. if there are special "infinity" values that would overflow)
*
* @param t represents a specific instant in time based on
* the hybrid calendar which combines Julian and
* Gregorian calendars.
* @return The number of micros since epoch from `java.sql.Timestamp`.
* @throws IllegalArgumentException if t is null
mingkangli-db marked this conversation as resolved.
Show resolved Hide resolved
*/
mingkangli-db marked this conversation as resolved.
Show resolved Hide resolved
def convertJavaTimestampToTimestamp(t: Timestamp): Long = {
DateTimeUtils.fromJavaTimestamp(t)
}

/**
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the
* value stored in a remote database.
* JDBC dialects should override this function to provide implementations that suite their
* JDBC dialects should override this function to provide implementations that suit their
* JDBC drivers.
* @param t Timestamp returned from JDBC driver getTimestamp method.
* @return A LocalDateTime representing the same wall clock time as the timestamp in database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -281,4 +282,28 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
}
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}"
}

/**
* PostgreSQL has four special "infinity values" that we need clamp to avoid overflow.
* If it is not one of the infinity values, fall back to default behavior. */
override def convertJavaTimestampToTimestamp(t: Timestamp): Long = {
// Variable names come from PostgreSQL "constant field docs":
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L

val time = t.getTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the range of java Timestamp is larger than Spark SQL timestamp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the Spark SQL timestamp range?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Long.MinValue microseconds before UTC epoch to Long.MaxValue microseconds after UTC epoch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is the same range for java Timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then how does the overflow happen? because the calendar is different?

Copy link
Contributor Author

@mingkangli-db mingkangli-db Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that java.sql timestamps are measured with millisecond accuracy (from Long.MinValue milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps are measured at microseconds accuracy. So we would get an overflow exception when we call MultiplyExact by 1000 in Java.

The stacktrace would look something like this:
at java.lang.Math.multiplyExact(Math.java:892)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestampNoRebase(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:xxx)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$15(JdbcUtils.scala:xxx)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simplify, Long.MaxValue should be the min value in microseconds to not overflow, is that right @mingkangli-db?

Copy link
Contributor Author

@mingkangli-db mingkangli-db Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you mean Long.MaxValue itself is the maximum value in microseconds that can be stored to not cause overflow. I added some comments in PostgresDialect.scala, hopefully this would make it clearer.


if (time == POSTGRESQL_DATE_POSITIVE_INFINITY ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) {
Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we query a infinite timestamp column in pgsql, what does pgsql display?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be displayed as "+infinity" or "-infinity". See: here "The values infinity and -infinity are specially represented inside the system and will be displayed unchanged"

} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) {
Long.MinValue
} else {
DateTimeUtils.fromJavaTimestamp(t)
}
}
}