Skip to content

Commit

Permalink
Merge pull request #905 from sp1ff/master
Browse files Browse the repository at this point in the history
Added support for FetchRequest protocol version 3.
  • Loading branch information
eapache authored Jun 30, 2017
2 parents d81f250 + d6b6145 commit 2fd980e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
4 changes: 4 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 3
request.MaxBytes = MaxResponseSize
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
14 changes: 14 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
return nil
}

// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
blocks map[string]map[int32]*fetchRequestBlock
}
Expand All @@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
if r.Version == 3 {
pe.putInt32(r.MaxBytes)
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand Down Expand Up @@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if r.MinBytes, err = pd.getInt32(); err != nil {
return err
}
if r.Version == 3 {
if r.MaxBytes, err = pd.getInt32(); err != nil {
return err
}
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -114,6 +126,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
default:
return minVersion
}
Expand Down

0 comments on commit 2fd980e

Please sign in to comment.