Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10956] Common MemoryManager interface for storage and execution #9000

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
Expand Down Expand Up @@ -69,6 +70,8 @@ class SparkEnv (
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
// TODO: unify these *MemoryManager classes (SPARK-10984)
val memoryManager: MemoryManager,
val shuffleMemoryManager: ShuffleMemoryManager,
val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
Expand Down Expand Up @@ -332,7 +335,8 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
val memoryManager = new StaticMemoryManager(conf)
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

Expand All @@ -343,8 +347,8 @@ object SparkEnv extends Logging {

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down Expand Up @@ -417,6 +421,7 @@ object SparkEnv extends Logging {
httpFileServer,
sparkFilesDir,
metricsSystem,
memoryManager,
shuffleMemoryManager,
executorMemoryManager,
outputCommitCoordinator,
Expand Down
117 changes: 117 additions & 0 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}


/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
*
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one of these per JVM.
*/
private[spark] abstract class MemoryManager {

// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
protected def memoryStore: MemoryStore = {
if (_memoryStore == null) {
throw new IllegalArgumentException("memory store not initialized yet")
}
_memoryStore
}

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
def setMemoryStore(store: MemoryStore): Unit = {
_memoryStore = store
}

/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
def acquireExecutionMemory(numBytes: Long): Long

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean

/**
* Release N bytes of execution memory.
*/
def releaseExecutionMemory(numBytes: Long): Unit

/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit

/**
* Release all storage memory acquired.
*/
def releaseStorageMemory(): Unit

/**
* Release N bytes of unroll memory.
*/
def releaseUnrollMemory(numBytes: Long): Unit

/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long

/**
* Execution memory currently in use, in bytes.
*/
def executionMemoryUsed: Long

/**
* Storage memory currently in use, in bytes.
*/
def storageMemoryUsed: Long

}
202 changes: 202 additions & 0 deletions core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
* The sizes of the execution and storage regions are determined through
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
* regions are cleanly separated such that neither usage can borrow memory from the other.
*/
private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
extends MemoryManager with Logging {

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

// Amount of execution / storage memory in use
// Accesses must be synchronized on `this`
private var _executionMemoryUsed: Long = 0
private var _storageMemoryUsed: Long = 0

def this(conf: SparkConf) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf))
}

/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
bytesToGrant
}

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return whether all N bytes were successfully granted.
*/
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
* space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
* are added to `evictedBlocks`.
*
* @return whether all N bytes were successfully granted.
*/
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}

/**
* Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
* @param numBytesToFree the size of space to be freed through evicting blocks
* @param evictedBlocks a holder for blocks evicted in the process
* @return whether all N bytes were successfully granted.
*/
private def acquireStorageMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
// Note: Keep this outside synchronized block to avoid potential deadlocks!
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
synchronized {
assert(_storageMemoryUsed <= maxStorageMemory)
val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
if (enoughMemory) {
_storageMemoryUsed += numBytesToAcquire
}
enoughMemory
}
}

/**
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}

/**
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
}
}

/**
* Release all storage memory acquired.
*/
override def releaseStorageMemory(): Unit = synchronized {
_storageMemoryUsed = 0
}

/**
* Release N bytes of unroll memory.
*/
override def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = synchronized {
_executionMemoryUsed
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = synchronized {
_storageMemoryUsed
}

}


private[spark] object StaticMemoryManager {

/**
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}


/**
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

}
Loading