-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
User-space multi-version support #617
Comments
@eapache I have recently started looking into how we can implement func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
var r *OffsetCommitRequest
if bom.parent.conf.Consumer.Offsets.Retention == 0 {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
}
...
return nil
} How would you prefer to improve this if we implement something like // Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
// KafkaVersion is the string defining the the version of Kafka cluster. This version string is used to
// indicate client behaviour such as the ApiVersion to use when communicating with the server.
KafkaVersion string
...
} I am relatively new to sarama and go in general but would be more than happy to contribute to this. |
Something like var r *OffsetCommitRequest
if kafkaVersionAtLeast(bom.parent.conf.KafkaVersion, "0.9.0") {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} Although now I'm leaning towards a struct with methods so we could write |
Did you see KIP-35 which aims to provide an API to query the broker for supported API requests&versions? |
I hadn't seen that, but since it isn't in existing versions I imagine won't be practically useful until after we've dropped support for versions prior to 0.9.1. Still, good for future planning. |
@eapache Can you please have a look at this and leave some comments: https://github.com/mrafayaleem/sarama/pull/2/files I have used semver in a slightly hacky way to do version comparison. Also, I am thinking of implementing timestamp as part of this. I would be opening a PR once I have everything in place so would keep bugging you for code reviews if that is okay with you. |
If you're going to do timestamps, please do it as a separate PR; things are much easier to review when we only have to worry about one concern at a time. As for the existing state of your branch, it looks pretty good, but I am concerned about the semver library. I appreciate the desire to use a library, but it seems like a problem; Kafka doesn't follow semver, and they do intend to release a real v1 at some point as far as I know. A more generic library (or just implementing a small helper to split on |
Thanks for the feedback. Agreed on both points. Just wanted to get an initial GTG on the PR. Would make the changes and finalise it soon. Thanks! |
Tries to address IBM#617
This exists; now we just need to use it. |
Problem
The kafka spec was recently updated again to list new versions for several of the protocol messages (specifically produce and fetch requests) which we should add support for.
However, this raises a more general question of e.g. which versions of these messages we use in the consumer and producer etc. We currently have a mixed bag of config (such as
Consumer.Offsets.Retention
) which affect this one or another, but nothing consistent, and it's hard to tell which version of kafka which config settings make you compatible (or not) with.Proposal
Implement a
Config.KafkaVersion
as either a string ("0.8.2") or a struct (0, 8, 2) which specifies the broker version sarama is communicating with. Deprecate all the version-specific config options and just do the right thing everywhere depending on the version of kafka in use. This will mean automatically using the latest supported API version everywhere except when that has serious semantic differences (e.g. offset requests going to kafka vs zookeeper).Open Questions
The text was updated successfully, but these errors were encountered: