diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index c58555fc9d2c5..ac55fcdc2057b 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -53,7 +53,8 @@ # mllib that depend on top level pyspark packages, which transitively depend on python's random. # Since Python's import logic looks for modules in the current package first, we eliminate # mllib.random as a candidate for C{import random} by removing the first search path, the script's -# location, in order to force the loader to look in Python's top-level modules for C{random}. +# location, in order to force the loader to look in Python's top-level +# modules for C{random}. import sys s = sys.path.pop(0) import random diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 45d36e5d0e764..f85e2501f9b03 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -97,7 +97,8 @@ pickleSer = PickleSerializer() # Holds accumulators registered on the current machine, keyed by ID. This is then used to send -# the local accumulator updates back to the driver program at the end of a task. +# the local accumulator updates back to the driver program at the end of a +# task. _accumulatorRegistry = {} @@ -110,6 +111,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param): class Accumulator(object): + """ A shared variable that can be accumulated, i.e., has a commutative and associative "add" operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=} @@ -139,14 +141,16 @@ def __reduce__(self): def value(self): """Get the accumulator's value; only usable in driver program""" if self._deserialized: - raise Exception("Accumulator.value cannot be accessed inside tasks") + raise Exception( + "Accumulator.value cannot be accessed inside tasks") return self._value @value.setter def value(self, value): """Sets the accumulator's value; only usable in driver program""" if self._deserialized: - raise Exception("Accumulator.value cannot be accessed inside tasks") + raise Exception( + "Accumulator.value cannot be accessed inside tasks") self._value = value def add(self, term): @@ -166,6 +170,7 @@ def __repr__(self): class AccumulatorParam(object): + """ Helper object that defines how to accumulate values of a given type. """ @@ -186,6 +191,7 @@ def addInPlace(self, value1, value2): class AddingAccumulatorParam(AccumulatorParam): + """ An AccumulatorParam that uses the + operators to add values. Designed for simple types such as integers, floats, and lists. Requires the zero value for the underlying type @@ -210,6 +216,7 @@ def addInPlace(self, value1, value2): class _UpdateRequestHandler(SocketServer.StreamRequestHandler): + """ This handler will keep polling updates from the same socket until the server is shutdown. @@ -218,7 +225,8 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry while not self.server.server_shutdown: - # Poll every 1 second for new data -- don't block in case of shutdown. + # Poll every 1 second for new data -- don't block in case of + # shutdown. r, _, _ = select.select([self.rfile], [], [], 1) if self.rfile in r: num_updates = read_int(self.rfile) @@ -228,7 +236,9 @@ def handle(self): # Write a byte in acknowledgement self.wfile.write(struct.pack("!b", 1)) + class AccumulatorServer(SocketServer.TCPServer): + """ A simple TCP server that intercepts shutdown() in order to interrupt our continuous polling on the handler. @@ -239,6 +249,7 @@ def shutdown(self): self.server_shutdown = True SocketServer.TCPServer.shutdown(self) + def _start_update_server(): """Start a TCP server to receive accumulator updates in a daemon thread, and returns it""" server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 43f40f8783bfd..f3e64989ed564 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -45,6 +45,7 @@ def _from_id(bid): class Broadcast(object): + """ A broadcast variable created with L{SparkContext.broadcast()}. diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index b4c82f519bd53..ebfc0ad003b3b 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -56,6 +56,7 @@ class SparkConf(object): + """ Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -124,7 +125,8 @@ def setSparkHome(self, value): def setExecutorEnv(self, key=None, value=None, pairs=None): """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): - raise Exception("Either pass one key-value pair or a list of pairs") + raise Exception( + "Either pass one key-value pair or a list of pairs") elif key is not None: self._jconf.setExecutorEnv(key, value) elif pairs is not None: diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2e80eb50f2207..3ecd9a58f5b74 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -47,6 +47,7 @@ class SparkContext(object): + """ Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create L{RDD}s and @@ -59,7 +60,8 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH + # zip and egg files that need to be added to PYTHONPATH + _python_includes = None _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @@ -99,13 +101,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = rdd._extract_concise_traceback() else: tempNamedTuple = namedtuple("Callsite", "function file linenum") - self._callsite = tempNamedTuple(function=None, file=None, linenum=None) + self._callsite = tempNamedTuple( + function=None, file=None, linenum=None) SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf) except: - # If an error occurs, clean up in order to allow future SparkContext creation: + # If an error occurs, clean up in order to allow future + # SparkContext creation: self.stop() raise @@ -138,7 +142,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, if not self._conf.contains("spark.master"): raise Exception("A master URL must be set in your configuration") if not self._conf.contains("spark.app.name"): - raise Exception("An application name must be set in your configuration") + raise Exception( + "An application name must be set in your configuration") # Read back our properties from the conf in case we loaded some of them from # the classpath or an external config file @@ -179,7 +184,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self.addPyFile(path) # Deploy code dependencies set by spark-submit; these will already have been added - # with SparkContext.addFile, so we just need to add them to the PYTHONPATH + # with SparkContext.addFile, so we just need to add them to the + # PYTHONPATH for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) @@ -189,9 +195,11 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, sys.path.append(dirname) # Create a temporary directory inside spark.local.dir: - local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) + local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir( + self._jsc.sc().conf()) self._temp_dir = \ - self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() + self._jvm.org.apache.spark.util.Utils.createTempDir( + local_dir).getAbsolutePath() def _initialize_context(self, jconf): """ @@ -213,7 +221,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): if instance: if (SparkContext._active_spark_context and - SparkContext._active_spark_context != instance): + SparkContext._active_spark_context != instance): currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite @@ -284,7 +292,8 @@ def parallelize(self, c, numSlices=None): # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) - # Make sure we distribute data evenly if it's smaller than self.batchSize + # Make sure we distribute data evenly if it's smaller than + # self.batchSize if "__len__" not in dir(c): c = list(c) # Make it a list so we can compute its length batchSize = min(len(c) // numSlices, self._batchSize) @@ -403,10 +412,12 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, Java object. (default sc._default_batch_size_for_serialized_input) """ minSplits = minSplits or min(self.defaultParallelism, 2) - batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + batchSize = max( + 1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if ( + batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, - keyConverter, valueConverter, minSplits, batchSize) + keyConverter, valueConverter, minSplits, batchSize) return RDD(jrdd, self, ser) def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -434,10 +445,13 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max( + 1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if ( + batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.newAPIHadoopFile( + self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf, batchSize) return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -462,10 +476,13 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max( + 1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if ( + batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD( + self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf, batchSize) return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -493,10 +510,13 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter= Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max( + 1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if ( + batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.hadoopFile( + self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf, batchSize) return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -521,10 +541,12 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + batchSize = max( + 1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if ( + batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf, batchSize) + keyConverter, valueConverter, jconf, batchSize) return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): @@ -587,7 +609,8 @@ def accumulator(self, value, accum_param=None): elif isinstance(value, complex): accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM else: - raise Exception("No default accumulator param for type %s" % type(value)) + raise Exception( + "No default accumulator param for type %s" % type(value)) SparkContext._next_accum_id += 1 return Accumulator(SparkContext._next_accum_id - 1, value, accum_param) @@ -632,12 +655,14 @@ def addPyFile(self, path): HTTP, HTTPS or FTP URI. """ self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + # dirname may be directory or HDFS/S3 prefix + (dirname, filename) = os.path.split(path) if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): self._python_includes.append(filename) # for tests in local mode - sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) + sys.path.append( + os.path.join(SparkFiles.getRootDirectory(), filename)) def setCheckpointDir(self, dirName): """ @@ -651,7 +676,8 @@ def _getJavaStorageLevel(self, storageLevel): Returns a Java StorageLevel based on a pyspark.StorageLevel. """ if not isinstance(storageLevel, StorageLevel): - raise Exception("storageLevel must be of type pyspark.StorageLevel") + raise Exception( + "storageLevel must be of type pyspark.StorageLevel") newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel return newStorageLevel(storageLevel.useDisk, @@ -754,13 +780,15 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): """ if partitions is None: partitions = range(rdd._jrdd.partitions().size()) - javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) + javaPartitions = ListConverter().convert( + partitions, self._gateway._gateway_client) # Implementation note: This is implemented as a mapPartitions followed # by runJob() in order to avoid having to pass a Python lambda into # SparkContext#runJob. mappedRDD = rdd.mapPartitions(partitionFunc) - it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) + it = self._jvm.PythonRDD.runJob( + self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) @@ -772,7 +800,8 @@ def _test(): globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 9fde0dde0f4b4..73fb691ea3bf3 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -30,7 +30,8 @@ def compute_real_exit_code(exit_code): - # SystemExit's code can be integer or string, but os._exit only accepts integers + # SystemExit's code can be integer or string, but os._exit only accepts + # integers if isinstance(exit_code, numbers.Integral): return exit_code else: @@ -43,7 +44,8 @@ def worker(sock): """ # Redirect stdout to stderr os.dup2(2, 1) - sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 + # The sys.stdout object is different from file descriptor 1 + sys.stdout = sys.stderr signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) @@ -62,7 +64,8 @@ def waitSocketClose(sock): # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because - # otherwise writes also cause a seek that makes us miss data on the read side. + # otherwise writes also cause a seek that makes us miss data on the read + # side. infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) exit_code = 0 @@ -106,7 +109,8 @@ def handle_sigchld(*args): try: pid, status = os.waitpid(0, os.WNOHANG) if status != 0: - msg = "worker %s crashed abruptly with exit status %s" % (pid, status) + msg = "worker %s crashed abruptly with exit status %s" % ( + pid, status) print >> sys.stderr, msg except EnvironmentError as err: if err.errno not in (ECHILD, EINTR): diff --git a/python/pyspark/files.py b/python/pyspark/files.py index 57ee14eeb7776..331de9a9b2212 100644 --- a/python/pyspark/files.py +++ b/python/pyspark/files.py @@ -19,6 +19,7 @@ class SparkFiles(object): + """ Resolves paths to files added through L{SparkContext.addFile()}. diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2c129679f47f3..2be2282f0c80b 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -39,12 +39,14 @@ def launch_gateway(): submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") submit_args = submit_args if submit_args is not None else "" submit_args = shlex.split(submit_args) - command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args + command = [ + os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + proc = Popen( + command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE) @@ -65,6 +67,7 @@ def preexec_func(): # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: class EchoOutputThread(Thread): + def __init__(self, stream): Thread.__init__(self) self.daemon = True diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index c6ca6a75df746..fca23111d3c2b 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -35,7 +35,8 @@ # Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, -# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. +# such as _dot and _serialize_double_vector, start to support scipy.sparse +# matrices. _have_scipy = False _scipy_issparse = None @@ -72,9 +73,9 @@ # Python interpreter must agree on what endian the machine is. -DENSE_VECTOR_MAGIC = 1 +DENSE_VECTOR_MAGIC = 1 SPARSE_VECTOR_MAGIC = 2 -DENSE_MATRIX_MAGIC = 3 +DENSE_MATRIX_MAGIC = 3 LABELED_POINT_MAGIC = 4 @@ -164,7 +165,8 @@ def _serialize_sparse_vector(v): header[1] = nonzeros _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32) values_offset = 9 + 4 * nonzeros - _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64) + _copyto(v.values, buffer=ba, offset=values_offset, + shape=[nonzeros], dtype=float64) return ba @@ -188,9 +190,11 @@ def _deserialize_double(ba, offset=0): True """ if type(ba) != bytearray: - raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba)) + raise TypeError( + "_deserialize_double called on a %s; wanted bytearray" % type(ba)) if len(ba) - offset != 8: - raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb) + raise TypeError( + "_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb) return struct.unpack("d", ba[offset:])[0] @@ -246,7 +250,8 @@ def _deserialize_sparse_vector(ba, offset=0): raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32) - values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) + values = _deserialize_numpy_array( + [nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) return SparseVector(int(size), indices, values) @@ -325,7 +330,8 @@ def _deserialize_labeled_point(ba, offset=0): if type(ba) != bytearray: raise TypeError("Expecting a bytearray but got %s" % type(ba)) if ba[offset] != LABELED_POINT_MAGIC: - raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0])) + raise TypeError("Expecting magic number %d but got %d" % + (LABELED_POINT_MAGIC, ba[0])) label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0] features = _deserialize_double_vector(ba, offset + 9) return LabeledPoint(label, features) @@ -339,7 +345,8 @@ def _copyto(array, buffer, offset, shape, dtype): TODO: In the future this could use numpy.copyto on NumPy 1.7+, but we should benchmark that to see whether it provides a benefit. """ - temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C') + temp_array = ndarray( + shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C') temp_array[...] = array @@ -356,7 +363,8 @@ def _get_unmangled_double_vector_rdd(data): return _get_unmangled_rdd(data, _serialize_double_vector) -# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points +# Map a pickled Python RDD of LabeledPoint to a Java RDD of +# _serialized_labeled_points def _get_unmangled_labeled_point_rdd(data): return _get_unmangled_rdd(data, _serialize_labeled_point) @@ -383,7 +391,8 @@ def _linear_predictor_typecheck(x, coeffs): elif (type(x) == RDD): raise RuntimeError("Bulk predict not yet supported.") else: - raise TypeError("Argument of type " + type(x).__name__ + " unsupported") + raise TypeError( + "Argument of type " + type(x).__name__ + " unsupported") # If we weren't given initial weights, take a zero vector of the appropriate @@ -430,6 +439,7 @@ def _serialize_rating(r): class RatingDeserializer(Serializer): + def loads(self, stream): length = struct.unpack("!i", stream.read(4))[0] ba = stream.read(length) @@ -478,7 +488,8 @@ def _convert_vector(vec): assert vec.shape[1] == 1, "Expected column vector" csc = vec.tocsc() return SparseVector(vec.shape[0], csc.indices, csc.data) - raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix") + raise TypeError( + "Expected NumPy array, SparseVector, or scipy.sparse matrix") def _squared_distance(v1, v2): @@ -531,7 +542,8 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 2bbb9c3fca315..2e31665105c62 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -31,6 +31,7 @@ class LogisticRegressionModel(LinearModel): + """A linear binary classification model derived from logistic regression. >>> data = [ @@ -60,6 +61,7 @@ class LogisticRegressionModel(LinearModel): >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0 True """ + def predict(self, x): _linear_predictor_typecheck(x, self._coeff) margin = _dot(x, self._coeff) + self._intercept @@ -72,6 +74,7 @@ def predict(self, x): class LogisticRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None): """Train a logistic regression model on the given data.""" @@ -83,6 +86,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWei class SVMModel(LinearModel): + """A support vector machine. >>> data = [ @@ -106,6 +110,7 @@ class SVMModel(LinearModel): >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0 True """ + def predict(self, x): _linear_predictor_typecheck(x, self._coeff) margin = _dot(x, self._coeff) + self._intercept @@ -113,6 +118,7 @@ def predict(self, x): class SVMWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): @@ -124,6 +130,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, class NaiveBayesModel(object): + """ Model for Naive Bayes classifiers. @@ -164,6 +171,7 @@ def predict(self, x): class NaiveBayes(object): + @classmethod def train(cls, data, lambda_=1.0): """ @@ -182,7 +190,8 @@ def train(cls, data, lambda_=1.0): """ sc = data.context dataBytes = _get_unmangled_labeled_point_rdd(data) - ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes( + dataBytes._jrdd, lambda_) return NaiveBayesModel( _deserialize_double_vector(ans[0]), _deserialize_double_vector(ans[1]), @@ -193,7 +202,8 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b380e8f6c8725..7e974d7186bf7 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -27,6 +27,7 @@ class KMeansModel(object): + """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) @@ -55,6 +56,7 @@ class KMeansModel(object): >>> type(model.clusterCenters) """ + def __init__(self, centers): self.centers = centers @@ -76,6 +78,7 @@ def predict(self, x): class KMeans(object): + @classmethod def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): """Train a k-means clustering model.""" @@ -96,7 +99,8 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 54720c2324ca6..41b1f2976893b 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -27,6 +27,7 @@ class SparseVector(object): + """ A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy's {scipy.sparse} data types. @@ -59,7 +60,8 @@ def __init__(self, size, *args): self.indices = array([p[0] for p in pairs], dtype=int32) self.values = array([p[1] for p in pairs], dtype=float64) else: - assert len(args[0]) == len(args[1]), "index and value arrays not same length" + assert len(args[0]) == len( + args[1]), "index and value arrays not same length" self.indices = array(args[0], dtype=int32) self.values = array(args[1], dtype=float64) for i in xrange(len(self.indices) - 1): @@ -88,10 +90,12 @@ def dot(self, other): result += self.values[i] * other[self.indices[i]] return result elif other.ndim == 2: - results = [self.dot(other[:, i]) for i in xrange(other.shape[1])] + results = [self.dot(other[:, i]) + for i in xrange(other.shape[1])] return array(results) else: - raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) + raise Exception( + "Cannot call dot with %d-dimensional array" % other.ndim) else: result = 0.0 i, j = 0, 0 @@ -167,7 +171,8 @@ def __str__(self): def __repr__(self): inds = self.indices vals = self.values - entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) + entries = ", ".join( + ["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) return "SparseVector({0}, {{{1}}})".format(self.size, entries) def __eq__(self, other): @@ -192,6 +197,7 @@ def __ne__(self, other): class Vectors(object): + """ Factory methods for working with vectors. Note that dense vectors are simply represented as NumPy array objects, so there is no need @@ -256,7 +262,8 @@ def _test(): if __name__ == "__main__": # remove current path from list of search paths to avoid importing mllib.random - # for C{import random}, which is done in an external dependency of pyspark during doctests. + # for C{import random}, which is done in an external dependency of pyspark + # during doctests. import sys sys.path.pop(0) _test() diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 36e710dbae7a8..1b51da913c4b3 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -24,7 +24,9 @@ from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector from pyspark.serializers import NoOpSerializer + class RandomRDDGenerators: + """ Generator methods for creating RDDs comprised of i.i.d samples from some distribution. @@ -52,8 +54,9 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): >>> parts == sc.defaultParallelism True """ - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD( + sc._jsc, size, numPartitions, seed) + uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -76,8 +79,9 @@ def normalRDD(sc, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - 1.0) < 0.1 True """ - jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) + jrdd = sc._jvm.PythonMLLibAPI().normalRDD( + sc._jsc, size, numPartitions, seed) + normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -97,8 +101,9 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - sqrt(mean)) < 0.5 True """ - jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD( + sc._jsc, mean, size, numPartitions, seed) + poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) @staticmethod @@ -118,7 +123,7 @@ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): """ jrdd = sc._jvm.PythonMLLibAPI() \ .uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - uniform = RDD(jrdd, sc, NoOpSerializer()) + uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod @@ -138,7 +143,7 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None): """ jrdd = sc._jvm.PythonMLLibAPI() \ .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) - normal = RDD(jrdd, sc, NoOpSerializer()) + normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @staticmethod @@ -161,7 +166,7 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): """ jrdd = sc._jvm.PythonMLLibAPI() \ .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) - poisson = RDD(jrdd, sc, NoOpSerializer()) + poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) @@ -172,7 +177,8 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 6c385042ffa5f..c398b008fda54 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -26,6 +26,7 @@ class MatrixFactorizationModel(object): + """A matrix factorisation model trained by regularized alternating least-squares. @@ -58,6 +59,7 @@ def predictAll(self, usersProducts): class ALS(object): + @classmethod def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): sc = ratings.context @@ -79,7 +81,8 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 041b119269427..c23ec7e44ca6d 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -27,6 +27,7 @@ class LabeledPoint(object): + """ The features and labels of a data point. @@ -34,6 +35,7 @@ class LabeledPoint(object): @param features: Vector of features for this point (NumPy array, list, pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) """ + def __init__(self, label, features): self.label = label if (type(features) == ndarray or type(features) == SparseVector @@ -42,14 +44,17 @@ def __init__(self, label, features): elif type(features) == list: self.features = array(features) else: - raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + raise TypeError( + "Expected NumPy array, list, SparseVector, or scipy.sparse matrix") def __str__(self): return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" class LinearModel(object): + """A linear model that has a vector of coefficients and an intercept.""" + def __init__(self, weights, intercept): self._coeff = weights self._intercept = intercept @@ -64,6 +69,7 @@ def intercept(self): class LinearRegressionModelBase(LinearModel): + """A linear regression model. >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) @@ -72,6 +78,7 @@ class LinearRegressionModelBase(LinearModel): >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 True """ + def predict(self, x): """Predict the value of the dependent variable given a vector x""" """containing values for the independent variables.""" @@ -80,6 +87,7 @@ def predict(self, x): class LinearRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit. >>> from pyspark.mllib.regression import LabeledPoint @@ -111,6 +119,7 @@ class LinearRegressionModel(LinearRegressionModelBase): class LinearRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None, regParam=1.0, regType=None, intercept=False): @@ -146,6 +155,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, class LassoModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an l_1 penalty term. @@ -178,6 +188,7 @@ class LassoModel(LinearRegressionModelBase): class LassoWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): @@ -189,6 +200,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, class RidgeRegressionModel(LinearRegressionModelBase): + """A linear regression model derived from a least-squares fit with an l_2 penalty term. @@ -221,6 +233,7 @@ class RidgeRegressionModel(LinearRegressionModelBase): class RidgeRegressionWithSGD(object): + @classmethod def train(cls, data, iterations=100, step=1.0, regParam=1.0, miniBatchFraction=1.0, initialWeights=None): @@ -235,7 +248,8 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index 0a08a562d1f1f..dbf2a03bb3a73 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -24,6 +24,7 @@ _serialize_double, _serialize_double_vector, \ _deserialize_double, _deserialize_double_matrix + class Statistics(object): @staticmethod @@ -79,13 +80,15 @@ def corr(x, y=None, method=None): try: Xser = _get_unmangled_double_vector_rdd(x) except TypeError: - raise TypeError("corr called on a single RDD not consisted of Vectors.") + raise TypeError( + "corr called on a single RDD not consisted of Vectors.") resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method) return _deserialize_double_matrix(resultMat) else: xSer = _get_unmangled_rdd(x, _serialize_double) ySer = _get_unmangled_rdd(y, _serialize_double) - result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method) + result = sc._jvm.PythonMLLibAPI().corr( + xSer._jrdd, ySer._jrdd, method) return result @@ -94,7 +97,8 @@ def _test(): from pyspark import SparkContext globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 37ccf1d590743..d5dd07299269b 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -39,6 +39,7 @@ class VectorTests(unittest.TestCase): + def test_serialize(self): sv = SparseVector(4, {1: 1, 3: 2}) dv = array([1., 2., 3., 4.]) @@ -46,9 +47,12 @@ def test_serialize(self): self.assertTrue(sv is _convert_vector(sv)) self.assertTrue(dv is _convert_vector(dv)) self.assertTrue(array_equal(dv, _convert_vector(lst))) - self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv))) - self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) - self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) + self.assertEquals( + sv, _deserialize_double_vector(_serialize_double_vector(sv))) + self.assertTrue( + array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue( + array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -61,9 +65,11 @@ def test_dot(self): self.assertEquals(10.0, _dot(sv, dv)) self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat))) self.assertEquals(30.0, _dot(dv, dv)) - self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) + self.assertTrue( + array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) self.assertEquals(30.0, _dot(lst, dv)) - self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) + self.assertTrue( + array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) def test_squared_distance(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -81,6 +87,7 @@ def test_squared_distance(self): class ListTests(PySparkTestCase): + """ Test MLlib algorithms on plain lists, to make sure they're passed through as NumPy arrays. @@ -94,7 +101,8 @@ def test_clustering(self): [1.1, 0], [1.2, 0], ] - clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") + clusters = KMeans.train( + self.sc.parallelize(data), 2, initializationMode="k-means||") self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) @@ -160,6 +168,7 @@ def test_regression(self): @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): + """ Test both vector operations and MLlib algorithms with SciPy sparse matrices, if SciPy is available. @@ -176,10 +185,14 @@ def test_serialize(self): self.assertEquals(sv, _convert_vector(lil.tocoo())) self.assertEquals(sv, _convert_vector(lil.tocsr())) self.assertEquals(sv, _convert_vector(lil.todok())) - self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) - self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) - self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) - self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) + self.assertEquals( + sv, _deserialize_double_vector(_serialize_double_vector(lil))) + self.assertEquals( + sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) + self.assertEquals( + sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) + self.assertEquals( + sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) def test_dot(self): from scipy.sparse import lil_matrix @@ -223,7 +236,8 @@ def test_clustering(self): self.scipy_matrix(3, {2: 1.0}), self.scipy_matrix(3, {2: 1.1}) ] - clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") + clusters = KMeans.train( + self.sc.parallelize(data), 2, initializationMode="k-means||") self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index d94900cefdb77..5dbab5102e5f8 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -25,6 +25,7 @@ class MLUtils: + """ Helper methods to load, save and pre-process data used in MLlib. """ @@ -128,7 +129,8 @@ def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None): parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l)) if numFeatures <= 0: parsed.cache() - numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 + numFeatures = parsed.map( + lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) @staticmethod @@ -183,7 +185,8 @@ def loadLabeledPoints(sc, path, minPartitions=None): (0.0,[1.01,2.02,3.03]) """ minPartitions = minPartitions or min(sc.defaultParallelism, 2) - jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) + jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints( + sc._jsc, path, minPartitions) serialized = RDD(jSerialized, sc, NoOpSerializer()) return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) @@ -195,7 +198,8 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod( + globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b6038d..374535587cfde 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -233,7 +233,7 @@ def __init__(self, jrdd, ctx, jrdd_deserializer): def _toPickleSerialization(self): if (self._jrdd_deserializer == PickleSerializer() or - self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): + self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): return self else: return self._reserialize(BatchedSerializer(PickleSerializer(), 10)) @@ -1078,7 +1078,8 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl jconf = self.ctx._dictToJavaMap(conf) pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) - self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, + self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile( + pickledRDD._jrdd, batched, path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): @@ -1124,7 +1125,8 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No jconf = self.ctx._dictToJavaMap(conf) pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) - self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, + self.ctx._jvm.PythonRDD.saveAsHadoopFile( + pickledRDD._jrdd, batched, path, outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf, compressionCodecClass) @@ -1348,7 +1350,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): outputSerializer = self.ctx._unbatched_serializer limit = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) / 2) + "spark.python.worker.memory", "512m")) / 2) def add_shuffle_key(split, iterator): @@ -1430,12 +1432,12 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true') memory = _parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) + "spark.python.worker.memory", "512m")) agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): merger = ExternalMerger(agg, memory * 0.9, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeValues(iterator) return merger.iteritems() @@ -1444,7 +1446,7 @@ def combineLocally(iterator): def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeCombiners(iterator) return merger.iteritems() @@ -1588,7 +1590,7 @@ def sampleByKey(self, withReplacement, fractions, seed=None): """ for fraction in fractions.values(): assert fraction >= 0.0, "Negative fraction value: %s" % fraction - return self.mapPartitionsWithIndex( \ + return self.mapPartitionsWithIndex( RDDStratifiedSampler(withReplacement, fractions, seed).func, True) def subtractByKey(self, other, numPartitions=None): diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 2df000fdb08ca..35940191c70a3 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -20,6 +20,7 @@ class RDDSamplerBase(object): + def __init__(self, withReplacement, seed=None): try: import numpy @@ -30,7 +31,8 @@ def __init__(self, withReplacement, seed=None): "Falling back to default random generator for sampling.") self._use_numpy = False - self._seed = seed if seed is not None else random.randint(0, sys.maxint) + self._seed = seed if seed is not None else random.randint( + 0, sys.maxint) self._withReplacement = withReplacement self._random = None self._split = None @@ -85,7 +87,8 @@ def getPoissonSample(self, split, mean): def shuffle(self, vals): if self._random is None: - self.initRandomGenerator(0) # this should only ever called on the master so + # this should only ever called on the master so + self.initRandomGenerator(0) # the split does not matter if self._use_numpy: @@ -95,6 +98,7 @@ def shuffle(self, vals): class RDDSampler(RDDSamplerBase): + def __init__(self, withReplacement, fraction, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction @@ -113,7 +117,9 @@ def func(self, split, iterator): if self.getUniformSample(split) <= self._fraction: yield obj + class RDDStratifiedSampler(RDDSamplerBase): + def __init__(self, withReplacement, fractions, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fractions = fractions diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py index df34740fc8176..ef04c82866e6c 100644 --- a/python/pyspark/resultiterable.py +++ b/python/pyspark/resultiterable.py @@ -21,9 +21,11 @@ class ResultIterable(collections.Iterable): + """ A special result iterable. This is used because the standard iterator can not be pickled """ + def __init__(self, data): self.data = data self.index = 0 diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 03b31ae9624c2..f6b8e2f7d5b82 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -108,6 +108,7 @@ def __ne__(self, other): class FramedSerializer(Serializer): + """ Serializer that writes objects as a stream of (length, data) pairs, where C{length} is a 32-bit integer and data is C{length} bytes. @@ -159,6 +160,7 @@ def loads(self, obj): class BatchedSerializer(Serializer): + """ Serializes a stream of objects in batches by calling its wrapped Serializer with streams of objects. @@ -204,6 +206,7 @@ def __str__(self): class CartesianDeserializer(FramedSerializer): + """ Deserializes the JavaRDD cartesian() of two PythonRDDs. """ @@ -237,6 +240,7 @@ def __str__(self): class PairDeserializer(CartesianDeserializer): + """ Deserializes the JavaRDD zip() of two PythonRDDs. """ @@ -268,6 +272,7 @@ def dumps(self, obj): class PickleSerializer(FramedSerializer): + """ Serializes objects using Python's cPickle serializer: @@ -290,6 +295,7 @@ def dumps(self, obj): class MarshalSerializer(FramedSerializer): + """ Serializes objects using Python's Marshal serializer: @@ -303,9 +309,11 @@ class MarshalSerializer(FramedSerializer): class AutoSerializer(FramedSerializer): + """ Choose marshal or cPickle as serialization protocol autumatically """ + def __init__(self): FramedSerializer.__init__(self) self._type = None @@ -330,6 +338,7 @@ def loads(self, obj): class UTF8Deserializer(Serializer): + """ Deserializes streams written by String.getBytes. """ diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index e1e7cd954189f..be22020fb827b 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -39,7 +39,8 @@ if os.environ.get("ADD_FILES") is not None else None) if os.environ.get("SPARK_EXECUTOR_URI"): - SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) + SparkContext.setSystemProperty( + "spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) sc = SparkContext(appName="PySparkShell", pyFiles=add_files) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index e3923d1c36c57..2c68cd4921deb 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -45,7 +45,7 @@ def get_used_memory(): return int(line.split()[1]) >> 10 else: warnings.warn("Please install psutil to have better " - "support with spilling") + "support with spilling") if platform.system() == "Darwin": import resource rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss @@ -141,7 +141,7 @@ class ExternalMerger(Merger): This class works as follows: - - It repeatedly combine the items and save them in one dict in + - It repeatedly combine the items and save them in one dict in memory. - When the used memory goes above memory limit, it will split @@ -190,12 +190,12 @@ class ExternalMerger(Merger): MAX_TOTAL_PARTITIONS = 4096 def __init__(self, aggregator, memory_limit=512, serializer=None, - localdirs=None, scale=1, partitions=59, batch=1000): + localdirs=None, scale=1, partitions=59, batch=1000): Merger.__init__(self, aggregator) self.memory_limit = memory_limit # default serializer is only used for tests self.serializer = serializer or \ - BatchedSerializer(PickleSerializer(), 1024) + BatchedSerializer(PickleSerializer(), 1024) self.localdirs = localdirs or self._get_dirs() # number of partitions when spill data into disks self.partitions = partitions @@ -341,7 +341,7 @@ def _spill(self): self.pdata[i].clear() self.spills += 1 - gc.collect() # release the memory as much as possible + gc.collect() # release the memory as much as possible def iteritems(self): """ Return all merged items as iterator """ @@ -370,8 +370,8 @@ def _external_items(self): if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS and j < self.spills - 1 and get_used_memory() > hard_limit): - self.data.clear() # will read from disk again - gc.collect() # release the memory as much as possible + self.data.clear() # will read from disk again + gc.collect() # release the memory as much as possible for v in self._recursive_merged_items(i): yield v return @@ -409,9 +409,9 @@ def _recursive_merged_items(self, start): for i in range(start, self.partitions): subdirs = [os.path.join(d, "parts", str(i)) - for d in self.localdirs] + for d in self.localdirs] m = ExternalMerger(self.agg, self.memory_limit, self.serializer, - subdirs, self.scale * self.partitions) + subdirs, self.scale * self.partitions) m.pdata = [{} for _ in range(self.partitions)] limit = self._next_limit() @@ -419,7 +419,7 @@ def _recursive_merged_items(self, start): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) m._partitioned_mergeCombiners( - self.serializer.load_stream(open(p))) + self.serializer.load_stream(open(p))) if get_used_memory() > limit: m._spill() diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index f840475ffaf70..4a4b5c8a476fb 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -41,6 +41,7 @@ class DataType(object): + """Spark SQL DataType""" def __repr__(self): @@ -58,6 +59,7 @@ def __ne__(self, other): class PrimitiveTypeSingleton(type): + """Metaclass for PrimitiveType""" _instances = {} @@ -69,6 +71,7 @@ def __call__(cls): class PrimitiveType(DataType): + """Spark SQL PrimitiveType""" __metaclass__ = PrimitiveTypeSingleton @@ -79,6 +82,7 @@ def __eq__(self, other): class StringType(PrimitiveType): + """Spark SQL StringType The data type representing string values. @@ -86,6 +90,7 @@ class StringType(PrimitiveType): class BinaryType(PrimitiveType): + """Spark SQL BinaryType The data type representing bytearray values. @@ -93,6 +98,7 @@ class BinaryType(PrimitiveType): class BooleanType(PrimitiveType): + """Spark SQL BooleanType The data type representing bool values. @@ -100,6 +106,7 @@ class BooleanType(PrimitiveType): class TimestampType(PrimitiveType): + """Spark SQL TimestampType The data type representing datetime.datetime values. @@ -107,6 +114,7 @@ class TimestampType(PrimitiveType): class DecimalType(PrimitiveType): + """Spark SQL DecimalType The data type representing decimal.Decimal values. @@ -114,6 +122,7 @@ class DecimalType(PrimitiveType): class DoubleType(PrimitiveType): + """Spark SQL DoubleType The data type representing float values. @@ -121,6 +130,7 @@ class DoubleType(PrimitiveType): class FloatType(PrimitiveType): + """Spark SQL FloatType The data type representing single precision floating-point values. @@ -128,6 +138,7 @@ class FloatType(PrimitiveType): class ByteType(PrimitiveType): + """Spark SQL ByteType The data type representing int values with 1 singed byte. @@ -135,6 +146,7 @@ class ByteType(PrimitiveType): class IntegerType(PrimitiveType): + """Spark SQL IntegerType The data type representing int values. @@ -142,6 +154,7 @@ class IntegerType(PrimitiveType): class LongType(PrimitiveType): + """Spark SQL LongType The data type representing long values. If the any value is @@ -151,6 +164,7 @@ class LongType(PrimitiveType): class ShortType(PrimitiveType): + """Spark SQL ShortType The data type representing int values with 2 signed bytes. @@ -158,6 +172,7 @@ class ShortType(PrimitiveType): class ArrayType(DataType): + """Spark SQL ArrayType The data type representing list values. An ArrayType object @@ -183,10 +198,11 @@ def __init__(self, elementType, containsNull=False): def __str__(self): return "ArrayType(%s,%s)" % (self.elementType, - str(self.containsNull).lower()) + str(self.containsNull).lower()) class MapType(DataType): + """Spark SQL MapType The data type representing dict values. A MapType object comprises @@ -222,10 +238,11 @@ def __init__(self, keyType, valueType, valueContainsNull=True): def __repr__(self): return "MapType(%s,%s,%s)" % (self.keyType, self.valueType, - str(self.valueContainsNull).lower()) + str(self.valueContainsNull).lower()) class StructField(DataType): + """Spark SQL StructField Represents a field in a StructType. @@ -259,10 +276,11 @@ def __init__(self, name, dataType, nullable): def __repr__(self): return "StructField(%s,%s,%s)" % (self.name, self.dataType, - str(self.nullable).lower()) + str(self.nullable).lower()) class StructType(DataType): + """Spark SQL StructType The data type representing rows. @@ -287,7 +305,7 @@ def __init__(self, fields): def __repr__(self): return ("StructType(List(%s))" % - ",".join(str(field) for field in self.fields)) + ",".join(str(field) for field in self.fields)) def _parse_datatype_list(datatype_list_string): @@ -315,7 +333,7 @@ def _parse_datatype_list(datatype_list_string): _all_primitive_types = dict((k, v) for k, v in globals().iteritems() - if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType) + if type(v) is PrimitiveTypeSingleton and v.__base__ == PrimitiveType) def _parse_datatype_string(datatype_string): @@ -455,16 +473,16 @@ def _infer_schema(row): items = sorted(row.items()) elif isinstance(row, tuple): - if hasattr(row, "_fields"): # namedtuple + if hasattr(row, "_fields"): # namedtuple items = zip(row._fields, tuple(row)) - elif hasattr(row, "__FIELDS__"): # Row + elif hasattr(row, "__FIELDS__"): # Row items = zip(row.__FIELDS__, tuple(row)) elif all(isinstance(x, tuple) and len(x) == 2 for x in row): items = row else: raise ValueError("Can't infer schema from tuple") - elif hasattr(row, "__dict__"): # object + elif hasattr(row, "__dict__"): # object items = sorted(row.__dict__.items()) else: @@ -495,7 +513,7 @@ def _create_converter(obj, dataType): conv = lambda o: tuple(o.get(n) for n in names) elif isinstance(obj, tuple): - if hasattr(obj, "_fields"): # namedtuple + if hasattr(obj, "_fields"): # namedtuple conv = tuple elif hasattr(obj, "__FIELDS__"): conv = tuple @@ -504,7 +522,7 @@ def _create_converter(obj, dataType): else: raise ValueError("unexpected tuple") - elif hasattr(obj, "__dict__"): # object + elif hasattr(obj, "__dict__"): # object conv = lambda o: [o.__dict__.get(n, None) for n in names] nested = any(_has_struct(f.dataType) for f in dataType.fields) @@ -656,7 +674,7 @@ def _infer_schema_type(obj, dataType): assert len(fs) == len(obj), \ "Obj(%s) have different length with fields(%s)" % (obj, fs) fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True) - for o, f in zip(obj, fs)] + for o, f in zip(obj, fs)] return StructType(fields) else: @@ -679,6 +697,7 @@ def _infer_schema_type(obj, dataType): StructType: (tuple, list), } + def _verify_type(obj, dataType): """ Verify the type of obj against dataType, raise an exception if @@ -724,7 +743,7 @@ def _verify_type(obj, dataType): elif isinstance(dataType, StructType): if len(obj) != len(dataType.fields): raise ValueError("Length of object (%d) does not match with" - "length of fields (%d)" % (len(obj), len(dataType.fields))) + "length of fields (%d)" % (len(obj), len(dataType.fields))) for v, f in zip(obj, dataType.fields): _verify_type(v, f.dataType) @@ -857,6 +876,7 @@ def __reduce__(self): raise Exception("unexpected data type: %s" % dataType) class Row(tuple): + """ Row in SchemaRDD """ __DATATYPE__ = dataType __FIELDS__ = tuple(f.name for f in dataType.fields) @@ -868,7 +888,7 @@ class Row(tuple): def __repr__(self): # call collect __repr__ for nested objects return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n)) - for n in self.__FIELDS__)) + for n in self.__FIELDS__)) def __reduce__(self): return (_restore_object, (self.__DATATYPE__, tuple(self))) @@ -877,6 +897,7 @@ def __reduce__(self): class SQLContext: + """Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as @@ -975,7 +996,7 @@ def inferSchema(self, rdd): first = rdd.first() if not first: raise ValueError("The first row in RDD is empty, " - "can not infer schema") + "can not infer schema") if type(first) is dict: warnings.warn("Using RDD of dict to inferSchema is deprecated") @@ -1232,6 +1253,7 @@ def uncacheTable(self, tableName): class HiveContext(SQLContext): + """A variant of Spark SQL that integrates with data stored in Hive. Configuration for Hive is read from hive-site.xml on the classpath. @@ -1268,6 +1290,7 @@ def hql(self, hqlQuery): class LocalHiveContext(HiveContext): + """Starts up an instance of hive where metadata is stored locally. An in-process metadata data is created with data stored in ./metadata. @@ -1298,7 +1321,7 @@ class LocalHiveContext(HiveContext): def __init__(self, sparkContext, sqlContext=None): HiveContext.__init__(self, sparkContext, sqlContext) warnings.warn("LocalHiveContext is deprecated. " - "Use HiveContext instead.", DeprecationWarning) + "Use HiveContext instead.", DeprecationWarning) def _get_hive_ctx(self): return self._jvm.LocalHiveContext(self._jsc.sc()) @@ -1317,6 +1340,7 @@ def _create_row(fields, values): class Row(tuple): + """ A row in L{SchemaRDD}. The fields in it can be accessed like attributes. @@ -1358,7 +1382,6 @@ def __new__(self, *args, **kwargs): else: raise ValueError("No args or kwargs") - # let obect acs like class def __call__(self, *args): """create new Row object""" @@ -1384,12 +1407,13 @@ def __reduce__(self): def __repr__(self): if hasattr(self, "__FIELDS__"): return "Row(%s)" % ", ".join("%s=%r" % (k, v) - for k, v in zip(self.__FIELDS__, self)) + for k, v in zip(self.__FIELDS__, self)) else: return "" % ", ".join(self) class SchemaRDD(RDD): + """An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can @@ -1596,7 +1620,7 @@ def subtract(self, other, numPartitions=None): rdd = self._jschema_rdd.subtract(other._jschema_rdd) else: rdd = self._jschema_rdd.subtract(other._jschema_rdd, - numPartitions) + numPartitions) return SchemaRDD(rdd, self.sql_ctx) else: raise ValueError("Can only subtract another SchemaRDD") @@ -1623,9 +1647,9 @@ def _test(): jsonStrings = [ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' - '"field6":[{"field7": "row2"}]}', + '"field6":[{"field7": "row2"}]}', '{"field1" : null, "field2": "row3", ' - '"field3":{"field4":33, "field5": []}}' + '"field3":{"field4":33, "field5": []}}' ] globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 1e597d64e03fe..8b59d62bb40c3 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -51,13 +51,15 @@ def merge(self, value): return self - # Merge another StatCounter into this one, adding up the internal statistics. + # Merge another StatCounter into this one, adding up the internal + # statistics. def mergeStats(self, other): if not isinstance(other, StatCounter): raise Exception("Can only merge Statcounters!") if other is self: # reference equality holds - self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order + # Avoid overwriting fields in a weird order + self.merge(copy.deepcopy(other)) else: if self.n == 0: self.mu = other.mu @@ -73,12 +75,14 @@ def mergeStats(self, other): elif self.n * 10 < other.n: self.mu = other.mu - (delta * self.n) / (self.n + other.n) else: - self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) + self.mu = ( + self.mu * self.n + other.mu * other.n) / (self.n + other.n) self.maxValue = maximum(self.maxValue, other.maxValue) self.minValue = minimum(self.minValue, other.minValue) - self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) + self.m2 += other.m2 + \ + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n return self diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py index 5d77a131f2856..2aa0fb9d2c1ed 100644 --- a/python/pyspark/storagelevel.py +++ b/python/pyspark/storagelevel.py @@ -19,6 +19,7 @@ class StorageLevel: + """ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 16fb5a9256220..e5e455ebbe785 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -62,53 +62,53 @@ def setUp(self): self.N = 1 << 16 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) - self.agg = Aggregator(lambda x: [x], - lambda x, y: x.append(y) or x, - lambda x, y: x.extend(y) or x) + self.agg = Aggregator(lambda x: [x], + lambda x, y: x.append(y) or x, + lambda x, y: x.extend(y) or x) def test_in_memory(self): m = InMemoryMerger(self.agg) m.mergeValues(self.data) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = InMemoryMerger(self.agg) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) def test_small_dataset(self): m = ExternalMerger(self.agg, 1000) m.mergeValues(self.data) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = ExternalMerger(self.agg, 1000) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) def test_medium_dataset(self): m = ExternalMerger(self.agg, 10) m.mergeValues(self.data) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N))) + sum(xrange(self.N))) m = ExternalMerger(self.agg, 10) m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), - sum(xrange(self.N)) * 3) + sum(xrange(self.N)) * 3) def test_huge_dataset(self): m = ExternalMerger(self.agg, 10) m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)), - self.N * 10) + self.N * 10) m._cleanup() @@ -177,6 +177,7 @@ def test_add_py_file(self): log4j = self.sc._jvm.org.apache.log4j old_level = log4j.LogManager.getRootLogger().getLevel() log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) + def func(x): from userlibrary import UserClass return UserClass().hello() @@ -215,10 +216,12 @@ def test_add_egg_file_locally(self): def func(): from userlib import UserClass self.assertRaises(ImportError, func) - path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") + path = os.path.join( + SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") self.sc.addPyFile(path) from userlib import UserClass - self.assertEqual("Hello World from inside a package!", UserClass().hello()) + self.assertEqual( + "Hello World from inside a package!", UserClass().hello()) class TestRDDFunctions(PySparkTestCase): @@ -226,7 +229,8 @@ class TestRDDFunctions(PySparkTestCase): def test_failed_sparkcontext_creation(self): # Regression test for SPARK-1550 self.sc.stop() - self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) + self.assertRaises( + Exception, lambda: SparkContext("an-invalid-master-name")) self.sc = SparkContext("local") def test_save_as_textfile_with_unicode(self): @@ -315,7 +319,8 @@ def setUp(self): PySparkTestCase.setUp(self) self.tempdir = tempfile.NamedTemporaryFile(delete=False) os.unlink(self.tempdir.name) - self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData( + self.tempdir.name, self.sc._jsc) def tearDown(self): PySparkTestCase.tearDown(self) @@ -326,18 +331,20 @@ def test_sequencefiles(self): ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), + (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", "org.apache.hadoop.io.DoubleWritable", "org.apache.hadoop.io.Text").collect()) - ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), + (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.assertEqual(doubles, ed) bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.BytesWritable").collect()) + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BytesWritable").collect()) ebs = [(1, bytearray('aa', 'utf-8')), (1, bytearray('aa', 'utf-8')), (2, bytearray('aa', 'utf-8')), @@ -360,7 +367,8 @@ def test_sequencefiles(self): bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.BooleanWritable").collect()) - eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + eb = [(1, False), (1, True), (2, False), + (2, False), (2, True), (3, True)] self.assertEqual(bools, eb) nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", @@ -409,9 +417,9 @@ def test_sequencefiles(self): self.assertEqual(clazz[0], ec) unbatched_clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", - "org.apache.hadoop.io.Text", - "org.apache.spark.api.python.TestWritable", - batchSize=1).collect()) + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable", + batchSize=1).collect()) self.assertEqual(unbatched_clazz[0], ec) def test_oldhadoop(self): @@ -420,11 +428,12 @@ def test_oldhadoop(self): "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), + (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - oldconf = {"mapred.input.dir" : hellopath} + oldconf = {"mapred.input.dir": hellopath} hello = self.sc.hadoopRDD("org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", @@ -439,11 +448,12 @@ def test_newhadoop(self): "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), + (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") - newconf = {"mapred.input.dir" : hellopath} + newconf = {"mapred.input.dir": hellopath} hello = self.sc.newAPIHadoopRDD("org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", @@ -498,6 +508,7 @@ def test_converters(self): (u'\x03', [2.0])] self.assertEqual(maps, em) + class TestOutputFormat(PySparkTestCase): def setUp(self): @@ -511,17 +522,21 @@ def tearDown(self): def test_sequencefiles(self): basepath = self.tempdir.name - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), + (2, u'bb'), (2, u'bb'), (3, u'cc')] self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/") ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect()) self.assertEqual(ints, ei) - ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), + (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/") - doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect()) + doubles = sorted( + self.sc.sequenceFile(basepath + "/sfdouble/").collect()) self.assertEqual(doubles, ed) - ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))] + ebs = [(1, bytearray(b'\x00\x07spam\x08')), + (2, bytearray(b'\x00\x07spam\x08'))] self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/") bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect()) self.assertEqual(bytes, ebs) @@ -533,7 +548,8 @@ def test_sequencefiles(self): text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect()) self.assertEqual(text, et) - eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + eb = [(1, False), (1, True), (2, False), + (2, False), (2, True), (3, True)] self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/") bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect()) self.assertEqual(bools, eb) @@ -555,8 +571,8 @@ def test_sequencefiles(self): def test_oldhadoop(self): basepath = self.tempdir.name dict_data = [(1, {}), - (1, {"row1" : 1.0}), - (2, {"row2" : 2.0})] + (1, {"row1": 1.0}), + (2, {"row2": 2.0})] self.sc.parallelize(dict_data).saveAsHadoopFile( basepath + "/oldhadoop/", "org.apache.hadoop.mapred.SequenceFileOutputFormat", @@ -570,12 +586,12 @@ def test_oldhadoop(self): self.assertEqual(result, dict_data) conf = { - "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable", - "mapred.output.dir" : basepath + "/olddataset/"} + "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.MapWritable", + "mapred.output.dir": basepath + "/olddataset/"} self.sc.parallelize(dict_data).saveAsHadoopDataset(conf) - input_conf = {"mapred.input.dir" : basepath + "/olddataset/"} + input_conf = {"mapred.input.dir": basepath + "/olddataset/"} old_dataset = sorted(self.sc.hadoopRDD( "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -603,14 +619,15 @@ def test_newhadoop(self): valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect()) self.assertEqual(result, array_data) - conf = {"mapreduce.outputformat.class" : - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable", - "mapred.output.dir" : basepath + "/newdataset/"} - self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf, + conf = {"mapreduce.outputformat.class": + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.spark.api.python.DoubleArrayWritable", + "mapred.output.dir": basepath + "/newdataset/"} + self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset( + conf, valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter") - input_conf = {"mapred.input.dir" : basepath + "/newdataset/"} + input_conf = {"mapred.input.dir": basepath + "/newdataset/"} new_dataset = sorted(self.sc.newAPIHadoopRDD( "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -621,7 +638,7 @@ def test_newhadoop(self): def test_newolderror(self): basepath = self.tempdir.name - rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( basepath + "/newolderror/saveAsHadoopFile/", "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")) @@ -631,7 +648,7 @@ def test_newolderror(self): def test_bad_inputs(self): basepath = self.tempdir.name - rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) + rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) self.assertRaises(Exception, lambda: rdd.saveAsHadoopFile( basepath + "/badinputs/saveAsHadoopFile/", "org.apache.hadoop.mapred.NotValidOutputFormat")) @@ -650,7 +667,8 @@ def test_converters(self): "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", keyConverter="org.apache.spark.api.python.TestOutputKeyConverter", valueConverter="org.apache.spark.api.python.TestOutputValueConverter") - converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect()) + converted = sorted( + self.sc.sequenceFile(basepath + "/converters/").collect()) expected = [(u'1', 3.0), (u'2', 1.0), (u'3', 2.0)] @@ -663,62 +681,72 @@ def test_reserialization(self): data = zip(x, y) rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y)) rdd.saveAsSequenceFile(basepath + "/reserialize/sequence") - result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) + result1 = sorted( + self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) self.assertEqual(result1, data) rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop", "org.apache.hadoop.mapred.SequenceFileOutputFormat") - result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) + result2 = sorted( + self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) self.assertEqual(result2, data) - rdd.saveAsNewAPIHadoopFile(basepath + "/reserialize/newhadoop", - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") - result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) + rdd.saveAsNewAPIHadoopFile( + basepath + "/reserialize/newhadoop", + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") + result3 = sorted( + self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) self.assertEqual(result3, data) conf4 = { - "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.dir" : basepath + "/reserialize/dataset"} + "mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.dir": basepath + "/reserialize/dataset"} rdd.saveAsHadoopDataset(conf4) - result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) + result4 = sorted( + self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) self.assertEqual(result4, data) - conf5 = {"mapreduce.outputformat.class" : - "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", - "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.value.class" : "org.apache.hadoop.io.IntWritable", - "mapred.output.dir" : basepath + "/reserialize/newdataset"} + conf5 = {"mapreduce.outputformat.class": + "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", + "mapred.output.key.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", + "mapred.output.dir": basepath + "/reserialize/newdataset"} rdd.saveAsNewAPIHadoopDataset(conf5) - result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) + result5 = sorted( + self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) self.assertEqual(result5, data) def test_unbatched_save_and_read(self): basepath = self.tempdir.name - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), + (2, u'bb'), (2, u'bb'), (3, u'cc')] self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile( basepath + "/unbatched/") unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/", - batchSize=1).collect()) + batchSize=1).collect()) self.assertEqual(unbatched_sequence, ei) - unbatched_hadoopFile = sorted(self.sc.hadoopFile(basepath + "/unbatched/", - "org.apache.hadoop.mapred.SequenceFileInputFormat", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.Text", - batchSize=1).collect()) + unbatched_hadoopFile = sorted( + self.sc.hadoopFile(basepath + "/unbatched/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) self.assertEqual(unbatched_hadoopFile, ei) - unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(basepath + "/unbatched/", - "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.Text", - batchSize=1).collect()) + unbatched_newAPIHadoopFile = sorted( + self.sc.newAPIHadoopFile( + basepath + "/unbatched/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) self.assertEqual(unbatched_newAPIHadoopFile, ei) - oldconf = {"mapred.input.dir" : basepath + "/unbatched/"} + oldconf = {"mapred.input.dir": basepath + "/unbatched/"} unbatched_hadoopRDD = sorted(self.sc.hadoopRDD( "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -727,7 +755,7 @@ def test_unbatched_save_and_read(self): batchSize=1).collect()) self.assertEqual(unbatched_hadoopRDD, ei) - newconf = {"mapred.input.dir" : basepath + "/unbatched/"} + newconf = {"mapred.input.dir": basepath + "/unbatched/"} unbatched_newAPIHadoopRDD = sorted(self.sc.newAPIHadoopRDD( "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", @@ -744,7 +772,9 @@ def test_malformed_RDD(self): self.assertRaises(Exception, lambda: rdd.saveAsSequenceFile( basepath + "/malformed/sequence")) + class TestDaemon(unittest.TestCase): + def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM sock = socket(AF_INET, SOCK_STREAM) @@ -791,9 +821,11 @@ def test_termination_sigterm(self): class TestSparkSubmit(unittest.TestCase): + def setUp(self): self.programDir = tempfile.mkdtemp() - self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit") + self.sparkSubmit = os.path.join( + os.environ.get("SPARK_HOME"), "bin", "spark-submit") def tearDown(self): shutil.rmtree(self.programDir) @@ -830,7 +862,8 @@ def test_single_script(self): |sc = SparkContext() |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() """) - proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) + proc = subprocess.Popen( + [self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 4, 6]", out) @@ -846,7 +879,8 @@ def test_script_with_local_functions(self): |sc = SparkContext() |print sc.parallelize([1, 2, 3]).map(foo).collect() """) - proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) + proc = subprocess.Popen( + [self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[3, 6, 9]", out) @@ -884,7 +918,8 @@ def test_module_dependency_on_cluster(self): | return x + 1 """) proc = subprocess.Popen( - [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script], + [self.sparkSubmit, "--py-files", zip, "--master", + "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) @@ -911,6 +946,7 @@ def test_single_script_on_cluster(self): @unittest.skipIf(not _have_scipy, "SciPy not installed") class SciPyTests(PySparkTestCase): + """General PySpark tests that depend on scipy """ def test_serialize(self): @@ -923,15 +959,17 @@ def test_serialize(self): @unittest.skipIf(not _have_numpy, "NumPy not installed") class NumPyTests(PySparkTestCase): + """General PySpark tests that depend on numpy """ def test_statcounter_array(self): - x = self.sc.parallelize([np.array([1.0,1.0]), np.array([2.0,2.0]), np.array([3.0,3.0])]) + x = self.sc.parallelize( + [np.array([1.0, 1.0]), np.array([2.0, 2.0]), np.array([3.0, 3.0])]) s = x.stats() - self.assertSequenceEqual([2.0,2.0], s.mean().tolist()) - self.assertSequenceEqual([1.0,1.0], s.min().tolist()) - self.assertSequenceEqual([3.0,3.0], s.max().tolist()) - self.assertSequenceEqual([1.0,1.0], s.sampleStdev().tolist()) + self.assertSequenceEqual([2.0, 2.0], s.mean().tolist()) + self.assertSequenceEqual([1.0, 1.0], s.min().tolist()) + self.assertSequenceEqual([3.0, 3.0], s.max().tolist()) + self.assertSequenceEqual([1.0, 1.0], s.sampleStdev().tolist()) if __name__ == "__main__": diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 2770f63059853..14a63256b551f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -56,8 +56,10 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here + # fetch names of includes (*.zip and *.egg files) and construct + # PYTHONPATH + # *.py files that were added will be copied here + sys.path.append(spark_files_dir) num_python_includes = read_int(infile) for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) diff --git a/python/test_support/userlibrary.py b/python/test_support/userlibrary.py index 8e4a6292bc17c..73fd26e71f10d 100755 --- a/python/test_support/userlibrary.py +++ b/python/test_support/userlibrary.py @@ -19,6 +19,8 @@ Used to test shipping of code depenencies with SparkContext.addPyFile(). """ + class UserClass(object): + def hello(self): return "Hello World!"