Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Cannot determine whether the message is a duplicate at this time #21892

Closed
2 tasks done
graysonzeng opened this issue Jan 12, 2024 · 5 comments · Fixed by #22892
Closed
2 tasks done

[Bug] Cannot determine whether the message is a duplicate at this time #21892

graysonzeng opened this issue Jan 12, 2024 · 5 comments · Fixed by #22892
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@graysonzeng
Copy link
Contributor

graysonzeng commented Jan 12, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Version

pulsar version:3.1.1,master

Minimal reproduce step

broker count: 2
bookie count: 5

broker config:
managedLedgerDefaultAckQuorum: "2"
managedLedgerDefaultEnsembleSize: "4"
managedLedgerDefaultWriteQuorum: "3"

// Open Deduplication config
brokerDeduplicationEnabled: "true"

// enable Interceptor
brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Enable batch producer by default

Using pulsar perf the publishing rate is 200000 messages/sec and the total number of messages is 100000000.
Consume it at the same time.

bin/pulsar-perf produce persistent://pulsar/default/input_test -r 200000 -m 10000000

At the same time, Use a function to consume and produce messages, and set the sequenceId to the producer in the function.(Use EFFECTIVELY_ONCE mode)

What did you expect to see?

Complete the production and consumption of all messages

What did you see instead?

the producer fall into the following error and be stuck because of this error until the broker is restarted.

2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-3] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Re-Sending 1142 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 1 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-9] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 4340 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 395 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 1482 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 10 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 454 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 - R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [21.24.16.52/21.24.16.52:6650] Got exception io.netty.channel.StacklessClosedChannelException
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Disconnected
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-8] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-6-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s

Anything else?

After stuck, the heap dump of the broker was generated and something unusual was discovered

image

the pendingAddOps of LedgerHandle is also retains a lot of requests, the first request status in the queue is not completed, and pendingWriteRequests = 0, and addEntrySuccessBookies is empty.

image

But the second request is completed status.

if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {

In isDuplicate of MessageDeduplication, the sequenceId is between lastSequenceIdPersisted and highestSequencedPushed, this is the reason why we receive Cannot determine whether the message is a duplicate at this time error

                if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
                    return MessageDupStatus.Dup;
                } else {
                    return MessageDupStatus.Unknown;
                }

The client received this error, then disconnected and resent the message. The resent message was still at sequenceId > lastSequenceIdPersisted, causing it to fall into a loop.

Update

An important log message was found

14:29:19.961 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while running task: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
	at io.netty.buffer.AbstractByteBuf.checkIndexBounds(AbstractByteBuf.java:112) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:144) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:56) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:42) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.UnpooledDuplicatedByteBuf.<init>(UnpooledDuplicatedByteBuf.java:24) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedDuplicateByteBuf.<init>(AbstractPooledDerivedByteBuf.java:164) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf.duplicate0(AbstractPooledDerivedByteBuf.java:157) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.PooledSlicedByteBuf.duplicate(PooledSlicedByteBuf.java:118) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf$Component.duplicate(CompositeByteBuf.java:1947) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf.component(CompositeByteBuf.java:1556) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSendingV2(DigestManager.java:149) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSending(DigestManager.java:106) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.PendingAddOp.initiate(PendingAddOp.java:246) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.doAsyncAddEntry(LedgerHandle.java:1363) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.asyncAddEntry(LedgerHandle.java:1061) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:144) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:862) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:794) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

It points to bookkeeper DigestManager.computeDigestAndPackageForSendingV2()

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L149

private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length,
                                                                 ByteBuf data, byte[] masterKey, int flags) {

        if (unwrapped instanceof CompositeByteBuf) {
            CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
            for (int i = 0; i < cbb.numComponents(); i++) {
                // throws a IndexOutOfBoundsException
                ByteBuf b = cbb.component(i);
                digest = update(digest, b, b.readerIndex(), b.readableBytes());
            }
        } else {
            digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
        }

    }

In normal circumstances, after calculation, the result will be assigned to toSend and the payload will be changed to null.

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L231

public synchronized void initiate() {
       ......
        this.toSend = lh.macManager.computeDigestAndPackageForSending(
                entryId, lh.lastAddConfirmed, currentLedgerLength,
                payload, lh.ledgerKey, flags);
        // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
        payload = null;
       ......
    }

Therefore, we can see the peek of pendingAddOps still retains the payload, and toSend is empty

image

In bookkeeper PendingAddOp.unsetSuccessAndSendWriteRequest(), if toSend is null, it is return directly, So this request has been retained in pendingAddOps since computeDigest failed
https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L183

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@graysonzeng graysonzeng added the type/bug The PR fixed a bug or issue reported a bug label Jan 12, 2024
@lhotari
Copy link
Member

lhotari commented Jan 12, 2024

broker config:
managedLedgerDefaultAckQuorum: "2"
managedLedgerDefaultEnsembleSize: "4"
managedLedgerDefaultWriteQuorum: "3"

@graysonzeng Not related to the reported issue, but it's good to be aware that when using this type of config, it won't be optimal for read performance since sticky reads aren't used with bookies when E != Qw. More details in #18003 and apache/bookkeeper#4131 . Related Pulsar Slack thread: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1699487100764749?thread_ts=1698225686.705339&cid=C5Z4T36F7
"since ensemble=Qw, reads rates have increased x30" .

@graysonzeng
Copy link
Contributor Author

@graysonzeng Not related to the reported issue, but it's good to be aware that when using this type of config, it won't be optimal for read performance since sticky reads aren't used with bookies when E != Qw. More details in #18003 and apache/bookkeeper#4131 . Related Pulsar Slack thread: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1699487100764749?thread_ts=1698225686.705339&cid=C5Z4T36F7 "since ensemble=Qw, reads rates have increased x30" .

Thanks so much for the heads up! I definitely missed it. I will take the time to read it.

@lhotari
Copy link
Member

lhotari commented Feb 12, 2024

// enable Interceptor
brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

@graysonzeng does it reproduce without brokerEntryMetadataInterceptors ?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Feb 21, 2024

@lhotari I haven't tested it again, but I think it's not reproducible without brokerEntryMetadataInterceptors. Currently it seems that CompositeByteBuf will be generated in addBrokerEntryMetadata only when brokerEntryMetadataInterceptors are enabled.

compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);

@lhotari
Copy link
Member

lhotari commented May 31, 2024

This issue is most likely related to #22601 / #22810.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants