From 1c64fb11447fbb90cd09d650aad42b70fd7411b0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 Oct 2019 22:21:53 +0300 Subject: [PATCH 01/16] Support the interval type in schemas --- .../execution/datasources/parquet/ParquetSchemaConverter.scala | 2 +- .../sql/execution/datasources/v2/parquet/ParquetTable.scala | 2 ++ .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 8ce8a86d2f026..af0ed56bab554 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -171,7 +171,7 @@ class ParquetToSparkSchemaConverter( case FIXED_LEN_BYTE_ARRAY => originalType match { case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() + case INTERVAL => CalendarIntervalType case _ => illegalType() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index 2ad64b1aa5244..cc91d0ac3e6a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -58,6 +58,8 @@ case class ParquetTable( case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case _: CalendarIntervalType => true + case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 39590b063f0af..18219165dd1ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -114,12 +114,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | required fixed_len_byte_array(32) i(DECIMAL(32,0)); | required int64 j(TIMESTAMP_MILLIS); | required int64 k(TIMESTAMP_MICROS); + | required fixed_len_byte_array(12) l(INTERVAL); |} """.stripMargin) val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0), - TimestampType, TimestampType) + TimestampType, TimestampType, CalendarIntervalType) withTempPath { location => val path = new Path(location.getCanonicalPath) From 076958528b0f5ed2c0b1834256207dd376104b9a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 Oct 2019 22:32:31 +0300 Subject: [PATCH 02/16] Support the interval type in ColumnarBatchRow --- .../org/apache/spark/sql/vectorized/ColumnarBatch.java | 2 ++ .../execution/datasources/parquet/ParquetIOSuite.scala | 9 ++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index a2feac869ece6..46c717fe9d5ab 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -173,6 +173,8 @@ public InternalRow copy() { row.setInt(i, getInt(i)); } else if (dt instanceof TimestampType) { row.setLong(i, getLong(i)); + } else if (dt instanceof CalendarIntervalType) { + row.update(i, getInterval(i)); } else { throw new RuntimeException("Not implemented. " + dt); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 18219165dd1ec..133569885387a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} @@ -38,7 +37,6 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} @@ -49,7 +47,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport // with an empty configuration (it is after all not intended to be used in this way?) @@ -736,7 +734,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val dataTypes = Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType) + FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType, CalendarIntervalType) val constantValues = Seq( @@ -750,7 +748,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession 0.75D, Decimal("1234.23456"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), - DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) + DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")), + CalendarInterval.fromString("interval 1 month 2 microsecond")) dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) From cfcecf149b38723eb094553f3cc508924c2bf068 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 Oct 2019 22:42:20 +0300 Subject: [PATCH 03/16] Fix imports --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 133569885387a..873887fa16ed0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} @@ -37,6 +38,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} + import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} From c7889ed6d59d0714068b6a271334846e534ab25d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 12 Oct 2019 13:04:31 +0300 Subject: [PATCH 04/16] Support writing and reading intervals --- .../execution/datasources/DataSource.scala | 4 ---- .../parquet/ParquetFileFormat.scala | 2 ++ .../parquet/ParquetRowConverter.scala | 24 ++++++++++++++++--- .../parquet/ParquetSchemaConverter.scala | 5 ++++ .../parquet/ParquetWriteSupport.scala | 17 ++++++++++++- .../parquet/ParquetQuerySuite.scala | 14 +++++++++++ .../parquet/ParquetSchemaSuite.scala | 11 +++++++++ 7 files changed, 69 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0f5f1591623af..f2bb59d6d654e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -535,10 +535,6 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } - providingInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 815b62dfbf898..31ba4b15298e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -371,6 +371,8 @@ class ParquetFileFormat case udt: UserDefinedType[_] => supportDataType(udt.sqlType) + case _: CalendarIntervalType => true + case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index b772b6b77d1ce..04fd8b22da267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -23,20 +23,18 @@ import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer - import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some @@ -325,6 +323,26 @@ private[parquet] class ParquetRowConverter( override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) + case CalendarIntervalType + if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY => + new ParquetPrimitiveConverter(updater) { + override def addBinary(value: Binary): Unit = { + assert( + value.length() == 12, + "Intervals are expected to be stored in 12-byte fixed len byte array, " + + s"but got a ${value.length()}-byte array.") + + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val milliseconds = buf.getInt + var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS + val days = buf.getInt + val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY) + microseconds = Math.addExact(microseconds, daysInUs) + val months = buf.getInt + updater.set(new CalendarInterval(months, microseconds)) + } + } + case t => throw new RuntimeException( s"Unable to create Parquet converter for data type ${t.json} " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index af0ed56bab554..782304d8fb6a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -553,6 +553,11 @@ class SparkToParquetSchemaConverter( case udt: UserDefinedType[_] => convertField(field.copy(dataType = udt.sqlType)) + case i: CalendarIntervalType => + Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(12) + .as(INTERVAL) + .named(field.name) + case _ => throw new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index f6490614ab05b..4be486db3e99c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -73,6 +73,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // Reusable byte array used to write timestamps as Parquet INT96 values private val timestampBuffer = new Array[Byte](12) + // Reusable byte array used to write intervals as Parquet FIXED_LEN_BYTE_ARRAY values + private val intervalBuffer = new Array[Byte](12) + // Reusable byte array used to write decimal values private val decimalBuffer = new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION)) @@ -207,7 +210,19 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: UserDefinedType[_] => makeWriter(t.sqlType) - // TODO Adds IntervalType support + case t: CalendarIntervalType => + (row: SpecializedGetters, ordinal: Int) => + val interval = row.getInterval(ordinal) + val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY + val milliseconds: Int = (microseconds / DateTimeUtils.MICROS_PER_MILLIS).toInt + val days: Int = Math.toIntExact(interval.microseconds / DateTimeUtils.MICROS_PER_DAY) + val buf = ByteBuffer.wrap(intervalBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN) + .putInt(milliseconds) + .putInt(days) + .putInt(interval.months) + recordConsumer.addBinary(Binary.fromReusedByteArray(intervalBuffer)) + case _ => sys.error(s"Unsupported data type $dataType.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 88b94281d88ee..5ba42adc2e058 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -903,6 +903,20 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS") testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } + + test("interval written and read as Parquet INTERVAL") { + withTempPath { file => + val df = spark.range(10) + .selectExpr("interval 100 years 1 month 10 second 1 millisecond as i") + df.write.parquet(file.getCanonicalPath) + ("true" :: "false" :: Nil).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } + } + } } class ParquetV1QuerySuite extends ParquetQuerySuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 1274995fd6779..efea4e8cf1680 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1010,6 +1010,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true, outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) + testSchema( + "Interval written and read as fixed_len_byte_array(12) with INTERVAL", + StructType(Seq(StructField("f1", CalendarIntervalType))), + """message root { + | optional fixed_len_byte_array(12) f1 (INTERVAL); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = false, + writeLegacyParquetFormat = true) + private def testSchemaClipping( testName: String, parquetSchema: String, From 4563a4e491fd00ab7a2a1a8aaaf3456a177695c8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 12 Oct 2019 13:08:08 +0300 Subject: [PATCH 05/16] Remove unused variable --- .../sql/execution/datasources/parquet/ParquetWriteSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 4be486db3e99c..8be736ef217f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -210,7 +210,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: UserDefinedType[_] => makeWriter(t.sqlType) - case t: CalendarIntervalType => + case CalendarIntervalType => (row: SpecializedGetters, ordinal: Int) => val interval = row.getInterval(ordinal) val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY From 83b4bf2e12c40f5ee1a2d67ce9a8f2b27ba35357 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 12 Oct 2019 13:48:41 +0300 Subject: [PATCH 06/16] Fix coding style --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 04fd8b22da267..8109d2aea5866 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -23,11 +23,13 @@ import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer + import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ From 81ef7bef8696a0b51d80a46d5d08d2dbc09e9bd5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 12 Oct 2019 14:35:03 +0300 Subject: [PATCH 07/16] Exclude parquet from checking --- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f2bb59d6d654e..7f3ed48c43667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -535,6 +535,12 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { + if (providingClass != classOf[ParquetFileFormat]) { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + } + providingInstance() match { case dataSource: CreatableRelationProvider => SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode) From de0872e213499ae8ed13dafa1512dac47ed252a3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 12 Oct 2019 17:29:25 +0300 Subject: [PATCH 08/16] Exclude parquet from checks of unsupported interval --- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index d08f4b9066d2b..0508bb6a0a795 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -330,13 +330,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { + test("SPARK-24204 error handling for unsupported Interval data types - csv, json, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath // TODO: test file source V2 after write path is fixed. Seq(true).foreach { useV1 => val useV1List = if (useV1) { - "csv,json,orc,parquet" + "csv,json,orc" } else { "" } @@ -349,7 +349,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) { // write path - Seq("csv", "json", "parquet", "orc").foreach { format => + Seq("csv", "json", "orc").foreach { format => val msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }.getMessage @@ -357,7 +357,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } // read path - Seq("parquet", "csv").foreach { format => + Seq("csv").foreach { format => var msg = intercept[AnalysisException] { val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) From 281f62bf0fb785211e02fa3adcd1e2b93d7e1430 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 14 Oct 2019 09:44:27 +0300 Subject: [PATCH 09/16] Combine AtomicType and CalendarIntervalType in supportDataType() --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 31ba4b15298e9..3e10bc73b7214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -360,7 +360,7 @@ class ParquetFileFormat } override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true + case _: AtomicType | _: CalendarIntervalType => true case st: StructType => st.forall { f => supportDataType(f.dataType) } @@ -371,8 +371,6 @@ class ParquetFileFormat case udt: UserDefinedType[_] => supportDataType(udt.sqlType) - case _: CalendarIntervalType => true - case _ => false } } From 232b1fb709972edf7b17958f7d4dd07369999520 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 14 Oct 2019 09:55:20 +0300 Subject: [PATCH 10/16] Replace timestampBuffer and intervalBuffer by reusableBuffer --- .../datasources/parquet/ParquetWriteSupport.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 8be736ef217f6..56ca078e7adbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -71,10 +71,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _ // Reusable byte array used to write timestamps as Parquet INT96 values - private val timestampBuffer = new Array[Byte](12) - - // Reusable byte array used to write intervals as Parquet FIXED_LEN_BYTE_ARRAY values - private val intervalBuffer = new Array[Byte](12) + // or intervals as Parquet FIXED_LEN_BYTE_ARRAY values + private val reusableBuffer = new Array[Byte](12) // Reusable byte array used to write decimal values private val decimalBuffer = @@ -176,9 +174,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case SQLConf.ParquetOutputTimestampType.INT96 => (row: SpecializedGetters, ordinal: Int) => val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) - val buf = ByteBuffer.wrap(timestampBuffer) + val buf = ByteBuffer.wrap(reusableBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => (row: SpecializedGetters, ordinal: Int) => @@ -216,12 +214,12 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY val milliseconds: Int = (microseconds / DateTimeUtils.MICROS_PER_MILLIS).toInt val days: Int = Math.toIntExact(interval.microseconds / DateTimeUtils.MICROS_PER_DAY) - val buf = ByteBuffer.wrap(intervalBuffer) + val buf = ByteBuffer.wrap(reusableBuffer) buf.order(ByteOrder.LITTLE_ENDIAN) .putInt(milliseconds) .putInt(days) .putInt(interval.months) - recordConsumer.addBinary(Binary.fromReusedByteArray(intervalBuffer)) + recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) case _ => sys.error(s"Unsupported data type $dataType.") } From 3f6ad617bf1df32fcd3f38bc3b3a629a34631879 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 14 Oct 2019 10:10:16 +0300 Subject: [PATCH 11/16] Move unsupported type check to checkUnsupportedTypes() --- .../execution/datasources/DataSource.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 7f3ed48c43667..5cf877f759719 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, Tex import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} +import org.apache.spark.sql.types.{CalendarIntervalType, DataType, StructField, StructType} import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils @@ -498,10 +498,8 @@ case class DataSource( outputColumnNames: Seq[String], physicalPlan: SparkPlan): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) - if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } + checkUnsupportedTypes(outputColumns.map(_.dataType)) providingInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation( @@ -535,11 +533,7 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { - if (providingClass != classOf[ParquetFileFormat]) { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } - } + checkUnsupportedTypes(data.schema.map(_.dataType)) providingInstance() match { case dataSource: CreatableRelationProvider => @@ -571,6 +565,15 @@ case class DataSource( DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, checkEmptyGlobPath, checkFilesExist) } + + + private def checkUnsupportedTypes(dataTypes: Seq[DataType]): Unit = { + if (providingClass != classOf[ParquetFileFormat]) { + if (dataTypes.exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + } + } } object DataSource extends Logging { From c260622d7ab83292e2bd98ab13734adaadf51bfc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 14 Oct 2019 10:36:42 +0300 Subject: [PATCH 12/16] Add more tests to ParquetQuerySuite --- .../datasources/parquet/ParquetQuerySuite.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 5ba42adc2e058..1f46df8e934ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -906,8 +906,19 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS test("interval written and read as Parquet INTERVAL") { withTempPath { file => - val df = spark.range(10) - .selectExpr("interval 100 years 1 month 10 second 1 millisecond as i") + val df = Seq( + "interval 0 seconds", + "interval 1 month 1 millisecond", + "interval -1 month -1 millisecond", + "interval 1 year 2 month 3 weeks 4 days 5 hours 6 minutes 7 second 8 millisecond", + "interval -1 year -2 month -3 weeks -4 days -5 hours -6 minutes -7 second -8 millisecond", + "interval 3650000 days", + "interval -3650000 days", + "interval 9999 years 12 months 1 millisecond", + "interval 9999 years 12 months 23 hours 59 minutes 59 seconds 999 milliseconds", + "interval -9999 years -12 months -23 hours -59 minutes -59 seconds -999 milliseconds", + "").toDF("intervalStr") + .selectExpr("CAST(intervalStr AS interval) AS i") df.write.parquet(file.getCanonicalPath) ("true" :: "false" :: Nil).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { From 032a2ead7fe65032697f452dec983dff4dc77a3e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 25 Oct 2019 19:41:34 +0300 Subject: [PATCH 13/16] CalendarInterval.fromString -> IntervalUtils.fromString --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 873887fa16ed0..b8a71c696f853 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -43,13 +43,13 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.unsafe.types.UTF8String // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport // with an empty configuration (it is after all not intended to be used in this way?) @@ -751,7 +751,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Decimal("1234.23456"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")), - CalendarInterval.fromString("interval 1 month 2 microsecond")) + IntervalUtils.fromString("interval 1 month 2 microsecond")) dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) From 712378c6001c352d76c28c6cdf6176b694ea5355 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 1 Nov 2019 16:21:46 +0300 Subject: [PATCH 14/16] Rebase on new field - days --- .../spark/sql/execution/vectorized/ColumnVectorUtils.java | 3 ++- .../datasources/parquet/ParquetRowConverter.scala | 7 ++----- .../datasources/parquet/ParquetWriteSupport.scala | 7 ++----- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index bce6aa28c42a1..c3beed079c69f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -89,7 +89,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field } else if (t instanceof CalendarIntervalType) { CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t); col.getChild(0).putInts(0, capacity, c.months); - col.getChild(1).putLongs(0, capacity, c.microseconds); + col.getChild(1).putInts(0, capacity, c.days); + col.getChild(2).putLongs(0, capacity, c.microseconds); } else if (t instanceof DateType) { col.putInts(0, capacity, row.getInt(fieldIdx)); } else if (t instanceof TimestampType) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index cff62bcf3c546..5b1fb0102f7b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -333,13 +333,10 @@ private[parquet] class ParquetRowConverter( s"but got a ${value.length()}-byte array.") val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val milliseconds = buf.getInt - var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS + val microseconds = buf.getInt * DateTimeUtils.MICROS_PER_MILLIS val days = buf.getInt - val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY) - microseconds = Math.addExact(microseconds, daysInUs) val months = buf.getInt - updater.set(new CalendarInterval(months, microseconds)) + updater.set(new CalendarInterval(months, days, microseconds)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 56ca078e7adbb..558476e3182ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -211,13 +211,10 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case CalendarIntervalType => (row: SpecializedGetters, ordinal: Int) => val interval = row.getInterval(ordinal) - val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY - val milliseconds: Int = (microseconds / DateTimeUtils.MICROS_PER_MILLIS).toInt - val days: Int = Math.toIntExact(interval.microseconds / DateTimeUtils.MICROS_PER_DAY) val buf = ByteBuffer.wrap(reusableBuffer) buf.order(ByteOrder.LITTLE_ENDIAN) - .putInt(milliseconds) - .putInt(days) + .putInt((interval.milliseconds()).toInt) + .putInt(interval.days) .putInt(interval.months) recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) From cf8023a3926823a34a2d0661ff52b1ea203333b1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 1 Nov 2019 16:31:13 +0300 Subject: [PATCH 15/16] Add one more test --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 5aa444953e299..0a2e21c4a81b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -917,6 +917,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS "interval 9999 years 12 months 1 millisecond", "interval 9999 years 12 months 23 hours 59 minutes 59 seconds 999 milliseconds", "interval -9999 years -12 months -23 hours -59 minutes -59 seconds -999 milliseconds", + "interval 1000 months 1000 days 10000000 microseconds", "").toDF("intervalStr") .selectExpr("CAST(intervalStr AS interval) AS i") df.write.parquet(file.getCanonicalPath) From 904111049f95d066dd53bc208d253454add85b17 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 10 Dec 2019 22:56:35 +0300 Subject: [PATCH 16/16] Rebase on the master --- .../execution/datasources/parquet/ParquetRowConverter.scala | 3 ++- .../execution/datasources/parquet/ParquetWriteSupport.scala | 3 ++- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 5b1fb0102f7b7..5811bb122e7b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -34,6 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -333,7 +334,7 @@ private[parquet] class ParquetRowConverter( s"but got a ${value.length()}-byte array.") val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val microseconds = buf.getInt * DateTimeUtils.MICROS_PER_MILLIS + val microseconds = buf.getInt * MICROS_PER_MILLIS val days = buf.getInt val months = buf.getInt updater.set(new CalendarInterval(months, days, microseconds)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 558476e3182ee..40e1a76ec6da6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -34,6 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -213,7 +214,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val interval = row.getInterval(ordinal) val buf = ByteBuffer.wrap(reusableBuffer) buf.order(ByteOrder.LITTLE_ENDIAN) - .putInt((interval.milliseconds()).toInt) + .putInt(Math.toIntExact(interval.microseconds / MICROS_PER_MILLIS)) .putInt(interval.days) .putInt(interval.months) recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index b8a71c696f853..36e406302c241 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -751,7 +751,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Decimal("1234.23456"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")), - IntervalUtils.fromString("interval 1 month 2 microsecond")) + IntervalUtils.safeStringToInterval( + UTF8String.fromString("interval 1 month 2 microsecond"))) dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil)