Skip to content

Commit

Permalink
[SPARK-4552][SQL] Avoid exception when reading empty parquet data thr…
Browse files Browse the repository at this point in the history
…ough Hive

This is a very small fix that catches one specific exception and returns an empty table.  #3441 will address this in a more principled way.

Author: Michael Armbrust <michael@databricks.com>

Closes #3586 from marmbrus/fixEmptyParquet and squashes the following commits:

2781d9f [Michael Armbrust] Handle empty lists for newParquet
04dd376 [Michael Armbrust] Avoid exception when reading empty parquet data through Hive
  • Loading branch information
marmbrus committed Dec 3, 2014
1 parent 90ec643 commit 513ef82
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
val selectedPartitions = partitions.filter(p => partitionFilters.forall(_(p)))
val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
val selectedFiles = selectedPartitions.flatMap(_.files).map(f => fs.makeQualified(f.getPath))
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles:_*)
// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
}

// Push down filters when possible
predicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan, PhysicalRDD}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
Expand Down Expand Up @@ -104,53 +104,61 @@ private[hive] trait HiveStrategies {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
try {
if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the
// correct data types during evaluation.
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if (codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}
pruningCondition(inputData)
}

hiveContext
.parquetFile(partitions.map(_.getLocation).mkString(","))
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)):: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.toString)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
hiveContext
.parquetFile(partitions.map(_.getLocation).mkString(","))
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection: _*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.toString)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection: _*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
} catch {
// parquetFile will throw an exception when there is no data.
// TODO: Remove this hack for Spark 1.3.
case iae: java.lang.IllegalArgumentException
if iae.getMessage.contains("Can not create a Path from an empty string") =>
PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
}
case _ => Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ abstract class ParquetTest extends QueryTest with BeforeAndAfterAll {
10)
}

test(s"non-existant partition $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
0)
}

test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
Expand Down

0 comments on commit 513ef82

Please sign in to comment.