Skip to content

Commit

Permalink
remove Time and Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 27, 2014
1 parent 3f0fb4b commit c499ba0
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 559 deletions.
20 changes: 8 additions & 12 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
#

from pyspark import RDD
from pyspark.serializers import UTF8Deserializer, BatchedSerializer
from pyspark.serializers import UTF8Deserializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
from pyspark.streaming.duration import Seconds

from py4j.java_collections import ListConverter

Expand Down Expand Up @@ -76,9 +75,6 @@ def __init__(self, sparkContext, duration):
@param duration: A L{Duration} object or seconds for SparkStreaming.
"""
if isinstance(duration, (int, long, float)):
duration = Seconds(duration)

self._sc = sparkContext
self._jvm = self._sc._jvm
self._start_callback_server()
Expand All @@ -93,7 +89,10 @@ def _start_callback_server(self):
gw._python_proxy_port = gw._callback_server.port # update port with real port

def _initialize_context(self, sc, duration):
return self._jvm.JavaStreamingContext(sc._jsc, duration._jduration)
return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration))

def _jduration(self, seconds):
return self._jvm.Duration(int(seconds * 1000))

@property
def sparkContext(self):
Expand All @@ -111,12 +110,12 @@ def start(self):
def awaitTermination(self, timeout=None):
"""
Wait for the execution to stop.
@param timeout: time to wait in milliseconds
@param timeout: time to wait in seconds
"""
if timeout is None:
self._jssc.awaitTermination()
else:
self._jssc.awaitTermination(timeout)
self._jssc.awaitTermination(int(timeout * 1000))

def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
Expand All @@ -139,10 +138,7 @@ def remember(self, duration):
@param duration Minimum duration (in seconds) that each DStream
should remember its RDDs
"""
if isinstance(duration, (int, long, float)):
duration = Seconds(duration)

self._jssc.remember(duration._jduration)
self._jssc.remember(self._jduration(duration))

def checkpoint(self, directory):
"""
Expand Down
13 changes: 4 additions & 9 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.util import rddToFileName, RDDFunction, RDDFunction2
from pyspark.rdd import portable_hash
from pyspark.streaming.duration import Duration, Seconds
from pyspark.resultiterable import ResultIterable

__all__ = ["DStream"]
Expand Down Expand Up @@ -334,10 +333,10 @@ def slice(self, begin, end):
return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds]

def window(self, windowDuration, slideDuration=None):
d = Seconds(windowDuration)
d = self._ssc._jduration(windowDuration)
if slideDuration is None:
return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
s = Seconds(slideDuration)
s = self._ssc._jduration(slideDuration)
return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)

def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
Expand Down Expand Up @@ -375,16 +374,12 @@ def invReduceFunc(a, b, t):
joined = a.leftOuterJoin(b, numPartitions)
return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)

if not isinstance(windowDuration, Duration):
windowDuration = Seconds(windowDuration)
if not isinstance(slideDuration, Duration):
slideDuration = Seconds(slideDuration)
jreduceFunc = RDDFunction2(self.ctx, reduceFunc, reduced._jrdd_deserializer)
jinvReduceFunc = RDDFunction2(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
jreduceFunc, jinvReduceFunc,
windowDuration._jduration,
slideDuration._jduration)
self._ssc._jduration(windowDuration),
self._ssc._jduration(slideDuration))
return DStream(dstream.asJavaDStream(), self._ssc, self.ctx.serializer)

def updateStateByKey(self, updateFunc, numPartitions=None):
Expand Down
Loading

0 comments on commit c499ba0

Please sign in to comment.