Skip to content

Commit

Permalink
add Python examples in Streaming Programming Guide
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 15, 2014
1 parent 7b4f39f commit 7e4bb8a
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 5 deletions.
208 changes: 206 additions & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,67 @@ The complete code can be found in the Spark Streaming example
[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
<br>

</div>
<div data-lang="python" markdown="1" >
First, we import StreamingContext, which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.

{% highlight python %}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = new StreamingContext(sc, 1)
{% endhighlight %}

Using this context, we can create a DStream that represents streaming data from a TCP
source hostname, e.g. `localhost`, and port, e.g. `9999`

{% highlight python %}
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}

This `lines` DStream represents the stream of data that will be received from the data
server. Each record in this DStream is a line of text. Next, we want to split the lines by
space into words.

{% highlight python %}
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
{% endhighlight %}

`flatMap` is a one-to-many DStream operation that creates a new DStream by
generating multiple new records from each record in the source DStream. In this case,
each line will be split into multiple words and the stream of words is represented as the
`words` DStream. Next, we want to count these words.

{% highlight python %}
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
{% endhighlight %}

The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
1)` pairs, which is then reduced to get the frequency of words in each batch of data.
Finally, `wordCounts.pprint()` will print a few of the counts generated every second.

Note that when these lines are executed, Spark Streaming only sets up the computation it
will perform when it is started, and no real processing has started yet. To start the processing
after all the transformations have been setup, we finally call

{% highlight python %}
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
{% endhighlight %}

The complete code can be found in the Spark Streaming example
[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/network_wordcount.py).
<br>

</div>
</div>

Expand All @@ -236,6 +297,11 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight bash %}
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
{% endhighlight %}
</div>
</div>


Expand All @@ -259,8 +325,11 @@ hello world
</td>
<td width="2%"></td>
<td>
<div class="codetabs">

<div data-lang="scala" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
Expand All @@ -271,6 +340,37 @@ Time: 1357008430000 ms
(world,1)
...
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight bash %}
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
{% endhighlight %}
</div>
</div>
</td>
</table>

Expand Down Expand Up @@ -398,6 +498,30 @@ JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">

A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object.

{% highlight python %}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
{% endhighlight %}

The `appName` parameter is a name for your application to show on the cluster UI.
`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
you will not want to hardcode `master` in the program,
but rather [launch the application with `spark-submit`](submitting-applications.html) and
receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
in-process (detects the number of cores in the local system).

The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
section for more details.
</div>
</div>

After a context is defined, you have to do the follow steps.
Expand Down Expand Up @@ -483,6 +607,9 @@ methods for creating DStreams from files and Akka actors as input sources.
<div data-lang="java" markdown="1">
streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
</div>
<div data-lang="python" markdown="1">
streamingContext.textFileStream(dataDirectory);
</div>
</div>

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
Expand Down Expand Up @@ -684,13 +811,30 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
{% endhighlight %}

This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
1)` pairs in the [earlier example](#a-quick-example)).

{% highlight python %}
runningCounts = pairs.updateStateByKey(updateFunction)
{% endhighlight %}

</div>
</div>

The update function will be called for each word, with `newValues` having a sequence of 1's (from
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
Scala code, take a look at the example
[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
[stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py).

#### Transform Operation
{:.no_toc}
Expand Down Expand Up @@ -732,6 +876,15 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
});
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
{% endhighlight %}
</div>
</div>

Expand Down Expand Up @@ -793,6 +946,14 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

{% highlight python %}
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
{% endhighlight %}

</div>
</div>

Expand Down Expand Up @@ -860,6 +1021,7 @@ see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)

***

Expand Down Expand Up @@ -1376,6 +1538,44 @@ You can also explicitly create a `JavaStreamingContext` from the checkpoint data
the computation by using `new JavaStreamingContext(checkpointDirectory)`.

</div>
<div data-lang="python" markdown="1">

This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.

{% highlight python %}
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()
{% endhighlight %}

If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
If the directory does not exist (i.e., running for the first time),
then the function `functionToCreateContext` will be called to create a new
context and set up the DStreams. See the Python example
[recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
This example appends the word counts of network data into a file.

You can also explicitly create a `StreamingContext` from the checkpoint data and start the
computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.

</div>

</div>

**Note**: If Spark Streaming and/or the Spark Streaming program is recompiled,
Expand Down Expand Up @@ -1572,7 +1772,11 @@ package and renamed for better clarity.
[TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html),
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
- Python docs
* [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
* [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)

* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
and [Python] ({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in text encoded with UTF8 received from the network every second.
Usage: recoverable_network_wordcount.py <hostname> <port> <checkpoint-directory> <output-file>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
<output-file> file to which the word counts will be appended
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/recoverable_network_wordcount.py \
localhost 9999 ~/checkpoint/ ~/out`
If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
the checkpoint data.
"""

import os
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


def createContext(host, port, outputPath):
# If you do not see this printed, that means the StreamingContext has been loaded
# from the new checkpoint
print "Creating new context"
if os.path.exists(outputPath):
os.remove(outputPath)
sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time, rdd):
counts = "Counts at time %s %s" % (time, rdd.collect())
print counts
print "Appending to " + os.path.abspath(outputPath)
with open(outputPath, 'a') as f:
f.write(counts + "\n")

wordCounts.foreachRDD(echo)
return ssc

if __name__ == "__main__":
if len(sys.argv) != 5:
print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
"<checkpoint-directory> <output-file>"
exit(-1)
host, port, checkpoint, output = sys.argv[1:]
ssc = StreamingContext.getOrCreate(checkpoint,
lambda: createContext(host, int(port), output))
ssc.start()
ssc.awaitTermination()
8 changes: 5 additions & 3 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,11 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio
if `invReduceFunc` is not None, the reduction is done incrementally
using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc`
Expand Down

0 comments on commit 7e4bb8a

Please sign in to comment.