From d8f52dc7dc34bc6c1c368d790b7bdfe30c4eb529 Mon Sep 17 00:00:00 2001 From: maji2014 Date: Tue, 2 Dec 2014 01:54:33 -0800 Subject: [PATCH] code optimization for judgement --- .../scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala | 2 +- .../scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala | 2 +- .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 5baf45db45c17..ffbb2717053d2 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -45,7 +45,7 @@ private[spark] class HashShuffleReader[K, C]( } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { + } else if (dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { // Convert the Product2s to pairs since this is what downstream RDDs currently expect diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 183a30373b28c..8f05d3b048b17 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -56,7 +56,7 @@ private[spark] class HashShuffleWriter[K, V]( } else { records } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { + } else if (dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { records diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index d75f9d7311fad..cf0a0d5e83be1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -50,7 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { - if (!dep.aggregator.isDefined) { + if (dep.aggregator.isEmpty) { throw new IllegalStateException("Aggregator is empty for map-side combine") } sorter = new ExternalSorter[K, V, C](