Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume Performance drops when set EnableBatchIndexAcknowledgment = true #949

Closed
panszobe opened this issue Feb 2, 2023 · 13 comments · Fixed by #957
Closed

Consume Performance drops when set EnableBatchIndexAcknowledgment = true #949

panszobe opened this issue Feb 2, 2023 · 13 comments · Fixed by #957
Assignees

Comments

@panszobe
Copy link
Contributor

panszobe commented Feb 2, 2023

According to PR: #938
use master version(v0.9.1-0.20230117072740-d9b18d0690c1) to consume messages while EnableBatchIndexAcknowledgment set
true,but consume performance drops to 2/3 of previous

The test situation is as follows:
Topic has 5 partitions, producer production rate is 20MB/s , 300000 rows/s.

and consumers consume situations:

SDK Version Enable Batch Index Ack Consumer Instances Consume Rate
v0.9.1-0.20230117072740-d9b18d0690c1 Yes 3 100000 rows/s
v0.9.1-0.20230117072740-d9b18d0690c1 No 3 300000+ rows/s

Analyze the problem by pprof,we found that internal.(*connection).internalSendRequest and pulsar.(*partitionConsumer).internalAck are much more resource intensive when set EnableBatchIndexAcknowledgment as true.

Review the code:

func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return errors.New("consumer state is closed")
	}

	if cmid, ok := toChunkedMessageID(msgID); ok {
		return pc.unAckChunksTracker.ack(cmid)
	}

	trackingID, ok := toTrackingMessageID(msgID)
	if !ok {
		return errors.New("failed to convert trackingMessageID")
	}

	ackReq := new(ackRequest)
	ackReq.doneCh = make(chan struct{})
	ackReq.ackType = individualAck
	if !trackingID.Undefined() && trackingID.ack() {
		pc.metrics.AcksCounter.Inc()
		pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
		ackReq.msgID = trackingID
		// send ack request to eventsCh
		pc.eventsCh <- ackReq

		if withResponse {
			<-ackReq.doneCh
		}

		pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
	} else if pc.options.enableBatchIndexAck {
		ackReq.msgID = trackingID
		pc.eventsCh <- ackReq
	}

	if withResponse {
		return ackReq.err
	}
	return nil
}

Maybe problem is that partitionConsumer will send ack request to Pulsar Server by every MessageID, without waiting all msg of one batch be acked by ackTracker, it leads to ack requests becoming much more than BatchIndexAck disabled, performance drops bacause of much more processing requests. And backlog is lasting increasing, could not catch up with the production rate.

So, enableBatchIndexAck should follow the previous processing method or there is another way.

@BewareMyPower Thank you for developing this feature. Could you take a look at this problem ? Thanks a lot!

@BewareMyPower
Copy link
Contributor

I think the root cause is the lack of the ACK grouping tracker feature in Golang client. Here is a similar issue and fix for C++ client: apache/pulsar#6534

@BewareMyPower
Copy link
Contributor

It might take some time for that, I will start the feature catch up next week. Assign this issue to me first, if someone else is interested, please ping me in this issue.

@panszobe
Copy link
Contributor Author

panszobe commented Feb 2, 2023

It might take some time for that, I will start the feature catch up next week. Assign this issue to me first, if someone else is interested, please ping me in this issue.

Ok, thanks. Looking forward to new PR.

@panszobe
Copy link
Contributor Author

@BewareMyPower Hi, is there any progress ?

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Feb 15, 2023

It's almost complete except a few failed tests: BewareMyPower#1

And I found a problem that the Golang client only supports synchronous ACK APIs. If you're going to enable ACK with response, the ACK grouping tracker won't work.

@panszobe
Copy link
Contributor Author

It's almost complete except a few failed tests: BewareMyPower#1

And I found a problem that the Golang client only supports synchronous ACK APIs. If you're going to enable ACK with response, the ACK grouping tracker won't work.

OK, we disable the AckWithResponse option now, it's no problem.

BewareMyPower added a commit to BewareMyPower/pulsar-client-go that referenced this issue Feb 16, 2023
Fixes apache#949

### Motivation

Currently the Go client does not support grouping ACK requests, so each
time `Ack` (or similar APIs) is called, a ACK request will be sent,
which could downgrade the performance. We need to support configuring
the time and size to cache `MessageID` before sending ACK requests.

### Modifications
- Add an `AckGroupingOptions` field to `ConsumerOptions`, when it's nil,
  use 100ms as the max time and 1000 as the max size.
- Add an `ackGroupingTracker` interface to support grouping ACK requests
- When `AckWithResponse` is false, adding the `MessageID` instance to
  the tracker instead of sending the requests to `eventsCh`.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - Added `ack_grouping_tracker_test.go` to verify `ackGroupingTracker`
    in various cases
  - The consumer side change can be covered by existing tests because
    the default `AckGroupingOptions` config is
    `{ MaxSize: 1000, MaxTime: 100*time.Millisecond }`.
shibd pushed a commit that referenced this issue Feb 22, 2023
* Support grouping ACK requests by time and size

Fixes #949

### Motivation

Currently the Go client does not support grouping ACK requests, so each
time `Ack` (or similar APIs) is called, a ACK request will be sent,
which could downgrade the performance. We need to support configuring
the time and size to cache `MessageID` before sending ACK requests.

### Modifications
- Add an `AckGroupingOptions` field to `ConsumerOptions`, when it's nil,
  use 100ms as the max time and 1000 as the max size.
- Add an `ackGroupingTracker` interface to support grouping ACK requests
- When `AckWithResponse` is false, adding the `MessageID` instance to
  the tracker instead of sending the requests to `eventsCh`.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - Added `ack_grouping_tracker_test.go` to verify `ackGroupingTracker`
    in various cases
  - The consumer side change can be covered by existing tests because
    the default `AckGroupingOptions` config is
    `{ MaxSize: 1000, MaxTime: 100*time.Millisecond }`.

* Fix flushAndClean race

* Use unbuffered channel for flush operations

* Apply different AckGroupingOptions and expose this config
@BewareMyPower BewareMyPower reopened this Mar 6, 2023
@BewareMyPower
Copy link
Contributor

Let's reopen this issue for further discussion under #957.

I tested acknowledging by list in my local branch, it does not make a significant difference (About 48 Mbps to 54 Mbps). I'm going to investigate more this week.

@panszobe
Copy link
Contributor Author

panszobe commented Mar 6, 2023

Let's reopen this issue for further discussion under #957.

I tested acknowledging by list in my local branch, it does not make a significant difference (About 48 Mbps to 54 Mbps). I'm going to investigate more this week.

ok.

@BewareMyPower
Copy link
Contributor

Updated

In my latest code, I changed the grouping config:

./perf consume --profile \
       --enable-batch-index-ack \
       --ack-group-max-ms 100 \
       --ack-group-max-size 10000000 \
       my-topic

And now, the consumer can catch up the producer in my local env, though the produce rate is only about 20 MB/s.

INFO[22:53:30.112] Stats - Publish rate: 195124.5 msg/s -  156.3 Mbps -
                                Latency ms: 50%   2.4 -95%   5.0 - 99%  19.5 - 99.9%  19.5 - max   19.5
INFO[22:53:40.113] Stats - Publish rate: 190353.9 msg/s -  152.5 Mbps -
                                Latency ms: 50%   2.4 -95%   5.3 - 99%  20.8 - 99.9%  20.8 - max   20.8
INFO[22:53:50.112] Stats - Publish rate: 178765.6 msg/s -  143.2 Mbps -
                                Latency ms: 50%   2.6 -95%   6.3 - 99%  19.0 - 99.9%  19.0 - max   19.0
INFO[22:54:00.113] Stats - Publish rate: 163200.0 msg/s -  130.7 Mbps -
                                Latency ms: 50%   2.7 -95%   6.7 - 99%  21.9 - 99.9%  21.9 - max   21.9
INFO[22:54:10.112] Stats - Publish rate: 147459.4 msg/s -  118.1 Mbps -
                                Latency ms: 50%   2.9 -95%   7.6 - 99%  25.2 - 99.9%  25.2 - max   25.2
INFO[22:54:20.112] Stats - Publish rate: 157722.7 msg/s -  126.3 Mbps -
                                Latency ms: 50%   2.7 -95%   7.6 - 99%  21.7 - 99.9%  21.7 - max   21.7
INFO[22:54:30.112] Stats - Publish rate: 159618.6 msg/s -  127.9 Mbps -
                                Latency ms: 50%   2.9 -95%   6.7 - 99%  20.4 - 99.9%  20.4 - max   20.4
INFO[22:53:27.683] Stats - Consume rate: 148102.4 msg/s -  118.6 Mbps
INFO[22:53:37.683] Stats - Consume rate: 192455.4 msg/s -  154.2 Mbps
INFO[22:53:47.684] Stats - Consume rate: 176964.8 msg/s -  141.8 Mbps
INFO[22:53:57.683] Stats - Consume rate: 169696.7 msg/s -  135.9 Mbps
INFO[22:54:07.683] Stats - Consume rate: 156973.3 msg/s -  125.7 Mbps
INFO[22:54:17.683] Stats - Consume rate: 148775.7 msg/s -  119.2 Mbps
INFO[22:54:27.683] Stats - Consume rate: 160154.4 msg/s -  128.3 Mbps
INFO[22:54:37.683] Stats - Consume rate: 141378.7 msg/s -  113.3 Mbps

I won't push the PR at the moment because I think there is something wrong with the ACK grouping tracker implementation. Ideally, we should not configure such a large value of MaxSize.

@panszobe
Copy link
Contributor Author

New PR merged, I will take a test.

@panszobe
Copy link
Contributor Author

@BewareMyPower I made a test today.
There are better performances in new PR, as below:
image
image

enable_batch_ack_in_master_version can always catch up the producer's production.
We can see that disable_batch_ack_in_master_version consumes more slowly than enable_batch_ack_in_master_version, did you meet similar situations in your local env test?

There are a lot of efforts put on the improvements under enable_batch_ack scenes, could you have a look at the disable_batch_ack scenes? Usually, I think disable_batch_ack_in_master_version should consume more fast.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Mar 15, 2023

The performance test results are close between these two cases in my local env.

BTW, this issue has been resolved. You can open a new issue for that. Please describe how did you test.

@panszobe
Copy link
Contributor Author

The performance test results are close between these two cases in my local env.

BTW, this issue has been resolved. You can open a new issue for that. Please describe how did you test.

OK, I do more tests in local env.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants