Skip to content

Commit

Permalink
[SPARK-10984] Simplify *MemoryManager class structure
Browse files Browse the repository at this point in the history
This patch refactors the MemoryManager class structure. After #9000, Spark had the following classes:

- MemoryManager
- StaticMemoryManager
- ExecutorMemoryManager
- TaskMemoryManager
- ShuffleMemoryManager

This is fairly confusing. To simplify things, this patch consolidates several of these classes:

- ShuffleMemoryManager and ExecutorMemoryManager were merged into MemoryManager.
- TaskMemoryManager is moved into Spark Core.

**Key changes and tasks**:

- [x] Merge ExecutorMemoryManager into MemoryManager.
  - [x] Move pooling logic into Allocator.
- [x] Move TaskMemoryManager from `spark-unsafe` to `spark-core`.
- [x] Refactor the existing Tungsten TaskMemoryManager interactions so Tungsten code use only this and not both this and ShuffleMemoryManager.
- [x] Refactor non-Tungsten code to use the TaskMemoryManager instead of ShuffleMemoryManager.
- [x] Merge ShuffleMemoryManager into MemoryManager.
  - [x] Move code
  - [x] ~~Simplify 1/n calculation.~~ **Will defer to followup, since this needs more work.**
- [x] Port ShuffleMemoryManagerSuite tests.
- [x] Move classes from `unsafe` package to `memory` package.
- [ ] Figure out how to handle the hacky use of the memory managers in HashedRelation's broadcast variable construction.
- [x] Test porting and cleanup: several tests relied on mock functionality (such as `TestShuffleMemoryManager.markAsOutOfMemory`) which has been changed or broken during the memory manager consolidation
  - [x] AbstractBytesToBytesMapSuite
  - [x] UnsafeExternalSorterSuite
  - [x] UnsafeFixedWidthAggregationMapSuite
  - [x] UnsafeKVExternalSorterSuite

**Compatiblity notes**:

- This patch introduces breaking changes in `ExternalAppendOnlyMap`, which is marked as `DevloperAPI` (likely for legacy reasons): this class now cannot be used outside of a task.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9127 from JoshRosen/SPARK-10984.
  • Loading branch information
JoshRosen committed Oct 26, 2015
1 parent 63accc7 commit 85e654c
Show file tree
Hide file tree
Showing 58 changed files with 888 additions and 1,255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;
package org.apache.spark.memory;

import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* Manages the memory allocated by an individual task.
* <p>
Expand Down Expand Up @@ -87,13 +89,9 @@ public class TaskMemoryManager {
*/
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);

/**
* Tracks memory allocated with {@link TaskMemoryManager#allocate(long)}, used to detect / clean
* up leaked memory.
*/
private final HashSet<MemoryBlock> allocatedNonPageMemory = new HashSet<MemoryBlock>();
private final MemoryManager memoryManager;

private final ExecutorMemoryManager executorMemoryManager;
private final long taskAttemptId;

/**
* Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
Expand All @@ -103,16 +101,38 @@ public class TaskMemoryManager {
private final boolean inHeap;

/**
* Construct a new MemoryManager.
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(ExecutorMemoryManager executorMemoryManager) {
this.inHeap = executorMemoryManager.inHeap;
this.executorMemoryManager = executorMemoryManager;
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
}

/**
* Acquire N bytes of memory for execution, evicting cached blocks if necessary.
* @return number of bytes successfully granted (<= N).
*/
public long acquireExecutionMemory(long size) {
return memoryManager.acquireExecutionMemory(size, taskAttemptId);
}

/**
* Release N bytes of execution memory.
*/
public void releaseExecutionMemory(long size) {
memoryManager.releaseExecutionMemory(size, taskAttemptId);
}

public long pageSizeBytes() {
return memoryManager.pageSizeBytes();
}

/**
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of memory that will be shared between operators.
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
* Returns `null` if there was not enough memory to allocate the page.
*/
public MemoryBlock allocatePage(long size) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
Expand All @@ -129,7 +149,15 @@ public MemoryBlock allocatePage(long size) {
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = executorMemoryManager.allocate(size);
final long acquiredExecutionMemory = acquireExecutionMemory(size);
if (acquiredExecutionMemory != size) {
releaseExecutionMemory(acquiredExecutionMemory);
synchronized (this) {
allocatedPages.clear(pageNumber);
}
return null;
}
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand All @@ -152,45 +180,16 @@ public void freePage(MemoryBlock page) {
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
// Cannot access a page once it's freed.
executorMemoryManager.free(page);
}

/**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `zero()` on the result if this is necessary). This method is intended
* to be used for allocating operators' internal data structures. For data pages that you want to
* exchange between operators, consider using {@link TaskMemoryManager#allocatePage(long)}, since
* that will enable intra-memory pointers (see
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} and this class's
* top-level Javadoc for more details).
*/
public MemoryBlock allocate(long size) throws OutOfMemoryError {
assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = executorMemoryManager.allocate(size);
synchronized(allocatedNonPageMemory) {
allocatedNonPageMemory.add(memory);
}
return memory;
}

/**
* Free memory allocated by {@link TaskMemoryManager#allocate(long)}.
*/
public void free(MemoryBlock memory) {
assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
executorMemoryManager.free(memory);
synchronized(allocatedNonPageMemory) {
final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize);
}

/**
* Given a memory page and offset within that page, encode this address into a 64-bit long.
* This address will remain valid as long as the corresponding page has not been freed.
*
* @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}.
* @param page a data page allocated by {@link TaskMemoryManager#allocatePage(long)}/
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
* this should be the value that you would pass as the base offset into an
* UNSAFE call (e.g. page.baseOffset() + something).
Expand Down Expand Up @@ -270,17 +269,15 @@ public long cleanUpAllAllocatedMemory() {
}
}

synchronized (allocatedNonPageMemory) {
final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
while (iter.hasNext()) {
final MemoryBlock memory = iter.next();
freedBytes += memory.size();
// We don't call free() here because that calls Set.remove, which would lead to a
// ConcurrentModificationException here.
executorMemoryManager.free(memory);
iter.remove();
}
}
freedBytes += memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);

return freedBytes;
}

/**
* Returns the memory consumption, in bytes, for the current task
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.sort;

import org.apache.spark.memory.TaskMemoryManager;

/**
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
* <p>
Expand All @@ -26,7 +28,7 @@
* </pre>
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
* 13-bit page numbers assigned by {@link TaskMemoryManager}), this
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
* <p>
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

/**
Expand Down Expand Up @@ -72,7 +71,6 @@ final class ShuffleExternalSorter {
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager taskMemoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
private final ShuffleWriteMetrics writeMetrics;
Expand Down Expand Up @@ -105,15 +103,13 @@ final class ShuffleExternalSorter {

public ShuffleExternalSorter(
TaskMemoryManager memoryManager,
ShuffleMemoryManager shuffleMemoryManager,
BlockManager blockManager,
TaskContext taskContext,
int initialSize,
int numPartitions,
SparkConf conf,
ShuffleWriteMetrics writeMetrics) throws IOException {
this.taskMemoryManager = memoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
this.initialSize = initialSize;
Expand All @@ -124,7 +120,7 @@ public ShuffleExternalSorter(
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
Expand All @@ -140,9 +136,9 @@ public ShuffleExternalSorter(
private void initializeForWriting() throws IOException {
// TODO: move this sizing calculation logic into a static method of sorter:
final long memoryRequested = initialSize * 8L;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryRequested);
if (memoryAcquired != memoryRequested) {
shuffleMemoryManager.release(memoryAcquired);
taskMemoryManager.releaseExecutionMemory(memoryAcquired);
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}

Expand Down Expand Up @@ -272,6 +268,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
*/
@VisibleForTesting
void spill() throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
Expand All @@ -281,7 +278,7 @@ void spill() throws IOException {
writeSortedFile(false);
final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
shuffleMemoryManager.release(inMemSorterMemoryUsage);
taskMemoryManager.releaseExecutionMemory(inMemSorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

Expand Down Expand Up @@ -316,9 +313,13 @@ private long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
taskMemoryManager.freePage(block);
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
if (inMemSorter != null) {
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
}
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
Expand All @@ -337,8 +338,9 @@ public void cleanupResources() {
}
}
if (inMemSorter != null) {
shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
}
}

Expand All @@ -353,21 +355,20 @@ private void growPointerArrayIfNecessary() throws IOException {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
taskMemoryManager.releaseExecutionMemory(memoryAcquired);
spill();
} else {
inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
taskMemoryManager.releaseExecutionMemory(oldPointerArrayMemoryUsage);
}
}
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
* memory from the memory manager and spill if the requested memory can not be obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
Expand All @@ -386,17 +387,14 @@ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
if (currentPage == null) {
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
if (currentPage == null) {
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
Expand Down Expand Up @@ -430,17 +428,14 @@ public void insertRecord(
long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
// The record is larger than the page size, so allocate a special overflow page just to hold
// that record.
final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGranted != overflowPageSize) {
shuffleMemoryManager.release(memoryGranted);
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
if (overflowPage == null) {
spill();
final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGrantedAfterSpill != overflowPageSize) {
shuffleMemoryManager.release(memoryGrantedAfterSpill);
overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
if (overflowPage == null) {
throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
}
}
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
allocatedPages.add(overflowPage);
dataPage = overflowPage;
dataPagePosition = overflowPage.getBaseOffset();
Expand Down
Loading

0 comments on commit 85e654c

Please sign in to comment.