-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44280][SQL] Add convertJavaTimestampToTimestamp in JDBCDialect API #41843
Changes from 29 commits
bdb2be3
2ed6130
6f9c93a
adf23ce
9445d37
2e05df7
f2f77e8
a2da4ff
304de8f
f29b4de
d287b29
6041a43
9471d6a
40597af
4b07018
2916930
700fc72
34ecd58
c7c84d7
bf88f33
a4ea4b5
ffb9ea9
18bb802
ec204c4
ace6182
d1f5578
d752feb
fa52e05
e3fd600
30db0bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc | |
import java.math.{BigDecimal => JBigDecimal} | ||
import java.sql.{Connection, Date, Timestamp} | ||
import java.text.SimpleDateFormat | ||
import java.time.LocalDateTime | ||
import java.time.{LocalDateTime, ZoneOffset} | ||
import java.util.Properties | ||
|
||
import org.apache.spark.sql.Column | ||
|
@@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { | |
|('2013-04-05 18:01:02.123'), | ||
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() | ||
|
||
conn.prepareStatement("CREATE TABLE infinity_timestamp" + | ||
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate() | ||
conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" + | ||
" VALUES ('infinity'), ('-infinity');").executeUpdate() | ||
|
||
conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() | ||
conn.prepareStatement("create table custom_type(type_array not_null_text[]," + | ||
"type not_null_text)").executeUpdate() | ||
|
@@ -432,4 +437,18 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { | |
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa")) | ||
assert(row(0).getString(1) == "fdasfasdf") | ||
} | ||
|
||
test("SPARK-44280: infinity timestamp test") { | ||
val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties) | ||
val row = df.collect() | ||
|
||
assert(row.length == 2) | ||
val infinity = row(0).getAs[Timestamp]("timestamp_column") | ||
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") | ||
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) | ||
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) | ||
|
||
assert(infinity.getTime == maxTimestamp) | ||
assert(negativeInfinity.getTime == minTimeStamp) | ||
Comment on lines
+448
to
+452
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000 Otherwise, |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
package org.apache.spark.sql.jdbc | ||
|
||
import java.sql.{Connection, SQLException, Timestamp, Types} | ||
import java.time.LocalDateTime | ||
import java.time.{LocalDateTime, ZoneOffset} | ||
import java.util | ||
import java.util.Locale | ||
|
||
|
@@ -281,4 +281,34 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { | |
} | ||
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" | ||
} | ||
|
||
/** | ||
* java.sql timestamps are measured with millisecond accuracy (from Long.MinValue | ||
* milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured | ||
* at microseconds accuracy. For the "infinity values" in PostgreSQL (represented by | ||
* big constants), we need clamp them to avoid overflow. If it is not one of the infinity | ||
* values, fall back to default behavior. | ||
*/ | ||
override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = { | ||
// Variable names come from PostgreSQL "constant field docs": | ||
// https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html | ||
val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L | ||
val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L | ||
val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L | ||
val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L | ||
|
||
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) | ||
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) | ||
|
||
val time = t.getTime | ||
Comment on lines
+300
to
+303
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yaooqinn, @mingkangli-db, @cloud-fan
Shouldn't this be val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) * 1000
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) * 1000 e.g. Otherwise: val tsLong = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)
val ts = new Timestamp(tsLong) gives:
not and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch! We shall fix this... |
||
if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || | ||
time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { | ||
new Timestamp(maxTimestamp) | ||
} else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || | ||
time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { | ||
new Timestamp(minTimeStamp) | ||
} else { | ||
t | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also use the write API to play a roundtrip here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal here is upon reading one of the infinity timestamps in Postgresql, it is cast into reasonable values in Spark SQL instead of throwing an overflow error. However, from the other direction, since there is no built-in "infinity" values in Spark SQL, we can't write an infinity timestamp value to Postgresql, so I don't think a roundtrip test would be possible here.