Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 #11294

Merged
merged 15 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;


/**
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
* compose strategies.
* <br><br>
*
* <p><b><font color="red">Avoid using</font></b>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this phrasing in red font makes a lot of sense here. It's not actionable for the user of this class. If it's for modifying the class itself, should we just note that on mergeWithCompositeTail method instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't just for mergeWithCompositeTail. It is for the entire class. So cumulate() and addTail are impacted as well.

* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
* to corruption, where the components' readable area are not equal to the Composite's capacity
* (see https://github.com/netty/netty/issues/12844).
*/

class NettyAdaptiveCumulator implements Cumulator {
private final int composeMinSize;

Expand Down Expand Up @@ -83,8 +95,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
composite.capacity(composite.writerIndex());
}
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
.addFlattenedComponents(true, cumulation);
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
}
addInput(alloc, composite, in);
in = null;
Expand All @@ -104,7 +115,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
@VisibleForTesting
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
if (shouldCompose(composite, in, composeMinSize)) {
composite.addFlattenedComponents(true, in);
composite.addComponent(true, in);
} else {
// The total size of the new data and the last component are below the threshold. Merge them.
mergeWithCompositeTail(alloc, composite, in);
Expand Down Expand Up @@ -150,31 +161,13 @@ static void mergeWithCompositeTail(
ByteBuf tail = composite.component(tailComponentIndex);
ByteBuf newTail = null;
try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
&& !isCompositeOrWrappedComposite(tail)) {
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.

// Take ownership of the tail.
newTail = tail.retain();

// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
// the issue fixed.
// In certain cases, removing the CompositeByteBuf component, and then adding it back
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
// This happens because the buffer returned by composite.component() has out-of-sync
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
// buffer, but doesn't set the indexes.
//
// To get the right indexes we use the fact that composite.internalComponent() returns
// the slice() into the readable portion of the underlying buffer.
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
// adjusted so they correspond to the to the readable portion of the slice.
//
// Hence composite.internalComponent().duplicate() returns a buffer with the
// indexes that should've been on the composite.component() in the first place.
// Until the issue is fixed, we manually adjust the indexes of the removed component.
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -188,20 +181,26 @@ static void mergeWithCompositeTail(
* as pronounced because the capacity is doubled with each reallocation.
*/
newTail.writeBytes(in);

} else {
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
// The tail satisfies one or more criteria:
// - Shared
// - Not expandable
// - Composite
// - Wrapped Composite
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
newTail.setBytes(0, composite, tailStart, tailSize)
.setBytes(tailSize, in, in.readerIndex(), inputSize)
.writerIndex(newTailSize);
in.readerIndex(in.writerIndex());
}

// Store readerIndex to avoid out of bounds writerIndex during component replacement.
int prevReader = composite.readerIndex();
// Remove the old tail, reset writer index.
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
// Add back the new tail.
composite.addFlattenedComponents(true, newTail);
composite.addComponent(true, newTail);
// New tail's ownership transferred to the composite buf.
newTail = null;
composite.readerIndex(prevReader);
Expand All @@ -216,4 +215,12 @@ static void mergeWithCompositeTail(
}
}
}

private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
ByteBuf cur = tail;
while (cur.unwrap() != null) {
cur = cur.unwrap();
}
return cur instanceof CompositeByteBuf;
}
}
54 changes: 33 additions & 21 deletions netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
@Override
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
composite.addFlattenedComponents(true, in);
composite.addComponent(true, in);
}
};

Expand Down Expand Up @@ -208,8 +208,8 @@ public void setUp() {
in = ByteBufUtil.writeAscii(alloc, inData);
tail = ByteBufUtil.writeAscii(alloc, tailData);
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
// Note that addFlattenedComponents() will not add a new component when tail is not readable.
composite.addFlattenedComponents(true, tail);
// Note that addComponent() will not add a new component when tail is not readable.
composite.addComponent(true, tail);
}

@After
Expand Down Expand Up @@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() {
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());

// All fits, so tail capacity must stay the same.
composite.addFlattenedComponents(true, tail);
composite.addComponent(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
}

Expand All @@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() {
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);

// Tail capacity is extended to its fast capacity.
composite.addFlattenedComponents(true, tail);
composite.addComponent(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
}

Expand All @@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
composite.addFlattenedComponents(true, tail);
composite.addComponent(true, tail);

// Make input larger than tailFastCapacity
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
Expand Down Expand Up @@ -435,21 +435,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
composite.addFlattenedComponents(true, tail);
composite.addComponent(true, tail);
assertTailReplaced();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_shared() {
tail.retain();
composite.addFlattenedComponents(true, tail);
composite.addComponent(true, tail);
assertTailReplaced();
tail.release();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
composite.addFlattenedComponents(true, tail.asReadOnly());
composite.addComponent(true, tail.asReadOnly());
assertTailReplaced();
}

Expand Down Expand Up @@ -527,8 +527,7 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail) {
@Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -561,8 +560,7 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail.asReadOnly()) {
@Override
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -616,14 +614,14 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));

// Start with a regular cumulation and add the buf as the only component.
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf);
// Read composite1 buf to the beginning of the numbers.
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");

// Wrap composite1 into another cumulation. This is similar to
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
CompositeByteBuf composite2 =
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
alloc.compositeBuffer(8).addComponent(true, composite1);
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");

// The previous operation does not adjust the read indexes of the underlying buffers,
Expand All @@ -639,13 +637,27 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
ByteBufUtil.writeAscii(alloc, "56789"));
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
}

@Test
public void mergeWithNonCompositeTail() {
NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
ByteBufAllocator alloc = new PooledByteBufAllocator();
ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII));
ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII));

CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf);

// Correctness check: we still have a single component, and this component is still the
// original underlying buffer.
assertThat(cumulation.numComponents()).isEqualTo(1);
// Replace '2' with '*', and '8' with '$'.
buf.setByte(5, '*').setByte(11, '$');
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in);

assertEquals("tail-012345", cumulation.toString(US_ASCII));
assertEquals(0, in.refCnt());
assertEquals(1, cumulation.numComponents());

buf.setByte(2, '*').setByte(7, '$');
assertEquals("ta*l-01$345", cumulation.toString(US_ASCII));

composite.release();
}
}
}
Loading