Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RdKafka] Allow acknowledge to not call commit #580

Closed
Steveb-p opened this issue Oct 23, 2018 · 15 comments
Closed

[RdKafka] Allow acknowledge to not call commit #580

Steveb-p opened this issue Oct 23, 2018 · 15 comments
Labels

Comments

@Steveb-p
Copy link
Contributor

Steveb-p commented Oct 23, 2018

According to a discusion on https://gitter.im/arnaud-lb/php-rdkafka calling KafkaConsumer::commit or (commitAsync) is not really necessary, since offset is moved on read from topic (unless I'm mistaken, and also if enable.auto.commit topic configuration is left set to true (default)).

image

I'd like to propose an option for configuration to disable calling commit when acknowledging messages from Kafka. Is it a good idea?

@makasim
Copy link
Member

makasim commented Oct 23, 2018

I dont know. I am not much into kafka world. We could try if you think it makes sense.

@Steveb-p
Copy link
Contributor Author

@makasim I'm going to run tests to check if what I understand of Kafka internals is correct anyway. Just looking for your opinion and / or disapproval before I start :) I'll gladly make a PR once I'm sure about it, since I'm going to use that functionality myself.

@makasim
Copy link
Member

makasim commented Oct 23, 2018

@Steveb-p 👍

@Steveb-p
Copy link
Contributor Author

Sorry for it being so late, had a lot of different project issues to solve, had to postpone anything related to Kafka.

According to my tests, there is no significant difference betweem calling commitAsync or not, to the point of it being irrelevant. In my case for a small amount of events being passed from Kafka to rdkafka consumer the difference measured between 0.10609292984009 and 0.1104040145874 seconds of processing. There was no real data processing occuring other than passing it through Messenger Adapter.

As far as using "normal" commit, performance of the test degraded significantly, resulting in 15.112560987473 seconds.

In all three cases offset was saved on broker, whether message was acknowledged or not.

@makasim
Copy link
Member

makasim commented Nov 22, 2018

@Steveb-p I think we should make async commit a default behavior than. https://github.com/php-enqueue/enqueue-dev/blob/master/pkg/rdkafka/RdKafkaConsumer.php#L54

@Steveb-p
Copy link
Contributor Author

@makasim I'm looking into this further. As a matter of fact, it seems that calling commit doesn't really do anything apart from forcing Kafka to actually commit message offset on disc in case of it's "sync" variant.

Sample of subscription for base library does not include commit call:
https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html

I've ran tests with both commit and commitAsync calls commented out.
1.

array:3 [
  "global" => array:3 [
    "group.id" => "1"
    "metadata.broker.list" => "kafka1:9092,kafka2:9092,kafka3:9092"
    "enable.auto.commit" => "true"
  ]
  "commit_async" => true
  "topic" => array:1 [
    "auto.offset.reset" => "beginning"
  ]
]

On subsequent calls, I've not received previously seen messages, which is what I'd expect.

array:3 [
  "global" => array:3 [
    "group.id" => "2"
    "metadata.broker.list" => "kafka1:9092,kafka2:9092,kafka3:9092"
  ]
  "commit_async" => false
  "topic" => array:1 [
    "auto.offset.reset" => "beginning"
    "enable.auto.commit" => "false"
  ]
]

At first glance, despite explicitly asking Consumer to not commit automatically, I did not receive previously seen messages. It is possible that Kafka holds some information about Consumer group in memory, but I'm not sure atm. It looks as if enable.auto.commit option is ignored. (I'm pretty sure we should not rely on this, as it seems like an unintended behavior)

array:3 [
  "global" => array:3 [
    "group.id" => "3"
    "metadata.broker.list" => "kafka1:9092,kafka2:9092,kafka3:9092"
    "auto.offset.reset" => "beginning"
  ]
  "commit_async" => false
  "topic" => array:1 [
    "enable.auto.commit" => "false"
  ]
]

For sake of information, since I've stumbled upon this when dealing with topic/global configuration, it seems auto.offset.reset global configuration has not effect for underlying librdkafka library. Only after I've put it under topic key it started to be respected and - in example above - messages were read from the beginning for new consumer groups.

Configuration was dumped near RdKafkaContext.php:155 (getConf method) from $this->config.

Either way, it seems that calling commit is not necessary at all. It appears as if KafkaConsumer::consume marks message as read in it's underlying implementation.

@makasim
Copy link
Member

makasim commented Nov 22, 2018

Thank you for such a detailed overview. I think we can remove commit call than.

@Steveb-p
Copy link
Contributor Author

Steveb-p commented Nov 22, 2018

@makasim Wanted to ask this in a separate issue maybe, but this is as good a place as any: I've noticed that there was a recent patch removing Throwable exception class handling from Enqueue\MessengerAdapter\QueueInteropTransport::receive, since it hid possible errors when handling messages in Queue (I was wondering why my messages weren't handled in 0.8 :D).

However it can also potentially cause deserialization errors to get stuck in Queue indefininitely (this is kinda also my case - Serializer was unable to deserialize message, tho Kafka consumed the message anyway. It might happen for other queue implementations tho, since acknowledge/reject is further in code). I'm thinking maybe making QueueInteropTransport LoggerAware might help in cases where this happens?

As for the original issue, I'll provide a patch sometime tomorrow. In case anything breaks due to this, it might happen on my new message bus, so it should provide a nice testing environment :)

@makasim
Copy link
Member

makasim commented Nov 22, 2018

@Steveb-p I am not using Symfony Messenger. IMO messenger transport layer is very very bad designed, for me it is broken in many places. and should not be used at all.

Sorry, cannot help you here much.

@makasim
Copy link
Member

makasim commented Nov 22, 2018

@Steveb-p I'd like to propose you a kafka transport maintainer role. As I see you know far more than me about it. Would you be interested?

@Steveb-p
Copy link
Contributor Author

@makasim I'd say sure but I'm not familiar with underlying code of Symfony Messenger & Messenger Adapter for it.
As far as Symfony Messenger is concerned, it just suits my use-case and tools I'm already using / have to use, so I don't mind trying it out. Before looking into Symfony Messenger I didn't even know enqueue existed.

I've seen a couple of others steping up for the job. I'd prefer to stay a collaborator or co-maintainer at this time. Not to mention, I doubt I'll be using it for anything else than Redis / Kafka.

@makasim
Copy link
Member

makasim commented Nov 22, 2018

I was talking about rdkafka transport not symfony messenger.

@Steveb-p
Copy link
Contributor Author

@makasim My bad. I'd do so gladly then, considering my company will probably use it a lot.

@makasim
Copy link
Member

makasim commented Nov 22, 2018

@Steveb-p that's great

@makasim makasim closed this as completed Feb 19, 2019
@Steveb-p
Copy link
Contributor Author

@makasim can I reopen this since I intend to work on it? :)

Steveb-p added a commit that referenced this issue Mar 28, 2019
Provide addition configuration documentation for integrating with Symfony bundle

Fixes #580
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants