diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index a4153aaa926f8..269cdcaa220e1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -199,7 +199,10 @@ private[spark] object SerDeUtil extends Logging { * representation is serialized */ def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = { - val (keyFailed, valueFailed) = checkPickle(rdd.first()) + val (keyFailed, valueFailed) = rdd.take(1) match { + case Array() => (false, false) + case Array(first) => checkPickle(first) + } rdd.mapPartitions { iter => val cleaned = iter.map { case (k, v) => @@ -226,10 +229,12 @@ private[spark] object SerDeUtil extends Logging { } val rdd = pythonToJava(pyRDD, batched).rdd - rdd.first match { - case obj if isPair(obj) => + rdd.take(1) match { + case Array(obj) if isPair(obj) => // we only accept (K, V) - case other => throw new SparkException( + case Array() => + // we also accept empty collections + case Array(other) => throw new SparkException( s"RDD element of type ${other.getClass.getName} cannot be used") } rdd.map { obj => diff --git a/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala b/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala new file mode 100644 index 0000000000000..f8c39326145e1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.scalatest.FunSuite + +import org.apache.spark.SharedSparkContext + +class SerDeUtilSuite extends FunSuite with SharedSparkContext { + + test("Converting an empty pair RDD to python does not throw an exception (SPARK-5441)") { + val emptyRdd = sc.makeRDD(Seq[(Any, Any)]()) + SerDeUtil.pairRDDToPython(emptyRdd, 10) + } + + test("Converting an empty python RDD to pair RDD does not throw an exception (SPARK-5441)") { + val emptyRdd = sc.makeRDD(Seq[(Any, Any)]()) + val javaRdd = emptyRdd.toJavaRDD() + val pythonRdd = SerDeUtil.javaToPython(javaRdd) + SerDeUtil.pythonToPairRDD(pythonRdd, false) + } +} +