diff --git a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java index 58eabb2cf8d..e56b97b799d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java +++ b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java @@ -23,6 +23,18 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.handler.codec.ByteToMessageDecoder.Cumulator; + +/** + * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and + * compose strategies. + *

+ * + *

Avoid using + * {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead + * to corruption, where the components' readable area are not equal to the Composite's capacity + * (see https://github.com/netty/netty/issues/12844). + */ + class NettyAdaptiveCumulator implements Cumulator { private final int composeMinSize; @@ -83,8 +95,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu composite.capacity(composite.writerIndex()); } } else { - composite = alloc.compositeBuffer(Integer.MAX_VALUE) - .addFlattenedComponents(true, cumulation); + composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation); } addInput(alloc, composite, in); in = null; @@ -104,7 +115,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu @VisibleForTesting void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { if (shouldCompose(composite, in, composeMinSize)) { - composite.addFlattenedComponents(true, in); + composite.addComponent(true, in); } else { // The total size of the new data and the last component are below the threshold. Merge them. mergeWithCompositeTail(alloc, composite, in); @@ -150,31 +161,13 @@ static void mergeWithCompositeTail( ByteBuf tail = composite.component(tailComponentIndex); ByteBuf newTail = null; try { - if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) { + if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity() + && !isCompositeOrWrappedComposite(tail)) { // Ideal case: the tail isn't shared, and can be expanded to the required capacity. + // Take ownership of the tail. newTail = tail.retain(); - // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with - // the issue fixed. - // In certain cases, removing the CompositeByteBuf component, and then adding it back - // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844. - // This happens because the buffer returned by composite.component() has out-of-sync - // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying - // buffer, but doesn't set the indexes. - // - // To get the right indexes we use the fact that composite.internalComponent() returns - // the slice() into the readable portion of the underlying buffer. - // We use this implementation detail (internalComponent() returning a *SlicedByteBuf), - // and combine it with the fact that SlicedByteBuf duplicates have their indexes - // adjusted so they correspond to the to the readable portion of the slice. - // - // Hence composite.internalComponent().duplicate() returns a buffer with the - // indexes that should've been on the composite.component() in the first place. - // Until the issue is fixed, we manually adjust the indexes of the removed component. - ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate(); - newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex()); - /* * The tail is a readable non-composite buffer, so writeBytes() handles everything for us. * @@ -188,20 +181,26 @@ static void mergeWithCompositeTail( * as pronounced because the capacity is doubled with each reallocation. */ newTail.writeBytes(in); + } else { - // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity. + // The tail satisfies one or more criteria: + // - Shared + // - Not expandable + // - Composite + // - Wrapped Composite newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); newTail.setBytes(0, composite, tailStart, tailSize) .setBytes(tailSize, in, in.readerIndex(), inputSize) .writerIndex(newTailSize); in.readerIndex(in.writerIndex()); } + // Store readerIndex to avoid out of bounds writerIndex during component replacement. int prevReader = composite.readerIndex(); // Remove the old tail, reset writer index. composite.removeComponent(tailComponentIndex).setIndex(0, tailStart); // Add back the new tail. - composite.addFlattenedComponents(true, newTail); + composite.addComponent(true, newTail); // New tail's ownership transferred to the composite buf. newTail = null; composite.readerIndex(prevReader); @@ -216,4 +215,12 @@ static void mergeWithCompositeTail( } } } + + private static boolean isCompositeOrWrappedComposite(ByteBuf tail) { + ByteBuf cur = tail; + while (cur.unwrap() != null) { + cur = cur.unwrap(); + } + return cur instanceof CompositeByteBuf; + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java index c4c2f95a2a9..a09e2e08e56 100644 --- a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java @@ -81,7 +81,7 @@ public void setUp() { @Override void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { // To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose - composite.addFlattenedComponents(true, in); + composite.addComponent(true, in); } }; @@ -208,8 +208,8 @@ public void setUp() { in = ByteBufUtil.writeAscii(alloc, inData); tail = ByteBufUtil.writeAscii(alloc, tailData); composite = alloc.compositeBuffer(Integer.MAX_VALUE); - // Note that addFlattenedComponents() will not add a new component when tail is not readable. - composite.addFlattenedComponents(true, tail); + // Note that addComponent() will not add a new component when tail is not readable. + composite.addComponent(true, tail); } @After @@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() { assertThat(in.readableBytes()).isAtMost(tail.writableBytes()); // All fits, so tail capacity must stay the same. - composite.addFlattenedComponents(true, tail); + composite.addComponent(true, tail); assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity); } @@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() { alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE); // Tail capacity is extended to its fast capacity. - composite.addFlattenedComponents(true, tail); + composite.addComponent(true, tail); assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity); } @@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { @SuppressWarnings("InlineMeInliner") // Requires Java 11 String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1); int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length(); - composite.addFlattenedComponents(true, tail); + composite.addComponent(true, tail); // Make input larger than tailFastCapacity in.writeCharSequence(inSuffixOverFastBytes, US_ASCII); @@ -435,21 +435,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() { @SuppressWarnings("InlineMeInliner") // Requires Java 11 String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes()); tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII); - composite.addFlattenedComponents(true, tail); + composite.addComponent(true, tail); assertTailReplaced(); } @Test public void mergeWithCompositeTail_tailNotExpandable_shared() { tail.retain(); - composite.addFlattenedComponents(true, tail); + composite.addComponent(true, tail); assertTailReplaced(); tail.release(); } @Test public void mergeWithCompositeTail_tailNotExpandable_readOnly() { - composite.addFlattenedComponents(true, tail.asReadOnly()); + composite.addComponent(true, tail.asReadOnly()); assertTailReplaced(); } @@ -527,8 +527,7 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() { CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, tail) { @Override - public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, - ByteBuf buffer) { + public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { throw expectedError; } }; @@ -561,8 +560,7 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() { CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, tail.asReadOnly()) { @Override - public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, - ByteBuf buffer) { + public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { throw expectedError; } }; @@ -616,14 +614,14 @@ public void mergeWithCompositeTail_outOfSyncComposite() { ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII)); // Start with a regular cumulation and add the buf as the only component. - CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf); + CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf); // Read composite1 buf to the beginning of the numbers. assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---"); // Wrap composite1 into another cumulation. This is similar to // what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1. CompositeByteBuf composite2 = - alloc.compositeBuffer(8).addFlattenedComponents(true, composite1); + alloc.compositeBuffer(8).addComponent(true, composite1); assertThat(composite2.toString(US_ASCII)).isEqualTo("01234"); // The previous operation does not adjust the read indexes of the underlying buffers, @@ -639,13 +637,27 @@ public void mergeWithCompositeTail_outOfSyncComposite() { CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2, ByteBufUtil.writeAscii(alloc, "56789")); assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789"); + } + + @Test + public void mergeWithNonCompositeTail() { + NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024); + ByteBufAllocator alloc = new PooledByteBufAllocator(); + ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII)); + ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII)); + + CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf); - // Correctness check: we still have a single component, and this component is still the - // original underlying buffer. - assertThat(cumulation.numComponents()).isEqualTo(1); - // Replace '2' with '*', and '8' with '$'. - buf.setByte(5, '*').setByte(11, '$'); - assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9"); + CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in); + + assertEquals("tail-012345", cumulation.toString(US_ASCII)); + assertEquals(0, in.refCnt()); + assertEquals(1, cumulation.numComponents()); + + buf.setByte(2, '*').setByte(7, '$'); + assertEquals("ta*l-01$345", cumulation.toString(US_ASCII)); + + composite.release(); } } }