diff --git a/Makefile b/Makefile index 7cefc2a2c3..160e28b822 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ default: fmt get update test lint GO := go GOBIN := $(shell pwd)/bin GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -v -race -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -v -race -shuffle=on -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') diff --git a/async_producer_test.go b/async_producer_test.go index d872f60262..9f524a4e54 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2323,12 +2323,3 @@ ProducerLoop: log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) } - -// NewTestConfig returns a config meant to be used by tests. -// Due to inconsistencies with the request versions the clients send using the default Kafka version -// and the response versions our mocks use, we default to the minimum Kafka version in most tests -func NewTestConfig() *Config { - config := NewConfig() - config.Version = MinVersion - return config -} diff --git a/client_tls_test.go b/client_tls_test.go index 8d7a51a33c..aa01d63a6a 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -175,6 +175,7 @@ func TestTLS(t *testing.T) { } { tc := tc t.Run(tc.name, func(t *testing.T) { + t.Parallel() doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client) }) } diff --git a/config_test.go b/config_test.go index 8fdd94de7a..9477e6bd9e 100644 --- a/config_test.go +++ b/config_test.go @@ -9,6 +9,17 @@ import ( "github.com/rcrowley/go-metrics" ) +// NewTestConfig returns a config meant to be used by tests. +// Due to inconsistencies with the request versions the clients send using the default Kafka version +// and the response versions our mocks use, we default to the minimum Kafka version in most tests +func NewTestConfig() *Config { + config := NewConfig() + config.Consumer.Retry.Backoff = 0 + config.Producer.Retry.Backoff = 0 + config.Version = MinVersion + return config +} + func TestDefaultConfigValidates(t *testing.T) { config := NewTestConfig() if err := config.Validate(); err != nil { diff --git a/consumer_group_test.go b/consumer_group_test.go index 85b8108e31..912b6aa4f3 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -37,6 +37,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) { config.Version = V2_0_0_0 config.Consumer.Return.Errors = true config.Consumer.Group.Rebalance.Retry.Max = 2 + config.Consumer.Group.Rebalance.Retry.Backoff = 0 config.Consumer.Offsets.AutoCommit.Enable = false broker0 := NewMockBroker(t, 0) @@ -100,72 +101,46 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) { } func TestConsume_RaceTest(t *testing.T) { - const groupID = "test-group" - const topic = "test-topic" - const offsetStart = int64(1234) + const ( + groupID = "test-group" + topic = "test-topic" + offsetStart = int64(1234) + ) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_8_1_0 cfg.Consumer.Return.Errors = true + cfg.Metadata.Full = true seedBroker := NewMockBroker(t, 1) - - joinGroupResponse := &JoinGroupResponse{} - - syncGroupResponse := &SyncGroupResponse{ - Version: 3, // sarama > 2.3.0.0 uses version 3 - } - // Leverage mock response to get the MemberAssignment bytes - mockSyncGroupResponse := NewMockSyncGroupResponse(t).SetMemberAssignment(&ConsumerGroupMemberAssignment{ - Version: 1, - Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0 - UserData: []byte{0x01}, - }) - syncGroupResponse.MemberAssignment = mockSyncGroupResponse.MemberAssignment - - heartbeatResponse := &HeartbeatResponse{ - Err: ErrNoError, - } - offsetFetchResponse := &OffsetFetchResponse{ - Version: 1, - ThrottleTimeMs: 0, - Err: ErrNoError, - } - offsetFetchResponse.AddBlock(topic, 0, &OffsetFetchResponseBlock{ - Offset: offsetStart, - LeaderEpoch: 0, - Metadata: "", - Err: ErrNoError, - }) - - offsetResponse := &OffsetResponse{ - Version: 1, - } - offsetResponse.AddTopicPartition(topic, 0, offsetStart) - - metadataResponse := new(MetadataResponse) - metadataResponse.Version = 10 - metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) - metadataResponse.AddTopic("mismatched-topic", ErrUnknownTopicOrPartition) + defer seedBroker.Close() handlerMap := map[string]MockResponse{ "ApiVersionsRequest": NewMockApiVersionsResponse(t), - "MetadataRequest": NewMockSequence(metadataResponse), - "OffsetRequest": NewMockSequence(offsetResponse), - "OffsetFetchRequest": NewMockSequence(offsetFetchResponse), - "FindCoordinatorRequest": NewMockSequence(NewMockFindCoordinatorResponse(t). - SetCoordinator(CoordinatorGroup, groupID, seedBroker)), - "JoinGroupRequest": NewMockSequence(joinGroupResponse), - "SyncGroupRequest": NewMockSequence(syncGroupResponse), - "HeartbeatRequest": NewMockSequence(heartbeatResponse), + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetError("mismatched-topic", ErrUnknownTopicOrPartition), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset(topic, 0, -1, offsetStart), + "OffsetFetchRequest": NewMockOffsetFetchResponse(t). + SetOffset(groupID, topic, 0, offsetStart, "", ErrNoError), + "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t). + SetCoordinator(CoordinatorGroup, groupID, seedBroker), + "JoinGroupRequest": NewMockJoinGroupResponse(t), + "SyncGroupRequest": NewMockSyncGroupResponse(t).SetMemberAssignment( + &ConsumerGroupMemberAssignment{ + Version: 1, + Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0 + UserData: []byte{0x01}, + }, + ), + "HeartbeatRequest": NewMockHeartbeatResponse(t), } seedBroker.SetHandlerByMap(handlerMap) - cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second)) - - defer seedBroker.Close() + cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) - retryWait := 20 * time.Millisecond + retryWait := 10 * time.Millisecond var err error clientRetries := 0 outerFor: @@ -195,8 +170,8 @@ outerFor: t.Fatalf("should not proceed to Consume") } - if clientRetries <= 0 { - t.Errorf("clientRetries = %v; want > 0", clientRetries) + if clientRetries <= 1 { + t.Errorf("clientRetries = %v; want > 1", clientRetries) } if err != nil && !errors.Is(err, context.DeadlineExceeded) { diff --git a/consumer_test.go b/consumer_test.go index c8731a58ab..4096bdd734 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -860,7 +860,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { block4 := fetchResponse4.GetBlock("my_topic", 0) block4.PreferredReadReplica = -1 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -925,7 +925,7 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { block2 := fetchResponse2.GetBlock("my_topic", 0) block2.PreferredReadReplica = -1 - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -981,7 +981,7 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -1051,7 +1051,7 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3) fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4) - cfg := NewConfig() + cfg := NewTestConfig() cfg.Version = V2_3_0_0 cfg.RackID = "consumer_rack" @@ -1109,9 +1109,10 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { // // See https://github.com/IBM/sarama/issues/1927 func TestConsumeMessagesTrackLeader(t *testing.T) { - cfg := NewConfig() + cfg := NewTestConfig() cfg.ClientID = t.Name() cfg.Metadata.RefreshFrequency = time.Millisecond * 50 + cfg.Consumer.Retry.Backoff = 0 cfg.Net.MaxOpenRequests = 1 cfg.Version = V2_1_0_0 @@ -1996,7 +1997,7 @@ func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) { broker: &brokerConsumer{ broker: &Broker{}, }, - conf: NewConfig(), + conf: NewTestConfig(), topic: "my_topic", partition: 0, } diff --git a/mockbroker.go b/mockbroker.go index 9b7682c9e4..8b73074fb3 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -178,7 +178,9 @@ func (b *MockBroker) serverLoop() { i++ } wg.Wait() - Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err) + } } func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) { @@ -243,8 +245,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W for { buffer, err := b.readToBytes(conn) if err != nil { - Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer)) - b.serverError(err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer)) + b.serverError(err) + } break } @@ -253,8 +257,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W req, br, err := decodeRequest(bytes.NewReader(buffer)) bytesRead = br if err != nil { - Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) - b.serverError(err) + if !isConnectionClosedError(err) { + Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req)) + b.serverError(err) + } break } @@ -358,22 +364,25 @@ func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) } } -func (b *MockBroker) serverError(err error) { - b.t.Helper() - isConnectionClosedError := false +func isConnectionClosedError(err error) bool { + var result bool opError := &net.OpError{} if errors.As(err, &opError) { - isConnectionClosedError = true + result = true } else if errors.Is(err, io.EOF) { - isConnectionClosedError = true + result = true } else if err.Error() == "use of closed network connection" { - isConnectionClosedError = true + result = true } - if isConnectionClosedError { + return result +} + +func (b *MockBroker) serverError(err error) { + b.t.Helper() + if isConnectionClosedError(err) { return } - b.t.Errorf(err.Error()) } diff --git a/mockresponses.go b/mockresponses.go index bd2902b1d3..07a636740a 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -135,6 +135,7 @@ func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHe // MockMetadataResponse is a `MetadataResponse` builder. type MockMetadataResponse struct { controllerID int32 + errors map[string]KError leaders map[string]map[int32]int32 brokers map[string]int32 t TestReporter @@ -142,12 +143,18 @@ type MockMetadataResponse struct { func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse { return &MockMetadataResponse{ + errors: make(map[string]KError), leaders: make(map[string]map[int32]int32), brokers: make(map[string]int32), t: t, } } +func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse { + mmr.errors[topic] = kerror + return mmr +} + func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse { partitions := mmr.leaders[topic] if partitions == nil { @@ -191,10 +198,22 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError) } } + for topic, err := range mmr.errors { + metadataResponse.AddTopic(topic, err) + } return metadataResponse } for _, topic := range metadataRequest.Topics { - for partition, brokerID := range mmr.leaders[topic] { + leaders, ok := mmr.leaders[topic] + if !ok { + if err, ok := mmr.errors[topic]; ok { + metadataResponse.AddTopic(topic, err) + } else { + metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition) + } + continue + } + for partition, brokerID := range leaders { metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError) } } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index f132290676..69510d1446 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -109,7 +109,7 @@ func TestProducerWithTooManyExpectations(t *testing.T) { } func TestProducerFailTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -130,7 +130,7 @@ func TestProducerFailTxn(t *testing.T) { } func TestProducerWithTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -208,7 +208,7 @@ func TestProducerWithCheckerFunction(t *testing.T) { func TestProducerWithBrokenPartitioner(t *testing.T) { trm := newTestReporterMock() - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Partitioner = func(string) sarama.Partitioner { return brokePartitioner{} } diff --git a/mocks/mocks.go b/mocks/mocks.go index fa95952295..bd9d630ddb 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -103,6 +103,8 @@ func (pc *TopicConfig) partitions(topic string) int32 { // and the response versions our mocks use, we default to the minimum Kafka version in most tests func NewTestConfig() *sarama.Config { config := sarama.NewConfig() + config.Consumer.Retry.Backoff = 0 + config.Producer.Retry.Backoff = 0 config.Version = sarama.MinVersion return config } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index f141edebb5..fb6f058bc4 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -56,7 +56,7 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { } func TestSyncProducerFailTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 @@ -86,7 +86,7 @@ func TestSyncProducerFailTxn(t *testing.T) { } func TestSyncProducerUseTxn(t *testing.T) { - config := sarama.NewConfig() + config := NewTestConfig() config.Producer.Transaction.ID = "test" config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Backoff = 0 diff --git a/sarama_test.go b/sarama_test.go index 5e3d7ba9b0..5ab84cf236 100644 --- a/sarama_test.go +++ b/sarama_test.go @@ -13,7 +13,7 @@ import ( func TestMain(m *testing.M) { flag.Parse() if f := flag.Lookup("test.v"); f != nil && f.Value.String() == "true" { - Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags) + Logger = log.New(os.Stderr, "[DEBUG] ", log.Lmicroseconds|log.Ltime) } os.Exit(m.Run()) }