Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 #4196

Merged
merged 39 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9fb47f5
Add a test that reproduces a bug in checksum calculation
lhotari Jan 31, 2024
0707710
Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList
lhotari Jan 30, 2024
fe9301d
Remove CompositeBuffer unwrapping in DigestManager
lhotari Jan 30, 2024
532de4f
Rename update -> internalUpdate so that unwrapping logic could be add…
lhotari Jan 30, 2024
9a39d6b
Remove unnecessary unwrapping logic in Java9IntHash
lhotari Jan 30, 2024
986942d
Add safe way to handle CompositeByteBuf
lhotari Jan 31, 2024
0669a66
Add license header
lhotari Jan 31, 2024
4036a3a
Fix checkstyle
lhotari Jan 31, 2024
7a8c200
Refactor ByteBuf visitor solution
lhotari Jan 31, 2024
b5b42ee
Fix checkstyle
lhotari Jan 31, 2024
81e9d2d
Reformat
lhotari Jan 31, 2024
6c7e6e0
Refactor recursive visiting
lhotari Jan 31, 2024
c19ae5d
Revisit equals, hashCode and toString
lhotari Jan 31, 2024
8928f98
Refactor test case
lhotari Feb 1, 2024
19e33ca
Add support for UnpooledHeapByteBuf.getBytes which passes an array
lhotari Feb 1, 2024
86d002d
Add support for visiting buffers backed by byte[] arrays
lhotari Feb 1, 2024
e793872
Move ByteBufVisitor to org.apache.bookkeeper.util package
lhotari Feb 1, 2024
6508bf5
Update javadoc
lhotari Feb 1, 2024
5cd2dd8
Refactor to use stateless visitor so that instance can be shared
lhotari Feb 1, 2024
fc93b5e
Improve test so that a single scenario can be used for debugging
lhotari Feb 2, 2024
272e962
Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == …
lhotari Feb 2, 2024
b4d0b18
Fix checkstyle
lhotari Feb 2, 2024
9c1b68f
Fix checkstyle
lhotari Feb 2, 2024
a751294
Fix missing depth increment that prevents StackOverflowException
lhotari Feb 3, 2024
5f8fef5
Properly handle the depth increase and decrease
lhotari Feb 3, 2024
bb703ab
Remove unnecessary condition
lhotari Feb 3, 2024
2b79716
Use more efficient way to read bytes to the target array
lhotari Feb 3, 2024
f650b9b
Don't use ByteBufVisitor if it's not necessary
lhotari Feb 3, 2024
d21a5cd
Revert "Fix bug in Java9IntHash calculation that assumed crc32c_updat…
lhotari Feb 3, 2024
a9ed5e4
Fix issue in resume byte[] version that was added
lhotari Feb 3, 2024
b7d5795
Polish ByteBufVisitor
lhotari Feb 4, 2024
34c1c8a
Use extracted method
lhotari Feb 4, 2024
be20948
Fix bug with array handling
lhotari Feb 4, 2024
7998659
Polish ByteBufVisitor
lhotari Feb 4, 2024
b691316
Optimize the buffer copying in the case where array or memory address…
lhotari Feb 4, 2024
0af1556
Check if memory address is accepted
lhotari Feb 4, 2024
317aab7
Improve comments about complement (current = ~current) in resume
lhotari Feb 4, 2024
2c73dca
Print thread dump when build is cancelled
lhotari Feb 1, 2024
c215db8
Filter empty buffers and arrays in ByteBufVisitor
lhotari Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void populateValueAndReset(int digest, ByteBuf buf) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
crc.get().update(data, offset, len);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
Expand All @@ -34,6 +32,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.mutable.MutableInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,7 +55,20 @@ public abstract class DigestManager {

abstract int getMacCodeLength();

abstract int update(int digest, ByteBuf buffer, int offset, int len);
abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
MutableInt digestRef = new MutableInt(digest);
ByteBufVisitor.visitBuffers(buffer, offset, len,
(ByteBuf childBuffer, int childIndex, int childLength) -> {
if (childLength > 0) {
// recursively visit the sub buffer and update the digest
int updatedDigest = internalUpdate(digestRef.intValue(), childBuffer, childIndex, childLength);
digestRef.setValue(updatedDigest);
}
});
return digestRef.intValue();
}

abstract void populateValueAndReset(int digest, ByteBuf buffer);

Expand Down Expand Up @@ -136,34 +148,19 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long

// Compute checksum over the headers
int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.safeRelease(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());

populateValueAndReset(digest, buf);

// Reset the reader index to the beginning
buf.readerIndex(0);

if (isSmallEntry) {
buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
unwrapped.release();
buf.writeBytes(data, data.readerIndex(), data.readableBytes());
data.release();
return buf;
} else {
return ByteBufList.get(buf, unwrapped);
return ByteBufList.get(buf, data);
}
}

Expand All @@ -176,25 +173,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA
headersBuffer.writeLong(length);

int digest = update(0, headersBuffer, 0, METADATA_LENGTH);

// don't unwrap slices
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
? data.unwrap() : data;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(data);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
for (int i = 0; i < cbb.numComponents(); i++) {
ByteBuf b = cbb.component(i);
digest = update(digest, b, b.readerIndex(), b.readableBytes());
}
} else {
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
}
digest = update(digest, data, data.readerIndex(), data.readableBytes());
populateValueAndReset(digest, headersBuffer);

return ByteBufList.get(headersBuffer, unwrapped);
return ByteBufList.get(headersBuffer, data);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int getMacCodeLength() {
}

@Override
int update(int digest, ByteBuf buffer, int offset, int len) {
int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void populateValueAndReset(int digest, ByteBuf buffer) {
}

@Override
int update(int digest, ByteBuf data, int offset, int len) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
mac.get().update(data.slice(offset, len).nioBuffer());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -133,43 +132,14 @@ public static ByteBufList get() {
* Append a {@link ByteBuf} at the end of this {@link ByteBufList}.
*/
public void add(ByteBuf buf) {
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
((CompositeByteBuf) unwrapped).forEach(b -> {
ReferenceCountUtil.retain(b);
buffers.add(b);
});
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(unwrapped);
}
buffers.add(buf);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
public void prepend(ByteBuf buf) {
// don't unwrap slices
final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf
? buf.unwrap() : buf;
ReferenceCountUtil.retain(unwrapped);
ReferenceCountUtil.release(buf);

if (unwrapped instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) unwrapped;
for (int i = composite.numComponents() - 1; i >= 0; i--) {
ByteBuf b = composite.component(i);
ReferenceCountUtil.retain(b);
buffers.add(0, b);
}
ReferenceCountUtil.release(unwrapped);
} else {
buffers.add(0, unwrapped);
}
buffers.add(0, buf);
}

/**
Expand Down Expand Up @@ -285,7 +255,7 @@ public ByteBufList retain() {
@Override
protected void deallocate() {
for (int i = 0; i < buffers.size(); i++) {
ReferenceCountUtil.release(buffers.get(i));
buffers.get(i).release();
}

buffers.clear();
Expand Down
Loading
Loading