diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index efc9009c088a8..6668797f5f8be 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -17,6 +17,8 @@ package org.apache.spark.api.python +import java.nio.ByteOrder + import scala.collection.JavaConversions._ import scala.util.Failure import scala.util.Try @@ -28,6 +30,55 @@ import org.apache.spark.rdd.RDD /** Utilities for serialization / deserialization between Python and Java, using Pickle. */ private[python] object SerDeUtil extends Logging { + // Unpickle array.array generated by Python 2.6 + class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor { + // /* Description of types */ + // static struct arraydescr descriptors[] = { + // {'c', sizeof(char), c_getitem, c_setitem}, + // {'b', sizeof(char), b_getitem, b_setitem}, + // {'B', sizeof(char), BB_getitem, BB_setitem}, + // #ifdef Py_USING_UNICODE + // {'u', sizeof(Py_UNICODE), u_getitem, u_setitem}, + // #endif + // {'h', sizeof(short), h_getitem, h_setitem}, + // {'H', sizeof(short), HH_getitem, HH_setitem}, + // {'i', sizeof(int), i_getitem, i_setitem}, + // {'I', sizeof(int), II_getitem, II_setitem}, + // {'l', sizeof(long), l_getitem, l_setitem}, + // {'L', sizeof(long), LL_getitem, LL_setitem}, + // {'f', sizeof(float), f_getitem, f_setitem}, + // {'d', sizeof(double), d_getitem, d_setitem}, + // {'\0', 0, 0, 0} /* Sentinel */ + // }; + // TODO: support Py_UNICODE with 2 bytes + // FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the + // machine code for float/double here to workaround it. + // we should fix this after Pyrolite fix them + val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { + Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, + 'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21 + ) + } else { + Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, + 'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20 + ) + } + override def construct(args: Array[Object]): Object = { + if (args.length == 1) { + construct(args ++ Array("")) + } else if (args.length == 2 && args(1).isInstanceOf[String]) { + val typecode = args(0).asInstanceOf[String].charAt(0) + val data: String = args(1).asInstanceOf[String] + construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1")) + } else { + super.construct(args) + } + } + } + + def initialize() = { + Unpickler.registerConstructor("array", "array", new ArrayConstructor()) + } private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = { val pickle = new Pickler diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3ab98e262df31..ea28e8cd8c89f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile + SparkContext._jvm.SerDeUtil.initialize() if instance: if (SparkContext._active_spark_context and diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f3309a20fcffb..f255b44359fec 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -956,8 +956,6 @@ def test_newhadoop(self): conf=input_conf).collect()) self.assertEqual(new_dataset, data) - @unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy", - "Skipped on 2.6 and PyPy until SPARK-2951 is fixed") def test_newhadoop_with_array(self): basepath = self.tempdir.name # use custom ArrayWritable types and converters to handle arrays