Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix message buffer being copied from direct memory to heap memory #10330

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Apr 22, 2021

Motivation

When I tested pulsar with AppendIndexMetadataInterceptor configured, i.e.

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

I found the heap memory increased very fast so that GC happened frequently. After the analysis from the dump info, I found many messages, which have BrokerEntryMetadata in the head, are stored in heap memory instead of direct memory.

When I removed the above configuration, the heap memory became slow to increase.

Modifications

  • Copy two buffers into a single buffer in direct memory instead of using CompositeByteBuf. Because when a CompositeByteBuf that has over 1 components goes to BookKeeper's checksum logic, the nioBuffer() method will be called and the CompositeByteBuf's internal buffers will be concatenated into a single buffer in heap memory.
  • Add the reference count check to ensure that after the change, the input and output buffers of addBrokerEntryMetadata will be released finally after the output buffer is released.
  • Add a slightly code refactor that makes ManagedLedgerInterceptor#beforeAddEntry's returned value be used.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

After this change, the heap memory increasing problem was solved.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Apr 22, 2021

Here're the monitor statistics after and before this change. The first case (before 00:15) uses this PR's addBrokerEntryMetadata method.

截屏2021-04-23 上午12 19 32

截屏2021-04-23 上午12 19 46

I used a producer with nearly 200 MB/s write speed.

./bin/pulsar-perf produce -r 300000 my-topic

As we can see, the heap memory of second case (after 00:15) increased very fast and the GC happened very frequently. The difference of these two cases are only the pulsar-common.jar.

I'm not an expert of Netty and also very confused why the CompositeByteBuf make the whole buffer be copied into the heap memory.

Here's a dumped result that was generated by jmap -dump:format=b.

image

We can see there're many messages in heap memory. The first two bytes are [14, 2], which are equivalent to magicBrokerEntryMetadata. See

public static final short magicBrokerEntryMetadata = 0x0e02;

@BewareMyPower
Copy link
Contributor Author

@codelipenghui @hangc0276 @wuzhanpeng @dockerzhang @aloyszhang PTAL. Also it would be appreciated if you can compare the heap memory usage before and after this PR.

I'm not sure if there's something wrong with my test environment. The VM is AWS EC2 Ubuntu Server 18.04 LTS (HVM) ami-05248307900d52e3a i3.4xlarge.

@BewareMyPower
Copy link
Contributor Author

It looks like the heap memory problem is a BK side problem. I doubt that when LedgerHandle#asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx) accepts a CompositeByteBuf as the data, it will finally concat CompositeByteBuf's buffers to a single buffer in heap memory.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the heap memory problem is a BK side problem. I doubt that when LedgerHandle#asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx) accepts a CompositeByteBuf as the data, it will finally concat CompositeByteBuf's buffers to a single buffer in heap memory.

Probably you are right.

In this patch we are forcibly copying the data, and we should avoid it.

@eolivelli
Copy link
Contributor

@dlg99 PTAL

@codelipenghui
Copy link
Contributor

Seems the problem is related to the crc32 checksum in the bookkeeper client. Since we have more than 1 component in the CompositeByteBuf, CompositeByteBuf.hasMemoryAddress() will return false, details to see implement at the CompositeByteBuf:

public boolean hasMemoryAddress() {
        switch(this.componentCount) {
        case 0:
            return Unpooled.EMPTY_BUFFER.hasMemoryAddress();
        case 1:
            return this.components[0].buf.hasMemoryAddress();
        default:
            return false;
        }
    }

For the crc32 checksum in the bookkeeper will use nioBuffer() to resume the CRC hash, due to CompositeByteBuf..nioBuffer() will copy the data to the HeapByteBuffer if the component size > 1.

public static int resumeChecksum(int previousChecksum, ByteBuf payload) {
        if (payload.hasMemoryAddress() && (CRC32C_HASH instanceof Sse42Crc32C)) {
            return CRC32C_HASH.resume(previousChecksum, payload.memoryAddress() + payload.readerIndex(),
                payload.readableBytes());
        } else if (payload.hasArray()) {
            return CRC32C_HASH.resume(previousChecksum, payload.array(), payload.arrayOffset() + payload.readerIndex(),
                payload.readableBytes());
        } else {
            return CRC32C_HASH.resume(previousChecksum, payload.nioBuffer());
        }
    }

@BewareMyPower
Copy link
Contributor Author

@codelipenghui Thanks for you help, I'll update the PR description after I changed the beforeAddEntry back.

@eolivelli As what @codelipenghui mentioned, here we cannot avoid copying data because in BookKeeper side CompositeByteBuf#nioBuffer will be called. I think we can open an issue in https://github.com/apache/bookkeeper. So after
BookKeeper side fixed the problem, we could change the code here to avoid copying data.

@BewareMyPower
Copy link
Contributor Author

I've updated the code and PR description, PTAL again. @eolivelli @codelipenghui

@codelipenghui
Copy link
Contributor

@BewareMyPower Could you please help add an issue for tracking the Pulsar side and an issue for tracking the bookkeeper side? Before the 2.8.0 release, we need to fix both of them, this also needs a release of the bookkeeper which contains this fix. I agree with we can copying the data for now to avoid copy data into the JVM, since this only happens when the broker entry metadata enabled, if this can not be fixed completely, we'd better disable this feature by default in 2.8.0.

@eolivelli WDYT?

@eolivelli
Copy link
Contributor

@dlg99 please follow up on BK.

@dlg99
Copy link
Contributor

dlg99 commented Apr 23, 2021

@codelipenghui nice catch. The checksum part should be easy to fix, DigestManager.computeDigestAndPackageForSending can check if ByteBuf is an instance of CompositeByteBuf, treat it as Iterable (implemented by CompositeByteBuf), update digest in a loop. Or something along these lines.
PCBC.addEntry is doing something similar with ByteBufList.

@BewareMyPower
Copy link
Contributor Author

since this only happens when the broker entry metadata enabled, if this can not be fixed completely, we'd better disable this feature by default in 2.8.0.

This feature is disabled by default now because the default brokerEntryMetadataInterceptors is empty and managedLedgerInterceptor is null.

@BewareMyPower
Copy link
Contributor Author

BTW, I'm not sure if BK could be released in time so that Pulsar 2.8.0 won't be delayed too long. Currently I suggest applying this PR's patch first. If BK can't be released in time, we could apply the BK's fix to Pulsar 2.8.1.

@dlg99
Copy link
Contributor

dlg99 commented Apr 23, 2021

apache/bookkeeper#2701 should fix it.
Existing tests should be enough, we are not unit-testing memory allocations.
I didn't wait for the test run locally, let's see what CI checks say.

@BewareMyPower
Copy link
Contributor Author

@dlg99 Thanks for your quick fix.

And what I concern is if the PR is merged, when will BK be released and then we can upgrade Pulsar's BK dependency.

@dlg99
Copy link
Contributor

dlg99 commented Apr 23, 2021

@BewareMyPower I think we have a few other interesting changes since bk 4.13.0 to justify 4.13.1 release.
@eolivelli what do you think? We'll need to review pending PRs, what can be merged etc. Release process (+voting etc) easily takes a week.

@BewareMyPower meanwhile, can I ask you to build BK locally and confirm that the fix in fact improves the situation with heap allocations in your scenario? Let's assume this is a pre-req for starting the release process.

@eolivelli
Copy link
Contributor

@dlg99 I agree with you

@BewareMyPower
Copy link
Contributor Author

@dlg99 OK, I'll verify it soon.

@BewareMyPower
Copy link
Contributor Author

@dlg99 It looks like there's not significant improvement.

image

I'll add some logs to check if the BK dependency is updated. BTW, my build steps are:

  1. Build latest BK with PR 2701
mvn clean install -DskipTests
  1. Upgrade Pulsar's BK version
<bookkeeper.version>4.14.0-SNAPSHOT</bookkeeper.version>
  1. Build Pulsar
mvn clean install -DskipTests -Pcore-modules
  1. Then upload the distribution/server/target/apache-pulsar-2.8.0-SNAPSHOT-bin.tar.gz to my VM and start it.

@dlg99
Copy link
Contributor

dlg99 commented Apr 23, 2021

@BewareMyPower Looking at original chart, heap utilization used to go up to 4.5G, now about 3G (assuming I interpret these charts correctly, the tests and configuration are are the same etc). I'd expect these to be the same but grow slower and less frequent GC pauses but I have no idea what these charts actually show.

You may need to run a profiler to check where the allocations happen. IIRC JFR can record data over some interval and then JMC can show you where the most allocations happened, with stacktraces etc.

@BewareMyPower
Copy link
Contributor Author

I'll try to debug BK dependencies first. Maybe it was not caused by the checksum.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Apr 23, 2021

@dlg99 I just found there's something wrong with @codelipenghui 's analysis, which leads to a result that your PR didn't work.

We can debug BrokerEntryMetadataE2ETest and go to

ByteBuf duplicateBuffer = data.retainedDuplicate();
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);

  • The original data is io.netty.buffer.CompositeByteBuf
  • However, duplicateBuffer is io.netty.buffer.UnpooledDuplicatedByteBuf

So the argument type, which is passed to LedgerHandle#asyncAddEntry, is UnpooledDuplicatedByteBuf not CompositeByteBuf.


I tried to change the code above to

            ByteBuf duplicateBuffer = data.retainedDuplicate();
            final ByteBuf originalData = data;
            data = duplicateBuffer;

            // internally asyncAddEntry() will take the ownership of the buffer and release it at the end
            addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
            lastInitTime = System.nanoTime();
            ledger.asyncAddEntry(originalData, this, addOpCount);

But it still didn't work (even worse). I've added some debug logs to BK:

    public static int resumeChecksum(int previousChecksum, ByteBuf payload) {
        log.info("XYZ resumeChecksum payload.hasMemoryAddress(): {}, hasArray(): {}, is CompositeByteBuf: {},"
                        + " class name: {}",
                payload.hasMemoryAddress(), payload.hasArray(), (payload instanceof CompositeByteBuf),
                payload.getClass().getName());

Then send 1 message, the logs is:

19:41:21.303 [BookKeeperClientWorker-OrderedExecutor-12-0] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - XYZ resumeChecksum payload.hasMemoryAddress(): true, hasArray(): false, is CompositeByteBuf: false, class name: io.netty.buffer.PooledUnsafeDirectByteBuf
19:41:21.304 [BookKeeperClientWorker-OrderedExecutor-12-0] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - XYZ resumeChecksum payload.hasMemoryAddress(): true, hasArray(): false, is CompositeByteBuf: false, class name: io.netty.buffer.PooledUnsafeDirectByteBuf
19:41:21.305 [BookKeeperClientWorker-OrderedExecutor-12-0] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - XYZ resumeChecksum payload.hasMemoryAddress(): true, hasArray(): false, is CompositeByteBuf: false, class name: io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf

The CompositeByteBuf was split to a PooledUnsafeDirectByteBuf and a AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf. It seems that nioBuffer() wouldn't be called, but the heap memory problem still existed and might get worse.

@merlimat
Copy link
Contributor

So the argument type, which is passed to LedgerHandle#asyncAddEntry, is UnpooledDuplicatedByteBuf not CompositeByteBuf.

@BewareMyPower We might need to use ByteBuf.unwrap() to access to the wrapped buffer.

I tried to change the code above to
ByteBuf duplicateBuffer = data.retainedDuplicate();
But it still didn't work (even worse). I've added some debug logs to BK:

There's an extra retain, that would also require another release on the buffer.

@dlg99
Copy link
Contributor

dlg99 commented Apr 23, 2021

@BewareMyPower I updated the pr to unwrap DuplicatedByteBuf

Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

}
}

private boolean beforeAddEntry(OpAddEntry addOperation) {
private Optional<OpAddEntry> beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be updated to reflect the new return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but since this PR may not be merged because the BookKeeper side could fix it. I may do this code refactor in another PR.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Apr 24, 2021

@dlg99 Really thanks for your help. It should work now.

But there's still a little problem that is not related to your code change. When I run Pulsar with BK 4.14-SNAPSHOT, it looks like that the JNI library can't be loaded:

WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider

This is the reason that I still cannot get a good test result. However, when I run the Pulsar with BK 4.13, it succeeded to load the JNI library:

INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized

They are on the same VM so I wondered what could cause this?

@BewareMyPower
Copy link
Contributor Author

Also I have a question. Should we add an option to configure that if the CompositeByteBuf could be merged? Because for the platforms that can't load the Circe JNI library, the performance impact that is caused by frequent GC is much more than the data copy in memory. With the Java based CRC32c provider, the merged ByteBuf is still in direct memory but the CompositeByteBuf will call nioBuffer() and copy data into heap memory.

@eolivelli
Copy link
Contributor

BK 4.14 is basically equals to 4.13 in this scope.
So probably it is a problem about how you have built it locally.
On which platform you are? Arw you on Linux? Which JDK?

@BewareMyPower
Copy link
Contributor Author

@eolivelli I built BK on my MacOS with JDK 8, see #10330 (comment)

And I run the Pulsar with BK 4.14 on AWS EC2 Ubuntu Server 18.04 LTS (HVM) ami-05248307900d52e3a, with JDK 11.

@eolivelli
Copy link
Contributor

Probably (I am not sure) the problem is that you have to build BK on Linux, in order to build the CirceChecksum library correctly.

When we release BK, even while using Mace's, we run the build in a docker env, you can use the script you find in the 'dev' folder for instance

@BewareMyPower
Copy link
Contributor Author

@eolivelli Sounds reasonable. I'll give it a shot.

jiazhai pushed a commit to apache/bookkeeper that referenced this pull request Apr 26, 2021
Descriptions of the changes in this PR:

Handling CompositeByteBuf in a way that avoids unnecessary data copy.

### Motivation

apache/pulsar#10330

apache/pulsar#10330 (comment)

### Changes

Handling CompositeByteBuf in a way that avoids unnecessary data copy.
@dlg99
Copy link
Contributor

dlg99 commented Apr 26, 2021

@BewareMyPower can you confirm that the copy to heap issue is resolved by the bookkeeper change?

@BewareMyPower
Copy link
Contributor Author

@dlg99 Yeah, it's resolved. Sorry I forgot to update the result. Here's the compare with the same workload of #10330 (comment)

image

The heap memory is stable and there's no GC.

@BewareMyPower
Copy link
Contributor Author

Hi, @eolivelli I see apache/bookkeeper#2701 has been merged for a few days. Is it any plan for BookKeeper's release now? Then the Pulsar's BK dependency could be upgraded to fix #10330.

@eolivelli
Copy link
Contributor

Can you please start a discussion on dev@bookkeeper.apache.org ?
I am fine with cutting a release

@BewareMyPower
Copy link
Contributor Author

@eolivelli Sorry for the late reply because I'm on vacation now. I sent an email just now, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants