Skip to content

Commit

Permalink
chore(test): speedup some slow tests
Browse files Browse the repository at this point in the history
- make consumer and produce retries zero backoff
- use parallel on TLS test
- reduce mockbroker noise
- add mockresponses support for per-topic Err in MetadataResponse
- use microseconds in test debug logger

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 9, 2023
1 parent fa7db9a commit d0f0bc5
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ default: fmt get update test lint
GO := go
GOBIN := $(shell pwd)/bin
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := $(GO) test -v -race -coverprofile=profile.out -covermode=atomic
GOTEST := $(GO) test -v -race -shuffle=on -coverprofile=profile.out -covermode=atomic

FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
Expand Down
9 changes: 0 additions & 9 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2323,12 +2323,3 @@ ProducerLoop:

log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
}

// NewTestConfig returns a config meant to be used by tests.
// Due to inconsistencies with the request versions the clients send using the default Kafka version
// and the response versions our mocks use, we default to the minimum Kafka version in most tests
func NewTestConfig() *Config {
config := NewConfig()
config.Version = MinVersion
return config
}
1 change: 1 addition & 0 deletions client_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func TestTLS(t *testing.T) {
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client)
})
}
Expand Down
11 changes: 11 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"github.com/rcrowley/go-metrics"
)

// NewTestConfig returns a config meant to be used by tests.
// Due to inconsistencies with the request versions the clients send using the default Kafka version
// and the response versions our mocks use, we default to the minimum Kafka version in most tests
func NewTestConfig() *Config {
config := NewConfig()
config.Consumer.Retry.Backoff = 0
config.Producer.Retry.Backoff = 0
config.Version = MinVersion
return config
}

func TestDefaultConfigValidates(t *testing.T) {
config := NewTestConfig()
if err := config.Validate(); err != nil {
Expand Down
87 changes: 31 additions & 56 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
config.Version = V2_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Retry.Max = 2
config.Consumer.Group.Rebalance.Retry.Backoff = 0
config.Consumer.Offsets.AutoCommit.Enable = false

broker0 := NewMockBroker(t, 0)
Expand Down Expand Up @@ -100,72 +101,46 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
}

func TestConsume_RaceTest(t *testing.T) {
const groupID = "test-group"
const topic = "test-topic"
const offsetStart = int64(1234)
const (
groupID = "test-group"
topic = "test-topic"
offsetStart = int64(1234)
)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_8_1_0
cfg.Consumer.Return.Errors = true
cfg.Metadata.Full = true

seedBroker := NewMockBroker(t, 1)

joinGroupResponse := &JoinGroupResponse{}

syncGroupResponse := &SyncGroupResponse{
Version: 3, // sarama > 2.3.0.0 uses version 3
}
// Leverage mock response to get the MemberAssignment bytes
mockSyncGroupResponse := NewMockSyncGroupResponse(t).SetMemberAssignment(&ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0
UserData: []byte{0x01},
})
syncGroupResponse.MemberAssignment = mockSyncGroupResponse.MemberAssignment

heartbeatResponse := &HeartbeatResponse{
Err: ErrNoError,
}
offsetFetchResponse := &OffsetFetchResponse{
Version: 1,
ThrottleTimeMs: 0,
Err: ErrNoError,
}
offsetFetchResponse.AddBlock(topic, 0, &OffsetFetchResponseBlock{
Offset: offsetStart,
LeaderEpoch: 0,
Metadata: "",
Err: ErrNoError,
})

offsetResponse := &OffsetResponse{
Version: 1,
}
offsetResponse.AddTopicPartition(topic, 0, offsetStart)

metadataResponse := new(MetadataResponse)
metadataResponse.Version = 10
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
metadataResponse.AddTopic("mismatched-topic", ErrUnknownTopicOrPartition)
defer seedBroker.Close()

handlerMap := map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockSequence(metadataResponse),
"OffsetRequest": NewMockSequence(offsetResponse),
"OffsetFetchRequest": NewMockSequence(offsetFetchResponse),
"FindCoordinatorRequest": NewMockSequence(NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, groupID, seedBroker)),
"JoinGroupRequest": NewMockSequence(joinGroupResponse),
"SyncGroupRequest": NewMockSequence(syncGroupResponse),
"HeartbeatRequest": NewMockSequence(heartbeatResponse),
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetError("mismatched-topic", ErrUnknownTopicOrPartition),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset(topic, 0, -1, offsetStart),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).
SetOffset(groupID, topic, 0, offsetStart, "", ErrNoError),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorGroup, groupID, seedBroker),
"JoinGroupRequest": NewMockJoinGroupResponse(t),
"SyncGroupRequest": NewMockSyncGroupResponse(t).SetMemberAssignment(
&ConsumerGroupMemberAssignment{
Version: 1,
Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0
UserData: []byte{0x01},
},
),
"HeartbeatRequest": NewMockHeartbeatResponse(t),
}
seedBroker.SetHandlerByMap(handlerMap)

cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))

defer seedBroker.Close()
cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))

retryWait := 20 * time.Millisecond
retryWait := 10 * time.Millisecond
var err error
clientRetries := 0
outerFor:
Expand Down Expand Up @@ -195,8 +170,8 @@ outerFor:
t.Fatalf("should not proceed to Consume")
}

if clientRetries <= 0 {
t.Errorf("clientRetries = %v; want > 0", clientRetries)
if clientRetries <= 1 {
t.Errorf("clientRetries = %v; want > 1", clientRetries)
}

if err != nil && !errors.Is(err, context.DeadlineExceeded) {
Expand Down
13 changes: 7 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) {
block4 := fetchResponse4.GetBlock("my_topic", 0)
block4.PreferredReadReplica = -1

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -925,7 +925,7 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) {
block2 := fetchResponse2.GetBlock("my_topic", 0)
block2.PreferredReadReplica = -1

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -981,7 +981,7 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) {
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -1051,7 +1051,7 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 3)
fetchResponse4.AddMessage("my_topic", 0, nil, testMsg, 4)

cfg := NewConfig()
cfg := NewTestConfig()
cfg.Version = V2_3_0_0
cfg.RackID = "consumer_rack"

Expand Down Expand Up @@ -1109,9 +1109,10 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
//
// See https://github.com/IBM/sarama/issues/1927
func TestConsumeMessagesTrackLeader(t *testing.T) {
cfg := NewConfig()
cfg := NewTestConfig()
cfg.ClientID = t.Name()
cfg.Metadata.RefreshFrequency = time.Millisecond * 50
cfg.Consumer.Retry.Backoff = 0
cfg.Net.MaxOpenRequests = 1
cfg.Version = V2_1_0_0

Expand Down Expand Up @@ -1996,7 +1997,7 @@ func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) {
broker: &brokerConsumer{
broker: &Broker{},
},
conf: NewConfig(),
conf: NewTestConfig(),
topic: "my_topic",
partition: 0,
}
Expand Down
35 changes: 22 additions & 13 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ func (b *MockBroker) serverLoop() {
i++
}
wg.Wait()
Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
if !isConnectionClosedError(err) {
Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
}
}

func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
Expand Down Expand Up @@ -243,8 +245,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
for {
buffer, err := b.readToBytes(conn)
if err != nil {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
b.serverError(err)
if !isConnectionClosedError(err) {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
b.serverError(err)
}
break
}

Expand All @@ -253,8 +257,10 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
req, br, err := decodeRequest(bytes.NewReader(buffer))
bytesRead = br
if err != nil {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
b.serverError(err)
if !isConnectionClosedError(err) {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
b.serverError(err)
}
break
}

Expand Down Expand Up @@ -358,22 +364,25 @@ func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader)
}
}

func (b *MockBroker) serverError(err error) {
b.t.Helper()
isConnectionClosedError := false
func isConnectionClosedError(err error) bool {
var result bool
opError := &net.OpError{}
if errors.As(err, &opError) {
isConnectionClosedError = true
result = true
} else if errors.Is(err, io.EOF) {
isConnectionClosedError = true
result = true
} else if err.Error() == "use of closed network connection" {
isConnectionClosedError = true
result = true
}

if isConnectionClosedError {
return result
}

func (b *MockBroker) serverError(err error) {
b.t.Helper()
if isConnectionClosedError(err) {
return
}

b.t.Errorf(err.Error())
}

Expand Down
21 changes: 20 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,26 @@ func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHe
// MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
controllerID int32
errors map[string]KError
leaders map[string]map[int32]int32
brokers map[string]int32
t TestReporter
}

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
return &MockMetadataResponse{
errors: make(map[string]KError),
leaders: make(map[string]map[int32]int32),
brokers: make(map[string]int32),
t: t,
}
}

func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse {
mmr.errors[topic] = kerror
return mmr
}

func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
partitions := mmr.leaders[topic]
if partitions == nil {
Expand Down Expand Up @@ -191,10 +198,22 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader
metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
for topic, err := range mmr.errors {
metadataResponse.AddTopic(topic, err)
}
return metadataResponse
}
for _, topic := range metadataRequest.Topics {
for partition, brokerID := range mmr.leaders[topic] {
leaders, ok := mmr.leaders[topic]
if !ok {
if err, ok := mmr.errors[topic]; ok {
metadataResponse.AddTopic(topic, err)
} else {
metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition)
}
continue
}
for partition, brokerID := range leaders {
metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
Expand Down
6 changes: 3 additions & 3 deletions mocks/async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestProducerWithTooManyExpectations(t *testing.T) {
}

func TestProducerFailTxn(t *testing.T) {
config := sarama.NewConfig()
config := NewTestConfig()
config.Producer.Transaction.ID = "test"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Backoff = 0
Expand All @@ -130,7 +130,7 @@ func TestProducerFailTxn(t *testing.T) {
}

func TestProducerWithTxn(t *testing.T) {
config := sarama.NewConfig()
config := NewTestConfig()
config.Producer.Transaction.ID = "test"
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Backoff = 0
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestProducerWithCheckerFunction(t *testing.T) {

func TestProducerWithBrokenPartitioner(t *testing.T) {
trm := newTestReporterMock()
config := sarama.NewConfig()
config := NewTestConfig()
config.Producer.Partitioner = func(string) sarama.Partitioner {
return brokePartitioner{}
}
Expand Down
2 changes: 2 additions & 0 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (pc *TopicConfig) partitions(topic string) int32 {
// and the response versions our mocks use, we default to the minimum Kafka version in most tests
func NewTestConfig() *sarama.Config {
config := sarama.NewConfig()
config.Consumer.Retry.Backoff = 0
config.Producer.Retry.Backoff = 0
config.Version = sarama.MinVersion
return config
}
Loading

0 comments on commit d0f0bc5

Please sign in to comment.