Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kafka 0.9 protocol additions #577

Merged
merged 11 commits into from
Dec 9, 2015
Merged

Conversation

wvanbergen
Copy link
Contributor

@eapache Am I on the right track here?

type GroupProtocol interface {
encodeGroupProtocol(packetEncoder) error
decodeGroupProtocol(packetDecoder) error
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is a bit tricky; the protocol defines this to be extensible based on the ProtocolType. It continues to only define the specifics for the "consumer" protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am tempted to just make this []byte and handle the consumer-specific implementation via Encoder interface? Not sure, but I dislike a public interface with no public methods.

@@ -37,6 +37,9 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

// ErrMessageTooLarge is returned when a JoinGroup request returns a protocol type that is not supported by sarama.
var ErrUnknownGroupProtocol = errors.New("kafka: encountered an unknown group protocol")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen, the broker selects the protocols supported by all members, and if no such set exists throws a InconsistentGroupProtocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this error is only used when the ProtocolType on the request is set (by the user) to an unknown value (i.e. anything but consumer).

@eapache
Copy link
Contributor

eapache commented Dec 7, 2015

You're going to need to add all the new KError values to this as well.

}

func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
r.GroupId, err = pd.getString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can all be collapsed into their if lines because err is predefined

@wvanbergen wvanbergen changed the title [WIP] Implement new Group protocol messages. [WIP] Implement Kafka 0.9 protocol additions Dec 7, 2015
@wvanbergen
Copy link
Contributor Author

Added new KErrors, and Heartbeat request and response pair.

case ErrGroupAuthorizationFailed:
return "kafka server: The client is not authorized to access this group"
case ErrClusterAuthorizationFailed:
return "kafka server: The client is not authorized to send this request type"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, the existing errors end in punctuation

@wvanbergen
Copy link
Contributor Author

I've implemented all the new response/request pairs. I have also removed the ConsumerGroup specialization stuff and left it at just byte-arrays. We can later decide how to handle that best.

@eapache
Copy link
Contributor

eapache commented Dec 7, 2015

LGTM. Collapse all the decode if statements and add tests, then :shipit:

@wvanbergen wvanbergen force-pushed the kafka_09_protocol branch 2 times, most recently from 39e4207 to 611688b Compare December 8, 2015 14:15
0, 3, 't', 'w', 'o', // Protocol name
0, 0, 0, 3, 0x04, 0x05, 0x06, // protocol metadata
0, 3, 'o', 'n', 'e', // Protocol name
0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets assembled from a map, so the order is not deterministic. @eapache how have you gotten around this elsewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only test the 0/1 cases :P

@wvanbergen wvanbergen changed the title [WIP] Implement Kafka 0.9 protocol additions Implement Kafka 0.9 protocol additions Dec 8, 2015
@wvanbergen
Copy link
Contributor Author

I changed some slice types into maps to make the types easier to use, and added tests to all the things. This is ready for final review.

@wvanbergen
Copy link
Contributor Author

Note: some messages, fields, and errors are also renamed as part of the 0.9 release. For backwards compatibility reasons, these are not included.

package sarama

type DescribeGroupsResponse struct {
Groups []*GroupDescription
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda/almost/sorta makes sense for this to be a map keyed by group ID, but not sure if it's worth it at this point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make decode rather more complex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like that this matches the request: you ask for a slice of groups, you get a slice of group descriptions back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair

@eapache
Copy link
Contributor

eapache commented Dec 9, 2015

:shipit:

wvanbergen added a commit that referenced this pull request Dec 9, 2015
Implement Kafka 0.9 protocol additions
@wvanbergen wvanbergen merged commit 159e999 into master Dec 9, 2015
@wvanbergen wvanbergen deleted the kafka_09_protocol branch December 9, 2015 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants