From e16f7f92214b905daa08ae4e2f8acecdff03d445 Mon Sep 17 00:00:00 2001 From: hanbing0715 Date: Tue, 11 Apr 2017 17:27:12 +0800 Subject: [PATCH] add kafka 0.10.2.0 in version list add apiversions in broker --- broker.go | 11 +++++++++++ broker_test.go | 13 +++++++++++++ utils.go | 1 + 3 files changed, 25 insertions(+) diff --git a/broker.go b/broker.go index ff066b8d8..f57a69094 100644 --- a/broker.go +++ b/broker.go @@ -355,6 +355,17 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups return response, nil } +func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) { + response := new(ApiVersionsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index 5adb46f10..fcbe627fa 100644 --- a/broker_test.go +++ b/broker_test.go @@ -284,6 +284,19 @@ var brokerTestTable = []struct { t.Error("DescribeGroups request got no response!") } }}, + + {"ApiVersionsRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := ApiVersionsRequest{} + response, err := broker.ApiVersions(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("ApiVersions request got no response!") + } + }}, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { diff --git a/utils.go b/utils.go index 3cbab2d92..d36db9210 100644 --- a/utils.go +++ b/utils.go @@ -148,5 +148,6 @@ var ( V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) minVersion = V0_8_2_0 )