From bdb2be3751abd25db4578cec33ba83721a9c20f5 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Mon, 3 Jul 2023 22:47:47 -0700 Subject: [PATCH 01/25] add support for fromJavaTimeStamptoTimeStamp --- .../sql/jdbc/PostgresIntegrationSuite.scala | 16 ++++++++++++ .../datasources/jdbc/JdbcUtils.scala | 2 +- .../apache/spark/sql/jdbc/JdbcDialects.scala | 22 +++++++++++++++- .../spark/sql/jdbc/PostgresDialect.scala | 26 +++++++++++++++++++ 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index c539452bb9ae0..d8dedc5e65ff3 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -148,6 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { |('2013-04-05 18:01:02.123'), |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() + conn.prepareStatement(s"CREATE TABLE infinity_timestamp" + + "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP").executeUpdate(); + conn.prepareStatement(s"INSERT INTO infinity_timestamp (timestamp_column)" + + " VALUES ('infinity'), ('-infinity');").executeUpdate() + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + "type not_null_text)").executeUpdate() @@ -432,4 +437,15 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa")) assert(row(0).getString(1) == "fdasfasdf") } + + test("SPARK-44280: infinity timestamp test") { + val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties) + val row = df.collect() + assert(row.length == 2) + + val infinity = row(0).getAs[Timestamp]("timestamp_column") + val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") + assert(infinity.getTime == Long.MaxValue) + assert(negativeInfinity.getTime == Long.MinValue) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d907ce6b100cf..dcc2c22fbd3d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -484,7 +484,7 @@ object JdbcUtils extends Logging with SQLConfHelper { (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { - row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) + row.setLong(pos, dialect.convertJavaTimestampToTimestamp(t)) } else { row.update(pos, null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 93a311be2f867..fc4803c0ef024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -105,10 +105,30 @@ abstract class JdbcDialect extends Serializable with Logging { */ def getJDBCType(dt: DataType): Option[JdbcType] = None + /** + * Converts an instance of `java.sql.Timestamp` to the number of microseconds since + * 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds + * a local timestamp in Proleptic Gregorian calendar from the fields, and binds + * the timestamp to the system time zone. The resulted instant is converted to + * microseconds since the epoch. + * JDBC dialects can override this function to provide implementations that suit their + * JDBC drivers (e.g. if there are special "infinity" values that would overflow) + * + * @param t represents a specific instant in time based on + * the hybrid calendar which combines Julian and + * Gregorian calendars. + * @return The number of micros since epoch from `java.sql.Timestamp`. + * @throws IllegalArgumentException if t is null + */ + def convertJavaTimestampToTimestamp(t: Timestamp): Long = { + require(t != null, "Timestamp must be non-null") + DateTimeUtils.fromJavaTimestamp(t) + } + /** * Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the * value stored in a remote database. - * JDBC dialects should override this function to provide implementations that suite their + * JDBC dialects should override this function to provide implementations that suit their * JDBC drivers. * @param t Timestamp returned from JDBC driver getTimestamp method. * @return A LocalDateTime representing the same wall clock time as the timestamp in database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index ab8b1a7e1a50d..dedb3ada7827c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -25,6 +25,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.QueryCompilationErrors @@ -281,4 +282,29 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" } + + /** + * PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. + * If it is not one of the infinity values, fall back to default behavior. */ + override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { + + // variable names come from PostgreSQL "constant field docs": + // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html + val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L + val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L + val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L + val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L + + val time = t.getTime + + if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || + time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { + Long.MaxValue + } else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || + time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { + Long.MinValue + } else { + DateTimeUtils.fromJavaTimestamp(t) + } + } } From 2ed61303847130699b3fce0edda7a5f62a097829 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Mon, 3 Jul 2023 22:57:18 -0700 Subject: [PATCH 02/25] retrigger workflow --- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index fc4803c0ef024..d1eb9c0991f55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -121,7 +121,7 @@ abstract class JdbcDialect extends Serializable with Logging { * @throws IllegalArgumentException if t is null */ def convertJavaTimestampToTimestamp(t: Timestamp): Long = { - require(t != null, "Timestamp must be non-null") + require(t != null, "timestamp must be non-null") DateTimeUtils.fromJavaTimestamp(t) } From 6f9c93af1226385ab17eea631a5d7333f453c92f Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Wed, 5 Jul 2023 17:40:05 -0700 Subject: [PATCH 03/25] remove requires in fromJavaTimestampToTimestamp --- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d1eb9c0991f55..8e5bc6db03359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -121,7 +121,6 @@ abstract class JdbcDialect extends Serializable with Logging { * @throws IllegalArgumentException if t is null */ def convertJavaTimestampToTimestamp(t: Timestamp): Long = { - require(t != null, "timestamp must be non-null") DateTimeUtils.fromJavaTimestamp(t) } From adf23ce9cc98f847970539238ffd2089406a9e94 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 6 Jul 2023 06:50:37 -0700 Subject: [PATCH 04/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index dedb3ada7827c..99ba0a03b56e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -287,7 +287,6 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { * PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. * If it is not one of the infinity values, fall back to default behavior. */ override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { - // variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L From 9445d371b5e97c116c8567e400a140134da1f2cb Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 6 Jul 2023 06:55:41 -0700 Subject: [PATCH 05/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 99ba0a03b56e2..ab6b539d1a65d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -287,7 +287,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { * PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. * If it is not one of the infinity values, fall back to default behavior. */ override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { - // variable names come from PostgreSQL "constant field docs": + // Variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L val POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY = -185543533774800000L From 2e05df7af37b2f8d2cecc4cc66b71dcb922ce1ff Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 6 Jul 2023 18:00:47 -0700 Subject: [PATCH 06/25] further clarify why clamping is needed in PostgresDialect.scala --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index ab6b539d1a65d..d3c26418c2d04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -284,7 +284,10 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } /** - * PostgreSQL has four special "infinity values" that we need clamp to avoid overflow. + * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue + * milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps + * are measured at microseconds accuracy. For the “infinity values” in PostgreSQL + * (represented by big constants), we need clamp them to avoid overflow. * If it is not one of the infinity values, fall back to default behavior. */ override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { // Variable names come from PostgreSQL "constant field docs": From f2f77e8c8a9c51463ec9864b0dda85fbbb79ea6f Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 7 Jul 2023 11:45:04 -0700 Subject: [PATCH 07/25] fix style --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index d3c26418c2d04..750af321718d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -288,7 +288,8 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { * milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps * are measured at microseconds accuracy. For the “infinity values” in PostgreSQL * (represented by big constants), we need clamp them to avoid overflow. - * If it is not one of the infinity values, fall back to default behavior. */ + * If it is not one of the infinity values, fall back to default behavior. + */ override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { // Variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html From a2da4ffbcff2086ae735e3a09f4f2f400e08ee8f Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Sun, 9 Jul 2023 16:53:37 -0700 Subject: [PATCH 08/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 750af321718d4..a5c0db6fb4ddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -282,7 +282,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" } - + /** * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue * milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps From 304de8fa154734011df004599cb4e51a3e28ebc0 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Mon, 10 Jul 2023 00:38:16 -0700 Subject: [PATCH 09/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index a5c0db6fb4ddf..9e83910edac79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -282,7 +282,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" } - + /** * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue * milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps From f29b4de3eb87ecf7259215c08195bb147136220a Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 18 Jul 2023 11:14:39 -0700 Subject: [PATCH 10/25] fix formatting --- .../org/apache/spark/sql/jdbc/PostgresDialect.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 9e83910edac79..a716a1b105f34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -282,13 +282,13 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}" } - + /** * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue - * milliseconds to Long.MaxValue milliseconds), see here while Spark timestamps - * are measured at microseconds accuracy. For the “infinity values” in PostgreSQL - * (represented by big constants), we need clamp them to avoid overflow. - * If it is not one of the infinity values, fall back to default behavior. + * milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured + * at microseconds accuracy. For the “infinity values” in PostgreSQL (represented by + * big constants), we need clamp them to avoid overflow. If it is not one of the infinity + * values, fall back to default behavior. */ override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { // Variable names come from PostgreSQL "constant field docs": From 6041a4337e508368147d4bbd48cbbe16a8c76cad Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 18 Jul 2023 13:01:02 -0700 Subject: [PATCH 11/25] format once more --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index a716a1b105f34..aa6ceb7540967 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -286,7 +286,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { /** * java.sql timestamps are measured with millisecond accuracy (from Long.MinValue * milliseconds to Long.MaxValue milliseconds), while Spark timestamps are measured - * at microseconds accuracy. For the “infinity values” in PostgreSQL (represented by + * at microseconds accuracy. For the "infinity values" in PostgreSQL (represented by * big constants), we need clamp them to avoid overflow. If it is not one of the infinity * values, fall back to default behavior. */ From 9471d6a7e845a270136bb1ee1c2567c7d0afc61a Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 18 Jul 2023 16:33:29 -0700 Subject: [PATCH 12/25] Update PostgresIntegrationSuite.scala --- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index d8dedc5e65ff3..42443148cc498 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -148,11 +148,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { |('2013-04-05 18:01:02.123'), |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() - conn.prepareStatement(s"CREATE TABLE infinity_timestamp" + - "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP").executeUpdate(); - conn.prepareStatement(s"INSERT INTO infinity_timestamp (timestamp_column)" + + conn.prepareStatement("CREATE TABLE infinity_timestamp" + + "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate() + conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" + " VALUES ('infinity'), ('-infinity');").executeUpdate() - + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + "type not_null_text)").executeUpdate() From 40597af93fb49a025ab35b634b208ffdce9eb45e Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 18 Jul 2023 18:39:25 -0700 Subject: [PATCH 13/25] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 42443148cc498..cdb669f85907f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -152,7 +152,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate() conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" + " VALUES ('infinity'), ('-infinity');").executeUpdate() - + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + "type not_null_text)").executeUpdate() From 2916930590e2b506d0450457693a468e2f83cde2 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Wed, 19 Jul 2023 17:33:09 -0700 Subject: [PATCH 14/25] add fix attempt --- .../execution/datasources/jdbc/JdbcUtils.scala | 3 ++- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 8 +++++--- .../apache/spark/sql/jdbc/PostgresDialect.scala | 15 ++++++++------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cad7c2c5c8e41..b54608d7afbea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -483,7 +483,8 @@ object JdbcUtils extends Logging with SQLConfHelper { (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { - row.setLong(pos, dialect.convertJavaTimestampToTimestamp(t)) + row.setLong(pos, DateTimeUtils. + fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))) } else { row.update(pos, null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 8e5bc6db03359..9f9ef2ed4a0d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -120,9 +120,11 @@ abstract class JdbcDialect extends Serializable with Logging { * @return The number of micros since epoch from `java.sql.Timestamp`. * @throws IllegalArgumentException if t is null */ - def convertJavaTimestampToTimestamp(t: Timestamp): Long = { - DateTimeUtils.fromJavaTimestamp(t) - } +// def convertJavaTimestampToTimestamp(t: Timestamp): Long = { +// DateTimeUtils.fromJavaTimestamp(t) +// } + + def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t /** * Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index aa6ceb7540967..9c1ca2cb913e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, SQLException, Timestamp, Types} -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import java.util import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.QueryCompilationErrors @@ -290,7 +289,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { * big constants), we need clamp them to avoid overflow. If it is not one of the infinity * values, fall back to default behavior. */ - override def convertJavaTimestampToTimestamp(t: Timestamp): Long = { + override def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = { // Variable names come from PostgreSQL "constant field docs": // https://jdbc.postgresql.org/documentation/publicapi/index.html?constant-values.html val POSTGRESQL_DATE_NEGATIVE_INFINITY = -9223372036832400000L @@ -298,16 +297,18 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { val POSTGRESQL_DATE_POSITIVE_INFINITY = 9223372036825200000L val POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY = 185543533774800000L - val time = t.getTime + val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) + val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) + val time = t.getTime if (time == POSTGRESQL_DATE_POSITIVE_INFINITY || time == POSTGRESQL_DATE_DATE_POSITIVE_SMALLER_INFINITY) { - Long.MaxValue + new Timestamp(maxTimestamp) } else if (time == POSTGRESQL_DATE_NEGATIVE_INFINITY || time == POSTGRESQL_DATE_NEGATIVE_SMALLER_INFINITY) { - Long.MinValue + new Timestamp(minTimeStamp) } else { - DateTimeUtils.fromJavaTimestamp(t) + t } } } From 700fc72c7b29b84530a076ac10fdaa2ddc869b29 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 20 Jul 2023 12:46:58 -0700 Subject: [PATCH 15/25] refactor --- .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 7 +++++-- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 14 +------------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index cdb669f85907f..f7d6ddf94e104 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -445,7 +445,10 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val infinity = row(0).getAs[Timestamp]("timestamp_column") val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") - assert(infinity.getTime == Long.MaxValue) - assert(negativeInfinity.getTime == Long.MinValue) + val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) + val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) + + assert(infinity.getTime == maxTimestamp) + assert(negativeInfinity.getTime == minTimeStamp) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 9f9ef2ed4a0d3..7cc3c716e52cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -106,24 +106,12 @@ abstract class JdbcDialect extends Serializable with Logging { def getJDBCType(dt: DataType): Option[JdbcType] = None /** - * Converts an instance of `java.sql.Timestamp` to the number of microseconds since - * 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds - * a local timestamp in Proleptic Gregorian calendar from the fields, and binds - * the timestamp to the system time zone. The resulted instant is converted to - * microseconds since the epoch. - * JDBC dialects can override this function to provide implementations that suit their - * JDBC drivers (e.g. if there are special "infinity" values that would overflow) - * + * Converts an instance of `java.sql.Timestamp` to a custom `java.sql.Timestamp` value. * @param t represents a specific instant in time based on * the hybrid calendar which combines Julian and * Gregorian calendars. - * @return The number of micros since epoch from `java.sql.Timestamp`. * @throws IllegalArgumentException if t is null */ -// def convertJavaTimestampToTimestamp(t: Timestamp): Long = { -// DateTimeUtils.fromJavaTimestamp(t) -// } - def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t /** From 34ecd589c26fb9e113a8b4b5849570a651001885 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 20 Jul 2023 13:26:04 -0700 Subject: [PATCH 16/25] explicit import UTC --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 9c1ca2cb913e6..599b0008cf9d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, SQLException, Timestamp, Types} import java.time.{LocalDateTime, ZoneOffset} +import java.time.ZoneOffset.UTC import java.util import java.util.Locale From c7c84d71aa23f22d52a1983d83886e2b9fed23db Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 20 Jul 2023 14:57:04 -0700 Subject: [PATCH 17/25] remove duplicate import --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 599b0008cf9d0..9c1ca2cb913e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, SQLException, Timestamp, Types} import java.time.{LocalDateTime, ZoneOffset} -import java.time.ZoneOffset.UTC import java.util import java.util.Locale From bf88f330526859ad8de930cfef91303683d63812 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Thu, 20 Jul 2023 15:19:48 -0700 Subject: [PATCH 18/25] import ZoneOffset --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index f7d6ddf94e104..7ab5b5df57912 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc import java.math.{BigDecimal => JBigDecimal} import java.sql.{Connection, Date, Timestamp} import java.text.SimpleDateFormat -import java.time.LocalDateTime +import java.time.{LocalDateTime, ZoneOffset} import java.util.Properties import org.apache.spark.sql.Column From a4ea4b5614e69431fb2049c3cfb1baa50de9b483 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 02:10:16 -0700 Subject: [PATCH 19/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 9c1ca2cb913e6..52743550f2293 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -56,7 +56,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } else if (sqlType == Types.BIT && typeName == "bit" && size != 1) { Some(BinaryType) } else if (sqlType == Types.DOUBLE && typeName == "money") { - // money type seems to be broken but one workaround is to handle it as string. + // money type seems to be broken but one workaround is to handle it as strig. // See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/100 Some(StringType) } else if (sqlType == Types.OTHER) { From ffb9ea947abe9d17c9d5b0ee34fbf94ef4c3f208 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 02:11:35 -0700 Subject: [PATCH 20/25] Update PostgresDialect.scala --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 52743550f2293..9c1ca2cb913e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -56,7 +56,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } else if (sqlType == Types.BIT && typeName == "bit" && size != 1) { Some(BinaryType) } else if (sqlType == Types.DOUBLE && typeName == "money") { - // money type seems to be broken but one workaround is to handle it as strig. + // money type seems to be broken but one workaround is to handle it as string. // See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/100 Some(StringType) } else if (sqlType == Types.OTHER) { From ec204c410ff49c32352cb1972e3190f3bdee1036 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 11:40:13 -0700 Subject: [PATCH 21/25] Empty-Commit From ace618219f1cc22a004be96d0c78b47c08f19e93 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 12:57:00 -0700 Subject: [PATCH 22/25] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 7ab5b5df57912..f25612a2d29cf 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -12,7 +12,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License. + * limitations under the License.. */ package org.apache.spark.sql.jdbc From d1f5578b0054eafe8f51934dbf2b61204ae2a943 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 12:57:14 -0700 Subject: [PATCH 23/25] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index f25612a2d29cf..7ab5b5df57912 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -12,7 +12,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License.. + * limitations under the License. */ package org.apache.spark.sql.jdbc From d752febafb3e199be0ccb254958f393b66ad5c37 Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Fri, 21 Jul 2023 13:45:34 -0700 Subject: [PATCH 24/25] Update PostgresIntegrationSuite.scala --- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 7ab5b5df57912..90d6f6ae2fbfc 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -441,8 +441,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { test("SPARK-44280: infinity timestamp test") { val df = sqlContext.read.jdbc(jdbcUrl, "infinity_timestamp", new Properties) val row = df.collect() - assert(row.length == 2) + assert(row.length == 2) val infinity = row(0).getAs[Timestamp]("timestamp_column") val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) From fa52e052e273cf8efeb484e7dadad473d8f2a70a Mon Sep 17 00:00:00 2001 From: Mingkang Li Date: Tue, 25 Jul 2023 10:37:09 -0700 Subject: [PATCH 25/25] address comments --- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7cc3c716e52cf..fac3cc60d952a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -110,8 +110,10 @@ abstract class JdbcDialect extends Serializable with Logging { * @param t represents a specific instant in time based on * the hybrid calendar which combines Julian and * Gregorian calendars. + * @return the timestamp value to convert to * @throws IllegalArgumentException if t is null */ + @Since("3.5.0") def convertJavaTimestampToTimestamp(t: Timestamp): Timestamp = t /**