Skip to content

Commit

Permalink
Merge pull request apache#39 from marmbrus/lateralView
Browse files Browse the repository at this point in the history
Add support for lateral views, TGFs and Hive UDTFs
  • Loading branch information
marmbrus committed Feb 11, 2014
2 parents 0e6c1d7 + 7785ee6 commit 964368f
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 94 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resolvers += "SparkStaging" at "https://repository.apache.org/content/repositori

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

libraryDependencies += "catalyst" % "hive-golden" % "7" % "test" from "http://repository-databricks.forge.cloudbees.com/snapshot/catalystGolden7.jar"
libraryDependencies += "catalyst" % "hive-golden" % "8" % "test" from "http://cs.berkeley.edu/~marmbrus/tmp/catalystGolden8.jar"

// Hive 0.10.0 relies on a weird version of jdo that is not published anywhere... Remove when we upgrade to 0.11.0
libraryDependencies += "javax.jdo" % "jdo2-api" % "2.3-ec" from "http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar"
Expand Down
15 changes: 14 additions & 1 deletion src/main/scala/catalyst/analysis/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
Expand Down Expand Up @@ -114,6 +115,18 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
}
}

/**
* When a SELECT clause has only a single expression and that expression is a
* [[catalyst.expressions.Generator Generator]] we convert the
* [[catalyst.plans.logical.Project Project]] to a [[catalyst.plans.logical.Generate Generate]].
*/
object ImplicitGenerate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Project(Seq(Alias(g: Generator, _)), child) =>
Generate(g, join = false, outer = false, None, child)
}
}

/**
* Expands any references to [[Star]] (*) in project operators.
*/
Expand All @@ -129,7 +142,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case o => o :: Nil
},
child)
case t: Transform if containsStar(t.input) =>
case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
input = t.input.flatMap {
case s: Star => s.expand(t.child.output)
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/catalyst/dsl/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ package object dsl {
seed: Int = (math.random * 1000).toInt) =
Sample(fraction, withReplacement, seed, plan)

def generate(
generator: Generator,
join: Boolean = false,
outer: Boolean = false,
alias: Option[String] = None) =
Generate(generator, join, outer, None, plan)

def analyze = analysis.SimpleAnalyzer(plan)
}
}
124 changes: 103 additions & 21 deletions src/main/scala/catalyst/execution/FunctionRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package catalyst
package execution

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFEvaluator, AbstractGenericUDAFResolver}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.ql.udf.generic._
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.{io => hadoopIo}

Expand Down Expand Up @@ -38,6 +38,9 @@ object HiveFunctionRegistry extends analysis.FunctionRegistry with HiveFunctionF
} else if (
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUdaf(name, children)

} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUdtf(name, Nil, children)
} else {
sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
}
Expand Down Expand Up @@ -126,11 +129,13 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd
type UDFType = UDF

@transient
lazy val method = function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))
protected lazy val method =
function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))

@transient
lazy val dataType = javaClassToDataType(method.getReturnType)

lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
protected lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
val primitiveClasses = Seq(
Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE,
classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long],
Expand All @@ -152,7 +157,7 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd
} else {
constructor.newInstance(a match {
case i: Int => i: java.lang.Integer
case bd: BigDecimal => new HiveDecimal(bd.underlying)
case bd: BigDecimal => new HiveDecimal(bd.underlying())
case other: AnyRef => other
}).asInstanceOf[AnyRef]
}
Expand All @@ -178,15 +183,26 @@ case class HiveGenericUdf(
type UDFType = GenericUDF

@transient
lazy val argumentInspectors: Seq[ObjectInspector] = children.map(_.dataType).map(toInspector)
protected lazy val argumentInspectors = children.map(_.dataType).map(toInspector)

@transient
lazy val returnInspector = function.initialize(argumentInspectors.toArray)
protected lazy val returnInspector = function.initialize(argumentInspectors.toArray)

val dataType: DataType = inspectorToDataType(returnInspector)

def evaluate(evaluatedChildren: Seq[Any]): Any = {
returnInspector // Make sure initialized.
val args = evaluatedChildren.map(wrap).map { v =>
new DeferredJavaObject(v): DeferredObject
}.toArray
unwrap(function.evaluate(args))
}
}

trait HiveInspectors {

/** Converts native catalyst types to the types expected by Hive */
def wrap(a: Any): Any = a match {
def wrap(a: Any): AnyRef = a match {
case s: String => new hadoopIo.Text(s)
case i: Int => i: java.lang.Integer
case b: Boolean => b: java.lang.Boolean
Expand All @@ -200,16 +216,6 @@ case class HiveGenericUdf(
case null => null
}

def evaluate(evaluatedChildren: Seq[Any]): Any = {
returnInspector // Make sure initialized.
val args = evaluatedChildren.map(wrap).map { v =>
new DeferredJavaObject(v): DeferredObject
}.toArray
unwrap(function.evaluate(args))
}
}

trait HiveInspectors {
def toInspector(dataType: DataType): ObjectInspector = dataType match {
case ArrayType(tpe) => ObjectInspectorFactory.getStandardListObjectInspector(toInspector(tpe))
case MapType(keyType, valueType) =>
Expand Down Expand Up @@ -262,14 +268,14 @@ case class HiveGenericUdaf(

type UDFType = AbstractGenericUDAFResolver

lazy val resolver = createFunction[AbstractGenericUDAFResolver](name)
protected lazy val resolver: AbstractGenericUDAFResolver = createFunction(name)

lazy val objectInspector: ObjectInspector = {
protected lazy val objectInspector = {
resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
}

lazy val inspectors: Seq[ObjectInspector] = children.map(_.dataType).map(toInspector)
protected lazy val inspectors = children.map(_.dataType).map(toInspector)

def dataType: DataType = inspectorToDataType(objectInspector)

Expand All @@ -279,3 +285,79 @@ case class HiveGenericUdaf(

override def toString = s"$nodeName#$name(${children.mkString(",")})"
}

/**
* Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
* [[catalyst.expressions.Generator Generator]]. Note that the semantics of Generators do not allow
* Generators to maintain state in between input rows. Thus UDTFs that rely on partitioning
* dependent operations like calls to `close()` before producing output will not operate the same as
* in Hive. However, in practice this should not affect compatibility for most sane UDTFs
* (e.g. explode or GenericUDTFParseUrlTuple).
*
* Operators that require maintaining state in between input rows should instead be implemented as
* user defined aggregations, which have clean semantics even in a partitioned execution.
*/
case class HiveGenericUdtf(
name: String,
aliasNames: Seq[String],
children: Seq[Expression])
extends Generator with HiveInspectors with HiveFunctionFactory {

override def references = children.flatMap(_.references).toSet

@transient
protected lazy val function: GenericUDTF = createFunction(name)

protected lazy val inputInspectors = children.map(_.dataType).map(toInspector)

protected lazy val outputInspectors = {
val structInspector = function.initialize(inputInspectors.toArray)
structInspector.getAllStructFieldRefs.map(_.getFieldObjectInspector)
}

protected lazy val outputDataTypes = outputInspectors.map(inspectorToDataType)

override protected def makeOutput() = {
// Use column names when given, otherwise c_1, c_2, ... c_n.
if (aliasNames.size == outputDataTypes.size) {
aliasNames.zip(outputDataTypes).map {
case (attrName, attrDataType) =>
AttributeReference(attrName, attrDataType, nullable = true)()
}
} else {
outputDataTypes.zipWithIndex.map {
case (attrDataType, i) =>
AttributeReference(s"c_$i", attrDataType, nullable = true)()
}
}
}

def apply(input: Row): TraversableOnce[Row] = {
outputInspectors // Make sure initialized.
val collector = new UDTFCollector
function.setCollector(collector)

val udtInput = children.map(Evaluate(_, Vector(input))).map(wrap).toArray
function.process(udtInput)
collector.collectRows()
}

protected class UDTFCollector extends Collector {
var collected = new ArrayBuffer[Row]

override def collect(input: java.lang.Object) {
// We need to clone the input here because implementations of
// GenericUDTF reuse the same object. Luckily they are always an array, so
// it is easy to clone.
collected += new GenericRow(input.asInstanceOf[Array[_]].map(unwrap))
}

def collectRows() = {
val toCollect = collected
collected = new ArrayBuffer[Row]
toCollect
}
}

override def toString() = s"$nodeName#$name(${children.mkString(",")})"
}
44 changes: 44 additions & 0 deletions src/main/scala/catalyst/execution/Generate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package catalyst
package execution

import catalyst.expressions._
import catalyst.types._

/**
* Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
* programming with one important additional feature, which allows the input rows to be joined with
* their output.
* @param join when true, each output row is implicitly joined with the input tuple that produced
* it.
* @param outer when true, each input row will be output at least once, even if the output of the
* given `generator` is empty. `outer` has no effect when `join` is false.
*/
case class Generate(
generator: Generator,
join: Boolean,
outer: Boolean,
child: SharkPlan)
extends UnaryNode {

def output =
if (join) child.output ++ generator.output else generator.output

def execute() = {
if (join) {
val outerNulls = Seq.fill(generator.output.size)(null)
child.execute().mapPartitions { iter =>
iter.flatMap {row =>
val outputRows = generator(row)
if (outer && outputRows.isEmpty) {
new GenericRow(row ++ outerNulls) :: Nil
} else {
outputRows.map(or => new GenericRow(row ++ or))
}
}
}
} else {
child.execute().mapPartitions(iter => iter.flatMap(generator))
}
}
}
29 changes: 25 additions & 4 deletions src/main/scala/catalyst/execution/MetastoreCatalog.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package catalyst
package execution

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition, Table}
Expand All @@ -14,10 +16,9 @@ import analysis.Catalog
import expressions._
import plans.logical._
import rules._
import types._
import catalyst.types._

import collection.JavaConversions._
import scala.util.parsing.combinator.RegexParsers
import scala.collection.JavaConversions._

class HiveMetastoreCatalog(hiveConf: HiveConf) extends Catalog {
val client = new HiveMetaStoreClient(hiveConf)
Expand Down Expand Up @@ -46,12 +47,15 @@ class HiveMetastoreCatalog(hiveConf: HiveConf) extends Catalog {
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import HiveMetastoreTypes._

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case InsertIntoCreatedTable(db, tableName, child) =>
val databaseName = db.getOrElse(SessionState.get.getCurrentDatabase())

val table = new Table()
val schema = child.output.map(attr => new FieldSchema(attr.name, "string", ""))
val schema =
child.output.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))

table.setDbName(databaseName)
table.setTableName(tableName)
Expand Down Expand Up @@ -115,6 +119,23 @@ object HiveMetastoreTypes extends RegexParsers {
case Success(result, _) => result
case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType")
}

def toMetastoreType(dt: DataType): String = dt match {
case ArrayType(elementType) => s"array<${toMetastoreType(elementType)}>"
case StructType(fields) =>
s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>"
case MapType(keyType, valueType) =>
s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>"
case StringType => "string"
case FloatType => "float"
case IntegerType => "int"
case ShortType =>"tinyint"
case DoubleType => "double"
case LongType => "bigint"
case BinaryType => "binary"
case BooleanType => "boolean"
case DecimalType => "decimal"
}
}

case class MetastoreRelation(databaseName: String, tableName: String, alias: Option[String])
Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/catalyst/execution/PlanningStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ trait PlanningStrategies {
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sort tuples within a partition. Its requiredDistribution will be
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
Expand All @@ -220,8 +220,12 @@ trait PlanningStrategies {
execution.StopAfter(Evaluate(limit, Nil).asInstanceOf[Int], planLater(child))(sc) :: Nil
case Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater))(sc) :: Nil
case logical.Transform(input, script, output, child) =>
execution.Transform(input, script, output, planLater(child))(sc) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
case logical.ScriptTransformation(input, script, output, child) =>
execution.ScriptTransformation(input, script, output, planLater(child))(sc) :: Nil
case logical.NoRelation =>
execution.LocalRelation(Nil, Seq(IndexedSeq()))(sc) :: Nil
case _ => Nil
}
}
Expand Down
Loading

0 comments on commit 964368f

Please sign in to comment.