From 538f506851d7e2eba6a20d0ad4a5909486bf8516 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 28 Feb 2015 14:35:12 +0800 Subject: [PATCH] Addresses comments --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 8 ++++++-- .../scala/org/apache/spark/sql/parquet/newParquet.scala | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index de62eaa5981b6..b70759aa02aae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -126,9 +126,13 @@ private[sql] case class ParquetTableScan( conf) if (requestedPartitionOrdinals.nonEmpty) { - // This check if based on CatalystConverter.createRootConverter. + // This check is based on CatalystConverter.createRootConverter. val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType)) + // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into + // the `mapPartitionsWithInputSplit` closure below. + val outputSize = output.size + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => val partValue = "([^=]+)=([^=]+)".r val partValues = @@ -165,7 +169,7 @@ private[sql] case class ParquetTableScan( } } else { // Create a mutable row since we need to fill in values from partition columns. - val mutableRow = new GenericMutableRow(output.size) + val mutableRow = new GenericMutableRow(outputSize) new Iterator[Row] { def hasNext = iter.hasNext def next() = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 33505f93adccc..2f599b7182d72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -476,7 +476,7 @@ private[sql] case class ParquetRelation2( // When the data does not include the key and the key is requested then we must fill it in // based on information from the input split. if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) { - // This check if based on CatalystConverter.createRootConverter. + // This check is based on CatalystConverter.createRootConverter. val primitiveRow = requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))