Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make kafka output store offset for successfully delivered events #516

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

ppcad
Copy link
Collaborator

@ppcad ppcad commented Feb 1, 2024

No description provided.

@ppcad ppcad requested review from dtrai2 and ekneg54 February 1, 2024 11:17
@ppcad ppcad self-assigned this Feb 1, 2024
@ppcad ppcad linked an issue Feb 1, 2024 that may be closed by this pull request
@codecov-commenter
Copy link

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (9e19caa) 91.56% compared to head (8c484b9) 91.65%.

Files Patch % Lines
logprep/connector/confluent_kafka/input.py 87.09% 4 Missing ⚠️
logprep/connector/confluent_kafka/output.py 97.29% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #516      +/-   ##
==========================================
+ Coverage   91.56%   91.65%   +0.09%     
==========================================
  Files         130      130              
  Lines        9496     9551      +55     
==========================================
+ Hits         8695     8754      +59     
+ Misses        801      797       -4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@ekneg54 ekneg54 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea is clear for me, but the mechanic is not and I think the problem is spread accross the output AND the input.

to get the possibility to add meta fields to the event is pretty cool as we need this in the http_input_connector too.

the guarantee of delivery is not optional. There is no possibility to opt out and to have a fire and forget kafka output as before. please consider to make the whole mechanic configurable.

As implemented for now I do not think, that this will do the job and I have big doubts on performance of this solution. please have a look on my remarks.

logprep/connector/confluent_kafka/input.py Outdated Show resolved Hide resolved
Comment on lines 439 to 443
for meta_field in ("last_partition", "last_offset"):
try:
del event_dict["_metadata"][meta_field]
except (TypeError, KeyError):
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for meta_field in ("last_partition", "last_offset"):
try:
del event_dict["_metadata"][meta_field]
except (TypeError, KeyError):
pass
metadata.pop("last_partition", None)
metadata.pop("last_offset", None)

easier to read. (you have to adjust the tests, because you give string data where a dict is expected)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it, but I kept try except to check in case _metadata is not a dict, since this field might already exist in the event and be of any type. I did not check for dict directly, since it is more likely to not happen and more performant this way.

Comment on lines 466 to 467
if metadata is None:
raise FatalInputError(self, "Metadata for setting offsets can't be 'None'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if metadata is None:
raise FatalInputError(self, "Metadata for setting offsets can't be 'None'")

if it can't be None, we should ensure it is not None to reflect the non optional type hint. But this seems to be the wrong place to check this. We should check this earlier to fail faster.

My suggestion is to set metadata to an empty dict in batch_finished_callback if it is None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I changed it to be set in batch_finished_callback.

Comment on lines +486 to 487
Should be called by output connectors if they are finished processing a batch of records.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Should be called by output connectors if they are finished processing a batch of records.
"""
Should be called by output connectors if they are finished processing a batch of records.
"""
metadata = {} if metadata is None else metadata

so we ensure it can't be 'None' in further processing. Please adjust the type hints accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've added that suggestion.


@Metric.measure_time()
def _write_backlog(self):
self._producer.flush(self._config.flush_timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the flush_timeout in opensearch and elasticsearch is the time to guarantee message delivery. so this confuses me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush_timeout was already used for flush in shut_down and in case of a BufferError.
flush does internally call poll, until the internal buffer is empy, ensuring that all messages get sent.
We could rename it, but calling it flush_timeout for the flush method makes sense in my opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it makes sense. but the other option is to rename the parameters in elasticsearch and opensearch parameters so that the term "flush_timeout" means globaly the same in logprep. I would prefer, to change it here and now to get rid of this inconsistency and to not raise another pull_request to change this anywhere else

if error:
raise FatalOutputError(output=self, message=error)
self.metrics.number_of_successfully_delivered_events += 1
self.input_connector.batch_finished_callback(metadata=partition_offset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I understand batch_finished_callback is called on every successful delivered message, right?
if so, this would decrease performance drastically, because on every successful delivery the GIL is on this method.

but the called method is named BATCH_finished_callback but now it is called on every single message?

consider using the equivalent mechanic as in the opensearch_output. Write all successful deliveries in a list. then if the list (you should use a deque for this) is full, get the last committable offset for all the partitions and call the batch_finished_callback with it.

here it is possible that you commit for messages that are not delivered.

example:

kafka topic partition offsets:
current: 0 committed: 0

you consume messages
current: 1 committed: 0
you consume further messages
current 2 committed: 0
you deliver message 2 and callback is called
current 2 committed 2
what is with the first message that was not delivered?

kafka thinks it is delivered now, but it is actual not

name="number_of_successfully_delivered_events",
)
)
"""Number of events that were successfully delivered to Kafka"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ok it is not your issue now, but could you please add the documentation for the other kafka_output config parameters?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some documentation now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ensure Correct Offsets for Kafka Output
3 participants