diff --git a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java index 9baec34b189..ea654c5b9ba 100644 --- a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java @@ -23,6 +23,7 @@ import java.nio.InvalidMarkException; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Queue; import javax.annotation.Nullable; /** @@ -38,6 +39,7 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { private final Deque readableBuffers; private Deque rewindableBuffers; private int readableBytes; + private final Queue buffers = new ArrayDeque(2); private boolean marked; public CompositeReadableBuffer(int initialCapacity) { @@ -159,6 +161,31 @@ public void readBytes(OutputStream dest, int length) throws IOException { execute(STREAM_OP, length, dest, 0); } + /** + * Reads {@code length} bytes from this buffer and writes them to the destination buffer. + * Increments the read position by {@code length}. If the required bytes are not readable, throws + * {@link IndexOutOfBoundsException}. + * + * @param dest the destination buffer to receive the bytes. + * @param length the number of bytes to be copied. + * @throws IndexOutOfBoundsException if required bytes are not readable + */ + public void readBytes(CompositeReadableBuffer dest, int length) { + checkReadable(length); + readableBytes -= length; + + while (length > 0) { + ReadableBuffer buffer = buffers.peek(); + if (buffer.readableBytes() > length) { + dest.addBuffer(buffer.readBytes(length)); + length = 0; + } else { + dest.addBuffer(buffers.poll()); + length -= buffer.readableBytes(); + } + } + } + @Override public ReadableBuffer readBytes(int length) { if (length <= 0) {