Skip to content

Commit

Permalink
[SPARK-2406][SQL] Initial support for using ParquetTableScan to read …
Browse files Browse the repository at this point in the history
…HiveMetaStore tables.

This PR adds an experimental flag `spark.sql.hive.convertMetastoreParquet` that when true causes the planner to detects tables that use Hive's Parquet SerDe and instead plans them using Spark SQL's native `ParquetTableScan`.

Author: Michael Armbrust <michael@databricks.com>
Author: Yin Huai <huai@cse.ohio-state.edu>

Closes apache#1819 from marmbrus/parquetMetastore and squashes the following commits:

1620079 [Michael Armbrust] Revert "remove hive parquet bundle"
cc30430 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4f3d54f [Michael Armbrust] fix style
41ebc5f [Michael Armbrust] remove hive parquet bundle
a43e0da [Michael Armbrust] Merge remote-tracking branch 'origin/master' into parquetMetastore
4c4dc19 [Michael Armbrust] Fix bug with tree splicing.
ebb267e [Michael Armbrust] include parquet hive to tests pass (Remove this later).
c0d9b72 [Michael Armbrust] Avoid creating a HadoopRDD per partition.  Add dirty hacks to retrieve partition values from the InputSplit.
8cdc93c [Michael Armbrust] Merge pull request #8 from yhuai/parquetMetastore
a0baec7 [Yin Huai] Partitioning columns can be resolved.
1161338 [Michael Armbrust] Add a test to make sure conversion is actually happening
212d5cd [Michael Armbrust] Initial support for using ParquetTableScan to read HiveMetaStore tables.
  • Loading branch information
marmbrus committed Aug 18, 2014
1 parent 9eb74c7 commit 3abd0c1
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 23 deletions.
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ object SQL {
object Hive {

lazy val settings = Seq(

javaOptions += "-XX:MaxPermSize=1g",
// Multiple queries rely on the TestHive singleton. See comments there for more details.
parallelExecution in Test := false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children = child :: Nil
def execute() = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration],
@transient sqlContext: SQLContext)
@transient sqlContext: SQLContext,
partitioningAttributes: Seq[Attribute] = Nil)
extends LeafNode with MultiInstanceRelation {

self: Product =>
Expand All @@ -61,12 +62,13 @@ private[sql] case class ParquetRelation(

/** Attributes */
override val output =
partitioningAttributes ++
ParquetTypesConverter.readSchemaFromFile(
new Path(path),
new Path(path.split(",").head),
conf,
sqlContext.isParquetBinaryAsString)

override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
Expand All @@ -42,6 +43,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
Expand All @@ -60,28 +62,38 @@ case class ParquetTableScan(
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
// by exprId. note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
val output = attributes.map { a =>
relation.output
.find(o => o.exprId == a.exprId)
.getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
}
val normalOutput =
attributes
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))

val partOutput =
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))

def output = partOutput ++ normalOutput

assert(normalOutput.size + partOutput.size == attributes.size,
s"$normalOutput + $partOutput != $attributes, ${relation.output}")

override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

val conf: Configuration = ContextUtil.getConfiguration(job)
val qualifiedPath = {
val path = new Path(relation.path)
path.getFileSystem(conf).makeQualified(path)

relation.path.split(",").foreach { curPath =>
val qualifiedPath = {
val path = new Path(curPath)
path.getFileSystem(conf).makeQualified(path)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)

// Store both requested and original schema in `Configuration`
conf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(output))
ParquetTypesConverter.convertToString(normalOutput))
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(relation.output))
Expand All @@ -102,13 +114,41 @@ case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))

sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row])
.map(_._2)
.filter(_ != null) // Parquet's record filters may produce null values
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
sc,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row],
conf)

if (partOutput.nonEmpty) {
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
.getPath
.toString
.split("/")
.flatMap {
case partValue(key, value) => Some(key -> value)
case _ => None
}.toMap

val partitionRowValues =
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

new Iterator[Row] {
private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)

def hasNext = iter.hasNext

def next() = joinedRow.withRight(iter.next()._2)
}
}
} else {
baseRDD.map(_._2)
}.filter(_ != null) // Parquet's record filters may produce null values
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

Expand Down Expand Up @@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
Expand Down
119 changes: 117 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,136 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.SQLContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}

import scala.collection.JavaConversions._

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SQLContext#SparkPlanner =>

val hiveContext: HiveContext

/**
* :: Experimental ::
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
* table scan operator.
*
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
*
* Other issues:
* - Much of this logic assumes case insensitive resolution.
*/
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: SchemaRDD) {
def lowerCase =
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))

def addPartitioningAttributes(attrs: Seq[Attribute]) =
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) =
OutputFaker(
originalPlan.output.map(a =>
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
.getOrElse(
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
originalPlan)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
hiveContext.convertMetastoreParquet =>

// Filter out all predicates that only deal with partition keys
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true))

val unresolvedProjection = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}

hiveContext
.parquetFile(partitions.map(_.getLocation).mkString(","))
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)):: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
case _ => Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.parquet

import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.io.Writable

/**
* A placeholder that allows SparkSQL users to create metastore tables that are stored as
* parquet files. It is only intended to pass the checks that the serde is valid and exists
* when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
* when "spark.sql.hive.convertMetastoreParquet" is set to true.
*/
@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
"placeholder in the Hive MetaStore")
class FakeParquetSerDe extends SerDe {
override def getObjectInspector: ObjectInspector = new ObjectInspector {
override def getCategory: Category = Category.PRIMITIVE

override def getTypeName: String = "string"
}

override def deserialize(p1: Writable): AnyRef = throwError

override def initialize(p1: Configuration, p2: Properties): Unit = {}

override def getSerializedClass: Class[_ <: Writable] = throwError

override def getSerDeStats: SerDeStats = throwError

override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError

private def throwError =
sys.error(
"spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
}
Loading

0 comments on commit 3abd0c1

Please sign in to comment.