Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming mllib [SPARK-2438][MLLIB] #1361

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0898add
Added dependency on streaming
freeman-lab Jul 10, 2014
d99aa85
Helper methods for streaming MLlib apps
freeman-lab Jul 10, 2014
604f4d7
Expanded private class to include mllib
freeman-lab Jul 10, 2014
c4b1143
Streaming linear regression
freeman-lab Jul 10, 2014
453974e
Fixed indentation
freeman-lab Jul 10, 2014
fd31e03
Changed logging behavior
freeman-lab Jul 10, 2014
fb4683a
Minor changes for scalastyle consistency
freeman-lab Jul 14, 2014
86220bc
Streaming linear regression unit tests
freeman-lab Jul 14, 2014
a2a63ad
Makes convergence test more robust
freeman-lab Jul 18, 2014
6bfe1e6
Fixed imports
freeman-lab Jul 31, 2014
50dd237
Removed experimental tag
freeman-lab Jul 31, 2014
74188d6
Eliminate dependency on commons
freeman-lab Jul 31, 2014
4b0a5d3
Cleaned up tests
freeman-lab Jul 31, 2014
c7d38a3
Move check for empty data to GradientDescent
freeman-lab Aug 1, 2014
14b801e
Name changes
freeman-lab Aug 1, 2014
00aafdc
Add modifiers
freeman-lab Aug 1, 2014
c3f8b5a
Modified logging
freeman-lab Aug 1, 2014
b9b69f6
Added setter methods
freeman-lab Aug 1, 2014
7d51378
Moved streaming loader to MLUtils
freeman-lab Aug 1, 2014
2fe0720
Minor cleanup
freeman-lab Aug 1, 2014
66eba5e
Fixed line lengths
freeman-lab Aug 1, 2014
9541a41
Merge remote-tracking branch 'upstream/master' into streaming-mllib
freeman-lab Aug 1, 2014
c3326e7
Improved documentation
freeman-lab Aug 1, 2014
d28cf9a
Added usage notes
freeman-lab Aug 1, 2014
74cf440
Removed static methods
freeman-lab Aug 1, 2014
777b596
Restored treeAggregate
freeman-lab Aug 1, 2014
8711c41
Used return to avoid indentation
freeman-lab Aug 1, 2014
29f27ec
Formatting
freeman-lab Aug 1, 2014
8b95b27
Restored broadcasting
freeman-lab Aug 1, 2014
4086fee
Fixed current weight formatting
freeman-lab Aug 2, 2014
775ea29
Throw error if user doesn't initialize weights
freeman-lab Aug 2, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Train a linear regression model on one stream of data and make predictions
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be labeled data points in the form
* `(y,[x1,x2,x3,...,xn])`
* Where n is the number of features. n must be the same for train and test.
*
* Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, and 2 features per data point, call:
* $ bin/run-example \
* org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2
*
* As you add text files to `trainingDir` the model will continuously update.
* Anytime you add text files to `testDir`, you'll see predictions from the current model.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example needs a more documentation on how to run this. Is my assumption correct that the data needs to be loaded through the text file stream by writing new files to a folder? If that is the case, then all the usage details need to be mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, I'll make it much more clear how the example works.

*/
object StreamingLinearRegression {

def main(args: Array[String]) {

if (args.length != 4) {
System.err.println(
"Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>")
System.exit(1)
}

val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))

val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))

model.trainOn(trainingData)
model.predictOn(testData).print()

ssc.start()
ssc.awaitTermination()

}

}
5 changes: 5 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ object GradientDescent extends Logging {
val numExamples = data.count()
val miniBatchSize = numExamples * miniBatchFraction

// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {

logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)

}

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size
Expand Down Expand Up @@ -202,5 +210,6 @@ object GradientDescent extends Logging {
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LinearRegressionModel (
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class LinearRegressionWithSGD private (
class LinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
Expand All @@ -68,7 +68,7 @@ class LinearRegressionWithSGD private (
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.streaming.dstream.DStream

/**
* :: DeveloperApi ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere it needs to be documented that ordering relationship of trainOn() and predictOn(). Should one be called before other? Can one be called after others? Can any of them be called multiple times? What are the implications? If one is to be called only after another, then we will have to think about restricting it in the aPI itself (throw, error is ordering is wrong)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been testing this and it seems fairly robust, agreed we should clarify in the documentation.

What I've tried:

  • If train and predict are called on the same stream (or on two streams with data arriving simultaneously), order matters. If trainOn is first, the prediction will always use the subsequently updated model. If predictOn is first, it will use the model from the previous update. In practice, over multiple updates, either behavior seems reasonable, but maybe there should be a helpful warning if the user calls predictOn before trainOn?
  • If they are called on different streams and the data arrive sequentially, order doesn't matter. For example, if data arrive in the predictOn stream before the trainOn stream, the prediction uses the intial weights (as it should) to predict, regardless of the order of the calls.
  • It's ok, and maybe useful, to call predictOn repeatedly on different streams. For example, training on one stream, and predicting on it and another, behaves correctly (modolu the ordering issues described above).
  • If you call trainOn repeatedly on different streams, it will do an update when data arrive in either stream, which seems fine. Could be used to update using multiple input sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoa this is quite a lot to grasp.
I am not sure how in the second case the order doesnt matter. Gotta think about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@freeman-lab It would be really nice to put your findings inside the implementation. That can help us to understand the behavior.

* StreamingLinearAlgorithm implements methods for continuously
* training a generalized linear model model on streaming data,
* and using it for prediction on (possibly different) streaming data.
*
* This class takes as type parameters a GeneralizedLinearModel,
* and a GeneralizedLinearAlgorithm, making it easy to extend to construct
* streaming versions of any analyses using GLMs.
* Initial weights must be set before calling trainOn or predictOn.
* Only weights will be updated, not an intercept. If the model needs
* an intercept, it should be manually appended to the input data.
*
* For example usage, see `StreamingLinearRegressionWithSGD`.
*
* NOTE(Freeman): In some use cases, the order in which trainOn and predictOn
* are called in an application will affect the results. When called on
* the same DStream, if trainOn is called before predictOn, when new data
* arrive the model will update and the prediction will be based on the new
* model. Whereas if predictOn is called first, the prediction will use the model
* from the previous update.
*
* NOTE(Freeman): It is ok to call predictOn repeatedly on multiple streams; this
* will generate predictions for each one all using the current model.
* It is also ok to call trainOn on different streams; this will update
* the model using each of the different sources, in sequence.
*
*/
@DeveloperApi
abstract class StreamingLinearAlgorithm[
M <: GeneralizedLinearModel,
A <: GeneralizedLinearAlgorithm[M]] extends Logging {

/** The model to be updated and used for prediction. */
protected var model: M

/** The algorithm to use for updating. */
protected val algorithm: A

/** Return the latest model. */
def latestModel(): M = {
model
}

/**
* Update the model by training on batches of data from a DStream.
* This operation registers a DStream for training the model,
* and updates the model based on every subsequent
* batch of data from the stream.
*
* @param data DStream containing labeled data
*/
def trainOn(data: DStream[LabeledPoint]) {
if (Option(model.weights) == None) {
logError("Initial weights must be set before starting training")
throw new IllegalArgumentException
}
data.foreachRDD { (rdd, time) =>
model = algorithm.run(rdd, model.weights)
logInfo("Model updated at time %s".format(time.toString))
val display = model.weights.size match {
case x if x > 100 => model.weights.toArray.take(100).mkString("[", ",", "...")
case _ => model.weights.toArray.mkString("[", ",", "]")
}
logInfo("Current model: weights, %s".format (display))
}
}

/**
* Use the model to make predictions on batches of data from a DStream
*
* @param data DStream containing labeled data
* @return DStream containing predictions
*/
def predictOn(data: DStream[LabeledPoint]): DStream[Double] = {
if (Option(model.weights) == None) {
logError("Initial weights must be set before starting prediction")
throw new IllegalArgumentException
}
data.map(x => model.predict(x.features))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}

/**
* Train or predict a linear regression model on streaming data. Training uses
* Stochastic Gradient Descent to update the model based on each new batch of
* incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
*
* Each batch of data is assumed to be an RDD of LabeledPoints.
* The number of data points per batch can vary, but the number
* of features must be constant. An initial weight
* vector must be provided.
*
* Use a builder pattern to construct a streaming linear regression
* analysis in an application, like:
*
* val model = new StreamingLinearRegressionWithSGD()
* .setStepSize(0.5)
* .setNumIterations(10)
* .setInitialWeights(Vectors.dense(...))
* .trainOn(DStream)
*
*/
@Experimental
class StreamingLinearRegressionWithSGD (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double,
private var initialWeights: Vector)
extends StreamingLinearAlgorithm[
LinearRegressionModel, LinearRegressionWithSGD] with Serializable {

/**
* Construct a StreamingLinearRegression object with default parameters:
* {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0}.
* Initial weights must be set before using trainOn or predictOn
* (see `StreamingLinearAlgorithm`)
*/
def this() = this(0.1, 50, 1.0, null)

val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)

var model = algorithm.createModel(initialWeights, 0.0)

/** Set the step size for gradient descent. Default: 0.1. */
def setStepSize(stepSize: Double): this.type = {
this.algorithm.optimizer.setStepSize(stepSize)
this
}

/** Set the number of iterations of gradient descent to run per update. Default: 50. */
def setNumIterations(numIterations: Int): this.type = {
this.algorithm.optimizer.setNumIterations(numIterations)
this
}

/** Set the fraction of each batch to use for updates. Default: 1.0. */
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
this.algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
this
}

/** Set the initial weights. Default: [0.0, 0.0]. */
def setInitialWeights(initialWeights: Vector): this.type = {
this.model = algorithm.createModel(initialWeights, 0.0)
this
}

}
15 changes: 15 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

/**
* Helper methods to load, save and pre-process data used in ML Lib.
Expand Down Expand Up @@ -192,6 +194,19 @@ object MLUtils {
def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)

/**
* Loads streaming labeled points from a stream of text files
* where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
* See `StreamingContext.textFileStream` for more details on how to
* generate a stream from files
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add, see StreamingContext.textFileStream for more details about the how to generate a stream from files.

* @param ssc Streaming context
* @param dir Directory path in any Hadoop-supported file system URI
* @return Labeled points stored as a DStream[LabeledPoint]
*/
def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] =
ssc.textFileStream(dir).map(LabeledPointParser.parse)

/**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
Expand Down
Loading