Skip to content

Commit

Permalink
Merge pull request apache#482 from tdas/streaming-example-fix
Browse files Browse the repository at this point in the history
Added StreamingContext.awaitTermination to streaming examples

StreamingContext.start() currently starts a non-daemon thread which prevents termination of a Spark Streaming program even if main function has exited. Since the expected behavior of a streaming program is to run until explicitly killed, this was sort of fine when spark streaming applications are launched from the command line. However, when launched in Yarn-standalone mode, this did not work as the driver effectively got terminated when the main function exits. So SparkStreaming examples did not work on Yarn.

This addition to the examples ensures that the examples work on Yarn and also ensures that everyone learns that StreamingContext.awaitTermination() being necessary for SparkStreaming programs to wait.

The true bug-fix of making sure all threads by Spark Streaming are daemon threads is left for post-0.9.
  • Loading branch information
pwendell committed Jan 21, 2014
2 parents 7373ffb + 2e95174 commit 0367981
Show file tree
Hide file tree
Showing 17 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ public String call(Long in) {
}).print();

ssc.start();
ssc.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ public Integer call(Integer i1, Integer i2) {

wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,6 @@ public Integer call(Integer i1, Integer i2) {

wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,6 @@ public Integer call(Integer i1, Integer i2) {

reducedStream.print();
ssc.start();
ssc.awaitTermination();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,6 @@ object ActorWordCount {
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ object FlumeEventCount {
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object HdfsWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object KafkaWordCount {
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ object MQTTWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ object NetworkWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ object RawNetworkGrep {
union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,6 @@ object RecoverableNetworkWordCount {
createContext(master, ip, port, outputPath)
})
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ object StatefulNetworkWordCount {
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ object TwitterAlgebirdCMS {
})

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,6 @@ object TwitterAlgebirdHLL {
})

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ object TwitterPopularTags {
})

ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@ object ZeroMQWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

0 comments on commit 0367981

Please sign in to comment.