From 7ed263c37cb899157c967e846ccccdb6bcbb608b Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 14:14:53 +0100 Subject: [PATCH 1/3] fix(proto): ensure req+resp requiredVersion match Add a test to ensure that the requiredVersion of the request and response pairs matches. Accordingly fix DescribeConfigs which had a V1_0_0_0 instead of V1_1_0_0 bug for V1 in the Response Signed-off-by: Dominic Evans --- describe_configs_request.go | 8 +++++--- describe_configs_response.go | 8 +++++--- request_test.go | 1 + 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/describe_configs_request.go b/describe_configs_request.go index 2d2d906c4..d0ab0d6ef 100644 --- a/describe_configs_request.go +++ b/describe_configs_request.go @@ -109,11 +109,13 @@ func (r *DescribeConfigsRequest) isValidVersion() bool { func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V1_1_0_0 case 2: return V2_0_0_0 - default: + case 1: + return V1_1_0_0 + case 0: return V0_11_0_0 + default: + return V2_0_0_0 } } diff --git a/describe_configs_response.go b/describe_configs_response.go index 772d06b85..8aed5de85 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -122,12 +122,14 @@ func (r *DescribeConfigsResponse) isValidVersion() bool { func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V1_0_0_0 case 2: return V2_0_0_0 - default: + case 1: + return V1_1_0_0 + case 0: return V0_11_0_0 + default: + return V2_0_0_0 } } diff --git a/request_test.go b/request_test.go index c1abd5bed..9c9739ede 100644 --- a/request_test.go +++ b/request_test.go @@ -241,6 +241,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { resp := allocateResponseBody(req) assert.NotNil(t, resp, fmt.Sprintf("%s has no matching response type in allocateResponseBody", reflect.TypeOf(req))) assert.Equal(t, req.isValidVersion(), resp.isValidVersion(), fmt.Sprintf("%s isValidVersion should match %s", reflect.TypeOf(req), reflect.TypeOf(resp))) + assert.Equal(t, req.requiredVersion(), resp.requiredVersion(), fmt.Sprintf("%s requiredVersion should match %s", reflect.TypeOf(req), reflect.TypeOf(resp))) for _, body := range []protocolBody{req, resp} { assert.Equal(t, key, body.key()) assert.Equal(t, version, body.version()) From c4255f469e3b9cd277a9360705a5f322a3cd96cb Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 14:25:01 +0100 Subject: [PATCH 2/3] chore(proto): permit CreatePartitionsRequest V1 This is identical in format to V0, but just influences the broker's response behaviour: > starting in version 1, on quota violation, brokers send out responses > before throttling. Signed-off-by: Dominic Evans --- admin.go | 3 +++ create_partitions_request.go | 11 +++++++++-- create_partitions_response.go | 11 +++++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/admin.go b/admin.go index 93ab7b863..a936fa5b3 100644 --- a/admin.go +++ b/admin.go @@ -468,6 +468,9 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ Timeout: ca.conf.Admin.Timeout, ValidateOnly: validateOnly, } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } return ca.retryOnError(isErrNoController, func() error { b, err := ca.Controller() diff --git a/create_partitions_request.go b/create_partitions_request.go index 68435d639..3f5512656 100644 --- a/create_partitions_request.go +++ b/create_partitions_request.go @@ -73,11 +73,18 @@ func (r *CreatePartitionsRequest) headerVersion() int16 { } func (r *CreatePartitionsRequest) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion { - return V1_0_0_0 + switch r.Version { + case 1: + return V2_0_0_0 + case 0: + return V1_0_0_0 + default: + return V2_0_0_0 + } } type TopicPartition struct { diff --git a/create_partitions_response.go b/create_partitions_response.go index b9695ae53..c9e7ea72c 100644 --- a/create_partitions_response.go +++ b/create_partitions_response.go @@ -69,11 +69,18 @@ func (r *CreatePartitionsResponse) headerVersion() int16 { } func (r *CreatePartitionsResponse) isValidVersion() bool { - return r.Version == 0 + return r.Version >= 0 && r.Version <= 1 } func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion { - return V1_0_0_0 + switch r.Version { + case 1: + return V2_0_0_0 + case 0: + return V1_0_0_0 + default: + return V2_0_0_0 + } } func (r *CreatePartitionsResponse) throttleTime() time.Duration { From 47c47ae6add6dac6bcf527b1f31e6e22595c7630 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 14:35:09 +0100 Subject: [PATCH 3/3] chore(proto): permit AlterConfigsRequest V1 This is identical in format to V0, but just influences the broker's response behaviour Signed-off-by: Dominic Evans --- admin.go | 3 +++ alter_configs_request.go | 11 +++++++++-- alter_configs_response.go | 11 +++++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/admin.go b/admin.go index a936fa5b3..d226b7623 100644 --- a/admin.go +++ b/admin.go @@ -716,6 +716,9 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string Resources: resources, ValidateOnly: validateOnly, } + if ca.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 1 + } var ( b *Broker diff --git a/alter_configs_request.go b/alter_configs_request.go index cf51beb67..ee1ab6445 100644 --- a/alter_configs_request.go +++ b/alter_configs_request.go @@ -123,9 +123,16 @@ func (a *AlterConfigsRequest) headerVersion() int16 { } func (a *AlterConfigsRequest) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 } func (a *AlterConfigsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 1: + return V2_0_0_0 + case 0: + return V0_11_0_0 + default: + return V2_0_0_0 + } } diff --git a/alter_configs_response.go b/alter_configs_response.go index 99888840e..658f32e9a 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -113,11 +113,18 @@ func (a *AlterConfigsResponse) headerVersion() int16 { } func (a *AlterConfigsResponse) isValidVersion() bool { - return a.Version == 0 + return a.Version >= 0 && a.Version <= 1 } func (a *AlterConfigsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch a.Version { + case 1: + return V2_0_0_0 + case 0: + return V0_11_0_0 + default: + return V2_0_0_0 + } } func (r *AlterConfigsResponse) throttleTime() time.Duration {