Skip to content

Commit

Permalink
Start investigating moving to iterators for python API like the Java/…
Browse files Browse the repository at this point in the history
…Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent 88a5cef commit c60233a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,5 @@ def dispatch(seq):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
return (vbuf, wbuf)
return (iter(vbuf), iter(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
4 changes: 2 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ def mergeCombiners(a, b):
return a + b

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions)
numPartitions).mapValues(lambda x: iter(x))

# TODO: add tests
def flatMapValues(self, f):
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def cogroup(self, other, numPartitions=None):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())
>>> sorted(list(x.cogroup(y).collect()))
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup(self, other, numPartitions)
Expand Down

0 comments on commit c60233a

Please sign in to comment.