-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation #2665
Conversation
SPARK-3660 : Initial RDD for updateStateByKey transformation
Can one of the admins verify this patch? |
1 similar comment
Can one of the admins verify this patch? |
@soumitrak This is a good addition. Here are some high level suggestions.
|
Jenkins, this is ok to test. |
QA tests have started for PR 2665 at commit
|
QA tests have finished for PR 2665 at commit
|
Test PASSed. |
@soumitrak Hey any thoughts? |
Sorry, I was off for couple of days.
----- Original Message ----- @soumitrak Hey any thoughts? — |
Test build #22190 has started for PR 2665 at commit
|
Test build #22190 timed out for PR 2665 at commit |
Test FAILed. |
TD, I have incorporated your feedback, and add a testcase. Let me know if there is anything else. |
Test build #22206 has started for PR 2665 at commit
|
Test build #22206 has finished for PR 2665 at commit
|
Test FAILed. |
Jenkins, test this please. |
Test build #23074 has started for PR 2665 at commit
|
Test build #23074 has finished for PR 2665 at commit
|
Test FAILed. |
TD, Let me know how I can help. |
Jenkins, test this please. |
* Set up required DStreams to test the DStream operation using the sequence | ||
* of input collections, and initial sequence. | ||
*/ | ||
def setupStreamsWithInitial[U: ClassTag, V: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is necessary to add this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See other comment.
Test build #23166 has started for PR 2665 at commit
|
new HashPartitioner (numInputPartitions), true, initialRDD) | ||
} | ||
|
||
testOperationWithInitial(initial, inputData, updateStateOperation, outputData, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting up and using testOperationWithInitial just for this unit test, lets try to do the alternative.
val updateStateOperation = (s: DStream[String]) => {
val initialRDD = s.context.sparkContext.makeRDD(initial)
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some(values.sum + state.getOrElse(0))
}
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
s.map(x => (x, 1)).updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (numInputPartitions), true, initialRDD)
}
And then use the usual testOperation
Test build #23219 has finished for PR 2665 at commit
|
Test FAILed. |
Test build #23223 has started for PR 2665 at commit
|
@@ -443,6 +443,23 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( | |||
scalaFunc | |||
} | |||
|
|||
private def convertUpdateStateFunctionWithIterator[S] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect indentation
private def convertUpdateStateFunctionWithIterator[S](
in: JFunction2[JList[V], Optional[S], Optional[S]] // 4 space indent
): (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)] = { // 2 space indent
This is starting to look good. Please address the comments (mainly Scala API addition), and we will good to go. This is a good addition to updateStateByKey. |
Test build #23223 has finished for PR 2665 at commit
|
Test PASSed. |
Test build #23225 has started for PR 2665 at commit
|
Test build #23225 has finished for PR 2665 at commit
|
Test FAILed. |
Test build #23231 has started for PR 2665 at commit
|
|
||
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( | ||
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { | ||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit (do this if you have to make more changes): incorrect indentation. should be indented by more than the line with new Function2...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any other change to do. Actually I copied the previous method.
Test build #23231 has finished for PR 2665 at commit
|
Test PASSed. |
Alright I am merging this. |
Thanks very much for the changes! |
TD, Thanks for getting this through. |
SPARK-3660 : Initial RDD for updateStateByKey transformation I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount. Please let me know if any changes are required. Author: Soumitra Kumar <kumar.soumitra@gmail.com> Closes apache#2665 from soumitrak/master and squashes the following commits: ee8980b [Soumitra Kumar] Fixed copy/paste issue. 304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test. 9781135 [Soumitra Kumar] Fixed test, and renamed variable. 3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream. 2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' d4fdd18 [Soumitra Kumar] Renamed variable and moved method. d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation 8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount. Please let me know if any changes are required. Author: Soumitra Kumar <kumar.soumitra@gmail.com> Closes apache#2665 from soumitrak/master and squashes the following commits: ee8980b [Soumitra Kumar] Fixed copy/paste issue. 304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test. 9781135 [Soumitra Kumar] Fixed test, and renamed variable. 3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream. 2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' d4fdd18 [Soumitra Kumar] Renamed variable and moved method. d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation 8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount. Please let me know if any changes are required. Author: Soumitra Kumar <kumar.soumitra@gmail.com> Closes apache#2665 from soumitrak/master and squashes the following commits: ee8980b [Soumitra Kumar] Fixed copy/paste issue. 304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test. 9781135 [Soumitra Kumar] Fixed test, and renamed variable. 3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream. 2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' d4fdd18 [Soumitra Kumar] Renamed variable and moved method. d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation 8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation
I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount.
Please let me know if any changes are required.