From 9e73981280b6dfdfdd6132e2a213b6c4a95c026c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 30 Sep 2015 16:57:06 -0700 Subject: [PATCH 01/13] Introduce MemoryManager + StaticMemoryManager This commit does not represent any change in behavior. The MemoryManager's introduced are not actually used anywhere yet. This will come in the next commit. --- .../org/apache/spark/MemoryManager.scala | 51 ++++++++++++ .../scala/org/apache/spark/SparkEnv.scala | 4 + .../apache/spark/StaticMemoryManager.scala | 77 +++++++++++++++++++ .../spark/shuffle/ShuffleMemoryManager.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 2 +- 5 files changed, 134 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/MemoryManager.scala create mode 100644 core/src/main/scala/org/apache/spark/StaticMemoryManager.scala diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/MemoryManager.scala new file mode 100644 index 0000000000000..38e9a7e5bdd87 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/MemoryManager.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * An abstract memory manager that enforces how memory is shared between execution and storage. + * + * In this context, execution memory refers to that used for computation in shuffles, joins, + * sorts and aggregations, while storage memory refers to that used for caching and propagating + * internal data across the cluster. + */ +private[spark] abstract class MemoryManager { + + /** + * Acquire N bytes of memory for execution. + * @return whether all N bytes are successfully granted. + */ + def acquireExecutionMemory(numBytes: Long): Boolean + + /** + * Acquire N bytes of memory for storage. + * @return whether all N bytes are successfully granted. + */ + def acquireStorageMemory(numBytes: Long): Boolean + + /** + * Release N bytes of execution memory. + */ + def releaseExecutionMemory(numBytes: Long): Unit + + /** + * Release N bytes of storage memory. + */ + def releaseStorageMemory(numBytes: Long): Unit + +} diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c6fef7f91f00c..dc90eba36ba54 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -69,6 +69,8 @@ class SparkEnv ( val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem, + // TODO: unify these *MemoryManager classes + val memoryManager: MemoryManager, val shuffleMemoryManager: ShuffleMemoryManager, val executorMemoryManager: ExecutorMemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, @@ -323,6 +325,7 @@ object SparkEnv extends Logging { val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) + val memoryManager = new StaticMemoryManager(conf) val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores) val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) @@ -407,6 +410,7 @@ object SparkEnv extends Logging { httpFileServer, sparkFilesDir, metricsSystem, + memoryManager, shuffleMemoryManager, executorMemoryManager, outputCommitCoordinator, diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala new file mode 100644 index 0000000000000..6dfc87d8a1da0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.shuffle.ShuffleMemoryManager +import org.apache.spark.storage.BlockManager + + +/** + * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. + * + * The sizes of the execution and storage regions are determined through + * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two + * regions are cleanly separated such that neither usage can borrow memory from the other. + */ +private[spark] class StaticMemoryManager(conf: SparkConf) extends MemoryManager { + private val maxExecutionMemory = ShuffleMemoryManager.getMaxMemory(conf) + private val maxStorageMemory = BlockManager.getMaxMemory(conf) + @volatile private var executionMemoryUsed: Long = 0 + @volatile private var storageMemoryUsed: Long = 0 + + /** + * Acquire N bytes of memory for execution. + * @return whether all N bytes are successfully granted. + */ + override def acquireExecutionMemory(numBytes: Long): Boolean = { + if (executionMemoryUsed + numBytes <= maxExecutionMemory) { + executionMemoryUsed += numBytes + true + } else { + false + } + } + + /** + * Acquire N bytes of memory for storage. + * @return whether all N bytes are successfully granted. + */ + override def acquireStorageMemory(numBytes: Long): Boolean = { + if (storageMemoryUsed + numBytes <= maxStorageMemory) { + storageMemoryUsed += numBytes + true + } else { + false + } + } + + /** + * Release N bytes of execution memory. + */ + override def releaseExecutionMemory(numBytes: Long): Unit = { + executionMemoryUsed -= numBytes + } + + /** + * Release N bytes of storage memory. + */ + override def releaseStorageMemory(numBytes: Long): Unit = { + storageMemoryUsed -= numBytes + } + +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index a0d8abc2eecb3..060eeb9fef438 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -158,7 +158,7 @@ private[spark] object ShuffleMemoryManager { * of the memory pool and a safety factor since collections can sometimes grow bigger than * the size we target before we estimate their sizes again. */ - private def getMaxMemory(conf: SparkConf): Long = { + private[spark] def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47bd2ef8b2941..550cd43e3b782 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1268,7 +1268,7 @@ private[spark] object BlockManager extends Logging { private val ID_GENERATOR = new IdGenerator /** Return the total amount of storage memory available. */ - private def getMaxMemory(conf: SparkConf): Long = { + private[spark] def getMaxMemory(conf: SparkConf): Long = { val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong From b5df241669f6679742ec782d41b96174967f3911 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 30 Sep 2015 18:32:48 -0700 Subject: [PATCH 02/13] Derive max memory from MemoryManager This commit refactors BlockManager and ShuffleMemoryManager to take in a MemoryManager in their constructors, such that the max memory is derived from the MemoryManager instead of from a Long that is passed around everywhere. Note that no memory acquisition or release goes through the MemoryManager yet. This will come in future commits. --- .../org/apache/spark/MemoryManager.scala | 10 ++++ .../scala/org/apache/spark/SparkEnv.scala | 6 +-- .../apache/spark/StaticMemoryManager.scala | 46 +++++++++++++++---- .../spark/shuffle/ShuffleMemoryManager.scala | 37 ++++++++------- .../apache/spark/storage/BlockManager.scala | 31 ++----------- .../apache/spark/storage/MemoryStore.scala | 5 +- .../BlockManagerReplicationSuite.scala | 29 +++++++----- .../spark/storage/BlockManagerSuite.scala | 12 +++-- .../execution/TestShuffleMemoryManager.scala | 13 +++++- .../streaming/ReceivedBlockHandlerSuite.scala | 7 ++- 10 files changed, 118 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/MemoryManager.scala index 38e9a7e5bdd87..dce57d57840ef 100644 --- a/core/src/main/scala/org/apache/spark/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/MemoryManager.scala @@ -26,6 +26,16 @@ package org.apache.spark */ private[spark] abstract class MemoryManager { + /** + * Total available memory for execution, in bytes. + */ + def maxExecutionMemory: Long + + /** + * Total available memory for storage, in bytes. + */ + def maxStorageMemory: Long + /** * Acquire N bytes of memory for execution. * @return whether all N bytes are successfully granted. diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index dc90eba36ba54..f0e30f491587e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -326,7 +326,7 @@ object SparkEnv extends Logging { val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val memoryManager = new StaticMemoryManager(conf) - val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores) + val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores) val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) @@ -337,8 +337,8 @@ object SparkEnv extends Logging { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, - serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, - numUsableCores) + serializer, conf, memoryManager, mapOutputTracker, shuffleManager, + blockTransferService, securityManager, numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala index 6dfc87d8a1da0..e60638ad3dac0 100644 --- a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala @@ -17,9 +17,6 @@ package org.apache.spark -import org.apache.spark.shuffle.ShuffleMemoryManager -import org.apache.spark.storage.BlockManager - /** * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. @@ -28,18 +25,28 @@ import org.apache.spark.storage.BlockManager * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two * regions are cleanly separated such that neither usage can borrow memory from the other. */ -private[spark] class StaticMemoryManager(conf: SparkConf) extends MemoryManager { - private val maxExecutionMemory = ShuffleMemoryManager.getMaxMemory(conf) - private val maxStorageMemory = BlockManager.getMaxMemory(conf) +private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) extends MemoryManager { + private val _maxExecutionMemory: Long = StaticMemoryManager.getMaxExecutionMemory(conf) + private val _maxStorageMemory: Long = StaticMemoryManager.getMaxStorageMemory(conf) @volatile private var executionMemoryUsed: Long = 0 @volatile private var storageMemoryUsed: Long = 0 + /** + * Total available memory for execution, in bytes. + */ + override def maxExecutionMemory: Long = _maxExecutionMemory + + /** + * Total available memory for storage, in bytes. + */ + override def maxStorageMemory: Long = _maxStorageMemory + /** * Acquire N bytes of memory for execution. * @return whether all N bytes are successfully granted. */ override def acquireExecutionMemory(numBytes: Long): Boolean = { - if (executionMemoryUsed + numBytes <= maxExecutionMemory) { + if (executionMemoryUsed + numBytes <= _maxExecutionMemory) { executionMemoryUsed += numBytes true } else { @@ -52,7 +59,7 @@ private[spark] class StaticMemoryManager(conf: SparkConf) extends MemoryManager * @return whether all N bytes are successfully granted. */ override def acquireStorageMemory(numBytes: Long): Boolean = { - if (storageMemoryUsed + numBytes <= maxStorageMemory) { + if (storageMemoryUsed + numBytes <= _maxStorageMemory) { storageMemoryUsed += numBytes true } else { @@ -75,3 +82,26 @@ private[spark] class StaticMemoryManager(conf: SparkConf) extends MemoryManager } } + +private object StaticMemoryManager { + + /** + * Return the total amount of memory available for the storage region, in bytes. + */ + private def getMaxStorageMemory(conf: SparkConf): Long = { + val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) + val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + + /** + * Return the total amount of memory available for the execution region, in bytes. + */ + private def getMaxExecutionMemory(conf: SparkConf): Long = { + val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) + val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) + (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 060eeb9fef438..0df92fba682a6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.spark.unsafe.array.ByteArrayMethods -import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext} +import org.apache.spark._ /** * Allocates a pool of memory to tasks for use in shuffle operations. Each disk-spilling @@ -40,16 +40,17 @@ import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext} * * Use `ShuffleMemoryManager.create()` factory method to create a new instance. * - * @param maxMemory total amount of memory available for execution, in bytes. + * @param memoryManager the interface through which this manager acquires execution memory * @param pageSizeBytes number of bytes for each page, by default. */ private[spark] class ShuffleMemoryManager protected ( - val maxMemory: Long, + memoryManager: MemoryManager, val pageSizeBytes: Long) extends Logging { private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes + private val maxMemory = memoryManager.maxExecutionMemory private def currentTaskAttemptId(): Long = { // In case this is called on the driver, return an invalid task attempt id. @@ -138,30 +139,28 @@ class ShuffleMemoryManager protected ( private[spark] object ShuffleMemoryManager { - def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = { - val maxMemory = ShuffleMemoryManager.getMaxMemory(conf) + def create( + conf: SparkConf, + memoryManager: MemoryManager, + numCores: Int): ShuffleMemoryManager = { + val maxMemory = memoryManager.maxExecutionMemory val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores) - new ShuffleMemoryManager(maxMemory, pageSize) + new ShuffleMemoryManager(memoryManager, pageSize) } + /** + * Create a dummy [[ShuffleMemoryManager]] with the specified capacity and page size. + */ def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = { - new ShuffleMemoryManager(maxMemory, pageSizeBytes) + val memoryManager = new StaticMemoryManager { + override def maxExecutionMemory: Long = maxMemory + } + new ShuffleMemoryManager(memoryManager, pageSizeBytes) } @VisibleForTesting def createForTesting(maxMemory: Long): ShuffleMemoryManager = { - new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024) - } - - /** - * Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction - * of the memory pool and a safety factor since collections can sometimes grow bigger than - * the size we target before we estimate their sizes again. - */ - private[spark] def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) - val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + create(maxMemory, 4 * 1024 * 1024) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 550cd43e3b782..2fed714bc0e9b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -64,8 +64,8 @@ private[spark] class BlockManager( rpcEnv: RpcEnv, val master: BlockManagerMaster, defaultSerializer: Serializer, - maxMemory: Long, val conf: SparkConf, + memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, @@ -82,13 +82,15 @@ private[spark] class BlockManager( // Actual storage of where blocks are kept private var externalBlockStoreInitialized = false - private[spark] val memoryStore = new MemoryStore(this, maxMemory) + private[spark] val memoryStore = new MemoryStore(this, memoryManager) private[spark] val diskStore = new DiskStore(this, diskBlockManager) private[spark] lazy val externalBlockStore: ExternalBlockStore = { externalBlockStoreInitialized = true new ExternalBlockStore(this, executorId) } + private val maxMemory = memoryManager.maxStorageMemory + private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -157,24 +159,6 @@ private[spark] class BlockManager( * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) - /** - * Construct a BlockManager with a memory limit set based on system properties. - */ - def this( - execId: String, - rpcEnv: RpcEnv, - master: BlockManagerMaster, - serializer: Serializer, - conf: SparkConf, - mapOutputTracker: MapOutputTracker, - shuffleManager: ShuffleManager, - blockTransferService: BlockTransferService, - securityManager: SecurityManager, - numUsableCores: Int) = { - this(execId, rpcEnv, master, serializer, BlockManager.getMaxMemory(conf), - conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) - } - /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, @@ -1267,13 +1251,6 @@ private[spark] class BlockManager( private[spark] object BlockManager extends Logging { private val ID_GENERATOR = new IdGenerator - /** Return the total amount of storage memory available. */ - private[spark] def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) - val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong - } - /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that * might cause errors if one attempts to read from the unmapped buffer, but it's better than diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 6f27f00307f8c..e51c84762db72 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -23,7 +23,7 @@ import java.util.LinkedHashMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.TaskContext +import org.apache.spark.{MemoryManager, TaskContext} import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -33,11 +33,12 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) * Stores blocks in memory, either as Arrays of deserialized Java objects or as * serialized ByteBuffers. */ -private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) +private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager) extends BlockStore(blockManager) { private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + private val maxMemory = memoryManager.maxStorageMemory @volatile private var currentMemory = 0L diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index eb5af70d57aec..d12b534ccf83a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -39,29 +39,34 @@ import org.apache.spark.storage.StorageLevel._ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter { private val conf = new SparkConf(false).set("spark.app.id", "test") - var rpcEnv: RpcEnv = null - var master: BlockManagerMaster = null - val securityMgr = new SecurityManager(conf) - val mapOutputTracker = new MapOutputTrackerMaster(conf) - val shuffleManager = new HashShuffleManager(conf) + private var rpcEnv: RpcEnv = null + private var master: BlockManagerMaster = null + private val securityMgr = new SecurityManager(conf) + private val mapOutputTracker = new MapOutputTrackerMaster(conf) + private val shuffleManager = new HashShuffleManager(conf) + private val memoryManager = new StaticMemoryManager(conf) // List of block manager created during an unit test, so that all of the them can be stopped // after the unit test. - val allStores = new ArrayBuffer[BlockManager] + private val allStores = new ArrayBuffer[BlockManager] // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer", "1m") - val serializer = new KryoSerializer(conf) + private val serializer = new KryoSerializer(conf) // Implicitly convert strings to BlockIds for test clarity. - implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) + implicit private def StringToBlockId(value: String): BlockId = new TestBlockId(value) + + private def makeMemoryManager(maxMem: Long) = new StaticMemoryManager(conf) { + override def maxStorageMemory: Long = maxMem + } private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val store = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + val store = new BlockManager(name, rpcEnv, master, serializer, conf, + makeMemoryManager(maxMem), mapOutputTracker, shuffleManager, transfer, securityMgr, 0) store.initialize("app-id") allStores += store store @@ -258,8 +263,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) - val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, - 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) + val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf, + makeMemoryManager(10000), mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) failableStore.initialize("app-id") allStores += failableStore // so that this gets stopped after test assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 34bb4952e7246..ec45e894312d0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -63,12 +63,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) + private def makeMemoryManager(maxMem: Long) = new StaticMemoryManager(conf) { + override def maxStorageMemory: Long = maxMem + } + private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val manager = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + val manager = new BlockManager(name, rpcEnv, master, serializer, conf, + makeMemoryManager(maxMem), mapOutputTracker, shuffleManager, transfer, securityMgr, 0) manager.initialize("app-id") manager } @@ -821,8 +825,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Use Java serializer so we can create an unserializable error. val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, - new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr, - 0) + new JavaSerializer(conf), conf, makeMemoryManager(1200), mapOutputTracker, + shuffleManager, transfer, securityMgr, 0) // The put should fail since a1 is not serializable. class UnserializableClass diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 48c3938ff87ba..7778ab6b8df77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.execution +import org.apache.spark.MemoryManager import org.apache.spark.shuffle.ShuffleMemoryManager /** * A [[ShuffleMemoryManager]] that can be controlled to run out of memory. */ -class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) { +class TestShuffleMemoryManager + extends ShuffleMemoryManager(new GrantEverythingMemoryManager, 4 * 1024 * 1024) { private var oom = false override def tryToAcquire(numBytes: Long): Long = { @@ -49,3 +51,12 @@ class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1 oom = true } } + +private class GrantEverythingMemoryManager extends MemoryManager { + override def maxExecutionMemory: Long = Long.MaxValue + override def maxStorageMemory: Long = Long.MaxValue + override def acquireExecutionMemory(numBytes: Long): Boolean = true + override def acquireStorageMemory(numBytes: Long): Boolean = true + override def releaseExecutionMemory(numBytes: Long): Unit = { } + override def releaseStorageMemory(numBytes: Long): Unit = { } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 13cfe29d7b304..d97c477450b51 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -253,9 +253,12 @@ class ReceivedBlockHandlerSuite maxMem: Long, conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + val memoryManager = new StaticMemoryManager(conf) { + override val maxStorageMemory: Long = maxMem + } val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) manager.initialize("app-id") blockManagerBuffer += manager manager From 700e5d12c2f644a48f067f0e344f46aeabbc40c7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 1 Oct 2015 13:29:17 -0700 Subject: [PATCH 03/13] Route ShuffleMemoryManager through new interface All execution memory is now acquired through the new MemoryManager interface as of this commit. The next step is to do it for storage memory. --- .../org/apache/spark/MemoryManager.scala | 8 +-- .../apache/spark/StaticMemoryManager.scala | 63 +++++++++++++------ .../spark/shuffle/ShuffleMemoryManager.scala | 33 +++++++--- .../execution/TestShuffleMemoryManager.scala | 4 +- 4 files changed, 73 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/MemoryManager.scala index dce57d57840ef..4c922e26f0661 100644 --- a/core/src/main/scala/org/apache/spark/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/MemoryManager.scala @@ -38,15 +38,15 @@ private[spark] abstract class MemoryManager { /** * Acquire N bytes of memory for execution. - * @return whether all N bytes are successfully granted. + * @return whether the number bytes successfully granted (<= N). */ - def acquireExecutionMemory(numBytes: Long): Boolean + def acquireExecutionMemory(numBytes: Long): Long /** * Acquire N bytes of memory for storage. - * @return whether all N bytes are successfully granted. + * @return whether the number bytes successfully granted (<= N). */ - def acquireStorageMemory(numBytes: Long): Boolean + def acquireStorageMemory(numBytes: Long): Long /** * Release N bytes of execution memory. diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala index e60638ad3dac0..7b735458936eb 100644 --- a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala @@ -25,11 +25,19 @@ package org.apache.spark * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two * regions are cleanly separated such that neither usage can borrow memory from the other. */ -private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) extends MemoryManager { +private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) + extends MemoryManager with Logging { + private val _maxExecutionMemory: Long = StaticMemoryManager.getMaxExecutionMemory(conf) private val _maxStorageMemory: Long = StaticMemoryManager.getMaxStorageMemory(conf) - @volatile private var executionMemoryUsed: Long = 0 - @volatile private var storageMemoryUsed: Long = 0 + private val executionMemoryLock = new Object + private val storageMemoryLock = new Object + + // All accesses must be synchronized on `executionMemoryLock` + private var executionMemoryUsed: Long = 0 + + // All accesses must be synchronized on `storageMemoryLock` + private var storageMemoryUsed: Long = 0 /** * Total available memory for execution, in bytes. @@ -43,27 +51,27 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) extend /** * Acquire N bytes of memory for execution. - * @return whether all N bytes are successfully granted. + * @return whether the number bytes successfully granted (<= N). */ - override def acquireExecutionMemory(numBytes: Long): Boolean = { - if (executionMemoryUsed + numBytes <= _maxExecutionMemory) { - executionMemoryUsed += numBytes - true - } else { - false + override def acquireExecutionMemory(numBytes: Long): Long = { + executionMemoryLock.synchronized { + assert(_maxExecutionMemory >= executionMemoryUsed) + val bytesToGrant = math.min(numBytes, _maxExecutionMemory - executionMemoryUsed) + executionMemoryUsed += bytesToGrant + bytesToGrant } } /** * Acquire N bytes of memory for storage. - * @return whether all N bytes are successfully granted. + * @return whether the number bytes successfully granted (<= N). */ - override def acquireStorageMemory(numBytes: Long): Boolean = { - if (storageMemoryUsed + numBytes <= _maxStorageMemory) { - storageMemoryUsed += numBytes - true - } else { - false + override def acquireStorageMemory(numBytes: Long): Long = { + storageMemoryLock.synchronized { + assert(_maxStorageMemory >= storageMemoryUsed) + val bytesToGrant = math.min(numBytes, _maxStorageMemory - storageMemoryUsed) + storageMemoryUsed += bytesToGrant + bytesToGrant } } @@ -71,18 +79,35 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) extend * Release N bytes of execution memory. */ override def releaseExecutionMemory(numBytes: Long): Unit = { - executionMemoryUsed -= numBytes + executionMemoryLock.synchronized { + if (numBytes > executionMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of execution " + + s"memory when we only have $executionMemoryUsed bytes") + executionMemoryUsed = 0 + } else { + executionMemoryUsed -= numBytes + } + } } /** * Release N bytes of storage memory. */ override def releaseStorageMemory(numBytes: Long): Unit = { - storageMemoryUsed -= numBytes + storageMemoryLock.synchronized { + if (numBytes > storageMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of storage " + + s"memory when we only have $storageMemoryUsed bytes") + storageMemoryUsed = 0 + } else { + storageMemoryUsed -= numBytes + } + } } } + private object StaticMemoryManager { /** diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 0df92fba682a6..6a6985739718b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -63,6 +63,8 @@ class ShuffleMemoryManager protected ( * in some situations, to make sure each task has a chance to ramp up to at least 1 / 2N of the * total memory pool (where N is the # of active tasks) before it is forced to spill. This can * happen if the number of tasks increases but an older task had a lot of memory already. + * + * @return `numBytes` if all bytes are acquired, else 0. */ def tryToAcquire(numBytes: Long): Long = synchronized { val taskAttemptId = currentTaskAttemptId() @@ -72,7 +74,7 @@ class ShuffleMemoryManager protected ( // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire if (!taskMemory.contains(taskAttemptId)) { taskMemory(taskAttemptId) = 0L - notifyAll() // Will later cause waiting tasks to wake up and check numThreads again + notifyAll() // Will later cause waiting tasks to wake up and check numTasks again } // Keep looping until we're either sure that we don't want to grant this request (because this @@ -86,46 +88,57 @@ class ShuffleMemoryManager protected ( // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks; // don't let it be negative val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / numActiveTasks) - curMem)) + // Only give it as much memory as is free, which might be none if it reached 1 / numTasks + val toGrant = math.min(maxToGrant, freeMemory) if (curMem < maxMemory / (2 * numActiveTasks)) { // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; // if we can't give it this much now, wait for other tasks to free up memory // (this happens if older tasks allocated lots of memory before N grew) if (freeMemory >= math.min(maxToGrant, maxMemory / (2 * numActiveTasks) - curMem)) { - val toGrant = math.min(maxToGrant, freeMemory) - taskMemory(taskAttemptId) += toGrant - return toGrant + return acquire(toGrant) } else { logInfo( s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free") wait() } } else { - // Only give it as much memory as is free, which might be none if it reached 1 / numThreads - val toGrant = math.min(maxToGrant, freeMemory) - taskMemory(taskAttemptId) += toGrant - return toGrant + return acquire(toGrant) } } 0L // Never reached } + /** + * Acquire numBytes bytes for the current task from the memory manager. + * @return number of bytes actually acquired. + */ + private def acquire(numBytes: Long): Long = synchronized { + val taskAttemptId = currentTaskAttemptId() + val acquired = memoryManager.acquireExecutionMemory(numBytes) + taskMemory(taskAttemptId) += acquired + acquired + } + /** Release numBytes bytes for the current task. */ def release(numBytes: Long): Unit = synchronized { val taskAttemptId = currentTaskAttemptId() val curMem = taskMemory.getOrElse(taskAttemptId, 0L) if (curMem < numBytes) { throw new SparkException( - s"Internal error: release called on ${numBytes} bytes but task only has ${curMem}") + s"Internal error: release called on $numBytes bytes but task only has $curMem") } taskMemory(taskAttemptId) -= numBytes + memoryManager.releaseExecutionMemory(numBytes) notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed } /** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */ def releaseMemoryForThisTask(): Unit = synchronized { val taskAttemptId = currentTaskAttemptId() - taskMemory.remove(taskAttemptId) + taskMemory.remove(taskAttemptId).foreach { numBytes => + memoryManager.releaseExecutionMemory(numBytes) + } notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 7778ab6b8df77..190ec28081320 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -55,8 +55,8 @@ class TestShuffleMemoryManager private class GrantEverythingMemoryManager extends MemoryManager { override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue - override def acquireExecutionMemory(numBytes: Long): Boolean = true - override def acquireStorageMemory(numBytes: Long): Boolean = true + override def acquireExecutionMemory(numBytes: Long): Long = numBytes + override def acquireStorageMemory(numBytes: Long): Long = numBytes override def releaseExecutionMemory(numBytes: Long): Unit = { } override def releaseStorageMemory(numBytes: Long): Unit = { } } From 933de614df6e84d7776aa1d11ae1d82042f3f6dd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 5 Oct 2015 15:29:38 -0700 Subject: [PATCH 04/13] Route MemoryStore through new interface All storage memory is now acquired through the new MemoryManager interface. This requires non-trivial refactoring due to the fact that the unrolling and eviction logic are closely intertwined. --- .../org/apache/spark/MemoryManager.scala | 60 +++-- .../apache/spark/StaticMemoryManager.scala | 163 ++++++++++---- .../spark/shuffle/ShuffleMemoryManager.scala | 5 +- .../apache/spark/storage/MemoryStore.scala | 207 +++++++++--------- .../BlockManagerReplicationSuite.scala | 13 +- .../spark/storage/BlockManagerSuite.scala | 36 +-- .../execution/TestShuffleMemoryManager.scala | 20 +- .../streaming/ReceivedBlockHandlerSuite.scala | 15 +- 8 files changed, 329 insertions(+), 190 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/MemoryManager.scala index 4c922e26f0661..a70a58a70202b 100644 --- a/core/src/main/scala/org/apache/spark/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/MemoryManager.scala @@ -17,6 +17,11 @@ package org.apache.spark +import scala.collection.mutable + +import org.apache.spark.storage.{BlockId, BlockStatus} + + /** * An abstract memory manager that enforces how memory is shared between execution and storage. * @@ -27,26 +32,30 @@ package org.apache.spark private[spark] abstract class MemoryManager { /** - * Total available memory for execution, in bytes. - */ - def maxExecutionMemory: Long - - /** - * Total available memory for storage, in bytes. + * Acquire N bytes of memory for execution. + * @return number of bytes successfully granted (<= N). */ - def maxStorageMemory: Long + def acquireExecutionMemory(numBytes: Long): Long /** - * Acquire N bytes of memory for execution. - * @return whether the number bytes successfully granted (<= N). + * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return number of bytes successfully granted (0 or N). */ - def acquireExecutionMemory(numBytes: Long): Long + def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long /** - * Acquire N bytes of memory for storage. - * @return whether the number bytes successfully granted (<= N). + * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return number of bytes successfully granted (<= N). */ - def acquireStorageMemory(numBytes: Long): Long + def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long /** * Release N bytes of execution memory. @@ -58,4 +67,29 @@ private[spark] abstract class MemoryManager { */ def releaseStorageMemory(numBytes: Long): Unit + /** + * Release N bytes of unroll memory. + */ + def releaseUnrollMemory(numBytes: Long): Unit + + /** + * Total available memory for execution, in bytes. + */ + def maxExecutionMemory: Long + + /** + * Total available memory for storage, in bytes. + */ + def maxStorageMemory: Long + + /** + * Execution memory currently in use, in bytes. + */ + def executionMemoryUsed: Long + + /** + * Storage memory currently in use, in bytes. + */ + def storageMemoryUsed: Long + } diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala index 7b735458936eb..c4c1d3e73e6dd 100644 --- a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala @@ -17,6 +17,10 @@ package org.apache.spark +import scala.collection.mutable + +import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} + /** * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. @@ -25,52 +29,112 @@ package org.apache.spark * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two * regions are cleanly separated such that neither usage can borrow memory from the other. */ -private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) +private[spark] class StaticMemoryManager( + conf: SparkConf, + override val maxExecutionMemory: Long, + override val maxStorageMemory: Long) extends MemoryManager with Logging { - private val _maxExecutionMemory: Long = StaticMemoryManager.getMaxExecutionMemory(conf) - private val _maxStorageMemory: Long = StaticMemoryManager.getMaxStorageMemory(conf) - private val executionMemoryLock = new Object - private val storageMemoryLock = new Object + // Max number of bytes worth of blocks to evict when unrolling + private val maxMemoryToEvictForUnroll: Long = { + (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong + } + + // Amount of execution memory in use. Accesses must be synchronized on `executionLock`. + private var _executionMemoryUsed: Long = 0 + private val executionLock = new Object + + // Amount of storage memory in use. Accesses must be synchronized on `storageLock`. + private var _storageMemoryUsed: Long = 0 + private val storageLock = new Object + + // The memory store used to evict cached blocks + private var _memoryStore: MemoryStore = _ + private def memoryStore: MemoryStore = { + if (_memoryStore == null) { + _memoryStore = SparkEnv.get.blockManager.memoryStore + } + _memoryStore + } - // All accesses must be synchronized on `executionMemoryLock` - private var executionMemoryUsed: Long = 0 + // For testing only + def setMemoryStore(store: MemoryStore): Unit = { + _memoryStore = store + } - // All accesses must be synchronized on `storageMemoryLock` - private var storageMemoryUsed: Long = 0 + def this(conf: SparkConf) { + this( + conf, + StaticMemoryManager.getMaxExecutionMemory(conf), + StaticMemoryManager.getMaxStorageMemory(conf)) + } /** - * Total available memory for execution, in bytes. + * Acquire N bytes of memory for execution. + * @return number of bytes successfully granted (<= N). */ - override def maxExecutionMemory: Long = _maxExecutionMemory + override def acquireExecutionMemory(numBytes: Long): Long = { + executionLock.synchronized { + assert(_executionMemoryUsed <= maxExecutionMemory) + val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed) + _executionMemoryUsed += bytesToGrant + bytesToGrant + } + } /** - * Total available memory for storage, in bytes. + * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return number of bytes successfully granted (0 or N). */ - override def maxStorageMemory: Long = _maxStorageMemory + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks) + } /** - * Acquire N bytes of memory for execution. - * @return whether the number bytes successfully granted (<= N). + * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. + * + * This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage + * space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any, + * are added to `evictedBlocks`. + * + * @return number of bytes successfully granted (0 or N). */ - override def acquireExecutionMemory(numBytes: Long): Long = { - executionMemoryLock.synchronized { - assert(_maxExecutionMemory >= executionMemoryUsed) - val bytesToGrant = math.min(numBytes, _maxExecutionMemory - executionMemoryUsed) - executionMemoryUsed += bytesToGrant - bytesToGrant + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + storageLock.synchronized { + val currentUnrollMemory = memoryStore.currentUnrollMemory + val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) + val numBytesToFree = math.min(numBytes, maxNumBytesToFree) + acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } } /** - * Acquire N bytes of memory for storage. - * @return whether the number bytes successfully granted (<= N). + * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary. + * + * @param blockId the ID of the block we are acquiring storage memory for + * @param numBytesToAcquire the size of this block + * @param numBytesToFree the size of space to be freed through evicting blocks + * @param evictedBlocks a holder for blocks evicted in the process + * @return number of bytes successfully granted (0 or N). */ - override def acquireStorageMemory(numBytes: Long): Long = { - storageMemoryLock.synchronized { - assert(_maxStorageMemory >= storageMemoryUsed) - val bytesToGrant = math.min(numBytes, _maxStorageMemory - storageMemoryUsed) - storageMemoryUsed += bytesToGrant + private def acquireStorageMemory( + blockId: BlockId, + numBytesToAcquire: Long, + numBytesToFree: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + storageLock.synchronized { + memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) + assert(_storageMemoryUsed <= maxStorageMemory) + val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory + val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0 + _storageMemoryUsed += bytesToGrant bytesToGrant } } @@ -79,13 +143,13 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) * Release N bytes of execution memory. */ override def releaseExecutionMemory(numBytes: Long): Unit = { - executionMemoryLock.synchronized { - if (numBytes > executionMemoryUsed) { + executionLock.synchronized { + if (numBytes > _executionMemoryUsed) { logWarning(s"Attempted to release $numBytes bytes of execution " + - s"memory when we only have $executionMemoryUsed bytes") - executionMemoryUsed = 0 + s"memory when we only have ${_executionMemoryUsed} bytes") + _executionMemoryUsed = 0 } else { - executionMemoryUsed -= numBytes + _executionMemoryUsed -= numBytes } } } @@ -94,21 +158,42 @@ private[spark] class StaticMemoryManager(conf: SparkConf = new SparkConf) * Release N bytes of storage memory. */ override def releaseStorageMemory(numBytes: Long): Unit = { - storageMemoryLock.synchronized { - if (numBytes > storageMemoryUsed) { + storageLock.synchronized { + if (numBytes > _storageMemoryUsed) { logWarning(s"Attempted to release $numBytes bytes of storage " + - s"memory when we only have $storageMemoryUsed bytes") - storageMemoryUsed = 0 + s"memory when we only have ${_storageMemoryUsed} bytes") + _storageMemoryUsed = 0 } else { - storageMemoryUsed -= numBytes + _storageMemoryUsed -= numBytes } } } + /** + * Release N bytes of unroll memory. + */ + override def releaseUnrollMemory(numBytes: Long): Unit = { + releaseStorageMemory(numBytes) + } + + /** + * Amount of execution memory currently in use, in bytes. + */ + override def executionMemoryUsed: Long = executionLock.synchronized { + _executionMemoryUsed + } + + /** + * Amount of storage memory currently in use, in bytes. + */ + override def storageMemoryUsed: Long = storageLock.synchronized { + _storageMemoryUsed + } + } -private object StaticMemoryManager { +private[spark] object StaticMemoryManager { /** * Return the total amount of memory available for the storage region, in bytes. diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 6a6985739718b..320fddd18b558 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -165,9 +165,8 @@ private[spark] object ShuffleMemoryManager { * Create a dummy [[ShuffleMemoryManager]] with the specified capacity and page size. */ def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = { - val memoryManager = new StaticMemoryManager { - override def maxExecutionMemory: Long = maxMemory - } + val conf = new SparkConf + val memoryManager = new StaticMemoryManager(conf, maxMemory, Long.MaxValue) new ShuffleMemoryManager(memoryManager, pageSizeBytes) } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index e51c84762db72..89bbe59f83be8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -40,8 +40,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) private val maxMemory = memoryManager.maxStorageMemory - @volatile private var currentMemory = 0L - // Ensure only one thread is putting, and if necessary, dropping blocks at any given time private val accountingLock = new Object @@ -57,15 +55,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // memory (SPARK-4777). private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() - /** - * The amount of space ensured for unrolling values in memory, shared across all cores. - * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. - */ - private val maxUnrollMemory: Long = { - val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) - (maxMemory * unrollFraction).toLong - } - // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) @@ -78,8 +67,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) - /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ - def freeMemory: Long = maxMemory - currentMemory + /** Total storage memory used including unroll memory, in bytes. */ + private def memoryUsed: Long = memoryManager.storageMemoryUsed + + /** + * Amount of storage memory, in bytes, used for caching blocks. + * This does not include memory used for unrolling. + */ + private def blocksMemoryUsed: Long = memoryUsed - currentUnrollMemory override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -213,8 +208,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo entries.synchronized { val entry = entries.remove(blockId) if (entry != null) { - currentMemory -= entry.size - logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") + memoryManager.releaseStorageMemory(entry.size) + logDebug(s"Block $blockId of size ${entry.size} dropped " + + s"from memory (free ${maxMemory - blocksMemoryUsed})") true } else { false @@ -225,7 +221,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo override def clear() { entries.synchronized { entries.clear() - currentMemory = 0 } logInfo("MemoryStore cleared") } @@ -266,7 +261,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo var vector = new SizeTrackingVector[Any] // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(initialMemoryThreshold) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -282,20 +277,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - // Hold the accounting lock, in case another thread concurrently puts a block that - // takes up the unrolling space we just ensured here - accountingLock.synchronized { - if (!reserveUnrollMemoryForThisTask(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val spaceToEnsure = maxUnrollMemory - currentUnrollMemory - if (spaceToEnsure > 0) { - val result = ensureFreeSpace(blockId, spaceToEnsure) - droppedBlocks ++= result.droppedBlocks - } - keepUnrolling = reserveUnrollMemoryForThisTask(amountToRequest) - } - } + keepUnrolling = reserveUnrollMemoryForThisTask( + blockId, amountToRequest, droppedBlocks) // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } @@ -321,7 +304,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo accountingLock.synchronized { val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved releaseUnrollMemoryForThisTask(amountToRelease) - reservePendingUnrollMemoryForThisTask(amountToRelease) + reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks) } } } @@ -369,24 +352,24 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * for freeing up more space for another block that needs to be put. Only then the actually * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] accountingLock.synchronized { - val freeSpaceResult = ensureFreeSpace(blockId, size) - val enoughFreeSpace = freeSpaceResult.success - droppedBlocks ++= freeSpaceResult.droppedBlocks - - if (enoughFreeSpace) { + // Note: if we have previously unrolled this block successfully, then pending unroll + // memory should be non-zero. This is the amount that we already reserved during the + // unrolling process. In this case, we can just reuse this space to cache our block. + releasePendingUnrollMemoryForThisTask() + val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) + val enoughMemory = numBytesAcquired == size + if (enoughMemory) { + // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(value(), size, deserialized) entries.synchronized { entries.put(blockId, entry) - currentMemory += size } val valuesOrBytes = if (deserialized) "values" else "bytes" logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - putSuccess = true + blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. @@ -397,11 +380,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + memoryManager.releaseStorageMemory(numBytesAcquired) } - // Release the unroll memory used because we no longer need the underlying Array - releasePendingUnrollMemoryForThisTask() + ResultWithDroppedBlocks(enoughMemory, droppedBlocks) } - ResultWithDroppedBlocks(putSuccess, droppedBlocks) } /** @@ -410,40 +392,42 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping - * blocks. Otherwise, the freed space may fill up before the caller puts in their new value. - * - * Return whether there is enough free space, along with the blocks dropped in the process. + * @param blockId the ID of the block we are freeing space for + * @param space the size of this block + * @param evictedBlocks a holder for blocks evicted in the process + * @return whether there is enough free space */ - private def ensureFreeSpace( - blockIdToAdd: BlockId, - space: Long): ResultWithDroppedBlocks = { - logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") - - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + private[spark] def ensureFreeSpace( + blockId: BlockId, + space: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + accountingLock.synchronized { + val freeMemory = maxMemory - memoryUsed + val rddToAdd = getRddId(blockId) + val selectedBlocks = new ArrayBuffer[BlockId] + var selectedMemory = 0L - if (space > maxMemory) { - logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") - return ResultWithDroppedBlocks(success = false, droppedBlocks) - } + logInfo(s"Ensuring $space bytes of free space for block $blockId " + + s"(free: $freeMemory, max: $maxMemory)") - // Take into account the amount of memory currently occupied by unrolling blocks - // and minus the pending unroll memory for that block on current thread. - val taskAttemptId = currentTaskAttemptId() - val actualFreeMemory = freeMemory - currentUnrollMemory + - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + // Fail fast if the block simply won't fit + if (space > maxMemory) { + logInfo(s"Will not store $blockId as the required space " + + s"($space bytes) than our memory limit ($maxMemory bytes)") + return false + } - if (actualFreeMemory < space) { - val rddToAdd = getRddId(blockIdToAdd) - val selectedBlocks = new ArrayBuffer[BlockId] - var selectedMemory = 0L + // No need to evict anything if there is already enough free space + if (freeMemory >= space) { + return true + } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() - while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { + while (freeMemory + selectedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { @@ -453,7 +437,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - if (actualFreeMemory + selectedMemory >= space) { + if (freeMemory + selectedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } @@ -467,17 +451,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + droppedBlockStatus.foreach { status => evictedBlocks += ((blockId, status)) } } } - return ResultWithDroppedBlocks(success = true, droppedBlocks) + true } else { - logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + + logInfo(s"Will not store $blockId as it would require dropping another block " + "from the same RDD") - return ResultWithDroppedBlocks(success = false, droppedBlocks) + false } } - ResultWithDroppedBlocks(success = true, droppedBlocks) } override def contains(blockId: BlockId): Boolean = { @@ -490,17 +473,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } /** - * Reserve additional memory for unrolling blocks used by this task. - * Return whether the request is granted. + * Reserve memory for unrolling the given block for this task. + * @return whether the request is granted. */ - def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { + def reserveUnrollMemoryForThisTask( + blockId: BlockId, + memory: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { accountingLock.synchronized { - val granted = freeMemory > currentUnrollMemory + memory - if (granted) { + val acquired = memoryManager.acquireUnrollMemory(blockId, memory, evictedBlocks) + val success = acquired == memory + if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } - granted + success } } @@ -508,16 +495,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * Release memory used by this task for unrolling blocks. * If the amount is not specified, remove the current task's allocation altogether. */ - def releaseUnrollMemoryForThisTask(memory: Long = -1L): Unit = { + def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - if (memory < 0) { - unrollMemoryMap.remove(taskAttemptId) - } else { - unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, memory) - memory - // If this task claims no more unroll memory, release it completely - if (unrollMemoryMap(taskAttemptId) <= 0) { - unrollMemoryMap.remove(taskAttemptId) + if (unrollMemoryMap.contains(taskAttemptId)) { + val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) + if (memoryToRelease > 0) { + unrollMemoryMap(taskAttemptId) -= memoryToRelease + if (unrollMemoryMap(taskAttemptId) == 0) { + unrollMemoryMap.remove(taskAttemptId) + } + memoryManager.releaseUnrollMemory(memoryToRelease) } } } @@ -526,22 +514,40 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo /** * Reserve the unroll memory of current unroll successful block used by this task * until actually put the block into memory entry. + * @return whether the request is granted. */ - def reservePendingUnrollMemoryForThisTask(memory: Long): Unit = { + private def reservePendingUnrollMemoryForThisTask( + blockId: BlockId, + memory: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory + val acquired = memoryManager.acquireUnrollMemory(blockId, memory, evictedBlocks) + val success = acquired == memory + if (success) { + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory + } + success } } /** * Release pending unroll memory of current unroll successful block used by this task */ - def releasePendingUnrollMemoryForThisTask(): Unit = { + def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - pendingUnrollMemoryMap.remove(taskAttemptId) + if (pendingUnrollMemoryMap.contains(taskAttemptId)) { + val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) + if (memoryToRelease > 0) { + pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease + if (pendingUnrollMemoryMap(taskAttemptId) == 0) { + pendingUnrollMemoryMap.remove(taskAttemptId) + } + memoryManager.releaseUnrollMemory(memoryToRelease) + } + } } } @@ -562,19 +568,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo /** * Return the number of tasks currently unrolling blocks. */ - def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size } + private def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size } /** * Log information about current memory usage. */ - def logMemoryUsage(): Unit = { - val blocksMemory = currentMemory - val unrollMemory = currentUnrollMemory - val totalMemory = blocksMemory + unrollMemory + private def logMemoryUsage(): Unit = { logInfo( - s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " + - s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " + - s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(totalMemory)}. " + + s"Memory use = ${Utils.bytesToString(blocksMemoryUsed)} (blocks) + " + + s"${Utils.bytesToString(currentUnrollMemory)} (scratch space shared across " + + s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(memoryUsed)}. " + s"Storage limit = ${Utils.bytesToString(maxMemory)}." ) } @@ -585,7 +588,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * @param blockId ID of the block we are trying to unroll. * @param finalVectorSize Final size of the vector before unrolling failed. */ - def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = { + private def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: Long): Unit = { logWarning( s"Not enough space to cache $blockId in memory! " + s"(computed ${Utils.bytesToString(finalVectorSize)} so far)" diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d12b534ccf83a..7d260ca198b95 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -44,7 +44,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo private val securityMgr = new SecurityManager(conf) private val mapOutputTracker = new MapOutputTrackerMaster(conf) private val shuffleManager = new HashShuffleManager(conf) - private val memoryManager = new StaticMemoryManager(conf) // List of block manager created during an unit test, so that all of the them can be stopped // after the unit test. @@ -57,16 +56,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // Implicitly convert strings to BlockIds for test clarity. implicit private def StringToBlockId(value: String): BlockId = new TestBlockId(value) - private def makeMemoryManager(maxMem: Long) = new StaticMemoryManager(conf) { - override def maxStorageMemory: Long = maxMem - } - private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) + val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem) val store = new BlockManager(name, rpcEnv, master, serializer, conf, - makeMemoryManager(maxMem), mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memManager.setMemoryStore(store.memoryStore) store.initialize("app-id") allStores += store store @@ -263,8 +260,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000) val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf, - makeMemoryManager(10000), mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) + memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) + memManager.setMemoryStore(failableStore.memoryStore) failableStore.initialize("app-id") allStores += failableStore // so that this gets stopped after test assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ec45e894312d0..c95841c3b64fd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -63,18 +63,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) - private def makeMemoryManager(maxMem: Long) = new StaticMemoryManager(conf) { - override def maxStorageMemory: Long = maxMem - } - private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val manager = new BlockManager(name, rpcEnv, master, serializer, conf, - makeMemoryManager(maxMem), mapOutputTracker, shuffleManager, transfer, securityMgr, 0) - manager.initialize("app-id") - manager + val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem) + val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, + memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memManager.setMemoryStore(blockManager.memoryStore) + blockManager.initialize("app-id") + blockManager } override def beforeEach(): Unit = { @@ -824,9 +822,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block store put failure") { // Use Java serializer so we can create an unserializable error. val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) + val memoryManager = new StaticMemoryManager(conf, Long.MaxValue, 1200) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, - new JavaSerializer(conf), conf, makeMemoryManager(1200), mapOutputTracker, + new JavaSerializer(conf), conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memoryManager.setMemoryStore(store.memoryStore) // The put should fail since a1 is not serializable. class UnserializableClass @@ -1044,17 +1044,23 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("reserve/release unroll memory") { store = makeBlockManager(12000) val memoryStore = store.memoryStore + val dummyBlock = TestBlockId("") + val dummyEvictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] assert(memoryStore.currentUnrollMemory === 0) assert(memoryStore.currentUnrollMemoryForThisTask === 0) + def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { + memoryStore.reserveUnrollMemoryForThisTask(dummyBlock, memory, dummyEvictedBlocks) + } + // Reserve - memoryStore.reserveUnrollMemoryForThisTask(100) + assert(reserveUnrollMemoryForThisTask(100)) assert(memoryStore.currentUnrollMemoryForThisTask === 100) - memoryStore.reserveUnrollMemoryForThisTask(200) + assert(reserveUnrollMemoryForThisTask(200)) assert(memoryStore.currentUnrollMemoryForThisTask === 300) - memoryStore.reserveUnrollMemoryForThisTask(500) + assert(reserveUnrollMemoryForThisTask(500)) assert(memoryStore.currentUnrollMemoryForThisTask === 800) - memoryStore.reserveUnrollMemoryForThisTask(1000000) + assert(!reserveUnrollMemoryForThisTask(1000000)) assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted // Release memoryStore.releaseUnrollMemoryForThisTask(100) @@ -1062,9 +1068,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE memoryStore.releaseUnrollMemoryForThisTask(100) assert(memoryStore.currentUnrollMemoryForThisTask === 600) // Reserve again - memoryStore.reserveUnrollMemoryForThisTask(4400) + assert(reserveUnrollMemoryForThisTask(4400)) assert(memoryStore.currentUnrollMemoryForThisTask === 5000) - memoryStore.reserveUnrollMemoryForThisTask(20000) + assert(!reserveUnrollMemoryForThisTask(20000)) assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted // Release again memoryStore.releaseUnrollMemoryForThisTask(1000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 190ec28081320..cb8f8a24a8785 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.execution +import scala.collection.mutable + import org.apache.spark.MemoryManager import org.apache.spark.shuffle.ShuffleMemoryManager +import org.apache.spark.storage.{BlockId, BlockStatus} + /** * A [[ShuffleMemoryManager]] that can be controlled to run out of memory. @@ -53,10 +57,20 @@ class TestShuffleMemoryManager } private class GrantEverythingMemoryManager extends MemoryManager { - override def maxExecutionMemory: Long = Long.MaxValue - override def maxStorageMemory: Long = Long.MaxValue override def acquireExecutionMemory(numBytes: Long): Long = numBytes - override def acquireStorageMemory(numBytes: Long): Long = numBytes + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes override def releaseExecutionMemory(numBytes: Long): Unit = { } override def releaseStorageMemory(numBytes: Long): Unit = { } + override def releaseUnrollMemory(numBytes: Long): Unit = { } + override def maxExecutionMemory: Long = Long.MaxValue + override def maxStorageMemory: Long = Long.MaxValue + override def executionMemoryUsed: Long = Long.MaxValue + override def storageMemoryUsed: Long = Long.MaxValue } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index d97c477450b51..35726d33ee465 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -253,15 +253,14 @@ class ReceivedBlockHandlerSuite maxMem: Long, conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { - val memoryManager = new StaticMemoryManager(conf) { - override val maxStorageMemory: Long = maxMem - } + val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, - memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) - manager.initialize("app-id") - blockManagerBuffer += manager - manager + val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memManager.setMemoryStore(blockManager.memoryStore) + blockManager.initialize("app-id") + blockManagerBuffer += blockManager + blockManager } /** From e73e463ccaf8220387903b59b9a6ab71662d2ee7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 5 Oct 2015 16:57:41 -0700 Subject: [PATCH 05/13] Limit scope of synchronized blocks to avoid deadlocks --- .../org/apache/spark/MemoryManager.scala | 5 +++ .../apache/spark/StaticMemoryManager.scala | 34 +++++++++++++------ .../apache/spark/storage/MemoryStore.scala | 19 +++++------ .../execution/TestShuffleMemoryManager.scala | 1 + 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/MemoryManager.scala index a70a58a70202b..4e581ee1280e3 100644 --- a/core/src/main/scala/org/apache/spark/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/MemoryManager.scala @@ -67,6 +67,11 @@ private[spark] abstract class MemoryManager { */ def releaseStorageMemory(numBytes: Long): Unit + /** + * Release all storage memory acquired. + */ + def releaseStorageMemory(): Unit + /** * Release N bytes of unroll memory. */ diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala index c4c1d3e73e6dd..75285b32c5cbb 100644 --- a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala @@ -107,12 +107,10 @@ private[spark] class StaticMemoryManager( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { - storageLock.synchronized { - val currentUnrollMemory = memoryStore.currentUnrollMemory - val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) - val numBytesToFree = math.min(numBytes, maxNumBytesToFree) - acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks) - } + val currentUnrollMemory = memoryStore.currentUnrollMemory + val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) + val numBytesToFree = math.min(numBytes, maxNumBytesToFree) + acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } /** @@ -129,8 +127,9 @@ private[spark] class StaticMemoryManager( numBytesToAcquire: Long, numBytesToFree: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + // Note: Keep this outside synchronized block to avoid potential deadlocks! + memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) storageLock.synchronized { - memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) assert(_storageMemoryUsed <= maxStorageMemory) val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0 @@ -169,6 +168,15 @@ private[spark] class StaticMemoryManager( } } + /** + * Release all storage memory acquired. + */ + override def releaseStorageMemory(): Unit = { + storageLock.synchronized { + _storageMemoryUsed = 0 + } + } + /** * Release N bytes of unroll memory. */ @@ -179,15 +187,19 @@ private[spark] class StaticMemoryManager( /** * Amount of execution memory currently in use, in bytes. */ - override def executionMemoryUsed: Long = executionLock.synchronized { - _executionMemoryUsed + override def executionMemoryUsed: Long = { + executionLock.synchronized { + _executionMemoryUsed + } } /** * Amount of storage memory currently in use, in bytes. */ - override def storageMemoryUsed: Long = storageLock.synchronized { - _storageMemoryUsed + override def storageMemoryUsed: Long = { + storageLock.synchronized { + _storageMemoryUsed + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 89bbe59f83be8..c07356806f757 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -205,16 +205,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } override def remove(blockId: BlockId): Boolean = { - entries.synchronized { - val entry = entries.remove(blockId) - if (entry != null) { - memoryManager.releaseStorageMemory(entry.size) - logDebug(s"Block $blockId of size ${entry.size} dropped " + - s"from memory (free ${maxMemory - blocksMemoryUsed})") - true - } else { - false - } + val entry = entries.synchronized { entries.remove(blockId) } + if (entry != null) { + memoryManager.releaseStorageMemory(entry.size) + logDebug(s"Block $blockId of size ${entry.size} dropped " + + s"from memory (free ${maxMemory - blocksMemoryUsed})") + true + } else { + false } } @@ -222,6 +220,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo entries.synchronized { entries.clear() } + memoryManager.releaseStorageMemory() logInfo("MemoryStore cleared") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index cb8f8a24a8785..6f65ea549c220 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -68,6 +68,7 @@ private class GrantEverythingMemoryManager extends MemoryManager { evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes override def releaseExecutionMemory(numBytes: Long): Unit = { } override def releaseStorageMemory(numBytes: Long): Unit = { } + override def releaseStorageMemory(): Unit = { } override def releaseUnrollMemory(numBytes: Long): Unit = { } override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue From 52e201480e07974e9d13936d779b7c57e7c349a1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 5 Oct 2015 18:01:36 -0700 Subject: [PATCH 06/13] Remove ResultWithDroppedBlocks It was an awkward case class that didn't really do much. Instead we'll pass a mutable list into the methods to get the dropped blocks. --- .../spark/shuffle/ShuffleMemoryManager.scala | 6 +- .../apache/spark/storage/MemoryStore.scala | 58 ++++++++++--------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 320fddd18b558..201dbe6545727 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -63,8 +63,6 @@ class ShuffleMemoryManager protected ( * in some situations, to make sure each task has a chance to ramp up to at least 1 / 2N of the * total memory pool (where N is the # of active tasks) before it is forced to spill. This can * happen if the number of tasks increases but an older task had a lot of memory already. - * - * @return `numBytes` if all bytes are acquired, else 0. */ def tryToAcquire(numBytes: Long): Long = synchronized { val taskAttemptId = currentTaskAttemptId() @@ -110,8 +108,8 @@ class ShuffleMemoryManager protected ( } /** - * Acquire numBytes bytes for the current task from the memory manager. - * @return number of bytes actually acquired. + * Acquire N bytes of execution memory from the memory manager for the current task. + * @return number of bytes actually acquired (<= N). */ private def acquire(numBytes: Long): Long = synchronized { val taskAttemptId = currentTaskAttemptId() diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c07356806f757..4ad12ab7897e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -90,8 +90,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { - val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } } @@ -104,15 +105,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false) + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks) val data = - if (putAttempt.success) { + if (putSuccess) { assert(bytes.limit == size) Right(bytes.duplicate()) } else { null } - PutResult(size, data, putAttempt.droppedBlocks) + PutResult(size, data, droppedBlocks) } override def putArray( @@ -120,14 +122,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) - PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) + tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks) + PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) + tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) + PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } } @@ -320,8 +323,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, value: Any, size: Long, - deserialized: Boolean): ResultWithDroppedBlocks = { - tryToPut(blockId, () => value, size, deserialized) + deserialized: Boolean, + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + tryToPut(blockId, () => value, size, deserialized, droppedBlocks) } /** @@ -337,13 +341,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * blocks to free memory for one block, another thread may use up the freed space for * another block. * - * Return whether put was successful, along with the blocks dropped in the process. + * All blocks evicted in the process, if any, will be added to `droppedBlocks`. + * + * @return whether put was successful. */ private def tryToPut( blockId: BlockId, value: () => Any, size: Long, - deserialized: Boolean): ResultWithDroppedBlocks = { + deserialized: Boolean, + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has @@ -351,12 +358,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * for freeing up more space for another block that needs to be put. Only then the actually * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - accountingLock.synchronized { // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. + // This must be synchronized so the release and re-acquire can happen atomically. releasePendingUnrollMemoryForThisTask() val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) val enoughMemory = numBytesAcquired == size @@ -381,7 +387,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } memoryManager.releaseStorageMemory(numBytesAcquired) } - ResultWithDroppedBlocks(enoughMemory, droppedBlocks) + enoughMemory } } @@ -393,13 +399,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * @param blockId the ID of the block we are freeing space for * @param space the size of this block - * @param evictedBlocks a holder for blocks evicted in the process - * @return whether there is enough free space + * @param droppedBlocks a holder for blocks evicted in the process + * @return whether there is enough free space. */ private[spark] def ensureFreeSpace( blockId: BlockId, space: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { accountingLock.synchronized { val freeMemory = maxMemory - memoryUsed val rddToAdd = getRddId(blockId) @@ -450,7 +456,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => evictedBlocks += ((blockId, status)) } + droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } true @@ -478,9 +484,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo def reserveUnrollMemoryForThisTask( blockId: BlockId, memory: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, evictedBlocks) + val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) val success = acquired == memory if (success) { val taskAttemptId = currentTaskAttemptId() @@ -518,10 +524,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo private def reservePendingUnrollMemoryForThisTask( blockId: BlockId, memory: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, evictedBlocks) + val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) val success = acquired == memory if (success) { pendingUnrollMemoryMap(taskAttemptId) = @@ -595,7 +601,3 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo logMemoryUsage() } } - -private[spark] case class ResultWithDroppedBlocks( - success: Boolean, - droppedBlocks: Seq[(BlockId, BlockStatus)]) From 88faede150c83b197d17f0571fa2af2503a76179 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 6 Oct 2015 12:17:28 -0700 Subject: [PATCH 07/13] Add unit test for StaticMemoryManager --- .../spark/StaticMemoryManagerSuite.scala | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala diff --git a/core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala new file mode 100644 index 0000000000000..9bdd5f10c710c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable.ArrayBuffer + +import org.mockito.Mockito.{mock, reset, verify, when} +import org.mockito.Matchers.{any, eq => meq} + +import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} + + +class StaticMemoryManagerSuite extends SparkFunSuite { + private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") + + test("basic execution memory") { + val maxExecutionMem = 1000L + val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) + assert(mm.executionMemoryUsed === 0L) + assert(mm.acquireExecutionMemory(10L) === 10L) + assert(mm.executionMemoryUsed === 10L) + assert(mm.acquireExecutionMemory(100L) === 100L) + // Acquire up to the max + assert(mm.acquireExecutionMemory(1000L) === 890L) + assert(mm.executionMemoryUsed === 1000L) + assert(mm.acquireExecutionMemory(1L) === 0L) + assert(mm.executionMemoryUsed === 1000L) + mm.releaseExecutionMemory(800L) + assert(mm.executionMemoryUsed === 200L) + // Acquire after release + assert(mm.acquireExecutionMemory(1L) === 1L) + assert(mm.executionMemoryUsed === 201L) + // Release beyond what was acquired + mm.releaseExecutionMemory(maxExecutionMem) + assert(mm.executionMemoryUsed === 0L) + } + + test("basic storage memory") { + val maxStorageMem = 1000L + val dummyBlock = TestBlockId("you can see the world you brought to live") + val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) + assert(mm.storageMemoryUsed === 0L) + assert(mm.acquireStorageMemory(dummyBlock, 10L, dummyBlocks) === 10L) + // `ensureFreeSpace` should be called with the number of bytes requested + assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L) + assert(mm.storageMemoryUsed === 10L) + assert(dummyBlocks.isEmpty) + assert(mm.acquireStorageMemory(dummyBlock, 100L, dummyBlocks) === 100L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) + // Acquire up to the max, not granted + assert(mm.acquireStorageMemory(dummyBlock, 1000L, dummyBlocks) === 0L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L) + assert(mm.storageMemoryUsed === 110L) + assert(mm.acquireStorageMemory(dummyBlock, 890L, dummyBlocks) === 890L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L) + assert(mm.storageMemoryUsed === 1000L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 0L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assert(mm.storageMemoryUsed === 1000L) + mm.releaseStorageMemory(800L) + assert(mm.storageMemoryUsed === 200L) + // Acquire after release + assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 1L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assert(mm.storageMemoryUsed === 201L) + mm.releaseStorageMemory() + assert(mm.storageMemoryUsed === 0L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 1L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assert(mm.storageMemoryUsed === 1L) + // Release beyond what was acquired + mm.releaseStorageMemory(100L) + assert(mm.storageMemoryUsed === 0L) + } + + test("execution and storage isolation") { + val maxExecutionMem = 200L + val maxStorageMem = 1000L + val dummyBlock = TestBlockId("ain't nobody love like you do") + val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) + // Only execution memory should increase + assert(mm.acquireExecutionMemory(100L) === 100L) + assert(mm.storageMemoryUsed === 0L) + assert(mm.executionMemoryUsed === 100L) + assert(mm.acquireExecutionMemory(1000L) === 100L) + assert(mm.storageMemoryUsed === 0L) + assert(mm.executionMemoryUsed === 200L) + // Only storage memory should increase + assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks) === 50L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L) + assert(mm.storageMemoryUsed === 50L) + assert(mm.executionMemoryUsed === 200L) + // Only execution memory should be released + mm.releaseExecutionMemory(133L) + assert(mm.storageMemoryUsed === 50L) + assert(mm.executionMemoryUsed === 67L) + // Only storage memory should be released + mm.releaseStorageMemory() + assert(mm.storageMemoryUsed === 0L) + assert(mm.executionMemoryUsed === 67L) + } + + test("unroll memory") { + val maxStorageMem = 1000L + val dummyBlock = TestBlockId("lonely water") + val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) + assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks) === 100L) + assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) + assert(mm.storageMemoryUsed === 100L) + mm.releaseUnrollMemory(40L) + assert(mm.storageMemoryUsed === 60L) + when(ms.currentUnrollMemory).thenReturn(60L) + assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks) === 500L) + // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. + // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. + assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L) + assert(mm.storageMemoryUsed === 560L) + when(ms.currentUnrollMemory).thenReturn(560L) + assert(mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks) === 0L) + assert(mm.storageMemoryUsed === 560L) + // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed + assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L) + // Release beyond what was acquired + mm.releaseUnrollMemory(maxStorageMem) + assert(mm.storageMemoryUsed === 0L) + } + + /** + * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies. + */ + private def makeThings( + maxExecutionMem: Long, + maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { + val mm = new StaticMemoryManager(conf, maxExecutionMem, maxStorageMem) + val ms = mock(classOf[MemoryStore]) + mm.setMemoryStore(ms) + (mm, ms) + } + + /** + * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters. + */ + private def assertEnsureFreeSpaceCalled( + ms: MemoryStore, + blockId: BlockId, + numBytes: Long): Unit = { + verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any()) + reset(ms) + } + +} From 5c74063aaa21a54781accf56dc37086f7ffd1c62 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 6 Oct 2015 12:19:07 -0700 Subject: [PATCH 08/13] Minor style --- .../org/apache/spark/storage/BlockManagerReplicationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 7d260ca198b95..cb7fb9c793c1f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -54,7 +54,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo private val serializer = new KryoSerializer(conf) // Implicitly convert strings to BlockIds for test clarity. - implicit private def StringToBlockId(value: String): BlockId = new TestBlockId(value) + private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) private def makeBlockManager( maxMem: Long, From 0bf0d941c412561ebd27a806f4cc54178626b760 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Oct 2015 14:44:28 -0700 Subject: [PATCH 09/13] Add comment to pending unroll. --- .../apache/spark/storage/MemoryStore.scala | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4ad12ab7897e8..1431e63c4680b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -304,9 +304,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // later when the task finishes. if (keepUnrolling) { accountingLock.synchronized { + // Here, we are logically transferring memory from unroll memory to pending unroll memory. + // We release and re-acquire the memory from the MemoryManager. As of today, this is not + // race-prone because all calls to [acquire|release]UnrollMemoryForThisTask() occur in + // MemoryStore and are guarded by `accountingLock`, MemoryStore is the only component + // which allocates storage memory, and unroll memory is currently counted towards + // storage memory. If we ever change things so that unroll memory is counted towards + // execution memory, then we will need to revisit this argument as it may no longer hold. + // TODO: revisit this as part of SPARK-10983. val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved releaseUnrollMemoryForThisTask(amountToRelease) - reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks) + val acquired = memoryManager.acquireUnrollMemory(blockId, amountToRelease, droppedBlocks) + assert(acquired == amountToRelease) + val taskAttemptId = currentTaskAttemptId() + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToRelease } } } @@ -516,27 +528,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - /** - * Reserve the unroll memory of current unroll successful block used by this task - * until actually put the block into memory entry. - * @return whether the request is granted. - */ - private def reservePendingUnrollMemoryForThisTask( - blockId: BlockId, - memory: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) - val success = acquired == memory - if (success) { - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory - } - success - } - } - /** * Release pending unroll memory of current unroll successful block used by this task */ From b1e8fd16d9f08ff2ff693c44b09143c810c05544 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Oct 2015 15:00:32 -0700 Subject: [PATCH 10/13] Pending unroll transfer. --- .../apache/spark/storage/MemoryStore.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 1431e63c4680b..0013ae6b39918 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -303,22 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Otherwise, if we return an iterator, we release the memory reserved here // later when the task finishes. if (keepUnrolling) { + val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - // Here, we are logically transferring memory from unroll memory to pending unroll memory. - // We release and re-acquire the memory from the MemoryManager. As of today, this is not - // race-prone because all calls to [acquire|release]UnrollMemoryForThisTask() occur in - // MemoryStore and are guarded by `accountingLock`, MemoryStore is the only component - // which allocates storage memory, and unroll memory is currently counted towards - // storage memory. If we ever change things so that unroll memory is counted towards - // execution memory, then we will need to revisit this argument as it may no longer hold. - // TODO: revisit this as part of SPARK-10983. - val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved - releaseUnrollMemoryForThisTask(amountToRelease) - val acquired = memoryManager.acquireUnrollMemory(blockId, amountToRelease, droppedBlocks) - assert(acquired == amountToRelease) - val taskAttemptId = currentTaskAttemptId() + // Here, we transfer memory from unroll to pending unroll because we expect to cache this + // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in + // order to avoid race conditions where another component steals the memory that we're + // trying to transfer. + val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved + unrollMemoryMap(taskAttemptId) -= amountToTransferToPending pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToRelease + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending } } } @@ -374,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. - // This must be synchronized so the release and re-acquire can happen atomically. + // + // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the + // synchronization on `accountingLock` guarantees that the release of unroll memory and + // acquisition of storage memory happens atomically. However, if storage memory is acquired + // outside of MemoryStore or if unroll memory is counted as execution memory, then we will + // have to revisit this assumption. See SPARK-10983 for more context. releasePendingUnrollMemoryForThisTask() val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) val enoughMemory = numBytesAcquired == size From 1ef313c640f969a8cd51a16489d38c4698e29d9c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 Oct 2015 17:36:51 -0700 Subject: [PATCH 11/13] Move MemoryManager to new package --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 + .../scala/org/apache/spark/{ => memory}/MemoryManager.scala | 2 +- .../org/apache/spark/{ => memory}/StaticMemoryManager.scala | 3 ++- .../scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala | 3 ++- .../src/main/scala/org/apache/spark/storage/BlockManager.scala | 1 + core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 3 ++- .../apache/spark/{ => memory}/StaticMemoryManagerSuite.scala | 3 ++- .../apache/spark/storage/BlockManagerReplicationSuite.scala | 1 + .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 1 + .../apache/spark/sql/execution/TestShuffleMemoryManager.scala | 2 +- .../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 1 + 11 files changed, 15 insertions(+), 6 deletions(-) rename core/src/main/scala/org/apache/spark/{ => memory}/MemoryManager.scala (98%) rename core/src/main/scala/org/apache/spark/{ => memory}/StaticMemoryManager.scala (98%) rename core/src/test/scala/org/apache/spark/{ => memory}/StaticMemoryManagerSuite.scala (98%) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index bbb6e272009cf..326e13fe88fcb 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -30,6 +30,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.memory.{MemoryManager, StaticMemoryManager} import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv} diff --git a/core/src/main/scala/org/apache/spark/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala similarity index 98% rename from core/src/main/scala/org/apache/spark/MemoryManager.scala rename to core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 4e581ee1280e3..ebc62639887e8 100644 --- a/core/src/main/scala/org/apache/spark/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.memory import scala.collection.mutable diff --git a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala similarity index 98% rename from core/src/main/scala/org/apache/spark/StaticMemoryManager.scala rename to core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 75285b32c5cbb..bd5b38b1b7dc6 100644 --- a/core/src/main/scala/org/apache/spark/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.memory import scala.collection.mutable +import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index a46a5a61a6a13..472f2a741c83d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -21,8 +21,9 @@ import scala.collection.mutable import com.google.common.annotations.VisibleForTesting -import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark._ +import org.apache.spark.memory.{StaticMemoryManager, MemoryManager} +import org.apache.spark.unsafe.array.ByteArrayMethods /** * Allocates a pool of memory to tasks for use in shuffle operations. Each disk-spilling diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2fed714bc0e9b..f82a58b3c0b0e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -31,6 +31,7 @@ import sun.nio.ch.DirectBuffer import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.io.CompressionCodec +import org.apache.spark.memory.MemoryManager import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0013ae6b39918..bd4af7d88bd64 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -23,7 +23,8 @@ import java.util.LinkedHashMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{MemoryManager, TaskContext} +import org.apache.spark.TaskContext +import org.apache.spark.memory.MemoryManager import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector diff --git a/core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala similarity index 98% rename from core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala rename to core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 9bdd5f10c710c..0dd1fefd264e3 100644 --- a/core/src/test/scala/org/apache/spark/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.memory import scala.collection.mutable.ArrayBuffer @@ -23,6 +23,7 @@ import org.mockito.Mockito.{mock, reset, verify, when} import org.mockito.Matchers.{any, eq => meq} import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} +import org.apache.spark.{SparkConf, SparkFunSuite} class StaticMemoryManagerSuite extends SparkFunSuite { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index cb7fb9c793c1f..cc44c676b27ac 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark._ +import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.KryoSerializer diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c95841c3b64fd..05b9067abb475 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark._ import org.apache.spark.executor.DataReadMethod +import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 6f65ea549c220..942d1a2c53837 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import scala.collection.mutable -import org.apache.spark.MemoryManager +import org.apache.spark.memory.MemoryManager import org.apache.spark.shuffle.ShuffleMemoryManager import org.apache.spark.storage.{BlockId, BlockStatus} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 35726d33ee465..b2b6848719639 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus From 9de2e20baed737565a1db68d5950049104ea8bae Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 Oct 2015 17:42:34 -0700 Subject: [PATCH 12/13] Change acquire[Unroll/Storage]Memory to return boolean Previously these would always return either 0 or N. Instead we can express this simply as whether the acquire was successful so downstream usages don't have to worry about releasing partially acquired memory. --- .../apache/spark/memory/MemoryManager.scala | 8 +++---- .../spark/memory/StaticMemoryManager.scala | 19 ++++++++-------- .../apache/spark/storage/MemoryStore.scala | 7 ++---- .../memory/StaticMemoryManagerSuite.scala | 22 +++++++++---------- .../execution/TestShuffleMemoryManager.scala | 4 ++-- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index ebc62639887e8..579a560f94ee5 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -40,22 +40,22 @@ private[spark] abstract class MemoryManager { /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return number of bytes successfully granted (0 or N). + * @return whether all N bytes were successfully granted. */ def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return number of bytes successfully granted (<= N). + * @return whether all N bytes were successfully granted. */ def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean /** * Release N bytes of execution memory. diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index bd5b38b1b7dc6..80b045e4a961e 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -86,12 +86,12 @@ private[spark] class StaticMemoryManager( /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return number of bytes successfully granted (0 or N). + * @return whether all N bytes were successfully granted. */ override def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks) } @@ -102,12 +102,12 @@ private[spark] class StaticMemoryManager( * space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any, * are added to `evictedBlocks`. * - * @return number of bytes successfully granted (0 or N). + * @return whether all N bytes were successfully granted. */ override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { val currentUnrollMemory = memoryStore.currentUnrollMemory val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) val numBytesToFree = math.min(numBytes, maxNumBytesToFree) @@ -121,21 +121,22 @@ private[spark] class StaticMemoryManager( * @param numBytesToAcquire the size of this block * @param numBytesToFree the size of space to be freed through evicting blocks * @param evictedBlocks a holder for blocks evicted in the process - * @return number of bytes successfully granted (0 or N). + * @return whether all N bytes were successfully granted. */ private def acquireStorageMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = { + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { // Note: Keep this outside synchronized block to avoid potential deadlocks! memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) storageLock.synchronized { assert(_storageMemoryUsed <= maxStorageMemory) val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory - val bytesToGrant = if (enoughMemory) numBytesToAcquire else 0 - _storageMemoryUsed += bytesToGrant - bytesToGrant + if (enoughMemory) { + _storageMemoryUsed += numBytesToAcquire + } + enoughMemory } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bd4af7d88bd64..7723e0e77ff2f 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -376,8 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // outside of MemoryStore or if unroll memory is counted as execution memory, then we will // have to revisit this assumption. See SPARK-10983 for more context. releasePendingUnrollMemoryForThisTask() - val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) - val enoughMemory = numBytesAcquired == size + val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) if (enoughMemory) { // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(value(), size, deserialized) @@ -397,7 +396,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } - memoryManager.releaseStorageMemory(numBytesAcquired) } enoughMemory } @@ -498,8 +496,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo memory: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) - val success = acquired == memory + val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 0dd1fefd264e3..5de70e62b615e 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -57,32 +57,32 @@ class StaticMemoryManagerSuite extends SparkFunSuite { val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, dummyBlocks) === 10L) + assert(mm.acquireStorageMemory(dummyBlock, 10L, dummyBlocks)) // `ensureFreeSpace` should be called with the number of bytes requested assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L) assert(mm.storageMemoryUsed === 10L) assert(dummyBlocks.isEmpty) - assert(mm.acquireStorageMemory(dummyBlock, 100L, dummyBlocks) === 100L) + assert(mm.acquireStorageMemory(dummyBlock, 100L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) // Acquire up to the max, not granted - assert(mm.acquireStorageMemory(dummyBlock, 1000L, dummyBlocks) === 0L) + assert(!mm.acquireStorageMemory(dummyBlock, 1000L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L) assert(mm.storageMemoryUsed === 110L) - assert(mm.acquireStorageMemory(dummyBlock, 890L, dummyBlocks) === 890L) + assert(mm.acquireStorageMemory(dummyBlock, 890L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L) assert(mm.storageMemoryUsed === 1000L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 0L) + assert(!mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 1L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 201L) mm.releaseStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks) === 1L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -104,7 +104,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite { assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks) === 50L) + assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) @@ -123,19 +123,19 @@ class StaticMemoryManagerSuite extends SparkFunSuite { val dummyBlock = TestBlockId("lonely water") val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks) === 100L) + assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks) === 500L) + assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks)) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L) assert(mm.storageMemoryUsed === 560L) when(ms.currentUnrollMemory).thenReturn(560L) - assert(mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks) === 0L) + assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks)) assert(mm.storageMemoryUsed === 560L) // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index 942d1a2c53837..ff65d7bdf8b92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -61,11 +61,11 @@ private class GrantEverythingMemoryManager extends MemoryManager { override def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true override def releaseExecutionMemory(numBytes: Long): Unit = { } override def releaseStorageMemory(numBytes: Long): Unit = { } override def releaseStorageMemory(): Unit = { } From fc7f9f519852c2b3ef3eebcbc8e3f0ba63fcb3dc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 8 Oct 2015 18:05:34 -0700 Subject: [PATCH 13/13] Address comments --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../apache/spark/memory/MemoryManager.scala | 21 ++++- .../spark/memory/StaticMemoryManager.scala | 89 +++++++------------ .../spark/shuffle/ShuffleMemoryManager.scala | 3 +- .../apache/spark/storage/BlockManager.scala | 1 + .../apache/spark/storage/MemoryStore.scala | 1 + .../memory/StaticMemoryManagerSuite.scala | 26 +++--- .../spark/storage/BlockManagerSuite.scala | 5 +- 8 files changed, 70 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 326e13fe88fcb..df3d84a1f08e9 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -70,7 +70,7 @@ class SparkEnv ( val httpFileServer: HttpFileServer, val sparkFilesDir: String, val metricsSystem: MetricsSystem, - // TODO: unify these *MemoryManager classes + // TODO: unify these *MemoryManager classes (SPARK-10984) val memoryManager: MemoryManager, val shuffleMemoryManager: ShuffleMemoryManager, val executorMemoryManager: ExecutorMemoryManager, diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 579a560f94ee5..4bf73b696920d 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.memory import scala.collection.mutable -import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} /** @@ -27,10 +27,27 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * * In this context, execution memory refers to that used for computation in shuffles, joins, * sorts and aggregations, while storage memory refers to that used for caching and propagating - * internal data across the cluster. + * internal data across the cluster. There exists one of these per JVM. */ private[spark] abstract class MemoryManager { + // The memory store used to evict cached blocks + private var _memoryStore: MemoryStore = _ + protected def memoryStore: MemoryStore = { + if (_memoryStore == null) { + throw new IllegalArgumentException("memory store not initialized yet") + } + _memoryStore + } + + /** + * Set the [[MemoryStore]] used by this manager to evict cached blocks. + * This must be set after construction due to initialization ordering constraints. + */ + def setMemoryStore(store: MemoryStore): Unit = { + _memoryStore = store + } + /** * Acquire N bytes of memory for execution. * @return number of bytes successfully granted (<= N). diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 80b045e4a961e..150445edb9578 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -19,8 +19,8 @@ package org.apache.spark.memory import scala.collection.mutable -import org.apache.spark.{Logging, SparkConf, SparkEnv} -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.storage.{BlockId, BlockStatus} /** @@ -41,27 +41,10 @@ private[spark] class StaticMemoryManager( (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - // Amount of execution memory in use. Accesses must be synchronized on `executionLock`. + // Amount of execution / storage memory in use + // Accesses must be synchronized on `this` private var _executionMemoryUsed: Long = 0 - private val executionLock = new Object - - // Amount of storage memory in use. Accesses must be synchronized on `storageLock`. private var _storageMemoryUsed: Long = 0 - private val storageLock = new Object - - // The memory store used to evict cached blocks - private var _memoryStore: MemoryStore = _ - private def memoryStore: MemoryStore = { - if (_memoryStore == null) { - _memoryStore = SparkEnv.get.blockManager.memoryStore - } - _memoryStore - } - - // For testing only - def setMemoryStore(store: MemoryStore): Unit = { - _memoryStore = store - } def this(conf: SparkConf) { this( @@ -74,13 +57,11 @@ private[spark] class StaticMemoryManager( * Acquire N bytes of memory for execution. * @return number of bytes successfully granted (<= N). */ - override def acquireExecutionMemory(numBytes: Long): Long = { - executionLock.synchronized { - assert(_executionMemoryUsed <= maxExecutionMemory) - val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed) - _executionMemoryUsed += bytesToGrant - bytesToGrant - } + override def acquireExecutionMemory(numBytes: Long): Long = synchronized { + assert(_executionMemoryUsed <= maxExecutionMemory) + val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed) + _executionMemoryUsed += bytesToGrant + bytesToGrant } /** @@ -130,7 +111,7 @@ private[spark] class StaticMemoryManager( evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { // Note: Keep this outside synchronized block to avoid potential deadlocks! memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) - storageLock.synchronized { + synchronized { assert(_storageMemoryUsed <= maxStorageMemory) val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory if (enoughMemory) { @@ -143,40 +124,34 @@ private[spark] class StaticMemoryManager( /** * Release N bytes of execution memory. */ - override def releaseExecutionMemory(numBytes: Long): Unit = { - executionLock.synchronized { - if (numBytes > _executionMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of execution " + - s"memory when we only have ${_executionMemoryUsed} bytes") - _executionMemoryUsed = 0 - } else { - _executionMemoryUsed -= numBytes - } + override def releaseExecutionMemory(numBytes: Long): Unit = synchronized { + if (numBytes > _executionMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of execution " + + s"memory when we only have ${_executionMemoryUsed} bytes") + _executionMemoryUsed = 0 + } else { + _executionMemoryUsed -= numBytes } } /** * Release N bytes of storage memory. */ - override def releaseStorageMemory(numBytes: Long): Unit = { - storageLock.synchronized { - if (numBytes > _storageMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of storage " + - s"memory when we only have ${_storageMemoryUsed} bytes") - _storageMemoryUsed = 0 - } else { - _storageMemoryUsed -= numBytes - } + override def releaseStorageMemory(numBytes: Long): Unit = synchronized { + if (numBytes > _storageMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of storage " + + s"memory when we only have ${_storageMemoryUsed} bytes") + _storageMemoryUsed = 0 + } else { + _storageMemoryUsed -= numBytes } } /** * Release all storage memory acquired. */ - override def releaseStorageMemory(): Unit = { - storageLock.synchronized { - _storageMemoryUsed = 0 - } + override def releaseStorageMemory(): Unit = synchronized { + _storageMemoryUsed = 0 } /** @@ -189,19 +164,15 @@ private[spark] class StaticMemoryManager( /** * Amount of execution memory currently in use, in bytes. */ - override def executionMemoryUsed: Long = { - executionLock.synchronized { - _executionMemoryUsed - } + override def executionMemoryUsed: Long = synchronized { + _executionMemoryUsed } /** * Amount of storage memory currently in use, in bytes. */ - override def storageMemoryUsed: Long = { - storageLock.synchronized { - _storageMemoryUsed - } + override def storageMemoryUsed: Long = synchronized { + _storageMemoryUsed } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index 472f2a741c83d..bb64bb3f35df0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -165,7 +165,8 @@ private[spark] object ShuffleMemoryManager { */ def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = { val conf = new SparkConf - val memoryManager = new StaticMemoryManager(conf, maxMemory, Long.MaxValue) + val memoryManager = new StaticMemoryManager( + conf, maxExecutionMemory = maxMemory, maxStorageMemory = Long.MaxValue) new ShuffleMemoryManager(memoryManager, pageSizeBytes) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f82a58b3c0b0e..9f5bd2abbdc5d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -89,6 +89,7 @@ private[spark] class BlockManager( externalBlockStoreInitialized = true new ExternalBlockStore(this, executorId) } + memoryManager.setMemoryStore(memoryStore) private val maxMemory = memoryManager.maxStorageMemory diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7723e0e77ff2f..35c57b923c43a 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -496,6 +496,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo memory: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { accountingLock.synchronized { + // Note: all acquisitions of unroll memory must be synchronized on `accountingLock` val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) if (success) { val taskAttemptId = currentTaskAttemptId() diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 5de70e62b615e..c436a8b5c9f81 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -38,9 +38,9 @@ class StaticMemoryManagerSuite extends SparkFunSuite { assert(mm.acquireExecutionMemory(100L) === 100L) // Acquire up to the max assert(mm.acquireExecutionMemory(1000L) === 890L) - assert(mm.executionMemoryUsed === 1000L) + assert(mm.executionMemoryUsed === maxExecutionMem) assert(mm.acquireExecutionMemory(1L) === 0L) - assert(mm.executionMemoryUsed === 1000L) + assert(mm.executionMemoryUsed === maxExecutionMem) mm.releaseExecutionMemory(800L) assert(mm.executionMemoryUsed === 200L) // Acquire after release @@ -54,35 +54,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite { test("basic storage memory") { val maxStorageMem = 1000L val dummyBlock = TestBlockId("you can see the world you brought to live") - val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, dummyBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) // `ensureFreeSpace` should be called with the number of bytes requested assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L) assert(mm.storageMemoryUsed === 10L) - assert(dummyBlocks.isEmpty) - assert(mm.acquireStorageMemory(dummyBlock, 100L, dummyBlocks)) + assert(evictedBlocks.isEmpty) + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) + assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, 1000L, dummyBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L) assert(mm.storageMemoryUsed === 110L) - assert(mm.acquireStorageMemory(dummyBlock, 890L, dummyBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L) assert(mm.storageMemoryUsed === 1000L) - assert(!mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 201L) mm.releaseStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, dummyBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -150,7 +151,8 @@ class StaticMemoryManagerSuite extends SparkFunSuite { private def makeThings( maxExecutionMem: Long, maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { - val mm = new StaticMemoryManager(conf, maxExecutionMem, maxStorageMem) + val mm = new StaticMemoryManager( + conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem) val ms = mock(classOf[MemoryStore]) mm.setMemoryStore(ms) (mm, ms) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 05b9067abb475..f3fab33ca2e31 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1045,13 +1045,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("reserve/release unroll memory") { store = makeBlockManager(12000) val memoryStore = store.memoryStore - val dummyBlock = TestBlockId("") - val dummyEvictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] assert(memoryStore.currentUnrollMemory === 0) assert(memoryStore.currentUnrollMemoryForThisTask === 0) def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask(dummyBlock, memory, dummyEvictedBlocks) + memoryStore.reserveUnrollMemoryForThisTask( + TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)]) } // Reserve