Skip to content

Commit

Permalink
Added acceptance of the empty case
Browse files Browse the repository at this point in the history
  • Loading branch information
mnazbro committed Jan 27, 2015
1 parent ff356e2 commit e3b2fb6
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ private[spark] object SerDeUtil extends Logging {
* representation is serialized
*/
def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
val (keyFailed, valueFailed) = rdd.take(1) match {
case Array() => (false, false)
case Array(first) => checkPickle(first)
}

rdd.mapPartitions { iter =>
val cleaned = iter.map { case (k, v) =>
Expand All @@ -226,10 +229,12 @@ private[spark] object SerDeUtil extends Logging {
}

val rdd = pythonToJava(pyRDD, batched).rdd
rdd.first match {
case obj if isPair(obj) =>
rdd.take(1) match {
case Array(obj) if isPair(obj) =>
// we only accept (K, V)
case other => throw new SparkException(
case Array() =>
// we also accept empty collections
case Array(other) => throw new SparkException(
s"RDD element of type ${other.getClass.getName} cannot be used")
}
rdd.map { obj =>
Expand Down

0 comments on commit e3b2fb6

Please sign in to comment.