From ca6e0384f30b8b6d07840a2c555cfc6069126679 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 27 Feb 2015 03:24:37 +0800 Subject: [PATCH] Fixes SPARK-5775 --- .../sql/parquet/ParquetTableOperations.scala | 37 +++-- .../apache/spark/sql/parquet/newParquet.scala | 38 +++-- .../spark/sql/parquet/parquetSuites.scala | 134 +++++++++++++++++- 3 files changed, 187 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 4dc13b036cd4e..4e9f800ec579a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -143,19 +143,38 @@ private[sql] case class ParquetTableScan( relation.partitioningAttributes .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) + val mutableRow = new GenericMutableRow(output.size) + new Iterator[Row] { def hasNext = iter.hasNext def next() = { - val row = iter.next()._2.asInstanceOf[SpecificMutableRow] - - // Parquet will leave partitioning columns empty, so we fill them in here. - var i = 0 - while (i < requestedPartitionOrdinals.size) { - row(requestedPartitionOrdinals(i)._2) = - partitionRowValues(requestedPartitionOrdinals(i)._1) - i += 1 + iter.next() match { + case (_, row: SpecificMutableRow) => + // Parquet will leave partitioning columns empty, so we fill them in here. + var i = 0 + while (i < requestedPartitionOrdinals.size) { + row(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + row + + case (_, row: Row) => + var i = 0 + while (i < row.size) { + mutableRow(i) = row(i) + i += 1 + } + + i = 0 + while (i < requestedPartitionOrdinals.size) { + mutableRow(requestedPartitionOrdinals(i)._2) = + partitionRowValues(requestedPartitionOrdinals(i)._1) + i += 1 + } + + mutableRow } - row } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 16b771344bfcd..1a90ae4637cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -482,17 +482,35 @@ private[sql] case class ParquetRelation2( }.get val requiredPartOrdinal = partitionKeyLocations.keys.toSeq + val mutableRow = new GenericMutableRow(requestedSchema.size) + + iterator.map { + case (_, row: SpecificMutableRow) => + var i = 0 + while (i < requiredPartOrdinal.size) { + // TODO Avoids boxing cost here! + val partOrdinal = requiredPartOrdinal(i) + row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) + i += 1 + } + row + + case (_, row: Row) => + var i = 0 + while (i < row.size) { + // TODO Avoids boxing cost here! + mutableRow(i) = row(i) + i += 1 + } - iterator.map { pair => - val row = pair._2.asInstanceOf[SpecificMutableRow] - var i = 0 - while (i < requiredPartOrdinal.size) { - // TODO Avoids boxing cost here! - val partOrdinal = requiredPartOrdinal(i) - row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) - i += 1 - } - row + i = 0 + while (i < requiredPartOrdinal.size) { + // TODO Avoids boxing cost here! + val partOrdinal = requiredPartOrdinal(i) + mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal)) + i += 1 + } + mutableRow } } } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 80fd5cda20e20..c90d0f52f9aff 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -35,6 +35,20 @@ case class ParquetData(intField: Int, stringField: String) // The data that also includes the partitioning key case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) +case class StructContainer(intStructField :Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the @@ -85,6 +99,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { location '${new File(normalTableDir, "normal").getCanonicalPath}' """) + sql(s""" + CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT, + arrayField ARRAY + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + """) + + sql(s""" + CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes + ( + intField INT, + stringField STRING, + structField STRUCT, + arrayField ARRAY + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + """) + (1 to 10).foreach { p => sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") } @@ -93,7 +139,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } - val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)") + } + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)") + } + + val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) jsonRDD(rdd1).registerTempTable("jt") val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}""")) jsonRDD(rdd2).registerTempTable("jt_array") @@ -104,6 +158,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { override def afterAll(): Unit = { sql("DROP TABLE partitioned_parquet") sql("DROP TABLE partitioned_parquet_with_key") + sql("DROP TABLE partitioned_parquet_with_complextypes") + sql("DROP TABLE partitioned_parquet_with_key_and_complextypes") sql("DROP TABLE normal_parquet") sql("DROP TABLE IF EXISTS jt") sql("DROP TABLE IF EXISTS jt_array") @@ -408,6 +464,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest { path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' ) """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + ) + """) + + sql( s""" + CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + ) + """) } } @@ -446,7 +518,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll var partitionedTableDir: File = null var normalTableDir: File = null var partitionedTableDirWithKey: File = null - + var partitionedTableDirWithComplexTypes: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null override def beforeAll(): Unit = { partitionedTableDir = File.createTempFile("parquettests", "sparksql") @@ -482,9 +555,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll .toDF() .saveAsParquetFile(partDir.getCanonicalPath) } + + partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKeyAndComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithComplexTypes.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + } + } + + override protected def afterAll(): Unit = { + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() } - Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => + test(s"ordering of the partitioning columns $table") { checkAnswer( sql(s"SELECT p, stringField FROM $table WHERE p = 1"), @@ -574,6 +683,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll } } + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + + test(s"SPARK-5775 read struct from $table") { + checkAnswer( + sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) + } + + // Re-enable this after SPARK-5508 is fixed + ignore(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row(1 to i, 1))) + } + } + + test("non-part select(*)") { checkAnswer( sql("SELECT COUNT(*) FROM normal_parquet"),