Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator #7471

Closed

Conversation

dragos
Copy link
Contributor

@dragos dragos commented Jul 17, 2015

First step for SPARK-7398.

@tdas @huitseeker

huitseeker and others added 3 commits July 15, 2015 23:17
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
* @param cond A boolean that should become `true`
* @param timemout How many millis to wait before giving up
*/
def waitUntil(cond: => Boolean, timeout: Int): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is general enough that probably deserves a better place. I couldn't find anything similar in spark-core Utils or streaming utilities (the closest one was in BatchCounter, but won't do the same thing, and relies on wait/notify). I'm open to suggestions where to move it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, thanks!

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37634 has finished for PR 7471 at commit 1e7b210.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos dragos force-pushed the topic/streaming-bp/dynamic-rate branch from 1e7b210 to cd1397d Compare July 17, 2015 14:50
@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37638 has finished for PR 7471 at commit cd1397d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
}

/** Get the attached executor. */
private def executor = {
private[streaming] def executor = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? If you want to access private members, its best to use the PrivateTester See
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala#L866

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about PrivateTester, I'll look into it. Other Spark modules (such as core) are using these access modifiers, so I thought it's common practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PrivateTester won't work here. It assumes the private method is declared in the type of the receiver. It only works if the runtime type of the receiver is the type that declares the private method. Not the case here, where the method is declared in Receiver, but the invocation is on a concrete subclass (DummyReceiver). Relevant code here.

I could probably code something along those lines, but I think it's better to use qualified private. Are there any downsides to relying on language support for visibility? I doubt any user-code would declare things in o.a.s.streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, I see. The reason I wanted to use PrivateTest is because marking private[streaming] make it invisible through Scala compilation, but its visibly publicly in the bytecode, and therefore through Java. And since Receivers can be written in Java, I dont want to expose that. PrivateTester wont work, but the underlying mechanism will working using reflection.

scala> abstract class A { private val i = 1 }
defined class A

scala> class B extends A
defined class B

scala> B.getClass().getSuperclass()
<console>:8: error: not found: value B
              B.getClass().getSuperclass()
              ^

scala> classOf[B].getSuperclass()
res1: Class[_ >: B] = class A

scala> classOf[B].getSuperclass().getDeclaredFields()
res2: Array[java.lang.reflect.Field] = Array(private final int A.i)

scala> val f = classOf[B].getSuperclass().getDeclaredFields().head
f: java.lang.reflect.Field = private final int A.i

scala> f.setAccessible(true)

scala> f.get(new B)
res4: Object = 1

You could use this do the testing. Just make the DummyReceiver have a method that gets the underlying supervisor. Then an object DummyReceiver (no need for TestDummyReceiver class and object) for access to the supervisor.

@dragos
Copy link
Contributor Author

dragos commented Jul 20, 2015

See my last commit message about what changes I made for testing.

- made rate limit a Long and default to Long.MaxValue (consequence of the above)
- removed custom `waitUntil` and replaced it by `eventually`
@dragos dragos force-pushed the topic/streaming-bp/dynamic-rate branch from 7112cfd to 13ada97 Compare July 20, 2015 14:45
As I mentioned before, I don’t think this is a great idea:

- such tests are flaky (original test in ReceiverSuite was ignored for
that reason)
- Guava’s code has its own test suite, so we can assume it implements
`setRate` correctly

I noticed one flaky failure in about 10 runs on my machine (receiver got
1 message less than the lower bound, which is within 5% of the nominal rate).
@dragos dragos force-pushed the topic/streaming-bp/dynamic-rate branch from 13ada97 to 0c51959 Compare July 20, 2015 14:45
@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37830 has finished for PR 7471 at commit 61d3ecf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37836 has finished for PR 7471 at commit 0c51959.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37835 has finished for PR 7471 at commit 7112cfd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -17,6 +17,8 @@

package org.apache.spark.streaming.receiver

import java.util.concurrent.atomic.AtomicInteger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed any more. Make sure to remove the empty line below too.

@tdas
Copy link
Contributor

tdas commented Jul 20, 2015

Major things to change for the next iteration.

  • Revert block generate tests, and changes to ReceiverTracker (see inline threads for explanation)
  • Revert Receiver class and modify DummyReceiver in ReceiverTrackerSuite to access the executor field using Java reflection (again, see other inline thread for discussion)

See other inline threads for smaller nits.

@@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}

/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add { and } around the for-loop body

use the listener bus to know when receivers have registered (`onStart`
is called before receivers have registered, leading to flaky behavior).
@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37950 has finished for PR 7471 at commit 162d9e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -72,8 +78,62 @@ class ReceiverTrackerSuite extends TestSuiteBase {
assert(locations(0).length === 1)
assert(locations(3).length === 1)
}

test("Receiver tracker - propagates rate limit") {
object streamingListener extends StreamingListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be capitalized ... say.. TestStreamingListener, or even better ReceiverWaiter.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38081 has finished for PR 7471 at commit 8941cf9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor Author

dragos commented Jul 22, 2015

Spurious failure:

Building Spark

[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive-thriftserver -Phive package assembly/assembly streaming-kafka-assembly/assembly streaming-flume-assembly/assembly
Using /usr/java/latest as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
Error: Invalid or corrupt jarfile build/sbt-launch-0.13.7.jar

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #1157 has finished for PR 7471 at commit 8941cf9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor Author

dragos commented Jul 22, 2015

Same error 😢

@tdas
Copy link
Contributor

tdas commented Jul 22, 2015

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #1175 has finished for PR 7471 at commit 8941cf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #1177 has finished for PR 7471 at commit 8941cf9.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #1176 has finished for PR 7471 at commit 8941cf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #1179 has finished for PR 7471 at commit 8941cf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jul 22, 2015

LGTM. Merging this

@asfgit asfgit closed this in 798dff7 Jul 22, 2015
asfgit pushed a commit that referenced this pull request Jul 29, 2015
…ements the RateController

Based on #7471.

- [x] add a test that exercises the publish path from driver to receiver
- [ ] remove Serializable from `RateController` and `RateEstimator`

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes #7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits:

f168c94 [Iulian Dragos] Latest review round.
5125e60 [Iulian Dragos] Fix style.
a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
475e346 [Iulian Dragos] Latest round of reviews.
e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext
715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest.
e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates.
b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator.
238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
34a389d [Iulian Dragos] Various style changes and a first test for the rate controller.
d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController
8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants