diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java index 25f4f9232cf..13f55226483 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java @@ -34,6 +34,9 @@ */ @Internal public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler { + static final int ADAPTIVE_CUMULATOR_COMPOSE_MIN_SIZE_DEFAULT = 1024; + static final Cumulator ADAPTIVE_CUMULATOR = + new NettyAdaptiveCumulator(ADAPTIVE_CUMULATOR_COMPOSE_MIN_SIZE_DEFAULT); @Nullable protected final ChannelPromise channelUnused; @@ -48,6 +51,7 @@ protected GrpcHttp2ConnectionHandler( super(decoder, encoder, initialSettings); this.channelUnused = channelUnused; this.negotiationLogger = negotiationLogger; + setCumulator(ADAPTIVE_CUMULATOR); } /** diff --git a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java new file mode 100644 index 00000000000..b3a28c55c79 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java @@ -0,0 +1,224 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.handler.codec.ByteToMessageDecoder.Cumulator; + +class NettyAdaptiveCumulator implements Cumulator { + private final int composeMinSize; + + /** + * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and + * compose strategies. + * + * @param composeMinSize Determines the minimal size of the buffer that should be composed (added + * as a new component of the {@link CompositeByteBuf}). If the total size + * of the last component (tail) and the incoming buffer is below this value, + * the incoming buffer is appended to the tail, and the new component is not + * added. + */ + NettyAdaptiveCumulator(int composeMinSize) { + Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative"); + this.composeMinSize = composeMinSize; + } + + /** + * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and + * compose strategies. + * + *

This cumulator applies a heuristic to make a decision whether to track a reference to the + * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into + * the last component (the tail) by performing a memory copy. + * + *

It is necessary as a protection from a potential attack on the {@link + * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case + * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator + * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra + * compute to read the cumulation. + * + *

Implemented heuristic establishes a minimal threshold for the total size of the tail and + * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is + * used to avoid a case where attacker alternates the size of data packets to trick the cumulator + * into always selecting compose strategy. + * + *

Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands + * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when + * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new + * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure + * runtime {@code O(n^2)} is amortized to {@code O(n)}. + */ + @Override + @SuppressWarnings("ReferenceEquality") + public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + if (!cumulation.isReadable()) { + cumulation.release(); + return in; + } + CompositeByteBuf composite = null; + try { + if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) { + composite = (CompositeByteBuf) cumulation; + // Writer index must equal capacity if we are going to "write" + // new components to the end + if (composite.writerIndex() != composite.capacity()) { + composite.capacity(composite.writerIndex()); + } + } else { + composite = alloc.compositeBuffer(Integer.MAX_VALUE) + .addFlattenedComponents(true, cumulation); + } + addInput(alloc, composite, in); + in = null; + return composite; + } finally { + if (in != null) { + // We must release if the ownership was not transferred as otherwise it may produce a leak + in.release(); + // Also release any new buffer allocated if we're not returning it + if (composite != null && composite != cumulation) { + composite.release(); + } + } + } + } + + @VisibleForTesting + void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { + if (shouldCompose(composite, in, composeMinSize)) { + composite.addFlattenedComponents(true, in); + } else { + // The total size of the new data and the last component are below the threshold. Merge them. + mergeWithCompositeTail(alloc, composite, in); + } + } + + @VisibleForTesting + static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) { + int componentCount = composite.numComponents(); + if (composite.numComponents() == 0) { + return true; + } + int inputSize = in.readableBytes(); + int tailStart = composite.toByteIndex(componentCount - 1); + int tailSize = composite.writerIndex() - tailStart; + return tailSize + inputSize >= composeMinSize; + } + + /** + * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by + * expanding or replacing the tail component of the {@link CompositeByteBuf}. + * + *

The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying + * the tail component into a new buffer, for each incoming single-byte buffer. We append the new + * bytes to the tail, when a write (or a fast write) is possible. + * + *

Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to + * achieve runtime amortization. + * + *

We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)}, + * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)}, + * which doubles buffer capacity by normalizing it to the closest power of two. This assumption + * is verified in unit tests for this method. + */ + @VisibleForTesting + static void mergeWithCompositeTail( + ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { + int inputSize = in.readableBytes(); + int tailComponentIndex = composite.numComponents() - 1; + int tailStart = composite.toByteIndex(tailComponentIndex); + int tailSize = composite.writerIndex() - tailStart; + int newTailSize = inputSize + tailSize; + ByteBuf tail = composite.component(tailComponentIndex); + ByteBuf newTail = null; + try { + if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) { + // Ideal case: the tail isn't shared, and can be expanded to the required capacity. + // Take ownership of the tail. + newTail = tail.retain(); + + // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with + // the issue fixed. + // In certain cases, removing the CompositeByteBuf component, and then adding it back + // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844. + // This happens because the buffer returned by composite.component() has out-of-sync + // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying + // buffer, but doesn't set the indexes. + // + // To get the right indexes we use the fact that composite.internalComponent() returns + // the slice() into the readable portion of the underlying buffer. + // We use this implementation detail (internalComponent() returning a *SlicedByteBuf), + // and combine it with the fact that SlicedByteBuf duplicates have their indexes + // adjusted so they correspond to the to the readable portion of the slice. + // + // Hence composite.internalComponent().duplicate() returns a buffer with the + // indexes that should've been on the composite.component() in the first place. + // Until the issue is fixed, we manually adjust the indexes of the removed component. + ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate(); + newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex()); + + /* + * The tail is a readable non-composite buffer, so writeBytes() handles everything for us. + * + * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply + * updates its boundary to the end of consecutive memory run assigned to this buffer) + * - when the required size doesn't fit into writableBytes(), a new buffer is + * allocated, and the capacity calculated with alloc.calculateNewCapacity() + * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf + * is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer. + * Unwrapping buffers is unsafe, and potential benefit of fast writes may not be + * as pronounced because the capacity is doubled with each reallocation. + */ + newTail.writeBytes(in); + } else { + // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity. + newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); + newTail.setBytes(0, composite, tailStart, tailSize) + .setBytes(tailSize, in, in.readerIndex(), inputSize) + .writerIndex(newTailSize); + in.readerIndex(in.writerIndex()); + } + // Store readerIndex to avoid out of bounds writerIndex during component replacement. + int prevReader = composite.readerIndex(); + // Remove the old tail, reset writer index. + composite.removeComponent(tailComponentIndex).setIndex(0, tailStart); + // Add back the new tail. + composite.addFlattenedComponents(true, newTail); + // New tail's ownership transferred to the composite buf. + newTail = null; + in.release(); + in = null; + // Restore the reader. In case it fails we restore the reader after releasing/forgetting + // the input and the new tail so that finally block can handles them properly. + composite.readerIndex(prevReader); + } finally { + // Input buffer was merged with the tail. + if (in != null) { + in.release(); + } + // If new tail's ownership isn't transferred to the composite buf. + // Release it to prevent a leak. + if (newTail != null) { + newTail.release(); + } + } + } +} diff --git a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java new file mode 100644 index 00000000000..6a0c00bac0e --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java @@ -0,0 +1,641 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static com.google.common.truth.TruthJUnit.assume; +import static io.netty.util.CharsetUtil.US_ASCII; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Enclosed.class) +public class NettyAdaptiveCumulatorTest { + + private static Collection cartesianProductParams(List... lists) { + return Lists.cartesianProduct(lists).stream().map(List::toArray).collect(Collectors.toList()); + } + + @RunWith(JUnit4.class) + public static class CumulateTests { + // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions. + private static final String DATA_INITIAL = "0123"; + private static final String DATA_INCOMING = "456789"; + private static final String DATA_CUMULATED = "0123456789"; + + private static final ByteBufAllocator alloc = new UnpooledByteBufAllocator(false); + private NettyAdaptiveCumulator cumulator; + private NettyAdaptiveCumulator throwingCumulator; + private final UnsupportedOperationException throwingCumulatorError = + new UnsupportedOperationException(); + + // Buffers for testing + private final ByteBuf contiguous = ByteBufUtil.writeAscii(alloc, DATA_INITIAL); + private final ByteBuf in = ByteBufUtil.writeAscii(alloc, DATA_INCOMING); + + @Before + public void setUp() { + cumulator = new NettyAdaptiveCumulator(0) { + @Override + void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { + // To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose + composite.addFlattenedComponents(true, in); + } + }; + + // Throws an error on adding incoming buffer. + throwingCumulator = new NettyAdaptiveCumulator(0) { + @Override + void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { + throw throwingCumulatorError; + } + }; + } + + @Test + public void cumulate_notReadableCumulation_replacedWithInputAndReleased() { + contiguous.readerIndex(contiguous.writerIndex()); + assertFalse(contiguous.isReadable()); + ByteBuf cumulation = cumulator.cumulate(alloc, contiguous, in); + assertEquals(DATA_INCOMING, cumulation.toString(US_ASCII)); + assertEquals(0, contiguous.refCnt()); + // In retained by cumulation. + assertEquals(1, in.refCnt()); + assertEquals(1, cumulation.refCnt()); + cumulation.release(); + } + + @Test + public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() { + CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, contiguous, in); + assertEquals(DATA_INITIAL, cumulation.component(0).toString(US_ASCII)); + assertEquals(DATA_INCOMING, cumulation.component(1).toString(US_ASCII)); + assertEquals(DATA_CUMULATED, cumulation.toString(US_ASCII)); + // Both in and contiguous are retained by cumulation. + assertEquals(1, contiguous.refCnt()); + assertEquals(1, in.refCnt()); + assertEquals(1, cumulation.refCnt()); + cumulation.release(); + } + + @Test + public void cumulate_compositeCumulation_inputAppendedAsANewComponent() { + CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); + assertSame(composite, cumulator.cumulate(alloc, composite, in)); + assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII)); + assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII)); + assertEquals(DATA_CUMULATED, composite.toString(US_ASCII)); + // Both in and contiguous are retained by cumulation. + assertEquals(1, contiguous.refCnt()); + assertEquals(1, in.refCnt()); + assertEquals(1, composite.refCnt()); + composite.release(); + } + + @Test + public void cumulate_compositeCumulation_inputReleasedOnError() { + CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); + try { + throwingCumulator.cumulate(alloc, composite, in); + fail("Cumulator didn't throw"); + } catch (UnsupportedOperationException actualError) { + assertSame(throwingCumulatorError, actualError); + // Input must be released unless its ownership has been to the composite cumulation. + assertEquals(0, in.refCnt()); + // Initial composite cumulation owned by the caller in this case, so it isn't released. + assertEquals(1, composite.refCnt()); + // Contiguous still managed by the cumulation + assertEquals(1, contiguous.refCnt()); + } finally { + composite.release(); + } + } + + @Test + public void cumulate_contiguousCumulation_inputAndNewCompositeReleasedOnError() { + // Return our instance of new composite to ensure it's released. + CompositeByteBuf newComposite = alloc.compositeBuffer(Integer.MAX_VALUE); + ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class); + when(mockAlloc.compositeBuffer(anyInt())).thenReturn(newComposite); + + try { + // Previous cumulation is non-composite, so cumulator will create anew composite and add + // both buffers to it. + throwingCumulator.cumulate(mockAlloc, contiguous, in); + fail("Cumulator didn't throw"); + } catch (UnsupportedOperationException actualError) { + assertSame(throwingCumulatorError, actualError); + // Input must be released unless its ownership has been to the composite cumulation. + assertEquals(0, in.refCnt()); + // New composite cumulation hasn't been returned to the caller, so it must be released. + assertEquals(0, newComposite.refCnt()); + // Previous cumulation released because it was owned by the new composite cumulation. + assertEquals(0, contiguous.refCnt()); + } + } + } + + @RunWith(Parameterized.class) + public static class ShouldComposeTests { + // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions. + private static final String DATA_INITIAL = "0123"; + private static final String DATA_INCOMING = "456789"; + + /** + * Cartesian product of the test values. + */ + @Parameters(name = "composeMinSize={0}, tailData=\"{1}\", inData=\"{2}\"") + public static Collection params() { + List composeMinSize = ImmutableList.of(0, 9, 10, 11, Integer.MAX_VALUE); + List tailData = ImmutableList.of("", DATA_INITIAL); + List inData = ImmutableList.of("", DATA_INCOMING); + return cartesianProductParams(composeMinSize, tailData, inData); + } + + @Parameter public int composeMinSize; + @Parameter(1) public String tailData; + @Parameter(2) public String inData; + + private CompositeByteBuf composite; + private ByteBuf tail; + private ByteBuf in; + + @Before + public void setUp() { + ByteBufAllocator alloc = new UnpooledByteBufAllocator(false); + in = ByteBufUtil.writeAscii(alloc, inData); + tail = ByteBufUtil.writeAscii(alloc, tailData); + composite = alloc.compositeBuffer(Integer.MAX_VALUE); + // Note that addFlattenedComponents() will not add a new component when tail is not readable. + composite.addFlattenedComponents(true, tail); + } + + @After + public void tearDown() { + in.release(); + composite.release(); + } + + @Test + public void shouldCompose_emptyComposite() { + assume().that(composite.numComponents()).isEqualTo(0); + assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); + } + + @Test + public void shouldCompose_composeMinSizeReached() { + assume().that(composite.numComponents()).isGreaterThan(0); + assume().that(tail.readableBytes() + in.readableBytes()).isAtLeast(composeMinSize); + assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); + } + + @Test + public void shouldCompose_composeMinSizeNotReached() { + assume().that(composite.numComponents()).isGreaterThan(0); + assume().that(tail.readableBytes() + in.readableBytes()).isLessThan(composeMinSize); + assertFalse(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); + } + } + + @RunWith(Parameterized.class) + public static class MergeWithCompositeTailTests { + private static final String INCOMING_DATA_READABLE = "+incoming"; + private static final String INCOMING_DATA_DISCARDABLE = "discard"; + + private static final String TAIL_DATA_DISCARDABLE = "---"; + private static final String TAIL_DATA_READABLE = "tail"; + private static final String TAIL_DATA = TAIL_DATA_DISCARDABLE + TAIL_DATA_READABLE; + private static final int TAIL_READER_INDEX = TAIL_DATA_DISCARDABLE.length(); + private static final int TAIL_MAX_CAPACITY = 128; + + // DRY sacrificed to improve readability. + private static final String EXPECTED_TAIL_DATA = "tail+incoming"; + + /** + * Cartesian product of the test values. + * + *

Test cases when the cumulation contains components, other than tail, and could be + * partially read. This is needed to verify the correctness if reader and writer indexes of the + * composite cumulation after the merge. + */ + @Parameters(name = "compositeHeadData=\"{0}\", compositeReaderIndex={1}") + public static Collection params() { + String headData = "head"; + + List compositeHeadData = ImmutableList.of( + // Test without the "head" component. Empty string is equivalent of fully read buffer, + // so it's not added to the composite byte buf. The tail is added as the first component. + "", + // Test with the "head" component, so the tail is added as the second component. + headData + ); + + // After the tail is added to the composite cumulator, advance the reader index to + // cover different cases. + // The reader index only looks at what's readable in the composite byte buf, so + // discardable bytes of head and tail doesn't count. + List compositeReaderIndex = ImmutableList.of( + // Reader in the beginning + 0, + // Within the head (when present) or the tail + headData.length() - 2, + // Within the tail, even if the head is present + headData.length() + 2 + ); + return cartesianProductParams(compositeHeadData, compositeReaderIndex); + } + + @Parameter public String compositeHeadData; + @Parameter(1) public int compositeReaderIndex; + + // Use pooled allocator to have maxFastWritableBytes() behave differently than writableBytes(). + private final ByteBufAllocator alloc = new PooledByteBufAllocator(); + + // Composite buffer to be used in tests. + private CompositeByteBuf composite; + private ByteBuf tail; + private ByteBuf in; + + @Before + public void setUp() { + composite = alloc.compositeBuffer(); + + // The "head" component. It represents existing data in the cumulator. + // Note that addFlattenedComponents() does not add completely read buffer, which covers + // the case when compositeHeadData parameter is an empty string. + ByteBuf head = alloc.buffer().writeBytes(compositeHeadData.getBytes(US_ASCII)); + composite.addFlattenedComponents(true, head); + + // The "tail" component. It also represents existing data in the cumulator, but it's + // not added to the cumulator during setUp() stage. It is to be manipulated by tests to + // produce different buffer write scenarios based on different tail's capacity. + // After tail is changes for each test scenario, it's added to the composite buffer. + // + // The default state of the tail before each test: tail is full, but expandable (the data uses + // all initial capacity, but not maximum capacity). + // Tail data and indexes: + // ----tail + // r w + tail = alloc.buffer(TAIL_DATA.length(), TAIL_MAX_CAPACITY) + .writeBytes(TAIL_DATA.getBytes(US_ASCII)) + .readerIndex(TAIL_READER_INDEX); + + // Incoming data and indexes: + // discard+incoming + // r w + in = alloc.buffer() + .writeBytes(INCOMING_DATA_DISCARDABLE.getBytes(US_ASCII)) + .writeBytes(INCOMING_DATA_READABLE.getBytes(US_ASCII)) + .readerIndex(INCOMING_DATA_DISCARDABLE.length()); + } + + @After + public void tearDown() { + composite.release(); + } + + @Test + public void mergeWithCompositeTail_tailExpandable_write() { + // Make incoming data fit into tail capacity. + int fitCapacity = tail.capacity() + INCOMING_DATA_READABLE.length(); + tail.capacity(fitCapacity); + // Confirm it fits. + assertThat(in.readableBytes()).isAtMost(tail.writableBytes()); + + // All fits, so tail capacity must stay the same. + composite.addFlattenedComponents(true, tail); + assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity); + } + + @Test + public void mergeWithCompositeTail_tailExpandable_fastWrite() { + // Confirm that the tail can be expanded fast to fit the incoming data. + assertThat(in.readableBytes()).isAtMost(tail.maxFastWritableBytes()); + + // To avoid undesirable buffer unwrapping, at the moment adaptive cumulator is set not + // apply fastWrite technique. Even when fast write is possible, it will fall back to + // reallocating a larger buffer. + // int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes(); + int tailFastCapacity = + alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE); + + // Tail capacity is extended to its fast capacity. + composite.addFlattenedComponents(true, tail); + assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity); + } + + @Test + public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { + int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes(); + String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1); + int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length(); + composite.addFlattenedComponents(true, tail); + + // Make input larger than tailFastCapacity + in.writeCharSequence(inSuffixOverFastBytes, US_ASCII); + // Confirm that the tail can only fit incoming data via reallocation. + assertThat(in.readableBytes()).isGreaterThan(tail.maxFastWritableBytes()); + assertThat(in.readableBytes()).isAtMost(tail.maxWritableBytes()); + + // Confirm the assumption that new capacity is produced by alloc.calculateNewCapacity(). + int expectedTailCapacity = alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE); + assertTailExpanded(EXPECTED_TAIL_DATA.concat(inSuffixOverFastBytes), expectedTailCapacity); + } + + private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) { + int originalNumComponents = composite.numComponents(); + + // Handle the case when reader index is beyond all readable bytes of the cumulation. + int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex()); + composite.readerIndex(compositeReaderIndexBounded); + + // Execute the merge logic. + NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in); + + // Composite component count shouldn't change. + assertWithMessage( + "When tail is expanded, the number of components in the cumulation must not change") + .that(composite.numComponents()).isEqualTo(originalNumComponents); + + ByteBuf newTail = composite.component(composite.numComponents() - 1); + + // Verify the readable part of the expanded tail: + // 1. Initial readable bytes of the tail not changed + // 2. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded. + // 3. Readable bytes of the incoming buffer are fully read and appended to the tail. + assertEquals(expectedTailReadableData, newTail.toString(US_ASCII)); + // Verify expanded capacity. + assertEquals(expectedNewTailCapacity, newTail.capacity()); + + // Discardable bytes (0 < discardable < readerIndex) of the tail are kept as is. + String newTailDataDiscardable = newTail.toString(0, newTail.readerIndex(), US_ASCII); + assertWithMessage("After tail expansion, its discardable bytes should be unchanged") + .that(newTailDataDiscardable).isEqualTo(TAIL_DATA_DISCARDABLE); + + // Reader index must stay where it was + assertEquals(TAIL_READER_INDEX, newTail.readerIndex()); + // Writer index at the end + assertEquals(TAIL_READER_INDEX + expectedTailReadableData.length(), + newTail.writerIndex()); + + // Verify resulting cumulation. + assertExpectedCumulation(newTail, expectedTailReadableData, compositeReaderIndexBounded); + + // Verify incoming buffer. + assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse(); + assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0); + } + + @Test + public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() { + // Fill in tail to the maxCapacity. + String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes()); + tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII); + composite.addFlattenedComponents(true, tail); + assertTailReplaced(); + } + + @Test + public void mergeWithCompositeTail_tailNotExpandable_shared() { + tail.retain(); + composite.addFlattenedComponents(true, tail); + assertTailReplaced(); + tail.release(); + } + + @Test + public void mergeWithCompositeTail_tailNotExpandable_readOnly() { + composite.addFlattenedComponents(true, tail.asReadOnly()); + assertTailReplaced(); + } + + private void assertTailReplaced() { + int cumulationOriginalComponentsNum = composite.numComponents(); + int taiOriginalRefCount = tail.refCnt(); + String expectedTailReadable = tail.toString(US_ASCII) + in.toString(US_ASCII); + int expectedReallocatedTailCapacity = alloc + .calculateNewCapacity(expectedTailReadable.length(), Integer.MAX_VALUE); + + int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex()); + composite.readerIndex(compositeReaderIndexBounded); + NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in); + + // Composite component count shouldn't change. + assertEquals(cumulationOriginalComponentsNum, composite.numComponents()); + ByteBuf replacedTail = composite.component(composite.numComponents() - 1); + + // Verify the readable part of the expanded tail: + // 1. Discardable bytes (0 < discardable < readerIndex) of the tail are discarded. + // 2. Readable bytes of the tail are kept as is + // 3. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded. + // 4. Readable bytes of the incoming buffer are fully read and appended to the tail. + assertEquals(0, in.readableBytes()); + assertEquals(expectedTailReadable, replacedTail.toString(US_ASCII)); + + // Since tail discardable bytes are discarded, new reader index must be reset to 0. + assertEquals(0, replacedTail.readerIndex()); + // And new writer index at the new data's length. + assertEquals(expectedTailReadable.length(), replacedTail.writerIndex()); + // Verify the capacity of reallocated tail. + assertEquals(expectedReallocatedTailCapacity, replacedTail.capacity()); + + // Verify resulting cumulation. + assertExpectedCumulation(replacedTail, expectedTailReadable, compositeReaderIndexBounded); + + // Verify incoming buffer. + assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse(); + assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0); + + // The old tail must be released once (have one less reference). + assertWithMessage("Replaced tail released once.") + .that(tail.refCnt()).isEqualTo(taiOriginalRefCount - 1); + } + + private void assertExpectedCumulation( + ByteBuf newTail, String expectedTailReadable, int expectedReaderIndex) { + // Verify the readable part of the cumulation: + // 1. Readable composite head (initial) data + // 2. Readable part of the tail + // 3. Readable part of the incoming data + String expectedCumulationData = + compositeHeadData.concat(expectedTailReadable).substring(expectedReaderIndex); + assertEquals(expectedCumulationData, composite.toString(US_ASCII)); + + // Cumulation capacity includes: + // 1. Full composite head, including discardable bytes + // 2. Expanded tail readable bytes + int expectedCumulationCapacity = compositeHeadData.length() + expectedTailReadable.length(); + assertEquals(expectedCumulationCapacity, composite.capacity()); + + // Composite Reader index must stay where it was. + assertEquals(expectedReaderIndex, composite.readerIndex()); + // Composite writer index must be at the end. + assertEquals(expectedCumulationCapacity, composite.writerIndex()); + + // Composite cumulation is retained and owns the new tail. + assertEquals(1, composite.refCnt()); + assertEquals(1, newTail.refCnt()); + } + + @Test + public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() { + final UnsupportedOperationException expectedError = new UnsupportedOperationException(); + CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, + tail) { + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, + ByteBuf buffer) { + throw expectedError; + } + }; + + try { + NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, compositeThrows, in); + fail("Cumulator didn't throw"); + } catch (UnsupportedOperationException actualError) { + assertSame(expectedError, actualError); + // Input must be released unless its ownership has been to the composite cumulation. + assertEquals(0, in.refCnt()); + // Tail released + assertEquals(0, tail.refCnt()); + // Composite cumulation is retained + assertEquals(1, compositeThrows.refCnt()); + // Composite cumulation loses the tail + assertEquals(0, compositeThrows.numComponents()); + } finally { + compositeThrows.release(); + } + } + + @Test + public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() { + final UnsupportedOperationException expectedError = new UnsupportedOperationException(); + CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, + tail.asReadOnly()) { + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, + ByteBuf buffer) { + throw expectedError; + } + }; + + // Return our instance of the new buffer to ensure it's released. + int newTailSize = tail.readableBytes() + in.readableBytes(); + ByteBuf newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); + ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class); + when(mockAlloc.buffer(anyInt())).thenReturn(newTail); + + try { + NettyAdaptiveCumulator.mergeWithCompositeTail(mockAlloc, compositeRo, in); + fail("Cumulator didn't throw"); + } catch (UnsupportedOperationException actualError) { + assertSame(expectedError, actualError); + // Input must be released unless its ownership has been to the composite cumulation. + assertEquals(0, in.refCnt()); + // New buffer released + assertEquals(0, newTail.refCnt()); + // Composite cumulation is retained + assertEquals(1, compositeRo.refCnt()); + // Composite cumulation loses the tail + assertEquals(0, compositeRo.numComponents()); + } finally { + compositeRo.release(); + } + } + } + + /** + * Miscellaneous tests for {@link NettyAdaptiveCumulator#mergeWithCompositeTail} that don't + * fit into {@link MergeWithCompositeTailTests}, and require custom-crafted scenarios. + */ + @RunWith(JUnit4.class) + public static class MergeWithCompositeTailMiscTests { + private final ByteBufAllocator alloc = new PooledByteBufAllocator(); + + /** + * Test the issue with {@link CompositeByteBuf#component(int)} returning a ByteBuf with + * the indexes out-of-sync with {@code CompositeByteBuf.Component} offsets. + */ + @Test + public void mergeWithCompositeTail_outOfSyncComposite() { + NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024); + + // Create underlying buffer spacious enough for the test data. + ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII)); + + // Start with a regular cumulation and add the buf as the only component. + CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf); + // Read composite1 buf to the beginning of the numbers. + assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---"); + + // Wrap composite1 into another cumulation. This is similar to + // what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1. + CompositeByteBuf composite2 = + alloc.compositeBuffer(8).addFlattenedComponents(true, composite1); + assertThat(composite2.toString(US_ASCII)).isEqualTo("01234"); + + // The previous operation does not adjust the read indexes of the underlying buffers, + // only the internal Component offsets. When the cumulator attempts to append the input to + // the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back. + // Because the readerIndex on the tail buffer is not adjusted during the read operation + // on the CompositeByteBuf, adding the tail back results in the discarded bytes of the tail + // to be added back to the cumulator as if they were never read. + // + // If the reader index of the tail is not manually corrected, the resulting + // cumulation will contain the discarded part of the tail: "---". + // If it's corrected, it will only contain the numbers. + CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2, + ByteBufUtil.writeAscii(alloc, "56789")); + assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789"); + + // Correctness check: we still have a single component, and this component is still the + // original underlying buffer. + assertThat(cumulation.numComponents()).isEqualTo(1); + // Replace '2' with '*', and '8' with '$'. + buf.setByte(5, '*').setByte(11, '$'); + assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9"); + } + } +}