Skip to content

Commit

Permalink
chore(test): use modern protocol versions in FVT
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 10, 2023
1 parent 991b2b0 commit f4e6453
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 34 deletions.
1 change: 0 additions & 1 deletion functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func TestConsumerFetchFollowerFailover(t *testing.T) {
newConfig := func() *Config {
config := NewFunctionalTestConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
return config
}
Expand Down
5 changes: 0 additions & 5 deletions functional_consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func TestFuncConsumerGroupRebalanceAfterAddingPartitions(t *testing.T) {
defer teardownFunctionalTest(t)

config := NewFunctionalTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -243,10 +242,7 @@ func TestFuncConsumerGroupOffsetDeletion(t *testing.T) {
checkKafkaVersion(t, "2.4.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
// create a client with 2.4.0 version as it is the minimal version
// that supports DeleteOffsets request
config := NewFunctionalTestConfig()
config.Version = V2_4_0_0
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
defer safeClose(t, client)
if err != nil {
Expand Down Expand Up @@ -401,7 +397,6 @@ type testFuncConsumerGroupMember struct {
func defaultConfig(clientID string) *Config {
config := NewFunctionalTestConfig()
config.ClientID = clientID
config.Version = V0_10_2_0
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = OffsetOldest
config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
Expand Down
7 changes: 0 additions & 7 deletions functional_consumer_staticmembership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, 100, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, 100, nil, "test.4")
Expand Down Expand Up @@ -76,15 +74,13 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_4_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_4_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
Expand Down Expand Up @@ -179,15 +175,13 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {

config1 := NewFunctionalTestConfig()
config1.ClientID = "M1"
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewFunctionalTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
Expand All @@ -198,7 +192,6 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {

config3 := NewFunctionalTestConfig()
config3.ClientID = "M3"
config3.Version = V2_3_0_0
config3.Consumer.Offsets.Initial = OffsetNewest
config3.Consumer.Group.InstanceId = "Instance2" // same instance id as config2

Expand Down
1 change: 0 additions & 1 deletion functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = WaitForAll
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down
13 changes: 0 additions & 13 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestFuncProducingSnappy(t *testing.T) {

func TestFuncProducingZstd(t *testing.T) {
config := NewFunctionalTestConfig()
config.Version = V2_1_0_0
config.Producer.Compression = CompressionZSTD
testProducingMessages(t, config)
}
Expand Down Expand Up @@ -108,7 +107,6 @@ func TestFuncTxnProduceNoBegin(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.Transaction.Retry.Max = 200
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
defer producer.Close()
Expand All @@ -135,7 +133,6 @@ func TestFuncTxnCommitNoMessages(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.Transaction.Retry.Max = 200
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
defer producer.Close()
Expand Down Expand Up @@ -168,7 +165,6 @@ func TestFuncTxnProduce(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -222,7 +218,6 @@ func TestFuncTxnProduceWithBrokerFailure(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -289,7 +284,6 @@ func TestFuncTxnProduceEpochBump(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V2_6_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -358,7 +352,6 @@ func TestFuncInitProducerId3(t *testing.T) {
config.Producer.Retry.Max = 50
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V2_6_0_0

producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -401,7 +394,6 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) {
config.Consumer.IsolationLevel = ReadCommitted
config.Consumer.Offsets.AutoCommit.Enable = false
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -499,7 +491,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

configSecond := NewFunctionalTestConfig()
configSecond.ChannelBufferSize = 20
Expand All @@ -511,7 +502,6 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) {
configSecond.Producer.Retry.Max = 50
configSecond.Consumer.IsolationLevel = ReadCommitted
configSecond.Net.MaxOpenRequests = 1
configSecond.Version = V0_11_0_0

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -585,7 +575,6 @@ func TestFuncTxnAbortedProduce(t *testing.T) {
config.Producer.Transaction.Retry.Max = 200
config.Consumer.IsolationLevel = ReadCommitted
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(t, err)
Expand Down Expand Up @@ -671,7 +660,6 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1
config.Version = V0_11_0_0

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -907,7 +895,6 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 1024
config.Producer.Retry.Backoff = time.Millisecond * 50
config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)

producer, err := NewAsyncProducer(
FunctionalTestEnv.KafkaBrokerAddrs,
Expand Down
15 changes: 8 additions & 7 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,15 @@ func testMain(m *testing.M) int {
// NewFunctionalTestConfig returns a config meant to be used by functional tests.
func NewFunctionalTestConfig() *Config {
config := NewConfig()
// config.Consumer.Retry.Backoff = 0
// config.Producer.Retry.Backoff = 0
config.Version = MinVersion
version, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
if err != nil {
config.Version = DefaultVersion
} else {
config.Version = version
}
return config
}

Expand Down Expand Up @@ -171,7 +179,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
}

config := NewFunctionalTestConfig()
config.Version, err = ParseKafkaVersion(env.KafkaVersion)
if err != nil {
return err
}
Expand Down Expand Up @@ -315,11 +322,6 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
config.Metadata.Retry.Max = 5
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "sarama-prepareTestTopics"
var err error
config.Version, err = ParseKafkaVersion(env.KafkaVersion)
if err != nil {
return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
}

client, err := NewClient(env.KafkaBrokerAddrs, config)
if err != nil {
Expand Down Expand Up @@ -465,7 +467,6 @@ func ensureFullyReplicated(t testing.TB, timeout time.Duration, retry time.Durat
config.Metadata.Retry.Max = 5
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "sarama-ensureFullyReplicated"
config.Version = V2_6_0_0

var testTopicNames []string
for topic := range testTopicDetails {
Expand Down

0 comments on commit f4e6453

Please sign in to comment.