Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 29, 2014
1 parent 847f9b9 commit b983f0f
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 53 deletions.
6 changes: 1 addition & 5 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ export PYSPARK_SUBMIT_ARGS
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
fi
exec "$PYSPARK_PYTHON" $1
exit
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pyprint()
counts.pprint()

ssc.start()
ssc.awaitTermination()
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pyprint()
counts.pprint()

ssc.start()
ssc.awaitTermination()
5 changes: 5 additions & 0 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,8 @@ def _start_update_server():
thread.daemon = True
thread.start()
return server


if __name__ == "__main__":
import doctest
doctest.testmod()
5 changes: 5 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,8 @@ def write_int(value, stream):
def write_with_length(obj, stream):
write_int(len(obj), stream)
stream.write(obj)


if __name__ == "__main__":
import doctest
doctest.testmod()
2 changes: 1 addition & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def fullOuterJoin(self, other, numPartitions=None):
return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other)

def _jtime(self, timestamp):
""" convert datetime or unix_timestamp into Time
""" Convert datetime or unix_timestamp into Time
"""
if isinstance(timestamp, datetime):
timestamp = time.mktime(timestamp.timetuple())
Expand Down
6 changes: 0 additions & 6 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

from pyspark.context import SparkContext
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import Seconds


class PySparkStreamingTestCase(unittest.TestCase):
Expand All @@ -46,11 +45,6 @@ def setUp(self):
def tearDown(self):
self.ssc.stop()

@classmethod
def tearDownClass(cls):
# Make sure tp shutdown the callback server
SparkContext._gateway._shutdown_callback_server()

def _test_func(self, input, func, expected, sort=False):
"""
@param input: dataset for the test. This should be list of lists.
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ def rddToFileName(prefix, suffix, time):
return prefix + "-" + str(time)
else:
return prefix + "-" + str(time) + "." + suffix


if __name__ == "__main__":
import doctest
doctest.testmod()
79 changes: 41 additions & 38 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,39 @@ function run_test() {
fi
}

function run_core_tests() {
run_test "pyspark/conf.py"
run_test "pyspark/context.py"
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
run_test "pyspark/shuffle.py"
run_test "pyspark/rdd.py"
run_test "pyspark/tests.py"
}

function run_sql_tests() {
run_test "pyspark/sql.py"
}

function run_mllib_tests() {
run_test "pyspark/mllib/util.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/stat.py"
run_test "pyspark/mllib/tree.py"
run_test "pyspark/mllib/tests.py"
}

function run_streaming_tests() {
run_test "pyspark/streaming/util.py"
run_test "pyspark/streaming/tests.py"
}

echo "Running PySpark tests. Output is in python/unit-tests.log."

export PYSPARK_PYTHON="python"
Expand All @@ -60,51 +93,21 @@ fi
echo "Testing with Python version:"
$PYSPARK_PYTHON --version

run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/stat.py"
run_test "pyspark/mllib/tests.py"
run_test "pyspark/mllib/tree.py"
run_test "pyspark/mllib/util.py"
run_test "pyspark/streaming/tests.py"
#run_core_tests
#run_sql_tests
#run_mllib_tests
run_streaming_tests

# Try to test with PyPy
if [ $(which pypy) ]; then
export PYSPARK_PYTHON="pypy"
echo "Testing with PyPy version:"
$PYSPARK_PYTHON --version

run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
run_test "pyspark/sql.py"
# These tests are included in the module-level docs, and so must
# be handled on a higher level rather than within the python file.
export PYSPARK_DOC_TEST=1
run_test "pyspark/broadcast.py"
run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
run_test "pyspark/streaming/tests.py"
run_core_tests
run_sql_tests
run_mllib_tests
run_streaming_tests
fi

if [[ $FAILED == 0 ]]; then
Expand Down

0 comments on commit b983f0f

Please sign in to comment.