Skip to content

Commit

Permalink
merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 30, 2015
2 parents b55ac5c + dd4d84c commit 598c583
Show file tree
Hide file tree
Showing 47 changed files with 532 additions and 223 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Whether is the worker released into idle pool
@volatile var released = false

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
if (!reuse_worker || !released) {
try {
worker.close()
} catch {
Expand Down Expand Up @@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
complete_cleanly = true
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.api.scala.dsl._
import org.apache.spark.sql.Dsl._

// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
Expand Down
4 changes: 4 additions & 0 deletions graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.api.scala.dsl._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{BLAS, Vector, VectorUDT}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql._
import org.apache.spark.sql.api.scala.dsl._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.api.scala.dsl._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.param._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.api.scala.dsl._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ sealed trait Vector extends Serializable {
result = 31 * result + (bits ^ (bits >>> 32)).toInt
}
}
return result
result
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ class BlockMatrix(
new DenseMatrix(m, n, values)
}

/** Transpose this `BlockMatrix`. Returns a new `BlockMatrix` instance sharing the
* same underlying data. Is a lazy operation. */
def transpose: BlockMatrix = {
val transposedBlocks = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
((blockColIndex, blockRowIndex), mat.transpose)
}
new BlockMatrix(transposedBlocks, colsPerBlock, rowsPerBlock, nCols, nRows)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
val localMat = toLocalMatrix()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
}
// Wrong BlockMatrix dimensions
val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
intercept[SparkException] {
intercept[AssertionError] {
wrongRowSize.validate()
}
val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
intercept[SparkException] {
intercept[AssertionError] {
wrongColSize.validate()
}
// Duplicate indices
Expand All @@ -188,4 +188,33 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
dupMatrix.validate()
}
}

test("transpose") {
val expected = BDM(
(1.0, 0.0, 3.0, 0.0, 0.0),
(0.0, 2.0, 1.0, 1.0, 0.0),
(0.0, 1.0, 1.0, 2.0, 1.0),
(0.0, 0.0, 0.0, 1.0, 5.0))

val AT = gridBasedMat.transpose
assert(AT.numRows() === gridBasedMat.numCols())
assert(AT.numCols() === gridBasedMat.numRows())
assert(AT.toBreeze() === expected)

// partitioner must update as well
val originalPartitioner = gridBasedMat.partitioner
val ATpartitioner = AT.partitioner
assert(originalPartitioner.colsPerPart === ATpartitioner.rowsPerPart)
assert(originalPartitioner.rowsPerPart === ATpartitioner.colsPerPart)
assert(originalPartitioner.cols === ATpartitioner.rows)
assert(originalPartitioner.rows === ATpartitioner.cols)

// make sure it works when matrices are cached as well
gridBasedMat.cache()
val AT2 = gridBasedMat.transpose
AT2.cache()
assert(AT2.toBreeze() === AT.toBreeze())
val A = AT2.transpose
assert(A.toBreeze() === gridBasedMat.toBreeze())
}
}
10 changes: 5 additions & 5 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2136,9 +2136,9 @@ def __getitem__(self, item):

def __getattr__(self, name):
""" Return the column by given name """
if isinstance(name, basestring):
return Column(self._jdf.apply(name))
raise AttributeError
if name.startswith("__"):
raise AttributeError(name)
return Column(self._jdf.apply(name))

def alias(self, name):
""" Alias the current DataFrame """
Expand Down Expand Up @@ -2342,7 +2342,7 @@ def sum(self):

def _create_column_from_literal(literal):
sc = SparkContext._active_spark_context
return sc._jvm.org.apache.spark.sql.api.java.dsl.lit(literal)
return sc._jvm.org.apache.spark.sql.Dsl.lit(literal)


def _create_column_from_name(name):
Expand Down Expand Up @@ -2515,7 +2515,7 @@ def _(col):
jcol = col._jc
else:
jcol = _create_column_from_name(col)
jc = getattr(sc._jvm.org.apache.spark.sql.api.java.dsl, name)(jcol)
jc = getattr(sc._jvm.org.apache.spark.sql.Dsl, name)(jcol)
return Column(jc)
return staticmethod(_)

Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from fileinput import input
from glob import glob
import os
import pydoc
import re
import shutil
import subprocess
Expand Down Expand Up @@ -1032,6 +1033,15 @@ def test_aggregator(self):
from pyspark.sql import Aggregator as Agg
# self.assertEqual((0, '100'), tuple(g.agg(Agg.first(df.key), Agg.last(df.value)).first()))

def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
df = self.sqlCtx.jsonRDD(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
pydoc.render_doc(df.take(1))


class InputFormatTests(ReusedPySparkTestCase):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class SqlParser extends AbstractSparkSQLParser {
)

protected lazy val baseExpression: Parser[Expression] =
( "*" ^^^ Star(None)
( "*" ^^^ UnresolvedStar(None)
| primary
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ class Analyzer(catalog: Catalog,
Project(
projectList.flatMap {
case s: Star => s.expand(child.output, resolver)
case Alias(f @ UnresolvedFunction(_, args), name) if containsStar(args) =>
val expandedArgs = args.flatMap {
case s: Star => s.expand(child.output, resolver)
case o => o :: Nil
}
Alias(child = f.copy(children = expandedArgs), name)() :: Nil
case o => o :: Nil
},
child)
Expand All @@ -273,10 +279,9 @@ class Analyzer(catalog: Catalog,
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressions {
case u @ UnresolvedAttribute(name)
if resolver(name, VirtualColumn.groupingIdName) &&
q.isInstanceOf[GroupingAnalytics] =>
// Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics
case u @ UnresolvedAttribute(name) if resolver(name, VirtualColumn.groupingIdName) &&
q.isInstanceOf[GroupingAnalytics] =>
// Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics
q.asInstanceOf[GroupingAnalytics].gid
case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
Expand All @@ -299,7 +304,7 @@ class Analyzer(catalog: Catalog,
* Returns true if `exprs` contains a [[Star]].
*/
protected def containsStar(exprs: Seq[Expression]): Boolean =
exprs.collect { case _: Star => true}.nonEmpty
exprs.exists(_.collect { case _: Star => true }.nonEmpty)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false

override def newInstance = this
override def newInstance() = this
override def withNullability(newNullability: Boolean) = this
override def withQualifiers(newQualifiers: Seq[String]) = this
override def withName(newName: String) = UnresolvedAttribute(name)
Expand All @@ -77,15 +77,10 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E

/**
* Represents all of the input attributes to a given relational operator, for example in
* "SELECT * FROM ...".
*
* @param table an optional table that should be the target of the expansion. If omitted all
* tables' columns are produced.
* "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis.
*/
case class Star(
table: Option[String],
mapFunction: Attribute => Expression = identity[Attribute])
extends Attribute with trees.LeafNode[Expression] {
trait Star extends Attribute with trees.LeafNode[Expression] {
self: Product =>

override def name = throw new UnresolvedException(this, "name")
override def exprId = throw new UnresolvedException(this, "exprId")
Expand All @@ -94,29 +89,53 @@ case class Star(
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false

override def newInstance = this
override def newInstance() = this
override def withNullability(newNullability: Boolean) = this
override def withQualifiers(newQualifiers: Seq[String]) = this
override def withName(newName: String) = this

def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = {
// Star gets expanded at runtime so we never evaluate a Star.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression]
}


/**
* Represents all of the input attributes to a given relational operator, for example in
* "SELECT * FROM ...".
*
* @param table an optional table that should be the target of the expansion. If omitted all
* tables' columns are produced.
*/
case class UnresolvedStar(table: Option[String]) extends Star {

override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = {
val expandedAttributes: Seq[Attribute] = table match {
// If there is no table specified, use all input attributes.
case None => input
// If there is a table, pick out attributes that are part of this table.
case Some(t) => input.filter(_.qualifiers.filter(resolver(_, t)).nonEmpty)
}
val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map {
expandedAttributes.zip(input).map {
case (n: NamedExpression, _) => n
case (e, originalAttribute) =>
Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers)
}
mappedAttributes
}

// Star gets expanded at runtime so we never evaluate a Star.
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString = table.map(_ + ".").getOrElse("") + "*"
}


/**
* Represents all the resolved input attributes to a given relational operator. This is used
* in the data frame DSL.
*
* @param expressions Expressions to expand.
*/
case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star {
override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = expressions
override def toString = expressions.mkString("ResolvedStar(", ", ", ")")
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ object PartialAggregation {
// We need to pass all grouping expressions though so the grouping can happen a second
// time. However some of them might be unnamed so we alias them allowing them to be
// referenced in the second aggregation.
val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {
case n: NamedExpression => (n, n)
case other => (other, Alias(other, "PartialGroup")())
}.toMap
val namedGroupingExpressions: Map[Expression, NamedExpression] =
groupingExpressions.filter(!_.isInstanceOf[Literal]).map {
case n: NamedExpression => (n, n)
case other => (other, Alias(other, "PartialGroup")())
}.toMap

// Replace aggregations with a new expression that computes the result from the already
// computed partial evaluations and grouping values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
test("union project *") {
val plan = (1 to 100)
.map(_ => testRelation)
.fold[LogicalPlan](testRelation)((a,b) => a.select(Star(None)).select('a).unionAll(b.select(Star(None))))
.fold[LogicalPlan](testRelation) { (a, b) =>
a.select(UnresolvedStar(None)).select('a).unionAll(b.select(UnresolvedStar(None)))
}

assert(caseInsensitiveAnalyze(plan).resolved)
}
Expand Down
Loading

0 comments on commit 598c583

Please sign in to comment.