Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables. #1819

Closed
wants to merge 12 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children = child :: Nil
def execute() = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to test this PR soon. In the meantime would it make sense to only put this in SQLConf (as well as a field of the key string in the singleton object), making that class the central place that stores SQL configs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about that. The problem being that this only applies to HiveContexts, so it doesn't really make much sense in a SQLContext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a job for HiveConf extends SQLConf! After all, there's nothing better than confusing users trying to use org.apache.hadoop.hive.conf.HiveConf!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When in doubt, make up longer names: SQLConfigOpts, HiveConfigOpts. But this is only possibly relevant in the future and should not block this PR.


override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

Expand Down Expand Up @@ -328,6 +336,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
Expand Down
117 changes: 115 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,134 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.SQLContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.ParquetTableScan

import scala.collection.JavaConversions._

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SQLContext#SparkPlanner =>

val hiveContext: HiveContext

/**
* :: Experimental ::
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
* table scan operator.
*
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
*/
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: SchemaRDD) {
def lowerCase =
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
}

implicit class PhysicalPlanHacks(s: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
hiveContext.convertMetastoreParquet =>

// Filter out all predicates that only deal with partition keys
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true))

val unresolvedProjection = projectList.map(_ transform {
// Handle non-partitioning columns
case a: AttributeReference if !partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad... My IDE was misconfigured on the right margin...

})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}

org.apache.spark.sql.execution.Union(
partitions.par.map { p =>
val partValues = p.getValues()
val internalProjection = unresolvedProjection.map(_ transform {
// Handle partitioning columns
case a: AttributeReference if partitionKeyIds.contains(a.exprId) => {
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)

Alias(Cast(Literal(partValues.get(idx), StringType), key.dataType), a.name)()
}
})

hiveContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that causes performance issue if there are lots of partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did due to the hadoopConf getting broadcasted over and over again. Hence: c0d9b72

.parquetFile(p.getLocation)
.lowerCase
.where(unresolvedOtherPredicates)
.select(internalProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute))
}.seq) :: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
case _ => Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import java.io.File

import org.apache.spark.sql.hive.execution.HiveTableScan
import org.scalatest.BeforeAndAfterAll

import scala.reflect.ClassTag

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

case class ParquetData(intField: Int, stringField: String)

/**
* Tests for our SerDe -> Native parquet scan conversion.
*/
class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {

override def beforeAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
partitionedTableDir.delete()
partitionedTableDir.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDir, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

sql(s"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we execute setup queries in the constructor, will we introduce any issue to mvn tests? It looks similar with what we originally did for HiveTableScanSuite. Then, we have to use createQueryTest to atomically run setup and execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are okay as long as we don't use createQueryTest anywhere, since it runs reset(). I can try to move the DDL into each test to be safe though.

create external table partitioned_parquet
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDir.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
intField INT,
stringField STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
""")

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}

test("project the partitioning column") {
checkAnswer(
sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}

test("simple count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet"),
100)
}

test("pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
10)
}

test("multi-partition pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
30)
}

test("non-partition predicates") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
30)
}

test("sum") {
checkAnswer(
sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3
)
}

test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
10
)
}

test("conversion is working") {
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
case _: HiveTableScan => true
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
case _: ParquetTableScan => true
}.nonEmpty)
}
}