diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java index 4c8802af7ae67..acc49d968c186 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java @@ -28,7 +28,7 @@ /** * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}. */ -public final class NettyManagedBuffer extends ManagedBuffer { +public class NettyManagedBuffer extends ManagedBuffer { private final ByteBuf buf; public NettyManagedBuffer(ByteBuf buf) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 2634d88367669..e5e6a9e4a816c 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel} import org.apache.spark.util.{ByteBufferInputStream, Utils} -import org.apache.spark.util.io.ByteArrayChunkOutputStream +import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} /** * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. @@ -107,7 +107,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) blocks.zipWithIndex.foreach { case (block, i) => val pieceId = BroadcastBlockId(id, "piece" + i) - if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) { + val bytes = new ChunkedByteBuffer(block.duplicate()) + if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") } } @@ -115,10 +116,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } /** Fetch torrent blocks from the driver and/or other executors. */ - private def readBlocks(): Array[ByteBuffer] = { + private def readBlocks(): Array[ChunkedByteBuffer] = { // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported // to the driver, so other executors can pull these chunks from this executor as well. - val blocks = new Array[ByteBuffer](numBlocks) + val blocks = new Array[ChunkedByteBuffer](numBlocks) val bm = SparkEnv.get.blockManager for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { @@ -182,7 +183,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) case None => logInfo("Started reading broadcast variable " + id) val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() + val blocks = readBlocks().flatMap(_.getChunks()) logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) val obj = TorrentBroadcast.unBlockifyObject[T]( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6327d55fe75c2..3201463b8cc97 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -36,6 +36,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer /** * Spark executor, backed by a threadpool to run tasks. @@ -297,7 +298,9 @@ private[spark] class Executor( } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( - blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) + blockId, + new ChunkedByteBuffer(serializedDirectResult.duplicate()), + StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 7eb6d53c10950..873f1b56bd18b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -83,7 +83,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul return } val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( - serializedTaskResult.get) + serializedTaskResult.get.toByteBuffer) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3bbdf48104c91..aa2561d8c3898 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io._ -import java.nio.{ByteBuffer, MappedByteBuffer} +import java.nio.ByteBuffer import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} @@ -26,15 +26,13 @@ import scala.concurrent.duration._ import scala.util.Random import scala.util.control.NonFatal -import sun.nio.ch.DirectBuffer - import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryManager import org.apache.spark.network._ -import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo @@ -43,6 +41,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ import org.apache.spark.util._ +import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( @@ -296,7 +295,7 @@ private[spark] class BlockManager( * Put the block locally, using the given storage level. */ override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = { - putBytes(blockId, data.nioByteBuffer(), level) + putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level) } /** @@ -444,7 +443,7 @@ private[spark] class BlockManager( /** * Get block from the local block manager as serialized bytes. */ - def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { + def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work @@ -453,7 +452,8 @@ private[spark] class BlockManager( // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. Option( - shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) + new ChunkedByteBuffer( + shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } @@ -465,7 +465,7 @@ private[spark] class BlockManager( * Must be called while holding a read lock on the block. * Releases the read lock upon exception; keeps the read lock upon successful return. */ - private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = { + private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = { val level = info.level logDebug(s"Level for block $blockId is $level") // In order, try to read the serialized bytes from memory, then from disk, then fall back to @@ -504,7 +504,7 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit()) + new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size) } } @@ -521,7 +521,7 @@ private[spark] class BlockManager( /** * Get block from remote block managers as serialized bytes. */ - def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 @@ -567,7 +567,7 @@ private[spark] class BlockManager( } if (data != null) { - return Some(data) + return Some(new ChunkedByteBuffer(data)) } logDebug(s"The value of block $blockId is null") } @@ -705,7 +705,7 @@ private[spark] class BlockManager( */ def putBytes( blockId: BlockId, - bytes: ByteBuffer, + bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") @@ -725,7 +725,7 @@ private[spark] class BlockManager( */ private def doPutBytes( blockId: BlockId, - bytes: ByteBuffer, + bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { @@ -734,25 +734,22 @@ private[spark] class BlockManager( // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = if (level.replication > 1) { - // Duplicate doesn't copy the bytes, but just creates a wrapper - val bufferView = bytes.duplicate() Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool - replicate(blockId, bufferView, level) + replicate(blockId, bytes, level) }(futureExecutionContext) } else { null } - bytes.rewind() - val size = bytes.limit() + val size = bytes.size if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserialize(blockId, bytes.duplicate()) + val values = dataDeserialize(blockId, bytes) memoryStore.putIterator(blockId, values, level) match { case Right(_) => true case Left(iter) => @@ -922,7 +919,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level) } finally { - BlockManager.dispose(bytesToReplicate) + bytesToReplicate.dispose() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) @@ -944,29 +941,27 @@ private[spark] class BlockManager( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, - diskBytes: ByteBuffer): ByteBuffer = { + diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = { require(!level.deserialized) if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. blockInfo.synchronized { if (memoryStore.contains(blockId)) { - BlockManager.dispose(diskBytes) + diskBytes.dispose() memoryStore.getBytes(blockId).get } else { - val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => { + val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why - // this action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(diskBytes.limit) - copyForMemory.put(diskBytes) + // this action is put into a `() => ChunkedByteBuffer` and created lazily. + diskBytes.copy() }) if (putSucceeded) { - BlockManager.dispose(diskBytes) + diskBytes.dispose() memoryStore.getBytes(blockId).get } else { - diskBytes.rewind() diskBytes } } @@ -1032,7 +1027,7 @@ private[spark] class BlockManager( * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ - private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { + private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] @@ -1085,11 +1080,15 @@ private[spark] class BlockManager( case Some(peer) => try { val onePeerStartTime = System.currentTimeMillis - data.rewind() - logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") + logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") blockTransferService.uploadBlockSync( - peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) - logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" + peer.host, + peer.port, + peer.executorId, + blockId, + new NettyManagedBuffer(data.toNetty), + tLevel) + logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer peersForReplication -= peer @@ -1112,7 +1111,7 @@ private[spark] class BlockManager( } } val timeTakeMs = (System.currentTimeMillis - startTime) - logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + + logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + @@ -1154,7 +1153,7 @@ private[spark] class BlockManager( */ def dropFromMemory( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): StorageLevel = { + data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false @@ -1281,11 +1280,11 @@ private[spark] class BlockManager( ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } - /** Serializes into a byte buffer. */ - def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { - val byteStream = new ByteBufferOutputStream(4096) - dataSerializeStream(blockId, byteStream, values) - byteStream.toByteBuffer + /** Serializes into a chunked byte buffer. */ + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = { + val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4) + dataSerializeStream(blockId, byteArrayChunkOutputStream, values) + new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)) } /** @@ -1297,6 +1296,14 @@ private[spark] class BlockManager( dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } + /** + * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of + * the iterator is reached. + */ + def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = { + dataDeserializeStream(blockId, bytes.toInputStream(dispose = true)) + } + /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. @@ -1325,24 +1332,9 @@ private[spark] class BlockManager( } -private[spark] object BlockManager extends Logging { +private[spark] object BlockManager { private val ID_GENERATOR = new IdGenerator - /** - * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that - * might cause errors if one attempts to read from the unmapped buffer, but it's better than - * waiting for the GC to find it because that could lead to huge numbers of open files. There's - * unfortunately no standard API to do this. - */ - def dispose(buffer: ByteBuffer): Unit = { - if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { - logTrace(s"Unmapping $buffer") - if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { - buffer.asInstanceOf[DirectBuffer].cleaner().clean() - } - } - } - def blockIdsToHosts( blockIds: Array[BlockId], env: SparkEnv, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala index 5886b9c00b557..12594e6a2bc0c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala @@ -17,12 +17,11 @@ package org.apache.spark.storage -import java.nio.ByteBuffer - -import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} +import org.apache.spark.util.io.ChunkedByteBuffer /** - * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]] + * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]] * so that the corresponding block's read lock can be released once this buffer's references * are released. * @@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} private[storage] class BlockManagerManagedBuffer( blockManager: BlockManager, blockId: BlockId, - buf: ByteBuffer) extends NioManagedBuffer(buf) { + chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { override def retain(): ManagedBuffer = { super.retain() diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 5c28357ded6d6..ca23e2391ed02 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -26,6 +26,7 @@ import com.google.common.io.Closeables import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. @@ -71,23 +72,18 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e finishTime - startTime)) } - def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = { - // So that we do not modify the input offsets ! - // duplicate does not copy buffer, so inexpensive - val bytes = _bytes.duplicate() + def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { fileOutputStream => val channel = fileOutputStream.getChannel Utils.tryWithSafeFinally { - while (bytes.remaining > 0) { - channel.write(bytes) - } + bytes.writeFully(channel) } { channel.close() } } } - def getBytes(blockId: BlockId): ByteBuffer = { + def getBytes(blockId: BlockId): ChunkedByteBuffer = { val file = diskManager.getFile(blockId.name) val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { @@ -102,9 +98,9 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } } buf.flip() - buf + new ChunkedByteBuffer(buf) } else { - channel.map(MapMode.READ_ONLY, 0, file.length) + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) } } { channel.close() diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 43cd15921cc97..199a5fc270a41 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -17,10 +17,15 @@ package org.apache.spark.storage +import java.nio.{ByteBuffer, MappedByteBuffer} + import scala.collection.Map import scala.collection.mutable +import sun.nio.ch.DirectBuffer + import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging /** * :: DeveloperApi :: @@ -222,7 +227,22 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { } /** Helper methods for storage-related objects. */ -private[spark] object StorageUtils { +private[spark] object StorageUtils extends Logging { + + /** + * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that + * might cause errors if one attempts to read from the unmapped buffer, but it's better than + * waiting for the GC to find it because that could lead to huge numbers of open files. There's + * unfortunately no standard API to do this. + */ + def dispose(buffer: ByteBuffer): Unit = { + if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { + logTrace(s"Unmapping $buffer") + if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { + buffer.asInstanceOf[DirectBuffer].cleaner().clean() + } + } + } /** * Update the given list of RDDInfo with the given list of storage statuses. diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index a7c1854a41ff7..94171324f84b5 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -17,7 +17,6 @@ package org.apache.spark.storage.memory -import java.nio.ByteBuffer import java.util.LinkedHashMap import scala.collection.mutable @@ -29,8 +28,13 @@ import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector +import org.apache.spark.util.io.ChunkedByteBuffer -private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) +private sealed trait MemoryEntry { + val size: Long +} +private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry +private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -91,14 +95,13 @@ private[spark] class MemoryStore( * * @return true if the put() succeeded, false otherwise. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = { + def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it - // Work on a duplicate - since the original input might be used elsewhere. - val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - assert(bytes.limit == size) - val entry = new MemoryEntry(bytes, size, deserialized = false) + val bytes = _bytes() + assert(bytes.size == size) + val entry = new SerializedMemoryEntry(bytes, size) entries.synchronized { entries.put(blockId, entry) } @@ -184,10 +187,10 @@ private[spark] class MemoryStore( val arrayValues = vector.toArray vector = null val entry = if (level.deserialized) { - new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true) + new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - new MemoryEntry(bytes, bytes.limit, deserialized = false) + new SerializedMemoryEntry(bytes, bytes.size) } val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { @@ -241,27 +244,23 @@ private[spark] class MemoryStore( } } - def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val entry = entries.synchronized { - entries.get(blockId) - } - if (entry == null) { - None - } else { - require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form") - Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data + def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { + val entry = entries.synchronized { entries.get(blockId) } + entry match { + case null => None + case e: DeserializedMemoryEntry => + throw new IllegalArgumentException("should only call getBytes on serialized blocks") + case SerializedMemoryEntry(bytes, _) => Some(bytes) } } def getValues(blockId: BlockId): Option[Iterator[Any]] = { - val entry = entries.synchronized { - entries.get(blockId) - } - if (entry == null) { - None - } else { - require(entry.deserialized, "should only call getValues on deserialized blocks") - Some(entry.value.asInstanceOf[Array[Any]].iterator) + val entry = entries.synchronized { entries.get(blockId) } + entry match { + case null => None + case e: SerializedMemoryEntry => + throw new IllegalArgumentException("should only call getValues on deserialized blocks") + case DeserializedMemoryEntry(values, _) => Some(values.iterator) } } @@ -342,10 +341,9 @@ private[spark] class MemoryStore( // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + val data = entry match { + case DeserializedMemoryEntry(values, _) => Left(values) + case SerializedMemoryEntry(buffer, _) => Right(buffer) } val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data) if (newEffectiveStorageLevel.isValid) { diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala index 54de4d4ee8ca7..dce2ac63a664c 100644 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala @@ -20,10 +20,10 @@ package org.apache.spark.util import java.io.InputStream import java.nio.ByteBuffer -import org.apache.spark.storage.BlockManager +import org.apache.spark.storage.StorageUtils /** - * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose() + * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose() * at the end of the stream (e.g. to close a memory-mapped file). */ private[spark] @@ -68,12 +68,12 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f } /** - * Clean up the buffer, and potentially dispose of it using BlockManager.dispose(). + * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose(). */ private def cleanUp() { if (buffer != null) { if (dispose) { - BlockManager.dispose(buffer) + StorageUtils.dispose(buffer) } buffer = null } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala new file mode 100644 index 0000000000000..c643c4b63c601 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import com.google.common.primitives.UnsignedBytes +import io.netty.buffer.{ByteBuf, Unpooled} + +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.storage.StorageUtils + +/** + * Read-only byte buffer which is physically stored as multiple chunks rather than a single + * contiguous array. + * + * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have + * position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer, + * so if these buffers may also be used elsewhere then the caller is responsible for + * copying them as needed. + */ +private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { + require(chunks != null, "chunks must not be null") + require(chunks.forall(_.limit() > 0), "chunks must be non-empty") + require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + + /** + * This size of this buffer, in bytes. + */ + val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum + + def this(byteBuffer: ByteBuffer) = { + this(Array(byteBuffer)) + } + + /** + * Write this buffer to a channel. + */ + def writeFully(channel: WritableByteChannel): Unit = { + for (bytes <- getChunks()) { + while (bytes.remaining > 0) { + channel.write(bytes) + } + } + } + + /** + * Wrap this buffer to view it as a Netty ByteBuf. + */ + def toNetty: ByteBuf = { + Unpooled.wrappedBuffer(getChunks(): _*) + } + + /** + * Copy this buffer into a new byte array. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. + */ + def toArray: Array[Byte] = { + if (size >= Integer.MAX_VALUE) { + throw new UnsupportedOperationException( + s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") + } + val byteChannel = new ByteArrayWritableChannel(size.toInt) + writeFully(byteChannel) + byteChannel.close() + byteChannel.getData + } + + /** + * Copy this buffer into a new ByteBuffer. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size. + */ + def toByteBuffer: ByteBuffer = { + if (chunks.length == 1) { + chunks.head.duplicate() + } else { + ByteBuffer.wrap(toArray) + } + } + + /** + * Creates an input stream to read data from this ChunkedByteBuffer. + * + * @param dispose if true, [[dispose()]] will be called at the end of the stream + * in order to close any memory-mapped files which back this buffer. + */ + def toInputStream(dispose: Boolean = false): InputStream = { + new ChunkedByteBufferInputStream(this, dispose) + } + + /** + * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. + */ + def getChunks(): Array[ByteBuffer] = { + chunks.map(_.duplicate()) + } + + /** + * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. + * The new buffer will share no resources with the original buffer. + */ + def copy(): ChunkedByteBuffer = { + val copiedChunks = getChunks().map { chunk => + // TODO: accept an allocator in this copy method to integrate with mem. accounting systems + val newChunk = ByteBuffer.allocate(chunk.limit()) + newChunk.put(chunk) + newChunk.flip() + newChunk + } + new ChunkedByteBuffer(copiedChunks) + } + + /** + * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that + * might cause errors if one attempts to read from the unmapped buffer, but it's better than + * waiting for the GC to find it because that could lead to huge numbers of open files. There's + * unfortunately no standard API to do this. + */ + def dispose(): Unit = { + chunks.foreach(StorageUtils.dispose) + } +} + +/** + * Reads data from a ChunkedByteBuffer. + * + * @param dispose if true, [[ChunkedByteBuffer.dispose()]] will be called at the end of the stream + * in order to close any memory-mapped files which back the buffer. + */ +private class ChunkedByteBufferInputStream( + var chunkedByteBuffer: ChunkedByteBuffer, + dispose: Boolean) + extends InputStream { + + private[this] var chunks = chunkedByteBuffer.getChunks().iterator + private[this] var currentChunk: ByteBuffer = { + if (chunks.hasNext) { + chunks.next() + } else { + null + } + } + + override def read(): Int = { + if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { + currentChunk = chunks.next() + } + if (currentChunk != null && currentChunk.hasRemaining) { + UnsignedBytes.toInt(currentChunk.get()) + } else { + close() + -1 + } + } + + override def read(dest: Array[Byte], offset: Int, length: Int): Int = { + if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { + currentChunk = chunks.next() + } + if (currentChunk != null && currentChunk.hasRemaining) { + val amountToGet = math.min(currentChunk.remaining(), length) + currentChunk.get(dest, offset, amountToGet) + amountToGet + } else { + close() + -1 + } + } + + override def skip(bytes: Long): Long = { + if (currentChunk != null) { + val amountToSkip = math.min(bytes, currentChunk.remaining).toInt + currentChunk.position(currentChunk.position + amountToSkip) + if (currentChunk.remaining() == 0) { + if (chunks.hasNext) { + currentChunk = chunks.next() + } else { + close() + } + } + amountToSkip + } else { + 0L + } + } + + override def close(): Unit = { + if (chunkedByteBuffer != null && dispose) { + chunkedByteBuffer.dispose() + } + chunkedByteBuffer = null + chunks = null + currentChunk = null + } +} diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala new file mode 100644 index 0000000000000..aab70e7431e07 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.io + +import java.nio.ByteBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferSuite extends SparkFunSuite { + + test("no chunks") { + val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer]) + assert(emptyChunkedByteBuffer.size === 0) + assert(emptyChunkedByteBuffer.getChunks().isEmpty) + assert(emptyChunkedByteBuffer.toArray === Array.empty) + assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0) + assert(emptyChunkedByteBuffer.toNetty.capacity() === 0) + emptyChunkedByteBuffer.toInputStream(dispose = false).close() + emptyChunkedByteBuffer.toInputStream(dispose = true).close() + } + + test("chunks must be non-empty") { + intercept[IllegalArgumentException] { + new ChunkedByteBuffer(Array(ByteBuffer.allocate(0))) + } + } + + test("getChunks() duplicates chunks") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.getChunks().head.position(4) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("copy() does not affect original buffer's position") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.copy() + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("writeFully() does not affect original buffer's position") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("toArray()") { + val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes)) + assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array()) + } + + test("toArray() throws UnsupportedOperationException if size exceeds 2GB") { + val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4) + fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity()) + val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer)) + assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L)) + intercept[UnsupportedOperationException] { + chunkedByteBuffer.toArray + } + } + + test("toInputStream()") { + val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte)) + val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte)) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2)) + assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit()) + + val inputStream = chunkedByteBuffer.toInputStream(dispose = false) + val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt) + ByteStreams.readFully(inputStream, bytesFromStream) + assert(bytesFromStream === bytes1.array() ++ bytes2.array()) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2e0c0596a75bb..edf5cd35e40ee 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { @@ -192,8 +193,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store") assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") @@ -434,8 +435,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE t2.join() t3.join() - store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) store.waitForAsyncReregister() } } @@ -1253,9 +1254,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(12000) val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") - var bytes: ByteBuffer = null + var bytes: ChunkedByteBuffer = null memoryStore.putBytes(blockId, 10000, () => { - bytes = ByteBuffer.allocate(10000) + bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) assert(memoryStore.getSize(blockId) === 10000) @@ -1364,7 +1365,7 @@ private object BlockManagerSuite { def dropFromMemoryIfExists( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): Unit = { + data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = { store.blockInfoManager.lockForWriting(blockId).foreach { info => val newEffectiveStorageLevel = store.dropFromMemory(blockId, data) if (newEffectiveStorageLevel.isValid) { @@ -1394,7 +1395,9 @@ private object BlockManagerSuite { val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues) val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get) val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle) - val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes) + val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = { + wrapGet(store.getLocalBytes) + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 97e74fe706002..9ed5016510d56 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.io.ChunkedByteBuffer class DiskStoreSuite extends SparkFunSuite { @@ -29,7 +30,7 @@ class DiskStoreSuite extends SparkFunSuite { // Create a non-trivial (not all zeros) byte array val bytes = Array.tabulate[Byte](1000)(_.toByte) - val byteBuffer = ByteBuffer.wrap(bytes) + val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes)) val blockId = BlockId("rdd_1_2") val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true) @@ -44,9 +45,10 @@ class DiskStoreSuite extends SparkFunSuite { val notMapped = diskStoreNotMapped.getBytes(blockId) // Not possible to do isInstanceOf due to visibility of HeapByteBuffer - assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"), + assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")), "Expected HeapByteBuffer for un-mapped read") - assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read") + assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]), + "Expected MappedByteBuffer for mapped read") def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = { val array = new Array[Byte](in.remaining()) @@ -54,9 +56,7 @@ class DiskStoreSuite extends SparkFunSuite { array } - val mappedAsArray = arrayFromByteBuffer(mapped) - val notMappedAsArray = arrayFromByteBuffer(notMapped) - assert(Arrays.equals(mappedAsArray, bytes)) - assert(Arrays.equals(notMappedAsArray, bytes)) + assert(Arrays.equals(mapped.toArray, bytes)) + assert(Arrays.equals(notMapped.toArray, bytes)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index f811784b25c82..8625882b04421 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -28,11 +28,13 @@ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util._ import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.io.ChunkedByteBuffer /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. * It contains information about the id of the blocks having this partition's data and * the corresponding record handle in the write ahead log that backs the partition. + * * @param index index of the partition * @param blockId id of the block having the partition data * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark @@ -59,7 +61,6 @@ class WriteAheadLogBackedBlockRDDPartition( * correctness, and it can be used in situations where it is known that the block * does not exist in the Spark executors (e.g. after a failed driver is restarted). * - * * @param sc SparkContext * @param _blockIds Ids of the blocks that contains this RDD's data * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data @@ -156,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logInfo(s"Read partition data of $this from write ahead log, record handle " + partition.walRecordHandle) if (storeInBlockManager) { - blockManager.putBytes(blockId, dataRead, storageLevel) + blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 4880884b0509d..6d4f4b99c175f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -30,6 +30,7 @@ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.io.ChunkedByteBuffer /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { @@ -84,7 +85,8 @@ private[streaming] class BlockManagerBasedBlockHandler( numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => - blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) + blockManager.putBytes( + blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") @@ -178,15 +180,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler( numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => - byteBuffer + new ChunkedByteBuffer(byteBuffer.duplicate()) case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") } // Store the block in block manager val storeInBlockManagerFuture = Future { - val putSucceeded = - blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) + val putSucceeded = blockManager.putBytes( + blockId, + serializedBlock, + effectiveStorageLevel, + tellMaster = true) if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") @@ -195,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in write ahead log val storeInWriteAheadLogFuture = Future { - writeAheadLog.write(serializedBlock, clock.getTimeMillis()) + writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis()) } // Combine the futures, wait for both to complete, and return the write ahead log record handle diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 2d509af85ae33..76f67ed601ea5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -339,7 +339,7 @@ class ReceivedBlockHandlerSuite storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) }) storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) }) - storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) }) + storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) } /** Test error handling when blocks that cannot be stored */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 79ac833c1846b..c4bf42d0f272d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -223,7 +223,7 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(blockManager.dataSerialize(id, data.iterator)) + writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer) } writer.close() segments