Skip to content

Commit

Permalink
[PySpark] Fix tests with Python 2.6 in 1.0 branch
Browse files Browse the repository at this point in the history
[SPARK-2951] [PySpark] support unpickle array.array for Python 2.6

Pyrolite can not unpickle array.array which pickled by Python 2.6, this patch fix it by extend Pyrolite.

There is a bug in Pyrolite when unpickle array of float/double, this patch workaround it by reverse the endianness for float/double. This workaround should be removed after Pyrolite have a new release to fix this issue.

[PySpark] [SPARK-2954] [SPARK-2948] [SPARK-2910] [SPARK-2101] Python 2.6 Fixes

    - Modify python/run-tests to test with Python 2.6
    - Use unittest2 when running on Python 2.6.
    - Fix issue with namedtuple.
    - Skip TestOutputFormat.test_newhadoop on Python 2.6 until SPARK-2951 is fixed.
    - Fix MLlib _deserialize_double on Python 2.6.

[SPARK-3867][PySpark] ./python/run-tests failed when it run with Python 2.6 and unittest2 is not installed
    ./python/run-tests search a Python 2.6 executable on PATH and use it if available.
    When using Python 2.6, it is going to import unittest2 module which is not a standard library in Python 2.6, so it fails with Import

Author: cocoatomo <cocoatomo77@gmail.com>
Author: Josh Rosen <joshrosen@apache.org>
Author: Davies Liu <davies.liu@gmail.com>
Author: Davies Liu <davies@databricks.com>

Closes #3668 from davies/port_2365 and squashes the following commits:

b32583d [Davies Liu] rollback _common.py
bda1c72 [cocoatomo] [SPARK-3867][PySpark] ./python/run-tests failed when it run with Python 2.6 and unittest2 is not installed
14ad3d9 [Josh Rosen] [PySpark] [SPARK-2954] [SPARK-2948] [SPARK-2910] [SPARK-2101] Python 2.6 Fixes
7c55cff [Davies Liu] [SPARK-2951] [PySpark] support unpickle array.array for Python 2.6
  • Loading branch information
cocoatomo authored and Andrew Or committed Dec 13, 2014
1 parent 3425ba8 commit 9c21316
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 4 deletions.
83 changes: 83 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import java.nio.ByteOrder

import scala.collection.JavaConversions._
import scala.util.Failure
import scala.util.Try

import net.razorvine.pickle.{Unpickler, Pickler}

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
// static struct arraydescr descriptors[] = {
// {'c', sizeof(char), c_getitem, c_setitem},
// {'b', sizeof(char), b_getitem, b_setitem},
// {'B', sizeof(char), BB_getitem, BB_setitem},
// #ifdef Py_USING_UNICODE
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
// #endif
// {'h', sizeof(short), h_getitem, h_setitem},
// {'H', sizeof(short), HH_getitem, HH_setitem},
// {'i', sizeof(int), i_getitem, i_setitem},
// {'I', sizeof(int), II_getitem, II_setitem},
// {'l', sizeof(long), l_getitem, l_setitem},
// {'L', sizeof(long), LL_getitem, LL_setitem},
// {'f', sizeof(float), f_getitem, f_setitem},
// {'d', sizeof(double), d_getitem, d_setitem},
// {'\0', 0, 0, 0} /* Sentinel */
// };
// TODO: support Py_UNICODE with 2 bytes
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
// machine code for float/double here to workaround it.
// we should fix this after Pyrolite fix them
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
)
}
override def construct(args: Array[Object]): Object = {
if (args.length == 1) {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
} else {
super.construct(args)
}
}
}

def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
}
}

1 change: 1 addition & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
Expand Down
11 changes: 10 additions & 1 deletion python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@
Fuller unit tests for Python MLlib.
"""

import sys
from numpy import array, array_equal
import unittest

if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest

from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
_deserialize_double_vector, _dot, _squared_distance
Expand Down
16 changes: 13 additions & 3 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@
import sys
import tempfile
import time
import unittest
import zipfile

if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
except ImportError:
sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
sys.exit(1)
else:
import unittest


from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.serializers import read_int
Expand Down Expand Up @@ -291,8 +300,9 @@ def createFileInZip(self, name, content):
pattern = re.compile(r'^ *\|', re.MULTILINE)
content = re.sub(pattern, '', content.strip())
path = os.path.join(self.programDir, name + ".zip")
with zipfile.ZipFile(path, 'w') as zip:
zip.writestr(name, content)
zip = zipfile.ZipFile(path, 'w')
zip.writestr(name, content)
zip.close()
return path

def test_single_script(self):
Expand Down
8 changes: 8 additions & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ function run_test() {

echo "Running PySpark tests. Output is in python/unit-tests.log."

# Try to test with Python 2.6, since that's the minimum version that we support:
if [ $(which python2.6) ]; then
export PYSPARK_PYTHON="python2.6"
fi

echo "Testing with Python version:"
$PYSPARK_PYTHON --version

run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "pyspark/conf.py"
Expand Down

0 comments on commit 9c21316

Please sign in to comment.