Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 #11294

Merged
merged 15 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;


/**
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
* compose strategies.
* <br><br>
*
* <p><b><font color="red">Avoid using</font></b>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this phrasing in red font makes a lot of sense here. It's not actionable for the user of this class. If it's for modifying the class itself, should we just note that on mergeWithCompositeTail method instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't just for mergeWithCompositeTail. It is for the entire class. So cumulate() and addTail are impacted as well.

* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
* to corruption, where the components' readable area are not equal to the Composite's capacity
* (see https://github.com/netty/netty/issues/12844).
*/

class NettyAdaptiveCumulator implements Cumulator {
private final int composeMinSize;

Expand Down Expand Up @@ -84,7 +96,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
}
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
.addFlattenedComponents(true, cumulation);
.addComponent(true, cumulation);
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
}
addInput(alloc, composite, in);
in = null;
Expand All @@ -104,7 +116,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
@VisibleForTesting
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
if (shouldCompose(composite, in, composeMinSize)) {
composite.addFlattenedComponents(true, in);
composite.addComponent(true, in);
} else {
// The total size of the new data and the last component are below the threshold. Merge them.
mergeWithCompositeTail(alloc, composite, in);
Expand Down Expand Up @@ -150,31 +162,13 @@ static void mergeWithCompositeTail(
ByteBuf tail = composite.component(tailComponentIndex);
ByteBuf newTail = null;
try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
&& !isWrappedComposite(tail)) {
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.

// Take ownership of the tail.
newTail = tail.retain();

// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
// the issue fixed.
// In certain cases, removing the CompositeByteBuf component, and then adding it back
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
// This happens because the buffer returned by composite.component() has out-of-sync
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
// buffer, but doesn't set the indexes.
//
// To get the right indexes we use the fact that composite.internalComponent() returns
// the slice() into the readable portion of the underlying buffer.
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
// adjusted so they correspond to the to the readable portion of the slice.
//
// Hence composite.internalComponent().duplicate() returns a buffer with the
// indexes that should've been on the composite.component() in the first place.
// Until the issue is fixed, we manually adjust the indexes of the removed component.
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -188,6 +182,7 @@ static void mergeWithCompositeTail(
* as pronounced because the capacity is doubled with each reallocation.
*/
newTail.writeBytes(in);

} else {
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
Expand All @@ -196,12 +191,13 @@ static void mergeWithCompositeTail(
.writerIndex(newTailSize);
in.readerIndex(in.writerIndex());
}

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the old tail, reset writer index.
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
// Add back the new tail.
composite.addFlattenedComponents(true, newTail);
composite.addComponent(true, newTail);
// New tail's ownership transferred to the composite buf.
newTail = null;
composite.readerIndex(prevReader);
Expand All @@ -216,4 +212,12 @@ static void mergeWithCompositeTail(
}
}
}

private static boolean isWrappedComposite(ByteBuf tail) {
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
ByteBuf cur = tail;
while (cur.unwrap() != null) {
cur = cur.unwrap();
}
return cur instanceof CompositeByteBuf;
}
}
15 changes: 3 additions & 12 deletions netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,7 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail) {
@Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -561,8 +560,7 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail.asReadOnly()) {
@Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -623,7 +621,7 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
// Wrap composite1 into another cumulation. This is similar to
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
CompositeByteBuf composite2 =
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
alloc.compositeBuffer(8).addComponent(true, composite1);
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");

// The previous operation does not adjust the read indexes of the underlying buffers,
Expand All @@ -639,13 +637,6 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
ByteBufUtil.writeAscii(alloc, "56789"));
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");

// Correctness check: we still have a single component, and this component is still the
// original underlying buffer.
assertThat(cumulation.numComponents()).isEqualTo(1);
// Replace '2' with '*', and '8' with '$'.
buf.setByte(5, '*').setByte(11, '$');
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading