Skip to content

Commit

Permalink
Close Block InputStream immediately after all records are read
Browse files Browse the repository at this point in the history
  • Loading branch information
massie committed Jun 9, 2015
1 parent 208b7a5 commit 7c8f73e
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.shuffle.hash

import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.{InterruptibleIterator, SparkEnv, TaskContext}

Expand All @@ -38,7 +39,7 @@ private[spark] class HashShuffleReader[K, C](
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams(
handle.shuffleId, startPartition, context)
handle.shuffleId, startPartition, context)

// Wrap the streams for compression based on configuration
val wrappedStreams = blockStreams.map { case (blockId, inputStream) =>
Expand All @@ -50,7 +51,11 @@ private[spark] class HashShuffleReader[K, C](

// Create a key/value iterator for each stream
val recordIterator = wrappedStreams.flatMap { wrappedStream =>
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
val kvIter = serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
CompletionIterator[(Any, Any), Iterator[(Any, Any)]](kvIter, {
// Close the stream once all the records have been read from it
wrappedStream.close()
})
}

// Update read metrics for each record materialized
Expand Down

0 comments on commit 7c8f73e

Please sign in to comment.