Skip to content

Commit

Permalink
Fixed unnecessary copy to heap, see apache/pulsar#10330 (#2701)
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

Handling CompositeByteBuf in a way that avoids unnecessary data copy.

### Motivation

apache/pulsar#10330

apache/pulsar#10330 (comment)

### Changes

Handling CompositeByteBuf in a way that avoids unnecessary data copy.
  • Loading branch information
dlg99 authored Apr 26, 2021
1 parent a832ed5 commit 3c9c710
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;

import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -110,10 +112,21 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
headersBuffer.writeLong(length);

update(headersBuffer);
update(data);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(this::update);
} else {
update(unwrapped);
}
populateValueAndReset(headersBuffer);

return ByteBufList.get(headersBuffer, data);
return ByteBufList.get(headersBuffer, unwrapped);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -136,14 +137,43 @@ private static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
buffers.add(buf);
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
buffers.add(0, buf);
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
}

/**
Expand Down Expand Up @@ -259,7 +289,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
buffers.get(i).release();
ReferenceCountUtil.release(buffers.get(i));
}

buffers.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -88,6 +89,39 @@ public void testDouble() throws Exception {
assertEquals(b2.refCnt(), 0);
}

@Test
public void testComposite() throws Exception {
ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
b1.writerIndex(b1.capacity());
ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
b2.writerIndex(b2.capacity());

CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
composite.addComponent(b1);
composite.addComponent(b2);

ByteBufList buf = ByteBufList.get(composite);

// composite is unwrapped into two parts
assertEquals(2, buf.size());
// and released
assertEquals(composite.refCnt(), 0);

assertEquals(256, buf.readableBytes());
assertEquals(b1, buf.getBuffer(0));
assertEquals(b2, buf.getBuffer(1));

assertEquals(buf.refCnt(), 1);
assertEquals(b1.refCnt(), 1);
assertEquals(b2.refCnt(), 1);

buf.release();

assertEquals(buf.refCnt(), 0);
assertEquals(b1.refCnt(), 0);
assertEquals(b2.refCnt(), 0);
}

@Test
public void testClone() throws Exception {
ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
Expand Down

0 comments on commit 3c9c710

Please sign in to comment.