diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala new file mode 100644 index 0000000000000..e3d09fb768d97 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.nio.ByteOrder + +import scala.collection.JavaConversions._ +import scala.util.Failure +import scala.util.Try + +import net.razorvine.pickle.{Unpickler, Pickler} + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.rdd.RDD + +/** Utilities for serialization / deserialization between Python and Java, using Pickle. */ +private[python] object SerDeUtil extends Logging { + // Unpickle array.array generated by Python 2.6 + class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor { + // /* Description of types */ + // static struct arraydescr descriptors[] = { + // {'c', sizeof(char), c_getitem, c_setitem}, + // {'b', sizeof(char), b_getitem, b_setitem}, + // {'B', sizeof(char), BB_getitem, BB_setitem}, + // #ifdef Py_USING_UNICODE + // {'u', sizeof(Py_UNICODE), u_getitem, u_setitem}, + // #endif + // {'h', sizeof(short), h_getitem, h_setitem}, + // {'H', sizeof(short), HH_getitem, HH_setitem}, + // {'i', sizeof(int), i_getitem, i_setitem}, + // {'I', sizeof(int), II_getitem, II_setitem}, + // {'l', sizeof(long), l_getitem, l_setitem}, + // {'L', sizeof(long), LL_getitem, LL_setitem}, + // {'f', sizeof(float), f_getitem, f_setitem}, + // {'d', sizeof(double), d_getitem, d_setitem}, + // {'\0', 0, 0, 0} /* Sentinel */ + // }; + // TODO: support Py_UNICODE with 2 bytes + // FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the + // machine code for float/double here to workaround it. + // we should fix this after Pyrolite fix them + val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { + Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, + 'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21 + ) + } else { + Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, + 'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20 + ) + } + override def construct(args: Array[Object]): Object = { + if (args.length == 1) { + construct(args ++ Array("")) + } else if (args.length == 2 && args(1).isInstanceOf[String]) { + val typecode = args(0).asInstanceOf[String].charAt(0) + val data: String = args(1).asInstanceOf[String] + construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1")) + } else { + super.construct(args) + } + } + } + + def initialize() = { + Unpickler.registerConstructor("array", "array", new ArrayConstructor()) + } +} + diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 56746cb7aab3d..8b9866bd94d3d 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -190,6 +190,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile + SparkContext._jvm.SerDeUtil.initialize() if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 37ccf1d590743..cead9e38f65b5 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -19,8 +19,17 @@ Fuller unit tests for Python MLlib. """ +import sys from numpy import array, array_equal -import unittest + +if sys.version_info[:2] <= (2, 6): + try: + import unittest2 as unittest + except ImportError: + sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') + sys.exit(1) +else: + import unittest from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \ _deserialize_double_vector, _dot, _squared_distance diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8f5b48db46a02..6ba76ec9d5926 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -28,9 +28,18 @@ import sys import tempfile import time -import unittest import zipfile +if sys.version_info[:2] <= (2, 6): + try: + import unittest2 as unittest + except ImportError: + sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') + sys.exit(1) +else: + import unittest + + from pyspark.context import SparkContext from pyspark.files import SparkFiles from pyspark.serializers import read_int @@ -291,8 +300,9 @@ def createFileInZip(self, name, content): pattern = re.compile(r'^ *\|', re.MULTILINE) content = re.sub(pattern, '', content.strip()) path = os.path.join(self.programDir, name + ".zip") - with zipfile.ZipFile(path, 'w') as zip: - zip.writestr(name, content) + zip = zipfile.ZipFile(path, 'w') + zip.writestr(name, content) + zip.close() return path def test_single_script(self): diff --git a/python/run-tests b/python/run-tests index 3b4501178c89f..0e04f5597ae1c 100755 --- a/python/run-tests +++ b/python/run-tests @@ -49,6 +49,14 @@ function run_test() { echo "Running PySpark tests. Output is in python/unit-tests.log." +# Try to test with Python 2.6, since that's the minimum version that we support: +if [ $(which python2.6) ]; then + export PYSPARK_PYTHON="python2.6" +fi + +echo "Testing with Python version:" +$PYSPARK_PYTHON --version + run_test "pyspark/rdd.py" run_test "pyspark/context.py" run_test "pyspark/conf.py"