Skip to content

Commit

Permalink
Prototype impl of estimations for Catalyst logical plans.
Browse files Browse the repository at this point in the history
- Also add simple size-getters for ParquetRelation and
  MetastoreRelation.
- Add a rule to auto-convert equi-joins to BroadcastHashJoin, if a table
  has smaller size, based on the above getter (for MetastoreRelation).
  • Loading branch information
concretevitamin committed Jul 29, 2014
1 parent 800ecff commit 56a8e6e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import org.apache.spark.sql.catalyst.trees
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>

protected class Estimates {
lazy val childrenEstimations = children.map(_.estimates)
lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum
lazy val numTuples: Long = childrenEstimations.map(_.size).sum
lazy val size: Long = childrenEstimations.map(_.numTuples).sum
}

/**
* Estimates of various statistics.
*/
lazy val estimates: Estimates = new Estimates

/**
* Returns the set of attributes that are referenced by this node
* during evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
if broadcastTables.contains(b.tableName)
|| (right.estimates.size <= sqlContext.autoConvertJoinSize) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case ExtractEquiJoinKeys(
Inner,
Expand All @@ -81,7 +82,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
right)
if broadcastTables.contains(b.tableName) =>
if broadcastTables.contains(b.tableName)
|| (left.estimates.size <= sqlContext.autoConvertJoinSize) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapreduce.Job

import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
import parquet.schema.MessageType

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
Expand All @@ -43,12 +46,22 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
*
* @param path The path to the Parquet file.
*/
// TODO: make me a BaseRelation? For HashJoin strategy.
private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {

self: Product =>

@transient override lazy val estimates = new Estimates {
// TODO: investigate getting encoded column statistics in the parquet file?
override lazy val size: Long = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job())))
fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent?
}
}

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
ParquetTypesConverter
Expand Down
9 changes: 9 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._

Expand All @@ -28,6 +29,14 @@ class JoinSuite extends QueryTest {
// Ensures tables are loaded.
TestData

test("parquet") {
val data = parquetFile("../../points.parquet") // local file!
val sizes = data.logicalPlan.collect { case j: ParquetRelation =>
j.newInstance.estimates.size // also works without .newInstance
}.toSeq
assert(sizes.size === 1 && sizes(0) > 0)
}

test("equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -64,9 +65,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

// Since HiveQL is case insensitive for table names we make them all lowercase.
MetastoreRelation(
databaseName,
tblName,
alias)(table.getTTable, partitions.map(part => part.getTPartition))
databaseName, tblName, alias)(
table.getTTable, partitions.map(part => part.getTPartition))(
hive.hiveconf, table.getPath)
}

def createTable(
Expand Down Expand Up @@ -251,7 +252,11 @@ object HiveMetastoreTypes extends RegexParsers {
private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition])
(@transient hiveConf: HiveConf, @transient path: Path)
extends BaseRelation {

self: Product =>

// TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
// use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
// Right now, using org.apache.hadoop.hive.ql.metadata.Table and
Expand All @@ -264,6 +269,19 @@ private[hive] case class MetastoreRelation
new Partition(hiveQlTable, p)
}

// TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use?
@transient override lazy val estimates = new Estimates {
// Size getters adapted from
// https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
override lazy val size: Long =
maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path)

private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = {
val res = try { Some(size.toLong) } catch { case _: Exception => None }
res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength }
}
}

val tableDesc = new TableDesc(
Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]],
hiveQlTable.getInputFormatClass,
Expand All @@ -275,14 +293,14 @@ private[hive] case class MetastoreRelation
hiveQlTable.getMetadata
)

implicit class SchemaAttribute(f: FieldSchema) {
def toAttribute = AttributeReference(
f.getName,
HiveMetastoreTypes.toDataType(f.getType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = tableName +: alias.toSeq)
}
implicit class SchemaAttribute(f: FieldSchema) {
def toAttribute = AttributeReference(
f.getName,
HiveMetastoreTypes.toDataType(f.getType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = tableName +: alias.toSeq)
}

// Must be a stable value since new attributes are born here.
val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.execution.{BuildRight, BroadcastHashJoin}

import scala.util.Try

import org.apache.spark.sql.{SchemaRDD, Row}
import org.apache.spark.sql.hive.MetastoreRelation
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.{SchemaRDD, Row}

case class TestData(a: Int, b: String)

Expand All @@ -48,6 +51,28 @@ class HiveQuerySuite extends HiveComparisonTest {
"Incorrect number of rows in created table")
}

// TODO: put me in a separate EstimateSuite?
test("BHJ by size") {
hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables
// TODO: use two different tables?
// assume src has small size
val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""")
val physical = rdd.queryExecution.sparkPlan
val bhj = physical.collect { case j: BroadcastHashJoin => j }
println(s"${rdd.queryExecution}")
assert(bhj.size === 1)
}

// TODO: put me in a separate EstimateSuite?
test("estimates the size of a MetastoreRelation") {
val rdd = hql("""SELECT * FROM src""")
println(s"${rdd.queryExecution}")
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.estimates.size
}.toSeq
assert(sizes.size === 1 && sizes(0) > 0)
}

createQueryTest("between",
"SELECT * FROM src WHERE key Between 1 and 2")

Expand Down

0 comments on commit 56a8e6e

Please sign in to comment.