From 7c8f73e8cc497f023beae93b0bebc536d50a51cb Mon Sep 17 00:00:00 2001 From: Matt Massie Date: Tue, 9 Jun 2015 10:15:51 -0700 Subject: [PATCH] Close Block InputStream immediately after all records are read --- .../apache/spark/shuffle/hash/HashShuffleReader.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index ea8aa346cd9f3..d92a2a98a3588 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -19,6 +19,7 @@ package org.apache.spark.shuffle.hash import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader} +import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter import org.apache.spark.{InterruptibleIterator, SparkEnv, TaskContext} @@ -38,7 +39,7 @@ private[spark] class HashShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams( - handle.shuffleId, startPartition, context) + handle.shuffleId, startPartition, context) // Wrap the streams for compression based on configuration val wrappedStreams = blockStreams.map { case (blockId, inputStream) => @@ -50,7 +51,11 @@ private[spark] class HashShuffleReader[K, C]( // Create a key/value iterator for each stream val recordIterator = wrappedStreams.flatMap { wrappedStream => - serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator + val kvIter = serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator + CompletionIterator[(Any, Any), Iterator[(Any, Any)]](kvIter, { + // Close the stream once all the records have been read from it + wrappedStream.close() + }) } // Update read metrics for each record materialized