Skip to content

Commit

Permalink
[SPARK-4477] [PySpark] remove numpy from RDDSampler
Browse files Browse the repository at this point in the history
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy.

numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927.

It also complicate the code a lot, so we may should remove numpy from RDDSampler.

I also did some benchmark to verify that:
```
>>> from pyspark.mllib.random import RandomRDDs
>>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
>>> rdd.count()  # cache it
>>> rdd.sample(True, 0.9).count()    # measure this line
```
the results:

|withReplacement      |  random  | numpy.random |
 ------- | ------------ |  -------
|True | 1.5 s|  1.4 s|
|False|  0.6 s | 0.8 s|

closes apache#2313

Note: this patch including some commits that not mirrored to github, it will be OK after it catches up.

Author: Davies Liu <davies@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3351 from davies/numpy and squashes the following commits:

5c438d7 [Davies Liu] fix comment
c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477
98eb31b [Xiangrui Meng] make poisson sampling slightly faster
ee17d78 [Davies Liu] remove = for float
13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
f583023 [Davies Liu] fix tests
51649f5 [Davies Liu] remove numpy in RDDSampler
78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()

(cherry picked from commit d39f2e9)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
  • Loading branch information
Davies Liu authored and mengxr committed Nov 21, 2014
1 parent 69e2804 commit 5153aa0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 69 deletions.
10 changes: 6 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ def distinct(self, numPartitions=None):

def sample(self, withReplacement, fraction, seed=None):
"""
Return a sampled subset of this RDD (relies on numpy and falls back
on default random generator if numpy is unavailable).
Return a sampled subset of this RDD.
>>> rdd = sc.parallelize(range(100), 4)
>>> rdd.sample(False, 0.1, 81).count()
10
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
Expand Down Expand Up @@ -343,8 +346,7 @@ def randomSplit(self, weights, seed=None):
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD (currently requires
numpy).
Return a fixed-size sampled subset of this RDD.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
Expand Down
99 changes: 34 additions & 65 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,48 @@

import sys
import random
import math


class RDDSamplerBase(object):

def __init__(self, withReplacement, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, (
"NumPy does not appear to be installed. "
"Falling back to default random generator for sampling.")
self._use_numpy = False

self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._random = None
self._split = None
self._rand_initialized = False

def initRandomGenerator(self, split):
if self._use_numpy:
import numpy
self._random = numpy.random.RandomState(self._seed ^ split)
else:
self._random = random.Random(self._seed ^ split)
self._random = random.Random(self._seed ^ split)

# mixing because the initial seeds are close to each other
for _ in xrange(10):
self._random.randint(0, 1)

self._split = split
self._rand_initialized = True

def getUniformSample(self, split):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.random_sample()
def getUniformSample(self):
return self._random.random()

def getPoissonSample(self, mean):
# Using Knuth's algorithm described in
# http://en.wikipedia.org/wiki/Poisson_distribution
if mean < 20.0:
# one exp and k+1 random calls
l = math.exp(-mean)
p = self._random.random()
k = 0
while p > l:
k += 1
p *= self._random.random()
else:
return self._random.uniform(0.0, 1.0)

def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.poisson(mean)
else:
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
# drawing a sequence of numbers delta_j ~ Exp(mean)
num_arrivals = 1
cur_time = 0.0

cur_time += self._random.expovariate(mean)
# switch to the log domain, k+1 expovariate (random + log) calls
p = self._random.expovariate(mean)
k = 0
while p < 1.0:
k += 1
p += self._random.expovariate(mean)
return k

if cur_time > 1.0:
return 0

while(cur_time <= 1.0):
cur_time += self._random.expovariate(mean)
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)
def func(self, split, iterator):
raise NotImplementedError


class RDDSampler(RDDSamplerBase):
Expand All @@ -101,31 +68,32 @@ def __init__(self, withReplacement, fraction, seed=None):
self._fraction = fraction

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fraction)
count = self.getPoissonSample(self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
if self.getUniformSample() < self._fraction:
yield obj


class RDDRangeSampler(RDDSamplerBase):

def __init__(self, lowerBound, upperBound, seed=None):
RDDSamplerBase.__init__(self, False, seed)
self._use_numpy = False # no performance gain from numpy
self._lowerBound = lowerBound
self._upperBound = upperBound

def func(self, split, iterator):
self.initRandomGenerator(split)
for obj in iterator:
if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
if self._lowerBound <= self.getUniformSample() < self._upperBound:
yield obj


Expand All @@ -136,15 +104,16 @@ def __init__(self, withReplacement, fractions, seed=None):
self._fractions = fractions

def func(self, split, iterator):
self.initRandomGenerator(split)
if self._withReplacement:
for key, val in iterator:
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fractions[key])
count = self.getPoissonSample(self._fractions[key])
for _ in range(0, count):
yield key, val
else:
for key, val in iterator:
if self.getUniformSample(split) <= self._fractions[key]:
if self.getUniformSample() < self._fractions[key]:
yield key, val

0 comments on commit 5153aa0

Please sign in to comment.