-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use rdkafka event API instead of the callback API #617
Conversation
94da831
to
d25c7d9
Compare
Hey, could you explain to a fellow maintainer (rdkafka-ruby here) what benefits it yields for the consumer? I clearly see benefits for admin and producer (where the delivery reports materialization is easier to handle) but I wonder about the consumer. I would appreciate some explanation as it has been a while since I started thinking about this but I have problems wrapping my head around the consumer part. Is it not possible to yield from rust in the context of rebalance callback? How will it operate on rebalanes where the revocation happens first and prior to it an action like offset storage could take place? Thanks 🙏 |
d25c7d9
to
113af85
Compare
Hello @mensfeld! The rebalance handler does not really change here for rust-rdkafka. Is there anything in particular that gives that impression? Can you elaborate a little bit more about this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general!
src/consumer/base_consumer.rs
Outdated
} | ||
} | ||
|
||
if op_timeout >= timeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set op_timeout
to std::cmp::min(timeout, min_poll_interval)
, so it's never going to be larger than timeout
. Shouldn't we do something like:
let now = Instant::now();
let op_timeout = std::cmp::min(timeout, min_poll_interval);
// ...
let timeout = timeout.saturating_sub(now.elapsed());
if timeout == 0 {
return None;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to work fine for the following common cases. Did I miss something?
- If the user provides a
timeout
set to 100ms, then theop_timeout
will be 100ms (asmin_poll_interval
is 1sec by default). If no msgs or errors have been returned, then on this checkop_timeout
is equal totimeout
and we returnNone
. - If the user provides a
timeout
set to 2 seconds (over the default formin_poll_interval
), thenop_timeout
will be 1 second. Let's say we poll for 1 second and it does not return a msg/error, thenop_timeout
is less than timeout and now we dotimeout -= op_timeout
which settimeout
to 1 second. If we get None messages/errors again, thenop_timeout
is equal totimeout
and we returnNone
. - If the user provides
timeout
set to Never, thenop_timeout
is set to the default of 1 sec and we keep polling every second until we get a message or error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was concerned about something like the following:
- the user sets the
timeout
to 100ms. - After 1ms, we get
RD_KAFKA_EVENT_OFFSET_COMMIT
event. - We do
timeout -= op_timeout
, which putstimeout
at 0. - We exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah that's a good catch. Ok, I'll fix this.
} else { | ||
while !self.closed() { | ||
self.poll(Duration::from_millis(100)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe? Could we drop messages here and have the offsets committed to kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. My understanding is that messages won't be forwarded here anymore. In this particular case we had to switch rd_kafka_consumer_close by rd_kafka_consumer_close_queue because rd_kafka_consumer_close
will block until the consumer has revoked its assignment by calling Unassign
or IncrementalUnassign
on the rebalance handler. This translates on the consumer being blocked forever as there's no rebalance callback registered anymore.
Hence we had to move to rd_kafka_consumer_close_queue
that allows us to asynchronously close the consumer. In this particular case, by calling Poll
we can fwd the rebalance event to the rebalance method on the ConsumerContext
following the API doc:
The application must poll/serve this queue until rd_kafka_consumer_closed() returns true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, sounds good to me.
113af85
to
f3a0a53
Compare
The `Arc<BaseConsumer<C>>` to `MessageStream` isn't necessary anymore, and the changes to `split_partition_queue` can be reverted as well I think.
This is required as multiple write acks are tied to a single event.
One poll call might not be enough to serve the delivery report callbacks of the purged messages. The current flush impl will call poll multiple times until the queue is empty or timeout.
If timeout::Never is used, poll should eventually return a Message or Error rather than None when handling other events like stats, rebalances, etc.
rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event for the consumer case in order to return the error to the user.
Currently if a group.id is not specified we allow the use of the consumer for fetching metadata and watermarks. Keeping this behaviour.
If you have a consumer wrapping this one (FFI cases), the outer consumer must close the queue and serve the events via Poll. Otherwise it will hang forever as prior to calling close there's a rebalance & rdkafka awaits a response before continuing.
With the Event API we propagate generic client instance-level errors, such as broker connection failures, authentication issues, etc. However, fatal errors are also propagated via the Event API. These indicates that the particular instance of the client (producer/consumer) becomes non-functional.
f3a0a53
to
978c964
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
This: it allows handling the rebalance events outside of the scope of the registered callback. This is specially useful for services using rust-rdkafka to build FFI libraries on top of it. Since rebalance callbacks are triggered from the thread that is polling the consumer queue (one or both with main), a closure in the callback should allow to run code "outside" (in a way). So I wondered how this is done via events API. I am myself looking to move to the IO event API for producer and admin but not sure what benefits it yields for regular high level consumers. |
Let me give some more context. Let's assume you provide some FFI layer on top of rust-rdkafka that allows you to create bindings for other langs (like Go/Python etc). Using the callback approach, we register a rebalance callback with On the other hand, with the Event API approach, there are no such limitations. You receive the assign/revoke event and can delegate the *assign calls to an external caller. For rust-rdkafka, this translates to overriding the rebalance method of the |
@scanterog thank you for your explanation. That simplified things for me a lot 🙏 |
Is there any chance that the consumer never closed in BaseConsumer drop method? Because I ran into a situation where my program never exit that while loop checking whether closed. |
@Millione are you using cooperative sticky assignment strategy? |
No, will it make a difference? |
@Millione I thought you ended up with an infinite loop error that can happen with certain combination of cooperative-sticky and some specific partition counts. Please ignore if not used because then it's not the case. |
@mensfeld Even if in the case you said, I still think it's not reasonable here to loop forever. The implementation should prevent this? |
@Millione I cannot tell you much about the implementation in rust because I maintain the implementation in Ruby. The issue here is, that polling delegates the responsibility to the C layer and it is the C layer that may hang. If that happens it happens. Maybe there are tools to exit out of such hanging code available in Rust, but at least in Ruby, they are of low quality and do not fix anything. I do not know what is happening in your particular case, but I hold the Rust bindings in high regard in terms of stability and quality. |
@mensfeld Thanks for your patient explanation, I finally find I am stuck in the poll stage for rebalancing while closing, the last in stack frame is pthread cond wait. |
This PR changes how rust-rdkafka interacts with librdkafka. Instead of using the callback API, it moves to the event API. This PR does not change any of the public rdkafka API, but it give us the chance to introduce some optimizations. Some of them might break the current API though and hence this PR does not go into that route yet. For example, consider #273.
Some other benefits:
FutureProducer
where we need to detach the borrowMessage on errors.Performance wise, I've run kafka-benchmark I've got a pretty similar outcome for both the callback vs event based approach. I didn't find much differences here. I've also run long running services on our staging infra and we didn't find noticeable changes on cpu/rss on the producer/consumer side.
There's one main difference here though. Operation errors not related to consumption (like transient broker transport failures) are passed back to the user with the Event API. This is not the case for the Callback API. Details can be found confluentinc/librdkafka#4493. These errors should generally be considered informational as the underlying client will automatically try to recover from any errors encountered, the application does not need to take action on them. However, fatal errors are also propagated via the Event API. These indicates that the particular instance of the client (producer/consumer) becomes non-functional.