Skip to content

Commit

Permalink
Addressed styling issues mentioned by @marmbrus
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Mar 23, 2014
1 parent 9265366 commit fae7b02
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
package catalyst

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types._

/**
* Provides experimental support for generating catalyst schemas for scala objects.
*/
object ScalaReflection {
import scala.reflect.runtime.universe._

/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case s: StructType =>
s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
}

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T])

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
case t if t <:< typeOf[Seq[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
ArrayType(schemaFor(elementType))
case t if t <:< typeOf[String] => StringType
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
case t if t <:< definitions.DoubleTpe => DoubleType
case t if t <:< definitions.ShortTpe => ShortType
case t if t <:< definitions.ByteTpe => ByteType
}

implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {

/**
* Implicitly added to Sequences of case class objects. Returns a catalyst logical relation
* for the the data in the sequence.
*/
def asRelation: LocalRelation = {
val output = attributesFor[A]
LocalRelation(output, data)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.types._
* for a SQL like language should checkout the HiveQL support in the sql/hive sub-project.
*/
class SqlParser extends StandardTokenParsers {

def apply(input: String): LogicalPlan = {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ package analysis

import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package analysis

import org.apache.spark.sql.catalyst.expressions.{Alias, NamedExpression, Expression, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.trees.TreeNode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types._

/**
* Provides experimental support for generating catalyst schemas for scala objects.
*/
object ScalaReflection {
import scala.reflect.runtime.universe._

/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case s: StructType =>
s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
}

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T])

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
case t if t <:< typeOf[Seq[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
ArrayType(schemaFor(elementType))
case t if t <:< typeOf[String] => StringType
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
case t if t <:< definitions.DoubleTpe => DoubleType
case t if t <:< definitions.ShortTpe => ShortType
case t if t <:< definitions.ByteTpe => ByteType
}

implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {

/**
* Implicitly added to Sequences of case class objects. Returns a catalyst logical relation
* for the the data in the sequence.
*/
def asRelation: LocalRelation = {
val output = attributesFor[A]
LocalRelation(output, data)
}
}
}

/**
* A collection of implicit conversions that create a DSL for constructing catalyst data structures.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
*/
package object errors {

class TreeNodeException[TreeType <: TreeNode[_]]
(tree: TreeType, msg: String, cause: Throwable) extends Exception(msg, cause) {
class TreeNodeException[TreeType <: TreeNode[_]](
tree: TreeType, msg: String, cause: Throwable)
extends Exception(msg, cause) {

// Yes, this is the same as a default parameter, but... those don't seem to work with SBT
// external project dependencies for some reason.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package catalyst
package expressions

import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{IntegralType, FractionalType, NumericType, DataType}
import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType}
import org.apache.spark.sql.catalyst.errors.TreeNodeException

abstract class Expression extends TreeNode[Expression] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,6 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
}
i += 1
}
0
return 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types._


case class UnaryMinus(child: Expression) extends UnaryExpression {
type EvaluatedType = Any

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package expressions

import org.apache.spark.sql.catalyst.types.{StringType, BooleanType}
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType}
import org.apache.spark.sql.catalyst.analysis.UnresolvedException

trait Predicate extends Expression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object BooleanSimplification extends Rule[LogicalPlan] {
*/
object CombineFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ff@Filter(fc, nf@Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
}
}

Expand All @@ -113,8 +113,8 @@ object CombineFilters extends Rule[LogicalPlan] {
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter@Filter(condition, project@Project(fields, grandChild)) =>
val sourceAliases = fields.collect { case a@Alias(c, _) =>
case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
val sourceAliases = fields.collect { case a @ Alias(c, _) =>
(a.toAttribute: Attribute) -> c
}.toMap
project.copy(child = filter.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package plans

import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.trees.TreeNode

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package catalyst
package plans
package physical

import org.apache.spark.sql.catalyst.expressions.{SortOrder, Expression}
import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.catalyst.types.IntegerType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package analysis

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.logical._

/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._

class AnalysisSuite extends FunSuite {
val analyze = SimpleAnalyzer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package analysis
import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.types._
import scala.Some

class HiveTypeCoercionSuite extends FunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql
package catalyst
package optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types.IntegerType

// For implicit conversions
import org.apache.spark.sql.catalyst.dsl.expressions._

class ConstantFoldingSuite extends OptimizerTest {

object Optimize extends RuleExecutor[LogicalPlan] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package optimizer
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

import dsl.plans._
import dsl.expressions._
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class FilterPushdownSuite extends OptimizerTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._

/* Implicit conversions for creating query plans */

/**
* Provides helper methods for comparing plans produced by optimization rules with the expected
* result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package trees

import org.scalatest.FunSuite

import org.apache.spark.sql.catalyst.rules.{RuleExecutor, Rule}
import org.apache.spark.sql.catalyst.expressions.{Literal, IntegerLiteral, Expression}
import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}

class RuleExecutorSuite extends FunSuite {
object DecrementLiterals extends Rule[Expression] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions._

class TreeNodeSuite extends FunSuite {

test("top node changed") {
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
assert(after === Literal(2))
Expand Down

0 comments on commit fae7b02

Please sign in to comment.