Skip to content

Commit

Permalink
fixed pep8 violation
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent f198d14 commit b171ec3
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 10 deletions.
10 changes: 5 additions & 5 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class StreamingContext(object):
"""

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, sparkContext=None, duration=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, sparkContext=None, duration=None):
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -63,8 +63,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
if sparkContext is None:
# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
else:
self._sc = sparkContext

Expand Down Expand Up @@ -107,7 +107,7 @@ def awaitTermination(self, timeout=None):
else:
self._jssc.awaitTermination(timeout)

#TODO: add storageLevel
# TODO: add storageLevel
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
Expand Down
9 changes: 5 additions & 4 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def filter(self, f):
"""
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
def func(iterator):
return ifilter(f, iterator)
return self.mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
Expand Down Expand Up @@ -136,7 +137,7 @@ def reduceByKey(self, func, numPartitions=None):
return self.combineByKey(lambda x: x, func, func, numPartitions)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = None):
numPartitions=None):
"""
Count the number of elements for each key, and return the result to the
master as a dictionary
Expand All @@ -159,7 +160,7 @@ def combineLocally(iterator):
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
if not k in combiners:
if k not in combiners:
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
Expand Down Expand Up @@ -194,7 +195,7 @@ def add_shuffle_key(split, iterator):
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
id(partitionFunc))
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
partitioner).asJavaDStream()
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def _is_duration(self, instance):
if not isinstance(instance, Duration):
raise TypeError("This should be Duration")


def Milliseconds(milliseconds):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand All @@ -346,6 +347,7 @@ def Milliseconds(milliseconds):
"""
return Duration(milliseconds)


def Seconds(seconds):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand All @@ -359,6 +361,7 @@ def Seconds(seconds):
"""
return Duration(seconds * 1000)


def Minutes(minutes):
"""
Helper function that creates instance of [[pysparkstreaming.duration]] representing
Expand Down
Loading

0 comments on commit b171ec3

Please sign in to comment.