Skip to content

Commit

Permalink
Merge pull request alteryx#46 from mateiz/py-sort-update
Browse files Browse the repository at this point in the history
Fix PySpark docs and an overly long line of code after alteryx#38

Just noticed these after merging that commit (https://github.com/apache/incubator-spark/pull/38).
  • Loading branch information
mateiz committed Oct 9, 2013
2 parents 7b3ae04 + 478b2b7 commit 7827efc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This guide will show how to use the Spark features described there in Python.
There are a few key differences between the Python and Scala APIs:

* Python is dynamically typed, so RDDs can hold objects of multiple types.
* PySpark does not yet support a few API calls, such as `lookup`, `sort`, and non-text input files, though these will be added in future releases.
* PySpark does not yet support a few API calls, such as `lookup` and non-text input files, though these will be added in future releases.

In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types.
Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax:
Expand Down
16 changes: 8 additions & 8 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ def getCheckpointFile(self):
else:
return None

# TODO persist(self, storageLevel)

def map(self, f, preservesPartitioning=False):
"""
Return a new RDD containing the distinct elements in this RDD.
Expand Down Expand Up @@ -227,7 +225,7 @@ def takeSample(self, withReplacement, num, seed):
total = num

samples = self.sample(withReplacement, fraction, seed).collect()

# If the first sample didn't turn out large enough, keep trying to take samples;
# this shouldn't happen often because we use a big multiplier for their initial size.
# See: scala/spark/RDD.scala
Expand Down Expand Up @@ -288,7 +286,7 @@ def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)

samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
samples = sorted(samples, reverse=(not ascending), key=keyfunc)

# we have numPartitions many parts but one of the them has
Expand All @@ -309,7 +307,9 @@ def rangePartitionFunc(k):
def mapFunc(iterator):
yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))

return self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc).mapPartitions(mapFunc,preservesPartitioning=True).flatMap(lambda x: x, preservesPartitioning=True)
return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
.mapPartitions(mapFunc,preservesPartitioning=True)
.flatMap(lambda x: x, preservesPartitioning=True))

def glom(self):
"""
Expand Down Expand Up @@ -471,7 +471,7 @@ def count(self):
3
"""
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

def stats(self):
"""
Return a L{StatCounter} object that captures the mean, variance
Expand Down Expand Up @@ -508,7 +508,7 @@ def stdev(self):
0.816...
"""
return self.stats().stdev()

def sampleStdev(self):
"""
Compute the sample standard deviation of this RDD's elements (which corrects for bias in
Expand Down Expand Up @@ -878,7 +878,7 @@ def subtractByKey(self, other, numPartitions=None):
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
"""
filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
Expand Down

0 comments on commit 7827efc

Please sign in to comment.