From f4e645379d8b281b89674f096092bcaf9eabee29 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 9 Aug 2023 22:15:01 +0100 Subject: [PATCH] chore(test): use modern protocol versions in FVT Signed-off-by: Dominic Evans --- functional_consumer_follower_fetch_test.go | 1 - functional_consumer_group_test.go | 5 ----- functional_consumer_staticmembership_test.go | 7 ------- functional_consumer_test.go | 1 - functional_producer_test.go | 13 ------------- functional_test.go | 15 ++++++++------- 6 files changed, 8 insertions(+), 34 deletions(-) diff --git a/functional_consumer_follower_fetch_test.go b/functional_consumer_follower_fetch_test.go index ea66512fe..09329daf5 100644 --- a/functional_consumer_follower_fetch_test.go +++ b/functional_consumer_follower_fetch_test.go @@ -21,7 +21,6 @@ func TestConsumerFetchFollowerFailover(t *testing.T) { newConfig := func() *Config { config := NewFunctionalTestConfig() config.ClientID = t.Name() - config.Version = V2_8_0_0 config.Producer.Return.Successes = true return config } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 683de9100..5589ad195 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -141,7 +141,6 @@ func TestFuncConsumerGroupRebalanceAfterAddingPartitions(t *testing.T) { defer teardownFunctionalTest(t) config := NewFunctionalTestConfig() - config.Version = V2_3_0_0 admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) @@ -243,10 +242,7 @@ func TestFuncConsumerGroupOffsetDeletion(t *testing.T) { checkKafkaVersion(t, "2.4.0") setupFunctionalTest(t) defer teardownFunctionalTest(t) - // create a client with 2.4.0 version as it is the minimal version - // that supports DeleteOffsets request config := NewFunctionalTestConfig() - config.Version = V2_4_0_0 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) defer safeClose(t, client) if err != nil { @@ -401,7 +397,6 @@ type testFuncConsumerGroupMember struct { func defaultConfig(clientID string) *Config { config := NewFunctionalTestConfig() config.ClientID = clientID - config.Version = V0_10_2_0 config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = OffsetOldest config.Consumer.Group.Rebalance.Timeout = 10 * time.Second diff --git a/functional_consumer_staticmembership_test.go b/functional_consumer_staticmembership_test.go index 54f1ae43a..8da7b25d9 100644 --- a/functional_consumer_staticmembership_test.go +++ b/functional_consumer_staticmembership_test.go @@ -22,7 +22,6 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { config1 := NewFunctionalTestConfig() config1.ClientID = "M1" - config1.Version = V2_3_0_0 config1.Consumer.Offsets.Initial = OffsetNewest config1.Consumer.Group.InstanceId = "Instance1" m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, 100, nil, "test.4") @@ -30,7 +29,6 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { config2 := NewFunctionalTestConfig() config2.ClientID = "M2" - config2.Version = V2_3_0_0 config2.Consumer.Offsets.Initial = OffsetNewest config2.Consumer.Group.InstanceId = "Instance2" m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, 100, nil, "test.4") @@ -76,7 +74,6 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { config1 := NewFunctionalTestConfig() config1.ClientID = "M1" - config1.Version = V2_4_0_0 config1.Consumer.Offsets.Initial = OffsetNewest config1.Consumer.Group.InstanceId = "Instance1" m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4") @@ -84,7 +81,6 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { config2 := NewFunctionalTestConfig() config2.ClientID = "M2" - config2.Version = V2_4_0_0 config2.Consumer.Offsets.Initial = OffsetNewest config2.Consumer.Group.InstanceId = "Instance2" m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4") @@ -179,7 +175,6 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { config1 := NewFunctionalTestConfig() config1.ClientID = "M1" - config1.Version = V2_3_0_0 config1.Consumer.Offsets.Initial = OffsetNewest config1.Consumer.Group.InstanceId = "Instance1" m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4") @@ -187,7 +182,6 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { config2 := NewFunctionalTestConfig() config2.ClientID = "M2" - config2.Version = V2_3_0_0 config2.Consumer.Offsets.Initial = OffsetNewest config2.Consumer.Group.InstanceId = "Instance2" m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4") @@ -198,7 +192,6 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { config3 := NewFunctionalTestConfig() config3.ClientID = "M3" - config3.Version = V2_3_0_0 config3.Consumer.Offsets.Initial = OffsetNewest config3.Consumer.Group.InstanceId = "Instance2" // same instance id as config2 diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 14f29653d..167398b55 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -174,7 +174,6 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { config.Producer.Idempotent = true config.Producer.Return.Successes = true config.Producer.RequiredAcks = WaitForAll - config.Version = V0_11_0_0 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { diff --git a/functional_producer_test.go b/functional_producer_test.go index a7aa2c6f6..ffaf81e49 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -40,7 +40,6 @@ func TestFuncProducingSnappy(t *testing.T) { func TestFuncProducingZstd(t *testing.T) { config := NewFunctionalTestConfig() - config.Version = V2_1_0_0 config.Producer.Compression = CompressionZSTD testProducingMessages(t, config) } @@ -108,7 +107,6 @@ func TestFuncTxnProduceNoBegin(t *testing.T) { config.Producer.Return.Errors = true config.Producer.Transaction.Retry.Max = 200 config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) defer producer.Close() @@ -135,7 +133,6 @@ func TestFuncTxnCommitNoMessages(t *testing.T) { config.Producer.Return.Errors = true config.Producer.Transaction.Retry.Max = 200 config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) defer producer.Close() @@ -168,7 +165,6 @@ func TestFuncTxnProduce(t *testing.T) { config.Producer.Transaction.Retry.Max = 200 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -222,7 +218,6 @@ func TestFuncTxnProduceWithBrokerFailure(t *testing.T) { config.Producer.Transaction.Retry.Max = 200 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -289,7 +284,6 @@ func TestFuncTxnProduceEpochBump(t *testing.T) { config.Producer.Transaction.Retry.Max = 200 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V2_6_0_0 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -358,7 +352,6 @@ func TestFuncInitProducerId3(t *testing.T) { config.Producer.Retry.Max = 50 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V2_6_0_0 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -401,7 +394,6 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { config.Consumer.IsolationLevel = ReadCommitted config.Consumer.Offsets.AutoCommit.Enable = false config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -499,7 +491,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) { config.Producer.Transaction.Retry.Max = 200 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 configSecond := NewFunctionalTestConfig() configSecond.ChannelBufferSize = 20 @@ -511,7 +502,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) { configSecond.Producer.Retry.Max = 50 configSecond.Consumer.IsolationLevel = ReadCommitted configSecond.Net.MaxOpenRequests = 1 - configSecond.Version = V0_11_0_0 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -585,7 +575,6 @@ func TestFuncTxnAbortedProduce(t *testing.T) { config.Producer.Transaction.Retry.Max = 200 config.Consumer.IsolationLevel = ReadCommitted config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) require.NoError(t, err) @@ -671,7 +660,6 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { config.Producer.Return.Errors = true config.Producer.RequiredAcks = WaitForAll config.Net.MaxOpenRequests = 1 - config.Version = V0_11_0_0 producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { @@ -907,7 +895,6 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { config.Producer.Return.Successes = true config.Producer.Retry.Max = 1024 config.Producer.Retry.Backoff = time.Millisecond * 50 - config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) producer, err := NewAsyncProducer( FunctionalTestEnv.KafkaBrokerAddrs, diff --git a/functional_test.go b/functional_test.go index f51b2edb1..4883dbfe2 100644 --- a/functional_test.go +++ b/functional_test.go @@ -97,7 +97,15 @@ func testMain(m *testing.M) int { // NewFunctionalTestConfig returns a config meant to be used by functional tests. func NewFunctionalTestConfig() *Config { config := NewConfig() + // config.Consumer.Retry.Backoff = 0 + // config.Producer.Retry.Backoff = 0 config.Version = MinVersion + version, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")) + if err != nil { + config.Version = DefaultVersion + } else { + config.Version = version + } return config } @@ -171,7 +179,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err } config := NewFunctionalTestConfig() - config.Version, err = ParseKafkaVersion(env.KafkaVersion) if err != nil { return err } @@ -315,11 +322,6 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-prepareTestTopics" - var err error - config.Version, err = ParseKafkaVersion(env.KafkaVersion) - if err != nil { - return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err) - } client, err := NewClient(env.KafkaBrokerAddrs, config) if err != nil { @@ -465,7 +467,6 @@ func ensureFullyReplicated(t testing.TB, timeout time.Duration, retry time.Durat config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-ensureFullyReplicated" - config.Version = V2_6_0_0 var testTopicNames []string for topic := range testTopicDetails {