From 4d1eceb89058cdc687330317003a540e1fccf7e6 Mon Sep 17 00:00:00 2001 From: Michael Herstine Date: Thu, 15 Jun 2017 12:19:57 -0700 Subject: [PATCH 1/3] Added support for FetchRequest protocol version 3. This commit will add the 'MaxBytes' field to FetchRequests when the Kafka version is 0.10.1 or better. On send, it will be set to MaxResponseSize. No tests, yet. --- consumer.go | 4 ++++ fetch_request.go | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/consumer.go b/consumer.go index 78d7fa2ca..5479c024d 100644 --- a/consumer.go +++ b/consumer.go @@ -726,6 +726,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { request.Version = 2 } + if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { + request.Version = 3 + request.MaxBytes = MaxResponseSize - 47 // TODO(mherstin): WTF?! + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/fetch_request.go b/fetch_request.go index ab817a06e..a6d19501f 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -21,9 +21,13 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { return nil } +// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See +// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + MaxBytes int32 Version int16 blocks map[string]map[int32]*fetchRequestBlock } @@ -32,6 +36,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { pe.putInt32(-1) // replica ID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) + if 3 == r.Version { + pe.putInt32(r.MaxBytes) + } err = pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -67,6 +74,11 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { if r.MinBytes, err = pd.getInt32(); err != nil { return err } + if r.Version == 3 { + if r.MaxBytes, err = pd.getInt32(); err != nil { + return err + } + } topicCount, err := pd.getArrayLength() if err != nil { return err From b69551c88b4da0500555051fca1cd15c5c074b07 Mon Sep 17 00:00:00 2001 From: Michael Herstine Date: Tue, 20 Jun 2017 11:06:01 -0700 Subject: [PATCH 2/3] Add support for FetchRequest API V3. This commit adds support for version 3 of the FetchRequest API. The KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes the PR here: https://github.com/apache/kafka/pull/1812 and the JIRA here: https://issues.apache.org/jira/browse/KAFKA-2063 Should document the fact that the per-partition limits take precedence (so the returned message may be larger than the requested limit). --- consumer.go | 2 +- fetch_request.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 5479c024d..6e21f81a0 100644 --- a/consumer.go +++ b/consumer.go @@ -728,7 +728,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { } if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { request.Version = 3 - request.MaxBytes = MaxResponseSize - 47 // TODO(mherstin): WTF?! + request.MaxBytes = MaxResponseSize } for child := range bc.subscriptions { diff --git a/fetch_request.go b/fetch_request.go index a6d19501f..f4be46641 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -126,6 +126,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_10_1_0 default: return minVersion } From d6b6145ea673ef2f3ed3a984051c3bba75b03ceb Mon Sep 17 00:00:00 2001 From: Michael Herstine Date: Fri, 30 Jun 2017 10:37:26 -0700 Subject: [PATCH 3/3] Changing if condition from C-style in response to PR feedback. --- fetch_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fetch_request.go b/fetch_request.go index f4be46641..65600e86e 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -36,7 +36,7 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { pe.putInt32(-1) // replica ID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) - if 3 == r.Version { + if r.Version == 3 { pe.putInt32(r.MaxBytes) } err = pe.putArrayLength(len(r.blocks))