diff --git a/Makefile b/Makefile index 7cefc2a2c3..160e28b822 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ default: fmt get update test lint GO := go GOBIN := $(shell pwd)/bin GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -v -race -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -v -race -shuffle=on -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') diff --git a/async_producer_test.go b/async_producer_test.go index d872f60262..9f524a4e54 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2323,12 +2323,3 @@ ProducerLoop: log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) } - -// NewTestConfig returns a config meant to be used by tests. -// Due to inconsistencies with the request versions the clients send using the default Kafka version -// and the response versions our mocks use, we default to the minimum Kafka version in most tests -func NewTestConfig() *Config { - config := NewConfig() - config.Version = MinVersion - return config -} diff --git a/client_tls_test.go b/client_tls_test.go index 8d7a51a33c..aa01d63a6a 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -175,6 +175,7 @@ func TestTLS(t *testing.T) { } { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client) }) } diff --git a/config_test.go b/config_test.go index 8fdd94de7a..9477e6bd9e 100644 --- a/config_test.go +++ b/config_test.go @@ -9,6 +9,17 @@ import ( "github.com/rcrowley/go-metrics" ) +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *Config { + config := NewConfig() + config.Consumer.Retry.Backoff = 0 + config.Producer.Retry.Backoff = 0 + config.Version = MinVersion + return config +} + func TestDefaultConfigValidates(t *testing.T) { config := NewTestConfig() if err := config.Validate(); err != nil { diff --git a/consumer_group_test.go b/consumer_group_test.go index 85b8108e31..912b6aa4f3 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -37,6 +37,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) { config.Version = V2_0_0_0 config.Consumer.Return.Errors = true config.Consumer.Group.Rebalance.Retry.Max = 2 + config.Consumer.Group.Rebalance.Retry.Backoff = 0 config.Consumer.Offsets.AutoCommit.Enable = false broker0 := NewMockBroker(t, 0) @@ -100,72 +101,46 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) { } func TestConsume_RaceTest(t *testing.T) { - const groupID = "test-group" - const topic = "test-topic" - const offsetStart = int64(1234) + const ( + groupID = "test-group" + topic = "test-topic" + offsetStart = int64(1234) + ) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_8_1_0 cfg.Consumer.Return.Errors = true + cfg.Metadata.Full = true seedBroker := NewMockBroker(t, 1) - - joinGroupResponse := &JoinGroupResponse{} - - syncGroupResponse := &SyncGroupResponse{ - Version: 3, // sarama > 2.3.0.0 uses version 3 - } - // Leverage mock response to get the MemberAssignment bytes - mockSyncGroupResponse := NewMockSyncGroupResponse(t).SetMemberAssignment(&ConsumerGroupMemberAssignment{ - Version: 1, - Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0 - UserData: []byte{0x01}, - }) - syncGroupResponse.MemberAssignment = mockSyncGroupResponse.MemberAssignment - - heartbeatResponse := &HeartbeatResponse{ - Err: ErrNoError, - } - offsetFetchResponse := &OffsetFetchResponse{ - Version: 1, - ThrottleTimeMs: 0, - Err: ErrNoError, - } - offsetFetchResponse.AddBlock(topic, 0, &OffsetFetchResponseBlock{ - Offset: offsetStart, - LeaderEpoch: 0, - Metadata: "", - Err: ErrNoError, - }) - - offsetResponse := &OffsetResponse{ - Version: 1, - } - offsetResponse.AddTopicPartition(topic, 0, offsetStart) - - metadataResponse := new(MetadataResponse) - metadataResponse.Version = 10 - metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) - metadataResponse.AddTopic("mismatched-topic", ErrUnknownTopicOrPartition) + defer seedBroker.Close() handlerMap := map[string]MockResponse{ "ApiVersionsRequest": NewMockApiVersionsResponse(t), - "MetadataRequest": NewMockSequence(metadataResponse), - "OffsetRequest": NewMockSequence(offsetResponse), - "OffsetFetchRequest": NewMockSequence(offsetFetchResponse), - "FindCoordinatorRequest": NewMockSequence(NewMockFindCoordinatorResponse(t). - SetCoordinator(CoordinatorGroup, groupID, seedBroker)), - "JoinGroupRequest": NewMockSequence(joinGroupResponse), - "SyncGroupRequest": NewMockSequence(syncGroupResponse), - "HeartbeatRequest": NewMockSequence(heartbeatResponse), + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetError("mismatched-topic", ErrUnknownTopicOrPartition), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset(topic, 0, -1, offsetStart), + "OffsetFetchRequest": NewMockOffsetFetchResponse(t). + SetOffset(groupID, topic, 0, offsetStart, "", ErrNoError), + "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t). + SetCoordinator(CoordinatorGroup, groupID, seedBroker), + "JoinGroupRequest": NewMockJoinGroupResponse(t), + "SyncGroupRequest": NewMockSyncGroupResponse(t).SetMemberAssignment( + &ConsumerGroupMemberAssignment{ + Version: 1, + Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0 + UserData: []byte{0x01}, + }, + ), + "HeartbeatRequest": NewMockHeartbeatResponse(t), } seedBroker.SetHandlerByMap(handlerMap) - cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second)) - - defer seedBroker.Close() + cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) - retryWait := 20 * time.Millisecond + retryWait := 10 * time.Millisecond var err error clientRetries := 0 outerFor: @@ -195,8 +170,8 @@ outerFor: t.Fatalf("should not proceed to Consume") } - if clientRetries <= 0 { - t.Errorf("clientRetries = %v; want > 0", clientRetries) + if clientRetries <= 1 { + t.Errorf("clientRetries = %v; want > 1", clientRetries) } if err != nil && !errors.Is(err, context.DeadlineExceeded) { diff --git a/consumer_test.go b/consumer_test.go index c8731a58ab..4096bdd734 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -860,7 +860,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { block4 := fetchResponse4.GetBlock("my_topic", 0) block4.PreferredReadReplica = -1 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -925,7 +925,7 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { block2 := fetchResponse2.GetBlock("my_topic", 0) block2.PreferredReadReplica = -1 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -981,7 +981,7 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -1051,7 +1051,7 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -1109,9 +1109,10 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { // // See https://github.com/IBM/sarama/issues/1927 func TestConsumeMessagesTrackLeader(t *testing.T) { - cfg := NewConfig() + cfg := NewTestConfig() cfg.ClientID = t.Name() cfg.Metadata.RefreshFrequency = time.Millisecond * 50 + cfg.Consumer.Retry.Backoff = 0 cfg.Net.MaxOpenRequests = 1 cfg.Version = V2_1_0_0 @@ -1996,7 +1997,7 @@ func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) { broker: &brokerConsumer{ broker: &Broker{}, }, - conf: NewConfig(), + conf: NewTestConfig(), topic: "my_topic", partition: 0, } diff --git a/functional_admin_test.go b/functional_admin_test.go index 6d12261fe4..64e98182ee 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -17,7 +17,7 @@ func TestFuncAdminQuotas(t *testing.T) { t.Fatal(err) } - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version = kafkaVersion adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { @@ -137,21 +137,21 @@ func TestFuncAdminDescribeGroups(t *testing.T) { t.Fatal(err) } - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version = kafkaVersion adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } - config1 := NewTestConfig() + config1 := NewFunctionalTestConfig() config1.ClientID = "M1" config1.Version = V2_3_0_0 config1.Consumer.Offsets.Initial = OffsetNewest m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4") defer m1.Close() - config2 := NewTestConfig() + config2 := NewFunctionalTestConfig() config2.ClientID = "M2" config2.Version = V2_3_0_0 config2.Consumer.Offsets.Initial = OffsetNewest diff --git a/functional_client_test.go b/functional_client_test.go index 75eedac4c7..da9ad7b544 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -17,7 +17,7 @@ func TestFuncConnectionFailure(t *testing.T) { FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Metadata.Retry.Max = 1 _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) @@ -30,7 +30,7 @@ func TestFuncClientMetadata(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) @@ -74,7 +74,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_follower_fetch_test.go b/functional_consumer_follower_fetch_test.go index 8b1db3e24d..ea66512fe0 100644 --- a/functional_consumer_follower_fetch_test.go +++ b/functional_consumer_follower_fetch_test.go @@ -19,7 +19,7 @@ func TestConsumerFetchFollowerFailover(t *testing.T) { ) newConfig := func() *Config { - config := NewConfig() + config := NewFunctionalTestConfig() config.ClientID = t.Name() config.Version = V2_8_0_0 config.Producer.Return.Successes = true @@ -81,7 +81,8 @@ func TestConsumerFetchFollowerFailover(t *testing.T) { go func() { for i := 0; i < numMsg; i++ { msg := &ProducerMessage{ - Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))} + Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i)), + } if _, offset, err := producer.SendMessage(msg); err != nil { t.Error(i, err) } else if offset%50 == 0 { diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 19ca48348e..683de91008 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -140,7 +140,7 @@ func TestFuncConsumerGroupRebalanceAfterAddingPartitions(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version = V2_3_0_0 admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { @@ -245,7 +245,7 @@ func TestFuncConsumerGroupOffsetDeletion(t *testing.T) { defer teardownFunctionalTest(t) // create a client with 2.4.0 version as it is the minimal version // that supports DeleteOffsets request - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version = V2_4_0_0 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) defer safeClose(t, client) @@ -315,7 +315,7 @@ func markOffset(t *testing.T, offsetMgr OffsetManager, topic string, partition i } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { return err } @@ -399,7 +399,7 @@ type testFuncConsumerGroupMember struct { } func defaultConfig(clientID string) *Config { - config := NewConfig() + config := NewFunctionalTestConfig() config.ClientID = clientID config.Version = V0_10_2_0 config.Consumer.Return.Errors = true diff --git a/functional_consumer_staticmembership_test.go b/functional_consumer_staticmembership_test.go index 6b8f3245d0..54f1ae43a0 100644 --- a/functional_consumer_staticmembership_test.go +++ b/functional_consumer_staticmembership_test.go @@ -20,7 +20,7 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { t.Helper() - config1 := NewTestConfig() + config1 := NewFunctionalTestConfig() config1.ClientID = "M1" config1.Version = V2_3_0_0 config1.Consumer.Offsets.Initial = OffsetNewest @@ -28,7 +28,7 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) { m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, 100, nil, "test.4") defer m1.Close() - config2 := NewTestConfig() + config2 := NewFunctionalTestConfig() config2.ClientID = "M2" config2.Version = V2_3_0_0 config2.Consumer.Offsets.Initial = OffsetNewest @@ -74,7 +74,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { t.Helper() - config1 := NewTestConfig() + config1 := NewFunctionalTestConfig() config1.ClientID = "M1" config1.Version = V2_4_0_0 config1.Consumer.Offsets.Initial = OffsetNewest @@ -82,7 +82,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) { m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4") defer m1.Close() - config2 := NewTestConfig() + config2 := NewFunctionalTestConfig() config2.ClientID = "M2" config2.Version = V2_4_0_0 config2.Consumer.Offsets.Initial = OffsetNewest @@ -177,7 +177,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { t.Helper() - config1 := NewTestConfig() + config1 := NewFunctionalTestConfig() config1.ClientID = "M1" config1.Version = V2_3_0_0 config1.Consumer.Offsets.Initial = OffsetNewest @@ -185,7 +185,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4") defer m1.Close() - config2 := NewTestConfig() + config2 := NewFunctionalTestConfig() config2.ClientID = "M2" config2.Version = V2_3_0_0 config2.Consumer.Offsets.Initial = OffsetNewest @@ -196,7 +196,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { m1.WaitForState(2) m2.WaitForState(2) - config3 := NewTestConfig() + config3 := NewFunctionalTestConfig() config3.ClientID = "M3" config3.Version = V2_3_0_0 config3.Consumer.Offsets.Initial = OffsetNewest @@ -222,7 +222,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) { // -------------------------------------------------------------------- func testFuncConsumerGroupProduceMessage(topic string, count int) error { - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { return err } diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 5f63ee93a4..d907dc14c8 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -26,7 +26,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { t.Fatal(err) } @@ -57,7 +57,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { t.Fatal(err) } @@ -167,7 +167,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ClientID = t.Name() config.Net.MaxOpenRequests = 1 config.Consumer.IsolationLevel = ReadCommitted @@ -317,7 +317,7 @@ func TestConsumerGroupDeadlock(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - config := NewConfig() + config := NewFunctionalTestConfig() config.ClientID = t.Name() config.Producer.Return.Successes = true config.ChannelBufferSize = 2 * msgQty @@ -467,7 +467,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi g := errgroup.Group{} for _, prodVer := range clientVersions { for _, codec := range codecs { - prodCfg := NewTestConfig() + prodCfg := NewFunctionalTestConfig() prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String() if idempotent { prodCfg.ClientID += "-idempotent" @@ -545,7 +545,7 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [ for _, consVer := range clientVersions { // Create a partition consumer that should start from the first produced // message. - consCfg := NewTestConfig() + consCfg := NewFunctionalTestConfig() consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String() consCfg.Consumer.MaxProcessingTime = time.Second consCfg.Metadata.Full = false diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 2d983f2874..7f324b22bd 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -12,7 +12,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index 2b53315642..a7aa2c6f67 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -22,37 +22,37 @@ import ( const TestBatchSize = 1000 func TestFuncProducing(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() testProducingMessages(t, config) } func TestFuncProducingGzip(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Producer.Compression = CompressionGZIP testProducingMessages(t, config) } func TestFuncProducingSnappy(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Producer.Compression = CompressionSnappy testProducingMessages(t, config) } func TestFuncProducingZstd(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version = V2_1_0_0 config.Producer.Compression = CompressionZSTD testProducingMessages(t, config) } func TestFuncProducingNoResponse(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Producer.RequiredAcks = NoResponse testProducingMessages(t, config) } func TestFuncProducingFlushing(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Producer.Flush.Messages = TestBatchSize / 8 config.Producer.Flush.Frequency = 250 * time.Millisecond testProducingMessages(t, config) @@ -62,7 +62,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -96,7 +96,7 @@ func TestFuncTxnProduceNoBegin(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -123,7 +123,7 @@ func TestFuncTxnCommitNoMessages(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -158,7 +158,7 @@ func TestFuncTxnProduce(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -179,7 +179,7 @@ func TestFuncTxnProduce(t *testing.T) { require.NoError(t, err) defer pc.Close() - nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) require.NoError(t, err) defer nonTransactionalProducer.Close() @@ -212,7 +212,7 @@ func TestFuncTxnProduceWithBrokerFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -233,7 +233,7 @@ func TestFuncTxnProduceWithBrokerFailure(t *testing.T) { require.NoError(t, err) defer pc.Close() - nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) require.NoError(t, err) defer nonTransactionalProducer.Close() @@ -279,7 +279,7 @@ func TestFuncTxnProduceEpochBump(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -300,7 +300,7 @@ func TestFuncTxnProduceEpochBump(t *testing.T) { require.NoError(t, err) defer pc.Close() - nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) require.NoError(t, err) defer nonTransactionalProducer.Close() @@ -348,7 +348,7 @@ func TestFuncInitProducerId3(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -390,7 +390,7 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -442,7 +442,7 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { handler.started.Wait() - nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewTestConfig()) + nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) require.NoError(t, err) defer nonTransactionalProducer.Close() @@ -489,7 +489,7 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -501,7 +501,7 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) { config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 - configSecond := NewTestConfig() + configSecond := NewFunctionalTestConfig() configSecond.ChannelBufferSize = 20 configSecond.Producer.Flush.Frequency = 50 * time.Millisecond configSecond.Producer.Flush.Messages = 200 @@ -522,7 +522,7 @@ func TestFuncTxnProduceMultiTxn(t *testing.T) { require.NoError(t, err) defer pc.Close() - nonTransactionalConfig := NewTestConfig() + nonTransactionalConfig := NewFunctionalTestConfig() nonTransactionalConfig.Producer.Return.Successes = true nonTransactionalConfig.Producer.Return.Errors = true @@ -574,7 +574,7 @@ func TestFuncTxnAbortedProduce(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ChannelBufferSize = 20 config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 @@ -599,7 +599,7 @@ func TestFuncTxnAbortedProduce(t *testing.T) { require.NoError(t, err) defer pc.Close() - nonTransactionalConfig := NewTestConfig() + nonTransactionalConfig := NewFunctionalTestConfig() nonTransactionalConfig.Producer.Return.Successes = true nonTransactionalConfig.Producer.Return.Errors = true @@ -661,7 +661,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Producer.Flush.Frequency = 250 * time.Millisecond config.Producer.Idempotent = true config.Producer.Timeout = 500 * time.Millisecond @@ -735,7 +735,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } func TestInterceptors(t *testing.T) { - config := NewTestConfig() + config := NewFunctionalTestConfig() setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -900,7 +900,7 @@ func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - config := NewTestConfig() + config := NewFunctionalTestConfig() config.ClientID = t.Name() config.Net.MaxOpenRequests = 1 config.Producer.Flush.MaxMessages = 1 @@ -1090,7 +1090,7 @@ func BenchmarkProducerSmallSinglePartition(b *testing.B) { } func BenchmarkProducerMediumSnappy(b *testing.B) { - conf := NewTestConfig() + conf := NewFunctionalTestConfig() conf.Producer.Compression = CompressionSnappy benchmarkProducer(b, conf, "test.1", ByteEncoder(make([]byte, 1024))) } diff --git a/functional_test.go b/functional_test.go index 602c1c45a8..f51b2edb1e 100644 --- a/functional_test.go +++ b/functional_test.go @@ -72,7 +72,7 @@ func testMain(m *testing.M) int { var env testEnvironment if os.Getenv("DEBUG") == "true" { - Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + Logger = log.New(os.Stderr, "[DEBUG] ", log.Lmicroseconds|log.Ltime) } usingExisting, err := existingEnvironment(ctx, &env) @@ -94,6 +94,13 @@ func testMain(m *testing.M) int { return m.Run() } +// NewFunctionalTestConfig returns a config meant to be used by functional tests. +func NewFunctionalTestConfig() *Config { + config := NewConfig() + config.Version = MinVersion + return config +} + type testEnvironment struct { ToxiproxyClient *toxiproxy.Client Proxies map[string]*toxiproxy.Proxy @@ -139,7 +146,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err if version, ok := os.LookupEnv("KAFKA_VERSION"); ok { env.KafkaVersion = version } else { - env.KafkaVersion = "3.1.2" + env.KafkaVersion = "3.3.2" } c := exec.Command("docker-compose", "up", "-d") @@ -163,7 +170,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return conn.Close() } - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Version, err = ParseKafkaVersion(env.KafkaVersion) if err != nil { return err @@ -304,7 +311,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { } Logger.Println("Creating topics") - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-prepareTestTopics" @@ -454,7 +461,7 @@ func teardownFunctionalTest(t testing.TB) { } func ensureFullyReplicated(t testing.TB, timeout time.Duration, retry time.Duration) { - config := NewTestConfig() + config := NewFunctionalTestConfig() config.Metadata.Retry.Max = 5 config.Metadata.Retry.Backoff = 10 * time.Second config.ClientID = "sarama-ensureFullyReplicated" diff --git a/mockbroker.go b/mockbroker.go index 9b7682c9e4..8b73074fb3 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -178,7 +178,9 @@ func (b *MockBroker) serverLoop() { i++ } wg.Wait() - Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) + } } func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) { @@ -243,8 +245,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W for { buffer, err := b.readToBytes(conn) if err != nil { - Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer)) - b.serverError(err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer)) + b.serverError(err) + } break } @@ -253,8 +257,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W req, br, err := decodeRequest(bytes.NewReader(buffer)) bytesRead = br if err != nil { - Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) - b.serverError(err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) + b.serverError(err) + } break } @@ -358,22 +364,25 @@ func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) } } -func (b *MockBroker) serverError(err error) { - b.t.Helper() - isConnectionClosedError := false +func isConnectionClosedError(err error) bool { + var result bool opError := &net.OpError{} if errors.As(err, &opError) { - isConnectionClosedError = true + result = true } else if errors.Is(err, io.EOF) { - isConnectionClosedError = true + result = true } else if err.Error() == "use of closed network connection" { - isConnectionClosedError = true + result = true } - if isConnectionClosedError { + return result +} + +func (b *MockBroker) serverError(err error) { + b.t.Helper() + if isConnectionClosedError(err) { return } - b.t.Errorf(err.Error()) } diff --git a/mockresponses.go b/mockresponses.go index bd2902b1d3..07a636740a 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -135,6 +135,7 @@ func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHe // MockMetadataResponse is a `MetadataResponse` builder. type MockMetadataResponse struct { controllerID int32 + errors map[string]KError leaders map[string]map[int32]int32 brokers map[string]int32 t TestReporter @@ -142,12 +143,18 @@ type MockMetadataResponse struct { func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse { return &MockMetadataResponse{ + errors: make(map[string]KError), leaders: make(map[string]map[int32]int32), brokers: make(map[string]int32), t: t, } } +func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse { + mmr.errors[topic] = kerror + return mmr +} + func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse { partitions := mmr.leaders[topic] if partitions == nil { @@ -191,10 +198,22 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError) } } + for topic, err := range mmr.errors { + metadataResponse.AddTopic(topic, err) + } return metadataResponse } for _, topic := range metadataRequest.Topics { - for partition, brokerID := range mmr.leaders[topic] { + leaders, ok := mmr.leaders[topic] + if !ok { + if err, ok := mmr.errors[topic]; ok { + metadataResponse.AddTopic(topic, err) + } else { + metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition) + } + continue + } + for partition, brokerID := range leaders { metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError) } } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index f132290676..69510d1446 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -109,7 +109,7 @@ func TestProducerWithTooManyExpectations(t *testing.T) { } func TestProducerFailTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -130,7 +130,7 @@ func TestProducerFailTxn(t *testing.T) { } func TestProducerWithTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -208,7 +208,7 @@ func TestProducerWithCheckerFunction(t *testing.T) { func TestProducerWithBrokenPartitioner(t *testing.T) { trm := newTestReporterMock() - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Partitioner = func(string) sarama.Partitioner { return brokePartitioner{} } diff --git a/mocks/mocks.go b/mocks/mocks.go index fa95952295..bd9d630ddb 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -103,6 +103,8 @@ func (pc *TopicConfig) partitions(topic string) int32 { // and the response versions our mocks use, we default to the minimum Kafka version in most tests func NewTestConfig() *sarama.Config { config := sarama.NewConfig() + config.Consumer.Retry.Backoff = 0 + config.Producer.Retry.Backoff = 0 config.Version = sarama.MinVersion return config } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index f141edebb5..fb6f058bc4 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -56,7 +56,7 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { } func TestSyncProducerFailTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -86,7 +86,7 @@ func TestSyncProducerFailTxn(t *testing.T) { } func TestSyncProducerUseTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 diff --git a/sarama_test.go b/sarama_test.go index 5e3d7ba9b0..5ab84cf236 100644 --- a/sarama_test.go +++ b/sarama_test.go @@ -13,7 +13,7 @@ import ( func TestMain(m *testing.M) { flag.Parse() if f := flag.Lookup("test.v"); f != nil && f.Value.String() == "true" { - Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags) + Logger = log.New(os.Stderr, "[DEBUG] ", log.Lmicroseconds|log.Ltime) } os.Exit(m.Run()) }