Skip to content

Commit

Permalink
Limit the length of the FileInputStream.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Sep 8, 2014
1 parent 1332156 commit 1dfd3d7
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode

import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}

import org.apache.spark.util.ByteBufferInputStream
Expand Down Expand Up @@ -72,7 +73,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
override def inputStream(): InputStream = {
val is = new FileInputStream(file)
is.skip(offset)
is
ByteStreams.limit(is, length)
}
}

Expand All @@ -84,7 +85,7 @@ final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {

override def size: Long = buf.remaining()

override def nioByteBuffer() = buf
override def nioByteBuffer() = buf.duplicate()

override def inputStream() = new ByteBufferInputStream(buf)
}
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,16 +339,10 @@ private[spark] class BlockManager(
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
def getLocalShuffleFromDisk(
blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {

val shuffleBlockManager = shuffleManager.shuffleBlockManager
val values = shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]).map(
bytes => this.dataDeserialize(blockId, bytes, serializer))

values.orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
def getLocalShuffleFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
val buf = shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
val is = wrapForCompression(blockId, buf.inputStream())
Some(serializer.newInstance().deserializeStream(is).asIterator)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ final class ShuffleBlockFetcherIterator(
new BlockFetchingListener {
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
() => blockManager.dataDeserialize(BlockId(blockId), data.nioByteBuffer(), serializer)
() => serializer.newInstance().deserializeStream(
blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
))
shuffleMetrics.remoteBytesRead += data.size
shuffleMetrics.remoteBlocksFetched += 1
Expand Down

0 comments on commit 1dfd3d7

Please sign in to comment.