Skip to content

Commit

Permalink
HDDS-10821. Ensure ChunkBuffer fully writes buffer to FileChannel (#6652
Browse files Browse the repository at this point in the history
)
  • Loading branch information
duongkame authored Dec 4, 2024
1 parent d66c088 commit e17f92c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Objects;
import java.util.function.Function;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

Expand Down Expand Up @@ -102,7 +103,7 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
return BufferUtils.writeFully(channel, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -246,9 +248,12 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
findCurrent();
return bytes;
return written;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -279,7 +280,11 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffers.toArray(new ByteBuffer[0]));
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
return written;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.hadoop.ozone.common.utils;

import com.google.common.base.Preconditions;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -136,4 +139,19 @@ public static int getNumberOfBins(long numElements, int maxElementsPerBin) {
}
return Math.toIntExact(n);
}

/**
* Write all remaining bytes in buffer to the given channel.
*/
public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOException {
long written = 0;
while (bb.remaining() > 0) {
int n = ch.write(bb);
if (n <= 0) {
throw new IllegalStateException("no bytes written");
}
written += n;
}
return written;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,21 @@ public long write(ByteBuffer[] srcs) throws IOException {

@Override
public int write(ByteBuffer src) throws IOException {
return delegate.write(src);
// If src has more than 1 byte left, simulate partial write by adjusting limit.
// Remaining 1 byte should be written on next call.
// This helps verify that the caller ensures buffer is written fully.
final int adjustment = 1;
final boolean limitWrite = src.remaining() > adjustment;
if (limitWrite) {
src.limit(src.limit() - adjustment);
}
try {
return delegate.write(src);
} finally {
if (limitWrite) {
src.limit(src.limit() + adjustment);
}
}
}

@Override
Expand Down

0 comments on commit e17f92c

Please sign in to comment.