From 5153aa041fd4ca8b2a4df4d635598090280655c6 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 20 Nov 2014 16:40:25 -0800 Subject: [PATCH] [SPARK-4477] [PySpark] remove numpy from RDDSampler In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy. numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927. It also complicate the code a lot, so we may should remove numpy from RDDSampler. I also did some benchmark to verify that: ``` >>> from pyspark.mllib.random import RandomRDDs >>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache() >>> rdd.count() # cache it >>> rdd.sample(True, 0.9).count() # measure this line ``` the results: |withReplacement | random | numpy.random | ------- | ------------ | ------- |True | 1.5 s| 1.4 s| |False| 0.6 s | 0.8 s| closes #2313 Note: this patch including some commits that not mirrored to github, it will be OK after it catches up. Author: Davies Liu Author: Xiangrui Meng Closes #3351 from davies/numpy and squashes the following commits: 5c438d7 [Davies Liu] fix comment c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477 98eb31b [Xiangrui Meng] make poisson sampling slightly faster ee17d78 [Davies Liu] remove = for float 13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy f583023 [Davies Liu] fix tests 51649f5 [Davies Liu] remove numpy in RDDSampler 78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain f5fdf63 [Davies Liu] fix bug with int in weights 4dfa2cd [Davies Liu] refactor f866bcf [Davies Liu] remove unneeded change c7a2007 [Davies Liu] switch to python implementation 95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit 0d9b256 [Davies Liu] refactor 1715ee3 [Davies Liu] address comments 41fce54 [Davies Liu] randomSplit() (cherry picked from commit d39f2e9c683a4ab78b29eb3c5668325bf8568e8c) Signed-off-by: Xiangrui Meng --- python/pyspark/rdd.py | 10 ++-- python/pyspark/rddsampler.py | 99 +++++++++++++----------------------- 2 files changed, 40 insertions(+), 69 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 50535d2711708..57754776faaa2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -310,8 +310,11 @@ def distinct(self, numPartitions=None): def sample(self, withReplacement, fraction, seed=None): """ - Return a sampled subset of this RDD (relies on numpy and falls back - on default random generator if numpy is unavailable). + Return a sampled subset of this RDD. + + >>> rdd = sc.parallelize(range(100), 4) + >>> rdd.sample(False, 0.1, 81).count() + 10 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) @@ -343,8 +346,7 @@ def randomSplit(self, weights, seed=None): # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ - Return a fixed-size sampled subset of this RDD (currently requires - numpy). + Return a fixed-size sampled subset of this RDD. >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 558dcfd12d46f..459e1427803cb 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -17,81 +17,48 @@ import sys import random +import math class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False - - self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None - self._split = None - self._rand_initialized = False def initRandomGenerator(self, split): - if self._use_numpy: - import numpy - self._random = numpy.random.RandomState(self._seed ^ split) - else: - self._random = random.Random(self._seed ^ split) + self._random = random.Random(self._seed ^ split) # mixing because the initial seeds are close to each other for _ in xrange(10): self._random.randint(0, 1) - self._split = split - self._rand_initialized = True - - def getUniformSample(self, split): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.random_sample() + def getUniformSample(self): + return self._random.random() + + def getPoissonSample(self, mean): + # Using Knuth's algorithm described in + # http://en.wikipedia.org/wiki/Poisson_distribution + if mean < 20.0: + # one exp and k+1 random calls + l = math.exp(-mean) + p = self._random.random() + k = 0 + while p > l: + k += 1 + p *= self._random.random() else: - return self._random.uniform(0.0, 1.0) - - def getPoissonSample(self, split, mean): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.poisson(mean) - else: - # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by - # drawing a sequence of numbers delta_j ~ Exp(mean) - num_arrivals = 1 - cur_time = 0.0 - - cur_time += self._random.expovariate(mean) + # switch to the log domain, k+1 expovariate (random + log) calls + p = self._random.expovariate(mean) + k = 0 + while p < 1.0: + k += 1 + p += self._random.expovariate(mean) + return k - if cur_time > 1.0: - return 0 - - while(cur_time <= 1.0): - cur_time += self._random.expovariate(mean) - num_arrivals += 1 - - return (num_arrivals - 1) - - def shuffle(self, vals): - if self._random is None: - self.initRandomGenerator(0) # this should only ever called on the master so - # the split does not matter - - if self._use_numpy: - self._random.shuffle(vals) - else: - self._random.shuffle(vals, self._random.random) + def func(self, split, iterator): + raise NotImplementedError class RDDSampler(RDDSamplerBase): @@ -101,17 +68,18 @@ def __init__(self, withReplacement, fraction, seed=None): self._fraction = fraction def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for obj in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fraction) + count = self.getPoissonSample(self._fraction) for _ in range(0, count): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self.getUniformSample() < self._fraction: yield obj @@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) - self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound def func(self, split, iterator): + self.initRandomGenerator(split) for obj in iterator: - if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + if self._lowerBound <= self.getUniformSample() < self._upperBound: yield obj @@ -136,15 +104,16 @@ def __init__(self, withReplacement, fractions, seed=None): self._fractions = fractions def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for key, val in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fractions[key]) + count = self.getPoissonSample(self._fractions[key]) for _ in range(0, count): yield key, val else: for key, val in iterator: - if self.getUniformSample(split) <= self._fractions[key]: + if self.getUniformSample() < self._fractions[key]: yield key, val