diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 80ca05b63..12f87518a 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -438,50 +438,60 @@ func versionRange(lower KafkaVersion) []KafkaVersion { } func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { - var wg sync.WaitGroup - var producedMessagesMu sync.Mutex - var producedMessages []*ProducerMessage + var ( + wg sync.WaitGroup + producers []SyncProducer + producedMessagesMu sync.Mutex + producedMessages []*ProducerMessage + ) for _, prodVer := range clientVersions { for _, codec := range codecs { - prodCfg := NewTestConfig() - prodCfg.Version = prodVer - prodCfg.Producer.Return.Successes = true - prodCfg.Producer.Return.Errors = true - prodCfg.Producer.Flush.MaxMessages = flush - prodCfg.Producer.Compression = codec - prodCfg.Producer.Idempotent = idempotent - if idempotent { - prodCfg.Producer.RequiredAcks = WaitForAll - prodCfg.Net.MaxOpenRequests = 1 - } + t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) { + t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec) + prodCfg := NewTestConfig() + prodCfg.Version = prodVer + prodCfg.Producer.Return.Successes = true + prodCfg.Producer.Return.Errors = true + prodCfg.Producer.Flush.MaxMessages = flush + prodCfg.Producer.Compression = codec + prodCfg.Producer.Idempotent = idempotent + if idempotent { + prodCfg.Producer.RequiredAcks = WaitForAll + prodCfg.Net.MaxOpenRequests = 1 + } - p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) - if err != nil { - t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) - continue - } - defer safeClose(t, p) - for i := 0; i < countPerVerCodec; i++ { - msg := &ProducerMessage{ - Topic: "test.1", - Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)), + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) + if err != nil { + t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) + return } - wg.Add(1) - go func() { - defer wg.Done() - _, _, err := p.SendMessage(msg) - if err != nil { - t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err) + producers = append(producers, p) + for i := 0; i < countPerVerCodec; i++ { + msg := &ProducerMessage{ + Topic: "test.1", + Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)), } - producedMessagesMu.Lock() - producedMessages = append(producedMessages, msg) - producedMessagesMu.Unlock() - }() - } + wg.Add(1) + go func() { + defer wg.Done() + _, _, err := p.SendMessage(msg) + if err != nil { + t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err) + } + producedMessagesMu.Lock() + producedMessages = append(producedMessages, msg) + producedMessagesMu.Unlock() + }() + } + }) } } wg.Wait() + for _, p := range producers { + safeClose(t, p) + } + // Sort produced message in ascending offset order. sort.Slice(producedMessages, func(i, j int) bool { return producedMessages[i].Offset < producedMessages[j].Offset @@ -495,7 +505,7 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [ // Consume all produced messages with all client versions supported by the // cluster. for _, consVer := range clientVersions { - t.Run(consVer.String(), func(t *testing.T) { + t.Run("consumer-"+consVer.String(), func(t *testing.T) { t.Logf("*** Consuming with client version %s\n", consVer) // Create a partition consumer that should start from the first produced // message.