Skip to content

Commit

Permalink
chore(test): use t.Run subtests for version matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
dnwe committed Jul 20, 2022
1 parent 5caa119 commit 8e96ffb
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,50 +438,60 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
}

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
var wg sync.WaitGroup
var producedMessagesMu sync.Mutex
var producedMessages []*ProducerMessage
var (
wg sync.WaitGroup
producers []SyncProducer
producedMessagesMu sync.Mutex
producedMessages []*ProducerMessage
)
for _, prodVer := range clientVersions {
for _, codec := range codecs {
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}
t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
}
defer safeClose(t, p)
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := p.SendMessage(msg)
if err != nil {
t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
producers = append(producers, p)
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
}
producedMessagesMu.Lock()
producedMessages = append(producedMessages, msg)
producedMessagesMu.Unlock()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
_, _, err := p.SendMessage(msg)
if err != nil {
t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
}
producedMessagesMu.Lock()
producedMessages = append(producedMessages, msg)
producedMessagesMu.Unlock()
}()
}
})
}
}
wg.Wait()

for _, p := range producers {
safeClose(t, p)
}

// Sort produced message in ascending offset order.
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
Expand All @@ -495,7 +505,7 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
// Consume all produced messages with all client versions supported by the
// cluster.
for _, consVer := range clientVersions {
t.Run(consVer.String(), func(t *testing.T) {
t.Run("consumer-"+consVer.String(), func(t *testing.T) {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
Expand Down

0 comments on commit 8e96ffb

Please sign in to comment.