Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5068] [SQL] Fix bug query data when path doesn't exist for HiveContext #4356

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 90 additions & 70 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
Expand Down Expand Up @@ -68,6 +68,8 @@ class HadoopTableReader(
math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)
}

@transient private lazy val fs = FileSystem.get(sc.hiveconf)

// TODO: set aws s3 credentials.

private val _broadcastedHiveConf =
Expand Down Expand Up @@ -102,24 +104,28 @@ class HadoopTableReader(
val broadcastedHiveConf = _broadcastedHiveConf

val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
applyFilterIfNeeded(tablePath, filterOpt) match {
case Some(inputPathStr) =>
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]

// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
val hadoopRDD = createHadoopRdd (tableDesc, inputPathStr, ifc)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificMutableRow (attributes.map (_.dataType) )

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
}
val deserializedHadoopRDD = hadoopRDD.mapPartitions {
iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance ()
deserializer.initialize (hconf, tableDesc.getProperties)
HadoopTableReader.fillObject (iter, deserializer, attrsWithIndex, mutableRow)
}

deserializedHadoopRDD
deserializedHadoopRDD
case None => new EmptyRDD[Row](sc.sparkContext)
}
}

override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = {
Expand All @@ -142,56 +148,62 @@ class HadoopTableReader(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[Row] = {
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
val hivePartitionRDDs = partitionToDeserializer.flatMap { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
// Get partition field info
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties

val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS)
// Partitioning columns are delimited by "/"
val partCols = partColsDelimited.trim().split("/").toSeq
// 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'.
val partValues = if (partSpec == null) {
Array.fill(partCols.size)(new String)
} else {
partCols.map(col => new String(partSpec.get(col))).toArray
}

// Create local references so that the outer object isn't serialized.
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))

// Splits all attributes into two groups, partition key attributes and those that are not.
// Attached indices indicate the position of each attribute in the output schema.
val (partitionKeyAttrs, nonPartitionKeyAttrs) =
attributes.zipWithIndex.partition { case (attr, _) =>
relation.partitionKeys.contains(attr)
}

def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = {
partitionKeyAttrs.foreach { case (attr, ordinal) =>
val partOrdinal = relation.partitionKeys.indexOf(attr)
row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
}
}

// Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)

createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
applyFilterIfNeeded(partPath, filterOpt) match {
case Some(inputPathStr) =>
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
// Get partition field info
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties

val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS)
// Partitioning columns are delimited by "/"
val partCols = partColsDelimited.trim().split("/").toSeq
// 'partValues[i]' contains the value for the partitioning column at 'partCols[i]'.
val partValues = if (partSpec == null) {
Array.fill(partCols.size)(new String)
} else {
partCols.map(col => new String(partSpec.get(col))).toArray
}

// Create local references so that the outer object isn't serialized.
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))

// Splits all attributes into two groups, partition key attributes and those that are not.
// Attached indices indicate the position of each attribute in the output schema.
val (partitionKeyAttrs, nonPartitionKeyAttrs) =
attributes.zipWithIndex.partition {
case (attr, _) =>
relation.partitionKeys.contains(attr)
}

def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = {
partitionKeyAttrs.foreach {
case (attr, ordinal) =>
val partOrdinal = relation.partitionKeys.indexOf(attr)
row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
}
}

// Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)

Some(createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions {
iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)

// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
})
case None => None
}
}.toSeq

Expand All @@ -207,13 +219,21 @@ class HadoopTableReader(
* If `filterOpt` is defined, then it will be used to filter files from `path`. These files are
* returned in a single, comma-separated string.
*/
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
val fs = path.getFileSystem(sc.hiveconf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Option[String] = {
if (fs.exists(path)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd better get fs from the path,because in the hadoop namenode federation we may get some problems like Wrong FS exception if we use the FileSystem.get(sc.hiveconf) to get fs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is similar to what @marmbrus mentioned in #3981. It's pretty expensive to check each path in serial for tables with lots of partitions. Especially when the data reside on S3. Can we use listStatus or globStatus to retrieve all FileStatus objects under some path(s), and then do the filtering locally?

// if the file exists
filterOpt match {
case Some(filter) =>
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
if (filteredFiles.length > 0) {
Some(filteredFiles.mkString(","))
} else {
None
}
case None => Some(path.toString)
}
} else {
None
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
238 val_238
238 val_238
238 val_238
238 val_238
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
238 val_238
238 val_238
238 val_238
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.util.Utils
import org.apache.spark.sql.hive.execution.HiveComparisonTest

abstract class QueryPartitionSuite extends HiveComparisonTest with BeforeAndAfter {
protected val tmpDir = Utils.createTempDir()

override def beforeAll() {
sql(
s"""CREATE TABLE table_with_partition(key int,value string)
|PARTITIONED by (ds string) location '${tmpDir.toURI.toString}'
|""".stripMargin)
sql(
s"""INSERT OVERWRITE TABLE table_with_partition
| partition (ds='1') SELECT key,value FROM src LIMIT 1""".stripMargin)
sql(
s"""INSERT OVERWRITE TABLE table_with_partition
| partition (ds='2') SELECT key,value FROM src LIMIT 1""".stripMargin)
sql(
s"""INSERT OVERWRITE TABLE table_with_partition
| partition (ds='3') SELECT key,value FROM src LIMIT 1""".stripMargin)
sql(
s"""INSERT OVERWRITE TABLE table_with_partition
| partition (ds='4') SELECT key,value FROM src LIMIT 1""".stripMargin)
}

override def afterAll() {
sql("DROP TABLE table_with_partition")
}
}

class QueryPartitionSuite0 extends QueryPartitionSuite {
//test for the exist path
createQueryTest("SPARK-5068 scan partition with existed path",
"select key,value from table_with_partition", false)
}

class QueryPartitionSuite1 extends QueryPartitionSuite {
override def beforeAll(): Unit = {
super.beforeAll()
//delete the one of the partition manually
val folders = tmpDir.listFiles.filter(_.isDirectory)
Utils.deleteRecursively(folders(0))
}

//test for the after deleting the partition path
createQueryTest("SPARK-5068 scan partition with non-existed path",
"select key,value from table_with_partition", false)
}

class QueryPartitionSuite2 extends QueryPartitionSuite {
override def beforeAll(): Unit = {
super.beforeAll()
// delete the path of the table file
Utils.deleteRecursively(tmpDir)
}

createQueryTest("SPARK-5068 scan table with non-existed path",
"select key,value from table_with_partition", false)
}