From 4b69fb15dd3fde3390aa4c35fdcb9171c18c29d1 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 16:27:28 -0700 Subject: [PATCH] fied input of socketTextDStream --- .../python/streaming/nerwork_wordcount.py | 2 +- python/pyspark/java_gateway.py | 1 + python/pyspark/streaming/context.py | 25 +++---------------- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/examples/src/main/python/streaming/nerwork_wordcount.py b/examples/src/main/python/streaming/nerwork_wordcount.py index 2e5048ccad213..67dc28f7bf7f0 100644 --- a/examples/src/main/python/streaming/nerwork_wordcount.py +++ b/examples/src/main/python/streaming/nerwork_wordcount.py @@ -10,7 +10,7 @@ exit(-1) ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1)) - lines = ssc.socketTextStream(sys.argv[1], sys.argv[2]) + lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 4547e54bd2d5d..9fd59be1456ef 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -87,6 +87,7 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.streaming.*") java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") + java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d3ff16fca764f..5dcc9ba35a653 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,7 +19,7 @@ from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer -from pyspark.storagelevel import StorageLevel +from pyspark.storagelevel import * from pyspark.rdd import RDD from pyspark.context import SparkContext @@ -83,26 +83,9 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination() - def checkpoint(self, directory): - raise NotImplementedError - - def fileStream(self, directory, filter=None, newFilesOnly=None): - raise NotImplementedError - - def networkStream(self, receiver): - raise NotImplementedError - - def queueStream(self, queue, oneAtATime=True, defaultRDD=None): - raise NotImplementedError - - def rawSocketStream(self, hostname, port, storagelevel): - raise NotImplementedError - - def remember(self, duration): - raise NotImplementedError - - def socketStream(hostname, port, converter,storageLevel): - raise NotImplementedError + # start from simple one. storageLevel is not passed for now. + def socketTextStream(self, hostname, port): + return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer()) def start(self): self._jssc.start()