diff --git a/consumer.go b/consumer.go index 78d7fa2ca..6e21f81a0 100644 --- a/consumer.go +++ b/consumer.go @@ -726,6 +726,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { request.Version = 2 } + if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { + request.Version = 3 + request.MaxBytes = MaxResponseSize + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/fetch_request.go b/fetch_request.go index ab817a06e..65600e86e 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { return nil } +// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See +// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + MaxBytes int32 Version int16 blocks map[string]map[int32]*fetchRequestBlock } @@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { pe.putInt32(-1) // replica ID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) + if r.Version == 3 { + pe.putInt32(r.MaxBytes) + } err = pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { if r.MinBytes, err = pd.getInt32(); err != nil { return err } + if r.Version == 3 { + if r.MaxBytes, err = pd.getInt32(); err != nil { + return err + } + } topicCount, err := pd.getArrayLength() if err != nil { return err @@ -114,6 +126,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_10_1_0 default: return minVersion }