-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Commit kafka offset in routine load #1734
Commit kafka offset in routine load #1734
Conversation
|
||
for (auto& kv : ctx->kafka_info->cmt_offset) { | ||
RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( | ||
ctx->kafka_info->topic, kv.first, kv.second); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to delete this tp1
after commit?
switch (ctx->load_src_type) { | ||
case TLoadSourceType::KAFKA: { | ||
std::shared_ptr<DataConsumer> consumer; | ||
HANDLE_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer), "failed to get consumers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better not return any error if we failed to do something after txn is committed.
@@ -133,6 +133,8 @@ class KafkaDataConsumer : public DataConsumer { | |||
// reassign partition topics | |||
virtual Status reset() override; | |||
virtual bool match(StreamLoadContext* ctx) override; | |||
// commit kafka offset | |||
Status commit(std::vector<RdKafka::TopicPartition*> offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status commit(std::vector<RdKafka::TopicPartition*> offset); | |
Status commit(const std::vector<RdKafka::TopicPartition*>& offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
librdkafka api is " ErrorCode commitSync (std::vector<TopicPartition*> &offsets)" , so here can't be const
a9cd126
to
cba0073
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…mysql jdbc connector (apache#1734)
Commit kafka offset in routine load
Kafka will decide whether to delete data based on whether all consumer group is commit offset or not.
If there is no commit offset, the kafka server disk may be full