Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 #4196

Merged
merged 39 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9fb47f5
Add a test that reproduces a bug in checksum calculation
lhotari Jan 31, 2024
0707710
Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList
lhotari Jan 30, 2024
fe9301d
Remove CompositeBuffer unwrapping in DigestManager
lhotari Jan 30, 2024
532de4f
Rename update -> internalUpdate so that unwrapping logic could be add…
lhotari Jan 30, 2024
9a39d6b
Remove unnecessary unwrapping logic in Java9IntHash
lhotari Jan 30, 2024
986942d
Add safe way to handle CompositeByteBuf
lhotari Jan 31, 2024
0669a66
Add license header
lhotari Jan 31, 2024
4036a3a
Fix checkstyle
lhotari Jan 31, 2024
7a8c200
Refactor ByteBuf visitor solution
lhotari Jan 31, 2024
b5b42ee
Fix checkstyle
lhotari Jan 31, 2024
81e9d2d
Reformat
lhotari Jan 31, 2024
6c7e6e0
Refactor recursive visiting
lhotari Jan 31, 2024
c19ae5d
Revisit equals, hashCode and toString
lhotari Jan 31, 2024
8928f98
Refactor test case
lhotari Feb 1, 2024
19e33ca
Add support for UnpooledHeapByteBuf.getBytes which passes an array
lhotari Feb 1, 2024
86d002d
Add support for visiting buffers backed by byte[] arrays
lhotari Feb 1, 2024
e793872
Move ByteBufVisitor to org.apache.bookkeeper.util package
lhotari Feb 1, 2024
6508bf5
Update javadoc
lhotari Feb 1, 2024
5cd2dd8
Refactor to use stateless visitor so that instance can be shared
lhotari Feb 1, 2024
fc93b5e
Improve test so that a single scenario can be used for debugging
lhotari Feb 2, 2024
272e962
Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == …
lhotari Feb 2, 2024
b4d0b18
Fix checkstyle
lhotari Feb 2, 2024
9c1b68f
Fix checkstyle
lhotari Feb 2, 2024
a751294
Fix missing depth increment that prevents StackOverflowException
lhotari Feb 3, 2024
5f8fef5
Properly handle the depth increase and decrease
lhotari Feb 3, 2024
bb703ab
Remove unnecessary condition
lhotari Feb 3, 2024
2b79716
Use more efficient way to read bytes to the target array
lhotari Feb 3, 2024
f650b9b
Don't use ByteBufVisitor if it's not necessary
lhotari Feb 3, 2024
d21a5cd
Revert "Fix bug in Java9IntHash calculation that assumed crc32c_updat…
lhotari Feb 3, 2024
a9ed5e4
Fix issue in resume byte[] version that was added
lhotari Feb 3, 2024
b7d5795
Polish ByteBufVisitor
lhotari Feb 4, 2024
34c1c8a
Use extracted method
lhotari Feb 4, 2024
be20948
Fix bug with array handling
lhotari Feb 4, 2024
7998659
Polish ByteBufVisitor
lhotari Feb 4, 2024
b691316
Optimize the buffer copying in the case where array or memory address…
lhotari Feb 4, 2024
0af1556
Check if memory address is accepted
lhotari Feb 4, 2024
317aab7
Improve comments about complement (current = ~current) in resume
lhotari Feb 4, 2024
2c73dca
Print thread dump when build is cancelled
lhotari Feb 1, 2024
c215db8
Filter empty buffers and arrays in ByteBufVisitor
lhotari Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.DuplicatedByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
Expand All @@ -34,6 +33,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.mutable.MutableInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,14 +49,89 @@ public abstract class DigestManager {

public static final int METADATA_LENGTH = 32;
public static final int LAC_METADATA_LENGTH = 16;
private static final int MAX_SUB_BUFFER_VISIT_RECURSION_DEPTH = 10;

final long ledgerId;
final boolean useV2Protocol;
private final ByteBufAllocator allocator;

abstract int getMacCodeLength();

abstract int update(int digest, ByteBuf buffer, int offset, int len);
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
return recursiveSubBufferVisitForDigestUpdate(digest, buffer, offset, len, 0);
}

private int recursiveSubBufferVisitForDigestUpdate(int digest, ByteBuf buffer, int offset, int len, int depth) {
if (len == 0) {
return digest;
}
if (depth < MAX_SUB_BUFFER_VISIT_RECURSION_DEPTH && !buffer.hasMemoryAddress() && !buffer.hasArray()) {
return visitWrappedBuffersAndCallInternalUpdateForEach(digest, buffer, offset, len, depth);
}
return internalUpdate(digest, buffer, offset, len);
}

/**
* This method is used to visit the wrapped buffers and call internalUpdate for each of them.
* CompositeByteBuf is one of the wrapped buffer types that will be visited. It can contain multiple
* sub buffers. The sub buffers can be wrapped buffers as well.
*
* Netty doesn't provide an API to visit the wrapped buffers, so we have to use the a hack here.
*
* @param digest current digest value
* @param buffer the buffer to visit
* @param offset the offset in the buffer
* @param len the length in the buffer
* @param depth the recursion depth of the wrapped buffer visit
* @return updated digest value
*/
private int visitWrappedBuffersAndCallInternalUpdateForEach(int digest, ByteBuf buffer, int offset, int len,
int depth) {
// hold the digest in a MutableInt so that it can be updated in the wrapped buffer visit
MutableInt digestRef = new MutableInt(digest);
ByteBuf sliced = buffer.slice(offset, len);
// call getBytes to trigger the wrapped buffer visit
sliced.getBytes(0, new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
if (length > 0) {
// recursively visit the sub buffer and update the digest
int updatedDigest = recursiveSubBufferVisitForDigestUpdate(digestRef.intValue(), src, srcIndex,
length, depth + 1);
digestRef.setValue(updatedDigest);
}
return this;
}

@Override
public boolean hasArray() {
// return false so that the wrapped buffer is visited
return false;
}

@Override
public boolean hasMemoryAddress() {
// return false so that the wrapped buffer is visited
return false;
}

@Override
public int nioBufferCount() {
// return 0 so that the wrapped buffer is visited
return 0;
}

@Override
public int capacity() {
// should return sufficient capacity for the total length
return len;
}
});
// return updated digest value
return digestRef.intValue();
}

abstract void populateValueAndReset(int digest, ByteBuf buffer);

Expand Down Expand Up @@ -136,34 +211,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long

// Compute checksum over the headers
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());

populateValueAndReset(digest, buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
unwrapped.release();
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
data.release();
return buf;
} else {
return ByteBufList.get(buf, unwrapped);
return ByteBufList.get(buf, data);
}
}

Expand All @@ -176,25 +236,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
headersBuffer.writeLong(length);

int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
return ByteBufList.get(headersBuffer, data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int getMacCodeLength() {
}

@Override
int update(int digest, ByteBuf buffer, int offset, int len) {
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -133,43 +132,14 @@ public static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
buffers.add(buf);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
buffers.add(0, buf);
}

/**
Expand Down Expand Up @@ -285,7 +255,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
ReferenceCountUtil.release(buffers.get(i));
buffers.get(i).release();
}

buffers.clear();
Expand Down
Loading
Loading