Skip to content

Commit

Permalink
[SPARK-2951] [PySpark] support unpickle array.array for Python 2.6
Browse files Browse the repository at this point in the history
Pyrolite can not unpickle array.array which pickled by Python 2.6, this patch fix it by extend Pyrolite.

There is a bug in Pyrolite when unpickle array of float/double, this patch workaround it by reverse the endianness for float/double. This workaround should be removed after Pyrolite have a new release to fix this issue.

I had send an PR to Pyrolite to fix it:  irmen/Pyrolite#11

Author: Davies Liu <davies.liu@gmail.com>

Closes #2365 from davies/pickle and squashes the following commits:

f44f771 [Davies Liu] enable tests about array
3908f5c [Davies Liu] Merge branch 'master' into pickle
c77c87b [Davies Liu] cleanup debugging code
60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
  • Loading branch information
davies authored and JoshRosen committed Sep 16, 2014
1 parent fdb302f commit da33acb
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.api.python

import java.nio.ByteOrder

import scala.collection.JavaConversions._
import scala.util.Failure
import scala.util.Try
Expand All @@ -28,6 +30,55 @@ import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
// static struct arraydescr descriptors[] = {
// {'c', sizeof(char), c_getitem, c_setitem},
// {'b', sizeof(char), b_getitem, b_setitem},
// {'B', sizeof(char), BB_getitem, BB_setitem},
// #ifdef Py_USING_UNICODE
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
// #endif
// {'h', sizeof(short), h_getitem, h_setitem},
// {'H', sizeof(short), HH_getitem, HH_setitem},
// {'i', sizeof(int), i_getitem, i_setitem},
// {'I', sizeof(int), II_getitem, II_setitem},
// {'l', sizeof(long), l_getitem, l_setitem},
// {'L', sizeof(long), LL_getitem, LL_setitem},
// {'f', sizeof(float), f_getitem, f_setitem},
// {'d', sizeof(double), d_getitem, d_setitem},
// {'\0', 0, 0, 0} /* Sentinel */
// };
// TODO: support Py_UNICODE with 2 bytes
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
// machine code for float/double here to workaround it.
// we should fix this after Pyrolite fix them
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
)
}
override def construct(args: Array[Object]): Object = {
if (args.length == 1) {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
} else {
super.construct(args)
}
}
}

def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
}

private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()

if instance:
if (SparkContext._active_spark_context and
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,6 @@ def test_newhadoop(self):
conf=input_conf).collect())
self.assertEqual(new_dataset, data)

@unittest.skipIf(sys.version_info[:2] <= (2, 6) or python_implementation() == "PyPy",
"Skipped on 2.6 and PyPy until SPARK-2951 is fixed")
def test_newhadoop_with_array(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
Expand Down

0 comments on commit da33acb

Please sign in to comment.