Skip to content

Commit

Permalink
[SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested t…
Browse files Browse the repository at this point in the history
…ypes

### What changes were proposed in this pull request?

[SPARK-44280](https://issues.apache.org/jira/browse/SPARK-44280) added a new API convertJavaTimestampToTimestamp which is called only for plain timestamps.

This PR makes it work for timestamps in arrays

### Why are the changes needed?

data consistency/correctness

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#45435 from yaooqinn/SPARK-47324.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
yaooqinn authored and HyukjinKwon committed Mar 11, 2024
1 parent afbebfb commit 72a95bc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.text.SimpleDateFormat
import java.time.{LocalDateTime, ZoneOffset}
import java.util.Properties

import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType}
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -149,9 +148,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()

conn.prepareStatement("CREATE TABLE infinity_timestamp" +
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
" VALUES ('infinity'), ('-infinity');").executeUpdate()
"(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP, timestamp_array TIMESTAMP[])")
.executeUpdate()
conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column, timestamp_array)" +
" VALUES ('infinity', ARRAY[TIMESTAMP 'infinity']), " +
"('-infinity', ARRAY[TIMESTAMP '-infinity'])")
.executeUpdate()

conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
Expand Down Expand Up @@ -447,10 +449,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(row.length == 2)
val infinity = row(0).getAs[Timestamp]("timestamp_column")
val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
val infinitySeq = row(0).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
val negativeInfinitySeq = row(1).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC)

assert(infinity.getTime == maxTimestamp)
assert(negativeInfinity.getTime == minTimeStamp)
assert(infinitySeq.head.getTime == maxTimestamp)
assert(negativeInfinitySeq.head.getTime == minTimeStamp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Timestamp}
import java.time.{Instant, LocalDate}
import java.util
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -414,7 +415,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case DecimalType.Fixed(p, s) =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val decimal =
nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
nullSafeConvert[JBigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
row.update(pos, decimal)

case DoubleType =>
Expand Down Expand Up @@ -508,37 +509,22 @@ object JdbcUtils extends Logging with SQLConfHelper {

case ArrayType(et, _) =>
val elementConversion = et match {
case TimestampType =>
(array: Object) =>
array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
nullSafeConvert(timestamp, fromJavaTimestamp)
}
case TimestampType => arrayConverter[Timestamp] {
(t: Timestamp) => fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))
}

case TimestampNTZType =>
(array: Object) =>
array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
nullSafeConvert(timestamp, (t: java.sql.Timestamp) =>
localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t)))
}
arrayConverter[Timestamp] {
(t: Timestamp) => localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t))
}

case StringType =>
(array: Object) =>
// some underling types are not String such as uuid, inet, cidr, etc.
array.asInstanceOf[Array[java.lang.Object]]
.map(obj => if (obj == null) null else UTF8String.fromString(obj.toString))

case DateType =>
(array: Object) =>
array.asInstanceOf[Array[java.sql.Date]].map { date =>
nullSafeConvert(date, fromJavaDate)
}
arrayConverter[Object]((obj: Object) => UTF8String.fromString(obj.toString))

case DateType => arrayConverter[Date](fromJavaDate)

case dt: DecimalType =>
(array: Object) =>
array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
nullSafeConvert[java.math.BigDecimal](
decimal, d => Decimal(d, dt.precision, dt.scale))
}
arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, dt.scale))

case LongType if metadata.contains("binarylong") =>
throw QueryExecutionErrors.unsupportedArrayElementTypeBasedOnBinaryError(dt)
Expand All @@ -552,7 +538,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[java.sql.Array](
input = rs.getArray(pos + 1),
array => new GenericArrayData(elementConversion.apply(array.getArray)))
array => new GenericArrayData(elementConversion(array.getArray)))
row.update(pos, array)

case _ => throw QueryExecutionErrors.unsupportedJdbcTypeError(dt.catalogString)
Expand All @@ -566,6 +552,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

private def arrayConverter[T](elementConvert: T => Any): Any => Any = (array: Any) => {
array.asInstanceOf[Array[T]].map(e => nullSafeConvert(e, elementConvert))
}

// A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
// `PreparedStatement`. The last argument `Int` means the index for the value to be set
// in the SQL statement and also used for the value in `Row`.
Expand Down

0 comments on commit 72a95bc

Please sign in to comment.