Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming mllib [SPARK-2438][MLLIB] #1361

Closed
wants to merge 31 commits into from

Conversation

freeman-lab
Copy link
Contributor

This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with @tdas and @mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries.

Summary of additions:

StreamingLinearAlgorithm

  • An abstract class for fitting generalized linear models online to streaming data, including training on (and updating) a model, and making predictions.

StreamingLinearRegressionWithSGD

  • Class and companion object for running streaming linear regression

StreamingLinearRegressionTestSuite

  • Unit tests

StreamingLinearRegression

  • Example use case: fitting a model online to data from one stream, and making predictions on other data

Notes

  • If this looks good, I can use the StreamingLinearAlgorithm class to easily implement other analyses that follow the same logic (Ridge, Lasso, Logistic, SVM).

- Abstract class to support a variety of streaming regression analyses
- Example concrete class for streaming linear regression
- Example usage: continually train on one data stream and test on
another
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mengxr
Copy link
Contributor

mengxr commented Jul 10, 2014

@freeman-lab This is great! Could you create a JIRA and add [SPARK-####][MLLIB] to the title of this PR? Thanks!

@mengxr
Copy link
Contributor

mengxr commented Jul 10, 2014

Jenkins, add to whitelist.

@mengxr
Copy link
Contributor

mengxr commented Jul 10, 2014

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@tdas
Copy link
Contributor

tdas commented Jul 10, 2014

Awesome, time to have some fun :D
Roping in @pwendell

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16516/

@freeman-lab freeman-lab changed the title Streaming mllib Streaming mllib [SPARK-2438][MLLIB] Jul 10, 2014
@freeman-lab
Copy link
Contributor Author

@mengxr great! Just created a JIRA (https://issues.apache.org/jira/browse/SPARK-2438) and added to the title.

@mengxr
Copy link
Contributor

mengxr commented Jul 12, 2014

@freeman-lab Could you add some unit tests? There should be some examples under streaming and mllib.

- Test parameter estimate accuracy after several updates
- Test parameter accuracy improvement after each batch
@freeman-lab
Copy link
Contributor Author

@mengxr I added two tests, they check that parameter estimates are accurate, and improve over time. The tests use temporary file writing / file streams, which is clunky, but @tdas will help add dependencies on the streaming test suite so we can use its utilities instead.

@mengxr
Copy link
Contributor

mengxr commented Jul 17, 2014

Jenkins, add to whitelist.

@mengxr
Copy link
Contributor

mengxr commented Jul 17, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA tests have started for PR 1361. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16774/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 17, 2014

QA results for PR 1361:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16774/consoleFull

@freeman-lab
Copy link
Contributor Author

Looks like the basic test for correct final params passes, but not the stricter test for improvement on every update. Both pass locally. My guess is that it's running a bit slower on Jenkins, so the updates don't complete fast enough (I can create a failure locally by making the test data rate too high). I'll play with this, might work to just slow down the data rate.

- Slower simulated data rates and updates
- Softens requirement for strict error reduction, but still ensures
error stability, and error reduction on at least a subset of updates
@freeman-lab
Copy link
Contributor Author

@mengxr mind retesting? I tried to make the convergence test more robust in a couple ways. If we still have issues we might need to rethink that test further. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA tests have started for PR 1361. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16799/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 18, 2014

QA results for PR 1361:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16799/consoleFull

- Also deleted companion object
- Renamed file for consistency
- Explained usage in documentation
@freeman-lab
Copy link
Contributor Author

@mengxr done! removed the static methods (and made the class public), and added those usage notes to StreamingLinearAlgorithm

// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.aggregate((BDV.zeros[Double](weights.size), 0.0))(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate -> .treeAggregate. We use a tree pattern to avoid sending too much data to the driver. Does it hurt streaming update performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's totally fine, I might have lost it in the merge, put it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for broadcasting, sorry, fixing...

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1361. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17716/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1361. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17718/consoleFull

@@ -174,17 +182,18 @@ object GradientDescent extends Logging {
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadcasting the weights is actually important for performance. Did you experience any problem with it? It may be an orthogonal issue. Maybe we should keep this code block unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my mistake, should be fixed now.

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1361:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StreamingLinearRegressionWithSGD (

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17716/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1361:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StreamingLinearRegressionWithSGD (

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17718/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA tests have started for PR 1361. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17732/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 2, 2014

QA results for PR 1361:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class StreamingLinearRegressionWithSGD (

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17732/consoleFull

@mengxr
Copy link
Contributor

mengxr commented Aug 2, 2014

LGTM. Merged into master. Thanks a lot for putting Streaming and MLlib together!

@tdas
Copy link
Contributor

tdas commented Aug 2, 2014

Yay!!!

On Fri, Aug 1, 2014 at 8:12 PM, Xiangrui Meng notifications@github.com
wrote:

LGTM. Merged into master. Thanks a lot for putting Streaming and MLlib
together!


Reply to this email directly or view it on GitHub
#1361 (comment).

@asfgit asfgit closed this in f6a1899 Aug 2, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with tdas and mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries.

__Summary of additions:__

_StreamingLinearAlgorithm_
- An abstract class for fitting generalized linear models online to streaming data, including training on (and updating) a model, and making predictions.

_StreamingLinearRegressionWithSGD_
- Class and companion object for running streaming linear regression

_StreamingLinearRegressionTestSuite_
- Unit tests

_StreamingLinearRegression_
- Example use case: fitting a model online to data from one stream, and making predictions on other data

__Notes__
- If this looks good, I can use the StreamingLinearAlgorithm class to easily implement other analyses that follow the same logic (Ridge, Lasso, Logistic, SVM).

Author: Jeremy Freeman <the.freeman.lab@gmail.com>
Author: freeman <the.freeman.lab@gmail.com>

Closes apache#1361 from freeman-lab/streaming-mllib and squashes the following commits:

775ea29 [Jeremy Freeman] Throw error if user doesn't initialize weights
4086fee [Jeremy Freeman] Fixed current weight formatting
8b95b27 [Jeremy Freeman] Restored broadcasting
29f27ec [Jeremy Freeman] Formatting
8711c41 [Jeremy Freeman] Used return to avoid indentation
777b596 [Jeremy Freeman] Restored treeAggregate
74cf440 [Jeremy Freeman] Removed static methods
d28cf9a [Jeremy Freeman] Added usage notes
c3326e7 [Jeremy Freeman] Improved documentation
9541a41 [Jeremy Freeman] Merge remote-tracking branch 'upstream/master' into streaming-mllib
66eba5e [Jeremy Freeman] Fixed line lengths
2fe0720 [Jeremy Freeman] Minor cleanup
7d51378 [Jeremy Freeman] Moved streaming loader to MLUtils
b9b69f6 [Jeremy Freeman] Added setter methods
c3f8b5a [Jeremy Freeman] Modified logging
00aafdc [Jeremy Freeman] Add modifiers
14b801e [Jeremy Freeman] Name changes
c7d38a3 [Jeremy Freeman] Move check for empty data to GradientDescent
4b0a5d3 [Jeremy Freeman] Cleaned up tests
74188d6 [Jeremy Freeman] Eliminate dependency on commons
50dd237 [Jeremy Freeman] Removed experimental tag
6bfe1e6 [Jeremy Freeman] Fixed imports
a2a63ad [freeman] Makes convergence test more robust
86220bc [freeman] Streaming linear regression unit tests
fb4683a [freeman] Minor changes for scalastyle consistency
fd31e03 [freeman] Changed logging behavior
453974e [freeman] Fixed indentation
c4b1143 [freeman] Streaming linear regression
604f4d7 [freeman] Expanded private class to include mllib
d99aa85 [freeman] Helper methods for streaming MLlib apps
0898add [freeman] Added dependency on streaming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants