-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: pass offsets to partitions as milliseconds instead of seconds #106
Conversation
# Get the offset from the epoch in seconds. | ||
offset_timestamp_s = int(offset_timestamp.timestamp()) | ||
# Get the offset from the epoch (Kafka expects offsets in milliseconds for offsets_for_times, see | ||
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.offsets_for_times) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be misleading to link to the kafka-python library's docs here, since we're not using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll see if I can find it in the library we're actually using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell it's undocumented, both for confluent-kafka and the underlying librdkafka. I have a PR open to add a note about it in the latter.
Co-authored-by: Tim McCormack <tmccormack@edx.org>
Kafka partitions expect time offsets to be in milliseconds instead of seconds, so multiply the timestamp by 1000 before setting it on the partition.
Tested locally against the dev cluster.
Merge checklist:
Check off if complete or not applicable: