Skip to content

Commit

Permalink
support unpickle array.array for Python 2.6
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 11, 2014
1 parent ed1980f commit 60e4e2f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.api.python

import java.nio.ByteOrder

import scala.collection.JavaConversions._
import scala.util.Failure
import scala.util.Try
Expand All @@ -28,6 +30,56 @@ import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
// static struct arraydescr descriptors[] = {
// {'c', sizeof(char), c_getitem, c_setitem},
// {'b', sizeof(char), b_getitem, b_setitem},
// {'B', sizeof(char), BB_getitem, BB_setitem},
// #ifdef Py_USING_UNICODE
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
// #endif
// {'h', sizeof(short), h_getitem, h_setitem},
// {'H', sizeof(short), HH_getitem, HH_setitem},
// {'i', sizeof(int), i_getitem, i_setitem},
// {'I', sizeof(int), II_getitem, II_setitem},
// {'l', sizeof(long), l_getitem, l_setitem},
// {'L', sizeof(long), LL_getitem, LL_setitem},
// {'f', sizeof(float), f_getitem, f_setitem},
// {'d', sizeof(double), d_getitem, d_setitem},
// {'\0', 0, 0, 0} /* Sentinel */
// };
// TODO: support Py_UNICODE with 2 bytes
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
// machine code for float/double here to work arround it.
// we should fix this after Pyrolite fix them
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
)
}
override def construct(args: Array[Object]): Object = {
if (args.length == 1) {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
println(typecode, machineCodes(typecode), data.length, data.toList)
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
} else {
super.construct(args)
}
}
}

def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
}

private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()

if instance:
if (SparkContext._active_spark_context and
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,6 @@ def test_oldhadoop(self):
conf=input_conf).collect())
self.assertEqual(old_dataset, dict_data)

@unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
def test_newhadoop(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
Expand Down

0 comments on commit 60e4e2f

Please sign in to comment.