Skip to content

Commit

Permalink
add example and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jan 8, 2015
1 parent 98c8d17 commit f6ce899
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 78 deletions.
55 changes: 34 additions & 21 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ private object SpecialLengths {
val PYTHON_EXCEPTION_THROWN = -2
val TIMING_DATA = -3
val END_OF_STREAM = -4
val NULL = -5
}

private[spark] object PythonRDD extends Logging {
Expand Down Expand Up @@ -374,49 +375,61 @@ private[spark] object PythonRDD extends Logging {
// The right way to implement this would be to use TypeTags to get the full
// type of T. Since I don't want to introduce breaking changes throughout the
// entire Spark API, I have to use this hacky approach:
def write(bytes: Array[Byte]) {
if (bytes == null) {
dataOut.writeInt(SpecialLengths.NULL)
} else {
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
}
def writeS(str: String) {
if (str == null) {
dataOut.writeInt(SpecialLengths.NULL)
} else {
writeUTF(str, dataOut)
}
}
if (iter.hasNext) {
val first = iter.next()
val newIter = Seq(first).iterator ++ iter
first match {
case arr: Array[Byte] =>
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
newIter.asInstanceOf[Iterator[Array[Byte]]].foreach(write)
case string: String =>
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
newIter.asInstanceOf[Iterator[String]].foreach(writeS)
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
write(stream.toArray())
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
writeS(key)
write(stream.toArray())
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
writeS(key)
writeS(value)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
write(key)
write(value)
}
// key is null
case (null, v:Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
write(key)
write(value)
}

case other =>
throw new SparkException("Unexpected element type " + first.getClass)
throw new SparkException("Unexpected element type " + other.getClass)
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions examples/src/main/python/streaming/kafka_wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <zk> <topic>
To run this on your local machine, you need to setup Kafka and create a producer first
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
and then run the example
`$ bin/spark-submit --driver-class-path lib_managed/jars/kafka_*.jar:\
external/kafka/target/scala-*/spark-streaming-kafka_*.jar examples/src/main/python/\
streaming/kafka_wordcount.py localhost:2181 test`
"""

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: network_wordcount.py <zk> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

zkQuorum, topic = sys.argv[1:]
lines = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
counts = lines.map(lambda x: x[1]).flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
7 changes: 6 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class SpecialLengths(object):
PYTHON_EXCEPTION_THROWN = -2
TIMING_DATA = -3
END_OF_STREAM = -4
NULL = -5


class Serializer(object):
Expand Down Expand Up @@ -145,8 +146,10 @@ def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
if length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if obj == "":
if len(obj) < length:
raise EOFError
return self.loads(obj)

Expand Down Expand Up @@ -480,6 +483,8 @@ def loads(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError
if length == SpecialLengths.NULL:
return None
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s

Expand Down
8 changes: 5 additions & 3 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.streaming import DStream

__all__ = ['KafkaUtils']
__all__ = ['KafkaUtils', 'utf8_decoder']


def utf8_decoder(s):
return s.decode('utf-8')
""" Decode the unicode as UTF-8 """
return s and s.decode('utf-8')


class KafkaUtils(object):
Expand Down Expand Up @@ -70,7 +71,8 @@ def getClassByName(name):
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
if 'call a package' in e.message:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
print "No kafka package, please build it and add it into classpath:"
print " $ sbt/sbt streaming-kafka/package"
print " $ bin/submit --driver-class-path lib_managed/jars/kafka_2.10-0.8.0.jar:" \
Expand Down
53 changes: 0 additions & 53 deletions python/pyspark/streaming/mqtt.py

This file was deleted.

0 comments on commit f6ce899

Please sign in to comment.