-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect API #41843
Changes from 2 commits
bdb2be3
2ed6130
6f9c93a
adf23ce
9445d37
2e05df7
f2f77e8
a2da4ff
304de8f
f29b4de
d287b29
6041a43
9471d6a
40597af
4b07018
2916930
700fc72
34ecd58
c7c84d7
bf88f33
a4ea4b5
ffb9ea9
18bb802
ec204c4
ace6182
d1f5578
d752feb
fa52e05
e3fd600
30db0bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -105,10 +105,30 @@ abstract class JdbcDialect extends Serializable with Logging { | |||
*/ | ||||
def getJDBCType(dt: DataType): Option[JdbcType] = None | ||||
|
||||
/** | ||||
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since | ||||
* 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds | ||||
* a local timestamp in Proleptic Gregorian calendar from the fields, and binds | ||||
* the timestamp to the system time zone. The resulted instant is converted to | ||||
* microseconds since the epoch. | ||||
* JDBC dialects can override this function to provide implementations that suit their | ||||
* JDBC drivers (e.g. if there are special "infinity" values that would overflow) | ||||
* | ||||
* @param t represents a specific instant in time based on | ||||
* the hybrid calendar which combines Julian and | ||||
* Gregorian calendars. | ||||
* @return The number of micros since epoch from `java.sql.Timestamp`. | ||||
* @throws IllegalArgumentException if t is null | ||||
mingkangli-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
*/ | ||||
mingkangli-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
def convertJavaTimestampToTimestamp(t: Timestamp): Long = { | ||||
require(t != null, "timestamp must be non-null") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an user-facing error? If so, please, add an error class to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not an user facing error but is a precondition: the timestamp passed into this function will never be null. I removed the null check because a similar function |
||||
DateTimeUtils.fromJavaTimestamp(t) | ||||
} | ||||
|
||||
/** | ||||
* Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the | ||||
* value stored in a remote database. | ||||
* JDBC dialects should override this function to provide implementations that suite their | ||||
* JDBC dialects should override this function to provide implementations that suit their | ||||
* JDBC drivers. | ||||
* @param t Timestamp returned from JDBC driver getTimestamp method. | ||||
* @return A LocalDateTime representing the same wall clock time as the timestamp in database. | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import java.util.Locale | |
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.SQLConfHelper | ||
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
import org.apache.spark.sql.connector.catalog.Identifier | ||
import org.apache.spark.sql.connector.expressions.NamedReference | ||
import org.apache.spark.sql.errors.QueryCompilationErrors | ||
|
@@ -281,4 +282,29 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { | |
} | ||
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" | ||
} | ||
|
||
/** | ||
* PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. | ||
* If it is not one of the infinity values, fall back to default behavior. */ | ||
override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: no new line is needed. |
||
// variable names come from PostgreSQL "constant field docs": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Please update to this: |
||
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html | ||
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L | ||
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L | ||
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L | ||
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L | ||
|
||
val time = t.getTime | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the range of java There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the Spark SQL timestamp range? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From Long.MinValue microseconds before UTC epoch to Long.MaxValue microseconds after UTC epoch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then it is the same range for java Timestamp. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then how does the overflow happen? because the calendar is different? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that java.sql timestamps are measured with millisecond accuracy (from Long.MinValue milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps are measured at microseconds accuracy. So we would get an overflow exception when we call MultiplyExact by 1000 in Java. The stacktrace would look something like this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To simplify, Long.MaxValue should be the min value in microseconds to not overflow, is that right @mingkangli-db? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, if you mean Long.MaxValue itself is the maximum value in microseconds that can be stored to not cause overflow. I added some comments in PostgresDialect.scala, hopefully this would make it clearer. |
||
|
||
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || | ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { | ||
Long.MaxValue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we query a infinite timestamp column in pgsql, what does pgsql display? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be displayed as "+infinity" or "-infinity". See: here "The values infinity and -infinity are specially represented inside the system and will be displayed unchanged" |
||
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || | ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { | ||
Long.MinValue | ||
} else { | ||
DateTimeUtils.fromJavaTimestamp(t) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also use the write API to play a roundtrip here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal here is upon reading one of the infinity timestamps in Postgresql, it is cast into reasonable values in Spark SQL instead of throwing an overflow error. However, from the other direction, since there is no built-in "infinity" values in Spark SQL, we can't write an infinity timestamp value to Postgresql, so I don't think a roundtrip test would be possible here.