From e17f92c4fce90c30ab6f3eb17b4d4e38e363c2e8 Mon Sep 17 00:00:00 2001 From: Duong Nguyen Date: Wed, 4 Dec 2024 11:06:36 -0800 Subject: [PATCH] HDDS-10821. Ensure ChunkBuffer fully writes buffer to FileChannel (#6652) --- .../common/ChunkBufferImplWithByteBuffer.java | 3 ++- .../ChunkBufferImplWithByteBufferList.java | 9 +++++++-- .../ozone/common/IncrementalChunkBuffer.java | 7 ++++++- .../hadoop/ozone/common/utils/BufferUtils.java | 18 ++++++++++++++++++ .../hdds/utils/MockGatheringChannel.java | 16 +++++++++++++++- 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index 782476eb56d..254be93dc4a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.function.Function; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.UncheckedAutoCloseable; @@ -102,7 +103,7 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - return channel.write(buffer); + return BufferUtils.writeFully(channel, buffer); } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index a3b5f9d2eef..f9992c9442d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; + +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -246,9 +248,12 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - long bytes = channel.write(buffers.toArray(new ByteBuffer[0])); + long written = 0; + for (ByteBuffer buf : buffers) { + written += BufferUtils.writeFully(channel, buf); + } findCurrent(); - return bytes; + return written; } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java index dda4fae0d2b..249c67e4dd3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.IOException; @@ -279,7 +280,11 @@ public List asByteBufferList() { @Override public long writeTo(GatheringByteChannel channel) throws IOException { - return channel.write(buffers.toArray(new ByteBuffer[0])); + long written = 0; + for (ByteBuffer buf : buffers) { + written += BufferUtils.writeFully(channel, buf); + } + return written; } @Override diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index c6ad754f19b..01b2ec0af10 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.common.utils; import com.google.common.base.Preconditions; + +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.List; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -136,4 +139,19 @@ public static int getNumberOfBins(long numElements, int maxElementsPerBin) { } return Math.toIntExact(n); } + + /** + * Write all remaining bytes in buffer to the given channel. + */ + public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOException { + long written = 0; + while (bb.remaining() > 0) { + int n = ch.write(bb); + if (n <= 0) { + throw new IllegalStateException("no bytes written"); + } + written += n; + } + return written; + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java index ce6f58dadcb..8f9256cd778 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/MockGatheringChannel.java @@ -59,7 +59,21 @@ public long write(ByteBuffer[] srcs) throws IOException { @Override public int write(ByteBuffer src) throws IOException { - return delegate.write(src); + // If src has more than 1 byte left, simulate partial write by adjusting limit. + // Remaining 1 byte should be written on next call. + // This helps verify that the caller ensures buffer is written fully. + final int adjustment = 1; + final boolean limitWrite = src.remaining() > adjustment; + if (limitWrite) { + src.limit(src.limit() - adjustment); + } + try { + return delegate.write(src); + } finally { + if (limitWrite) { + src.limit(src.limit() + adjustment); + } + } } @Override