Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 30, 2015
2 parents e2f7f5f + 0a95085 commit 7ee6737
Show file tree
Hide file tree
Showing 42 changed files with 831 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ object SparkContext extends Logging {
case "yarn-client" =>
val scheduler = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Whether is the worker released into idle pool
@volatile var released = false

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
if (!reuse_worker || !released) {
try {
worker.close()
} catch {
Expand Down Expand Up @@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
complete_cleanly = true
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
}
}
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class SparkContextSchedulerCreationSuite
}

test("yarn-client") {
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler")
}

def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
Expand Down
76 changes: 76 additions & 0 deletions examples/src/main/python/mllib/gradient_boosted_trees.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Gradient boosted Trees classification and regression using MLlib.
"""

import sys

from pyspark.context import SparkContext
from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.mllib.util import MLUtils


def testClassification(trainingData, testData):
# Train a GradientBoostedTrees model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={},
numIterations=30, maxDepth=4)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification ensemble model:')
print(model.toDebugString())


def testRegression(trainingData, testData):
# Train a GradientBoostedTrees model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo={},
numIterations=30, maxDepth=4)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
/ float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression ensemble model:')
print(model.toDebugString())


if __name__ == "__main__":
if len(sys.argv) > 1:
print >> sys.stderr, "Usage: gradient_boosted_trees"
exit(1)
sc = SparkContext(appName="PythonGradientBoostedTrees")

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

print('\nRunning example of classification using GradientBoostedTrees\n')
testClassification(trainingData, testData)

print('\nRunning example of regression using GradientBoostedTrees\n')
testRegression(trainingData, testData)

sc.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.mllib.tree.{RandomForest, DecisionTree}
import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree}
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy}
import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.model.{RandomForestModel, DecisionTreeModel}
import org.apache.spark.mllib.tree.loss.Losses
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -532,6 +533,35 @@ class PythonMLLibAPI extends Serializable {
}
}

/**
* Java stub for Python mllib GradientBoostedTrees.train().
* This stub returns a handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on exit;
* see the Py4J documentation.
*/
def trainGradientBoostedTreesModel(
data: JavaRDD[LabeledPoint],
algoStr: String,
categoricalFeaturesInfo: JMap[Int, Int],
lossStr: String,
numIterations: Int,
learningRate: Double,
maxDepth: Int): GradientBoostedTreesModel = {
val boostingStrategy = BoostingStrategy.defaultParams(algoStr)
boostingStrategy.setLoss(Losses.fromString(lossStr))
boostingStrategy.setNumIterations(numIterations)
boostingStrategy.setLearningRate(learningRate)
boostingStrategy.treeStrategy.setMaxDepth(maxDepth)
boostingStrategy.treeStrategy.categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap

val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK)
try {
GradientBoostedTrees.train(cached, boostingStrategy)
} finally {
cached.unpersist(blocking = false)
}
}

/**
* Java stub for mllib Statistics.colStats(X: RDD[Vector]).
* TODO figure out return type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ class Word2Vec extends Serializable with Logging {

val newSentences = sentences.repartition(numPartitions).cache()
val initRandom = new XORShiftRandom(seed)

if (vocabSize.toLong * vectorSize * 8 >= Int.MaxValue) {
throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" +
" to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " +
"which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue/8`.")
}

val syn0Global =
Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize)
val syn1Global = new Array[Float](vocabSize * vectorSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ sealed trait Vector extends Serializable {
result = 31 * result + (bits ^ (bits >>> 32)).toInt
}
}
return result
result
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ class BlockMatrix(
new DenseMatrix(m, n, values)
}

/** Transpose this `BlockMatrix`. Returns a new `BlockMatrix` instance sharing the
* same underlying data. Is a lazy operation. */
def transpose: BlockMatrix = {
val transposedBlocks = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
((blockColIndex, blockRowIndex), mat.transpose)
}
new BlockMatrix(transposedBlocks, colsPerBlock, rowsPerBlock, nCols, nRows)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
val localMat = toLocalMatrix()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ object Algo extends Enumeration {
val Classification, Regression = Value

private[mllib] def fromString(name: String): Algo = name match {
case "classification" => Classification
case "regression" => Regression
case "classification" | "Classification" => Classification
case "regression" | "Regression" => Regression
case _ => throw new IllegalArgumentException(s"Did not recognize Algo name: $name")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,33 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toLocalMatrix() === dense)
assert(gridBasedMat.toBreeze() === expected)
}

test("transpose") {
val expected = BDM(
(1.0, 0.0, 3.0, 0.0, 0.0),
(0.0, 2.0, 1.0, 1.0, 0.0),
(0.0, 1.0, 1.0, 2.0, 1.0),
(0.0, 0.0, 0.0, 1.0, 5.0))

val AT = gridBasedMat.transpose
assert(AT.numRows() === gridBasedMat.numCols())
assert(AT.numCols() === gridBasedMat.numRows())
assert(AT.toBreeze() === expected)

// partitioner must update as well
val originalPartitioner = gridBasedMat.partitioner
val ATpartitioner = AT.partitioner
assert(originalPartitioner.colsPerPart === ATpartitioner.rowsPerPart)
assert(originalPartitioner.rowsPerPart === ATpartitioner.colsPerPart)
assert(originalPartitioner.cols === ATpartitioner.rows)
assert(originalPartitioner.rows === ATpartitioner.cols)

// make sure it works when matrices are cached as well
gridBasedMat.cache()
val AT2 = gridBasedMat.transpose
AT2.cache()
assert(AT2.toBreeze() === AT.toBreeze())
val A = AT2.transpose
assert(A.toBreeze() === gridBasedMat.toBreeze())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
}
}

test("SPARK-5496: BoostingStrategy.defaultParams should recognize Classification") {
for (algo <- Seq("classification", "Classification", "regression", "Regression")) {
BoostingStrategy.defaultParams(algo)
}
}
}

object GradientBoostedTreesSuite {
Expand Down
41 changes: 34 additions & 7 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_kmeans_deterministic(self):

def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(0.0, [1, 0, 0]),
LabeledPoint(1.0, [0, 1, 1]),
Expand Down Expand Up @@ -198,18 +198,31 @@ def test_classification(self):
self.assertTrue(nb_model.predict(features[3]) > 0)

categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = \
DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
dt_model = DecisionTree.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)

rf_model = RandomForest.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)

gbt_model = GradientBoostedTrees.trainClassifier(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)

def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
Expand Down Expand Up @@ -238,13 +251,27 @@ def test_regression(self):
self.assertTrue(rr_model.predict(features[3]) > 0)

categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = \
DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
dt_model = DecisionTree.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)

rf_model = RandomForest.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)

gbt_model = GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)


class StatTests(PySparkTestCase):
# SPARK-4023
Expand Down
Loading

0 comments on commit 7ee6737

Please sign in to comment.