Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10488. Datanode OOM due to run out of mmap handler #6690

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public final class ScmConfigKeys {
"ozone.chunk.read.mapped.buffer.threshold";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT =
"32KB";
public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY =
"ozone.chunk.read.mapped.buffer.max.count";
// this max_count could not be greater than Linux platform max_map_count which by default is 65530.
public static final int OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT = 0;

public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY =
"ozone.scm.container.layout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.channels.GatheringByteChannel;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -60,13 +61,24 @@ static ChunkBuffer wrap(ByteBuffer buffer) {
return new ChunkBufferImplWithByteBuffer(buffer);
}

/** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}, with a function called when buffer is released. */
static ChunkBuffer wrap(ByteBuffer buffer, Consumer<Integer> function) {
return new ChunkBufferImplWithByteBuffer(buffer, function);
}

/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
static ChunkBuffer wrap(List<ByteBuffer> buffers) {
return wrap(buffers, null);
}

/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer},
* with a function called when buffers are released.*/
static ChunkBuffer wrap(List<ByteBuffer> buffers, Consumer<Integer> function) {
Objects.requireNonNull(buffers, "buffers == null");
if (buffers.size() == 1) {
return wrap(buffers.get(0));
return wrap(buffers.get(0), function);
}
return new ChunkBufferImplWithByteBufferList(buffers);
return new ChunkBufferImplWithByteBufferList(buffers, function);
}

/** Similar to {@link ByteBuffer#position()}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand All @@ -34,14 +36,24 @@
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
private final UncheckedAutoCloseable underlying;
private final Consumer<Integer> releaseCallback;

ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
this(buffer, null);
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.releaseCallback = null;
this.underlying = null;
}

ChunkBufferImplWithByteBuffer(ByteBuffer buffer, Consumer<Integer> releaseFunction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use IntConsumer instead of Consumer<Integer>.

this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.releaseCallback = releaseFunction;
this.underlying = null;
}

ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable underlying) {
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
this.underlying = underlying;
this.releaseCallback = null;
}

@Override
Expand Down Expand Up @@ -163,4 +175,12 @@ public String toString() {
return getClass().getSimpleName() + ":limit=" + buffer.limit()
+ "@" + Integer.toHexString(hashCode());
}

@Override
protected void finalize() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenSammi , Let's don't override finalize(). It will greatly reduce the performance; see HDDS-9295.

Copy link
Contributor Author

@ChenSammi ChenSammi May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the performance impact. The thing is these MappedByteBuffer are wrapped into ChunkBuffer, then into ByteString, there is not easy to find a better timing other than finalize() and trace back to MappedByteBuffers to release and ummap them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkBuffer is an AutoCloseable, so can we add cleanup of the MappedByteBuffer in ChunkBuffer#close()?

Copy link
Contributor

@szetszwo szetszwo May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenSammi , as @adoroszlai mentioned, the buffer should be released in the close() method.

BTW, HDDS-7188 ( #3730 ) may be a better solution since Netty has a better buffer management.

Copy link
Contributor Author

@ChenSammi ChenSammi May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkBuffer#close() has been tried before use finalize(), but it's not get called when the buffer is auto reclaimed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkBuffer#close() must be called after use. If not, it seems a bug.

Copy link
Contributor Author

@ChenSammi ChenSammi May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A ChunkBuffer is converted as one ByteString(data can be copied, or zero copied, depending on unsafe conversion is enabled or not). Multiple ByteString data for a readChunk request could be concatenated as one whole ByteString or a list of ByteString, depending on request options. The whole response with data is passed to the GPRC call which is async. I could be wrong, but I didn't find any GRPC notification or callback to tell a response has been sent out successfully so that the buffer associated with the response can be safely released, when last time I investigated if it's feasible to adopt the buffer pool in datanode for data read.

When the response is finally sent out, the buffer will be auto released by GC some time later. GC will not call ChunkBuffer#close(), but it will call finalize().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to use WeakReference solution. Will update the patch later.

Copy link
Contributor

@duongkame duongkame May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see LeakDetector (HDDS-9528) @ChenSammi .
Yet, relying on GC activity to perform resource closure may not be a good idea, because we have no control over when GC would cleanup unreachable objects. Usually people only reply on finalizer/WeakReference to detect leaks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GRPC is known to have some limits in the outbound:

  1. It doesn't provide any callbacks or events to notify it successfully sends a response. (unlike when sending a request, you can listen to the response observer).
  2. Even if the buffer being sent is a direct one (or mapped buffer), grpc will not do zero-copy when sending it as a response. It will do another copy to re-frame the original buffer. The copy is done through a reused heap buffer.

This PR may be very out of scope (of this PR discussion), but GRPC is not the right tool for the job. And we will keep making unnatural efforts to cope with it (like we're doing).

if (releaseCallback != null && buffer instanceof MappedByteBuffer) {
releaseCallback.accept(1);
}
super.finalize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.nio.MappedByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -32,6 +33,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand All @@ -46,6 +48,7 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
/** Buffer list backing the ChunkBuffer. */
private final List<ByteBuffer> buffers;
private final int limit;
private final Consumer<Integer> releaseCallback;

private int limitPrecedingCurrent;
private int currentIndex;
Expand All @@ -55,7 +58,16 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
EMPTY_BUFFER;
this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
this.releaseCallback = null;
findCurrent();
}

ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers, Consumer<Integer> releaseFunction) {
Objects.requireNonNull(buffers, "buffers == null");
this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
EMPTY_BUFFER;
this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
this.releaseCallback = releaseFunction;
findCurrent();
}

Expand Down Expand Up @@ -245,6 +257,15 @@ public String toString() {
+ ":l=" + limit();
}

@Override
protected void finalize() throws Throwable {
// The assumption here is all the buffers in buffers list are of same buffer type, so only check the first buffer
if (releaseCallback != null && !buffers.isEmpty() && buffers.get(0) instanceof MappedByteBuffer) {
releaseCallback.accept(buffers.size());
}
super.finalize();
}

private static int relativeToRange(int value, int min, int max) {
final int pos;
if (value <= min) {
Expand Down
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,15 @@
The default read threshold to use memory mapped buffers.
</description>
</property>
<property>
<name>ozone.chunk.read.mapped.buffer.max.count</name>
<value>0</value>
<tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
<description>
The default max count of memory mapped buffers allowed for a DN.
Default 0 means no mapped buffers allowed for data read.
</description>
</property>
<property>
<name>ozone.scm.container.layout</name>
<value>FILE_PER_BLOCK</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

Expand Down Expand Up @@ -187,11 +188,12 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data,
}
}

@SuppressWarnings("checkstyle:parameternumber")
public static ChunkBuffer readData(long len, int bufferCapacity,
File file, long off, HddsVolume volume, int readMappedBufferThreshold)
throws StorageContainerException {
if (len > readMappedBufferThreshold) {
return readData(file, bufferCapacity, off, len, volume);
File file, long off, HddsVolume volume, int readMappedBufferThreshold, boolean mmapEnabled,
Semaphore semaphore) throws StorageContainerException {
if (mmapEnabled && len > readMappedBufferThreshold && bufferCapacity > readMappedBufferThreshold) {
return readData(file, bufferCapacity, off, len, volume, semaphore);
} else if (len == 0) {
return ChunkBuffer.wrap(Collections.emptyList());
}
Expand Down Expand Up @@ -252,25 +254,39 @@ private static void readData(File file, long offset, long len,
* @return a list of {@link MappedByteBuffer} containing the data.
*/
private static ChunkBuffer readData(File file, int chunkSize,
long offset, long length, HddsVolume volume)
long offset, long length, HddsVolume volume, Semaphore semaphore)
throws StorageContainerException {

final List<ByteBuffer> buffers = new ArrayList<>(
Math.toIntExact((length - 1) / chunkSize) + 1);
readData(file, offset, length, channel -> {
long readLen = 0;
while (readLen < length) {
final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
final ByteBuffer mapped = channel.map(
FileChannel.MapMode.READ_ONLY, offset + readLen, n);
LOG.debug("mapped: offset={}, readLen={}, n={}, {}",
offset, readLen, n, mapped.getClass());
readLen += mapped.remaining();
buffers.add(mapped);
}
return readLen;
}, volume);
return ChunkBuffer.wrap(buffers);
final int bufferNum = Math.toIntExact((length - 1) / chunkSize) + 1;
if (!semaphore.tryAcquire(bufferNum)) {
// proceed with normal buffer
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length,
chunkSize);
readData(file, offset, length, c -> c.position(offset).read(buffers), volume);
Arrays.stream(buffers).forEach(ByteBuffer::flip);
return ChunkBuffer.wrap(Arrays.asList(buffers));
} else {
// proceed with mapped buffer
LOG.debug("mapped buffer permits decreased by {} to total {}", bufferNum, semaphore.availablePermits());
final List<ByteBuffer> buffers = new ArrayList<>(bufferNum);
readData(file, offset, length, channel -> {
long readLen = 0;
while (readLen < length) {
final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
final ByteBuffer mapped = channel.map(
FileChannel.MapMode.READ_ONLY, offset + readLen, n);
LOG.debug("mapped: offset={}, readLen={}, n={}, {}",
offset, readLen, n, mapped.getClass());
readLen += mapped.remaining();
buffers.add(mapped);
}
return readLen;
}, volume);
return ChunkBuffer.wrap(buffers, permits -> {
semaphore.release(permits);
LOG.debug("mapped buffer permits increased by {} to total {}", permits, semaphore.availablePermits());
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class BlockManagerImpl implements BlockManager {
// Default Read Buffer capacity when Checksum is not present
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;

/**
* Constructs a Block Manager.
Expand All @@ -75,6 +76,9 @@ public BlockManagerImpl(ConfigurationSource conf) {
this.readMappedBufferThreshold = config.getBufferSize(
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY,
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT);
this.readMappedBufferMaxCount = config.getInt(
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_KEY,
ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_MAX_COUNT_DEFAULT);
}

@Override
Expand Down Expand Up @@ -264,6 +268,11 @@ public int getReadMappedBufferThreshold() {
return readMappedBufferThreshold;
}

/** @return the max count of memory mapped buffers for read. */
public int getReadMappedBufferMaxCount() {
return readMappedBufferMaxCount;
}

/**
* Deletes an existing block.
* As Deletion is handled by BlockDeletingService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
Expand All @@ -75,6 +76,8 @@ public class FilePerBlockStrategy implements ChunkManager {
private final OpenFiles files = new OpenFiles();
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;
private final Semaphore semaphore;
private final VolumeSet volumeSet;

public FilePerBlockStrategy(boolean sync, BlockManager manager,
Expand All @@ -84,7 +87,15 @@ public FilePerBlockStrategy(boolean sync, BlockManager manager,
manager.getDefaultReadBufferCapacity();
this.readMappedBufferThreshold = manager == null ? 0
: manager.getReadMappedBufferThreshold();
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
semaphore = new Semaphore(this.readMappedBufferMaxCount);
} else {
semaphore = null;
}
}

private static void checkLayoutVersion(Container container) {
Expand Down Expand Up @@ -192,10 +203,10 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,

final long len = info.getLen();
long offset = info.getOffset();
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);
return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume,
readMappedBufferThreshold);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, semaphore);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK;
Expand All @@ -67,6 +68,8 @@ public class FilePerChunkStrategy implements ChunkManager {
private final BlockManager blockManager;
private final int defaultReadBufferCapacity;
private final int readMappedBufferThreshold;
private final int readMappedBufferMaxCount;
private final Semaphore semaphore;
private final VolumeSet volumeSet;

public FilePerChunkStrategy(boolean sync, BlockManager manager,
Expand All @@ -77,7 +80,15 @@ public FilePerChunkStrategy(boolean sync, BlockManager manager,
manager.getDefaultReadBufferCapacity();
this.readMappedBufferThreshold = manager == null ? 0
: manager.getReadMappedBufferThreshold();
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
semaphore = new Semaphore(this.readMappedBufferMaxCount);
} else {
semaphore = null;
}
}

private static void checkLayoutVersion(Container container) {
Expand Down Expand Up @@ -265,7 +276,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
long offset = info.getOffset() - chunkFileOffset;
Preconditions.checkState(offset >= 0);
return ChunkUtils.readData(len, bufferCapacity, file, offset, volume,
readMappedBufferThreshold);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, semaphore);
}
} catch (StorageContainerException ex) {
//UNABLE TO FIND chunk is not a problem as we will try with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ long getCommittedBlockLength(Container container, BlockID blockID)
/** @return the threshold to read using memory mapped buffers. */
int getReadMappedBufferThreshold();

/** @return the max count of memory mapped buffers to read. */
int getReadMappedBufferMaxCount();

/**
* Shutdown ContainerManager.
*/
Expand Down
Loading