From 2e95174c45f7ccfb5b9a44ca61897140ceae257f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Jan 2014 20:25:04 -0800 Subject: [PATCH] Added StreamingContext.awaitTermination to streaming examples. --- .../org/apache/spark/streaming/examples/JavaFlumeEventCount.java | 1 + .../org/apache/spark/streaming/examples/JavaKafkaWordCount.java | 1 + .../apache/spark/streaming/examples/JavaNetworkWordCount.java | 1 + .../org/apache/spark/streaming/examples/JavaQueueStream.java | 1 + .../org/apache/spark/streaming/examples/ActorWordCount.scala | 1 + .../org/apache/spark/streaming/examples/FlumeEventCount.scala | 1 + .../org/apache/spark/streaming/examples/HdfsWordCount.scala | 1 + .../org/apache/spark/streaming/examples/KafkaWordCount.scala | 1 + .../org/apache/spark/streaming/examples/MQTTWordCount.scala | 1 + .../org/apache/spark/streaming/examples/NetworkWordCount.scala | 1 + .../org/apache/spark/streaming/examples/RawNetworkGrep.scala | 1 + .../spark/streaming/examples/RecoverableNetworkWordCount.scala | 1 + .../spark/streaming/examples/StatefulNetworkWordCount.scala | 1 + .../org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala | 1 + .../org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala | 1 + .../org/apache/spark/streaming/examples/TwitterPopularTags.scala | 1 + .../org/apache/spark/streaming/examples/ZeroMQWordCount.scala | 1 + 17 files changed, 17 insertions(+) diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 7b5a243e26414..f061001dd264d 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -70,5 +70,6 @@ public String call(Long in) { }).print(); ssc.start(); + ssc.awaitTermination(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 04f62ee204145..2ffd351b4e498 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -104,5 +104,6 @@ public Integer call(Integer i1, Integer i2) { wordCounts.print(); jssc.start(); + jssc.awaitTermination(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index 349d826ab5df7..7777c9832abd3 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -84,5 +84,6 @@ public Integer call(Integer i1, Integer i2) { wordCounts.print(); ssc.start(); + ssc.awaitTermination(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index e2d55f1a4e180..26c44620abec1 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -80,5 +80,6 @@ public Integer call(Integer i1, Integer i2) { reducedStream.print(); ssc.start(); + ssc.awaitTermination(); } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 5a4aa7f3a2524..a5888811cc5ea 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -171,5 +171,6 @@ object ActorWordCount { lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index a59be7899dd37..11c3aaad3c8a8 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -60,5 +60,6 @@ object FlumeEventCount { stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index 704b315ef8b22..954bcc9b6ef5d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -50,6 +50,7 @@ object HdfsWordCount { val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 4a3d81c09a122..d9cb7326bb97d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -61,6 +61,7 @@ object KafkaWordCount { wordCounts.print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 78b49fdcf1eb3..eb61caf8c85b9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -101,5 +101,6 @@ object MQTTWordCount { val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 02264757123db..5656d487a57cc 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -54,5 +54,6 @@ object NetworkWordCount { val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index 99b79c3949a4e..cdd7547d0d3b4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -61,5 +61,6 @@ object RawNetworkGrep { union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 8c5d0bd56845b..aa82bf3c6bd8e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -114,5 +114,6 @@ object RecoverableNetworkWordCount { createContext(master, ip, port, outputPath) }) ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index 1183eba84686b..88f1cef89b318 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -65,5 +65,6 @@ object StatefulNetworkWordCount { val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 483c4d311810f..bbd44948b6fa5 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -110,5 +110,6 @@ object TwitterAlgebirdCMS { }) ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 94c2bf29ac433..a0094d460feec 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -87,5 +87,6 @@ object TwitterAlgebirdHLL { }) ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 8a70d4a978cd4..896d010c68f18 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -69,5 +69,6 @@ object TwitterPopularTags { }) ssc.start() + ssc.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 12d2a1084f900..85b4ce5e81950 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -91,5 +91,6 @@ object ZeroMQWordCount { val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() + ssc.awaitTermination() } }