Skip to content

Commit

Permalink
chore(test): use t.Run subtests for version matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
dnwe committed Jul 20, 2022
1 parent 5caa119 commit 3c6e6ce
Showing 1 changed file with 36 additions and 33 deletions.
69 changes: 36 additions & 33 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,41 +443,44 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
var producedMessages []*ProducerMessage
for _, prodVer := range clientVersions {
for _, codec := range codecs {
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}
t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
}
defer safeClose(t, p)
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := p.SendMessage(msg)
if err != nil {
t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
defer safeClose(t, p)
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
}
producedMessagesMu.Lock()
producedMessages = append(producedMessages, msg)
producedMessagesMu.Unlock()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := p.SendMessage(msg)
if err != nil {
t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
}
producedMessagesMu.Lock()
producedMessages = append(producedMessages, msg)
producedMessagesMu.Unlock()
}()
}
})
}
}
wg.Wait()
Expand All @@ -495,7 +498,7 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
// Consume all produced messages with all client versions supported by the
// cluster.
for _, consVer := range clientVersions {
t.Run(consVer.String(), func(t *testing.T) {
t.Run("consumer-"+consVer.String(), func(t *testing.T) {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
Expand Down

0 comments on commit 3c6e6ce

Please sign in to comment.