From 86816212a7aca81d7d64c5603bb1aeb7f8248a7a Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 17 Aug 2023 00:55:41 +0100 Subject: [PATCH] fix(fvt): handle msgset vs batchset - update the TestFuncProducing tests to exercise a number of different client versions so we cover the old msgset (which the tests were originally written for) and batchset format, as well as all the currently supported versions of ProduceRequest - relax some of the metrics validation to account for messages being sent in batches on the higher kafkaVersions Signed-off-by: Dominic Evans --- functional_producer_test.go | 167 ++++++++++++++++++++---------------- metrics_test.go | 8 ++ 2 files changed, 101 insertions(+), 74 deletions(-) diff --git a/functional_producer_test.go b/functional_producer_test.go index 003a48c12..ebd206b13 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -24,46 +24,38 @@ const TestBatchSize = 1000 func TestFuncProducing(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingGzip(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.Compression = CompressionGZIP - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingSnappy(t *testing.T) { config := NewFunctionalTestConfig() config.Producer.Compression = CompressionSnappy - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingZstd(t *testing.T) { config := NewFunctionalTestConfig() config.Producer.Compression = CompressionZSTD - testProducingMessages(t, config) + testProducingMessages(t, config, V2_1_0_0) // must be at least 2.1.0.0 for zstd } func TestFuncProducingNoResponse(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.RequiredAcks = NoResponse - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingFlushing(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.Flush.Messages = TestBatchSize / 8 config.Producer.Flush.Frequency = 250 * time.Millisecond - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncMultiPartitionProduce(t *testing.T) { @@ -804,7 +796,7 @@ func TestInterceptors(t *testing.T) { safeClose(t, consumer) } -func testProducingMessages(t *testing.T, config *Config) { +func testProducingMessages(t *testing.T, config *Config, minVersion KafkaVersion) { setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -815,78 +807,95 @@ func testProducingMessages(t *testing.T, config *Config) { } } + config.ClientID = t.Name() config.Producer.Return.Successes = true config.Consumer.Return.Errors = true - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) - if err != nil { - t.Fatal(err) + kafkaVersions := map[KafkaVersion]bool{} + for _, v := range []KafkaVersion{MinVersion, V0_10_0_0, V0_11_0_0, V1_0_0_0, V2_0_0_0, V2_1_0_0} { + if v.IsAtLeast(minVersion) { + kafkaVersions[v] = true + } } - defer safeClose(t, client) - - // Keep in mind the current offset - initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) - if err != nil { - t.Fatal(err) + if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil { + kafkaVersions[upper] = true } - producer, err := NewAsyncProducerFromClient(client) - if err != nil { - t.Fatal(err) - } + for version := range kafkaVersions { + t.Run(t.Name()+"-v"+version.String(), func(t *testing.T) { + checkKafkaVersion(t, version.String()) + config.Version = version + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client) - expectedResponses := TestBatchSize - for i := 1; i <= TestBatchSize; { - msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))} - select { - case producer.Input() <- msg: - i++ - case ret := <-producer.Errors(): - t.Fatal(ret.Err) - case <-producer.Successes(): - expectedResponses-- - } - } - for expectedResponses > 0 { - select { - case ret := <-producer.Errors(): - t.Fatal(ret.Err) - case <-producer.Successes(): - expectedResponses-- - } - } - safeClose(t, producer) + // Keep in mind the current offset + initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) + if err != nil { + t.Fatal(err) + } - // Validate producer metrics before using the consumer minus the offset request - validateProducerMetrics(t, client) + producer, err := NewAsyncProducerFromClient(client) + if err != nil { + t.Fatal(err) + } - master, err := NewConsumerFromClient(client) - if err != nil { - t.Fatal(err) - } - consumer, err := master.ConsumePartition("test.1", 0, initialOffset) - if err != nil { - t.Fatal(err) - } + expectedResponses := TestBatchSize + for i := 1; i <= TestBatchSize; { + msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))} + select { + case producer.Input() <- msg: + i++ + case ret := <-producer.Errors(): + t.Fatal(ret.Err) + case <-producer.Successes(): + expectedResponses-- + } + } + for expectedResponses > 0 { + select { + case ret := <-producer.Errors(): + t.Fatal(ret.Err) + case <-producer.Successes(): + expectedResponses-- + } + } + safeClose(t, producer) - for i := 1; i <= TestBatchSize; i++ { - select { - case <-time.After(10 * time.Second): - t.Fatal("Not received any more events in the last 10 seconds.") + // Validate producer metrics before using the consumer minus the offset request + validateProducerMetrics(t, client) - case err := <-consumer.Errors(): - t.Error(err) + master, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + consumer, err := master.ConsumePartition("test.1", 0, initialOffset) + if err != nil { + t.Fatal(err) + } - case message := <-consumer.Messages(): - if string(message.Value) != fmt.Sprintf("testing %d", i) { - t.Fatalf("Unexpected message with index %d: %s", i, message.Value) + for i := 1; i <= TestBatchSize; i++ { + select { + case <-time.After(10 * time.Second): + t.Fatal("Not received any more events in the last 10 seconds.") + + case err := <-consumer.Errors(): + t.Error(err) + + case message := <-consumer.Messages(): + if string(message.Value) != fmt.Sprintf("testing %d", i) { + t.Fatalf("Unexpected message with index %d: %s", i, message.Value) + } + } } - } - } - validateConsumerMetrics(t, client) + validateConsumerMetrics(t, client) - safeClose(t, consumer) + safeClose(t, consumer) + }) + } } // TestAsyncProducerRemoteBrokerClosed ensures that the async producer can @@ -996,11 +1005,21 @@ func validateProducerMetrics(t *testing.T, client Client) { if compressionEnabled { // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1)) - metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) + if client.Config().Version.IsAtLeast(V0_11_0_0) { + // slightly better compression with batching + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 30)) + } else { + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) + } metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000)) } else { // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record - metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) + if client.Config().Version.IsAtLeast(V0_11_0_0) { + // records will be grouped in batchSet rather than msgSet + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 4)) + } else { + metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) + } metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100)) metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100)) } diff --git a/metrics_test.go b/metrics_test.go index ee7aea374..c9144df38 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -105,6 +105,7 @@ func countMeterValidator(name string, expectedCount int) *metricValidator { func minCountMeterValidator(name string, minCount int) *metricValidator { return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() count := meter.Count() if count < int64(minCount) { t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count) @@ -116,6 +117,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if histogram, ok := metric.(metrics.Histogram); !ok { t.Errorf("Expected histogram metric for '%s', got %T", name, metric) } else { @@ -127,6 +129,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His func countHistogramValidator(name string, expectedCount int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() count := histogram.Count() if count != int64(expectedCount) { t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count) @@ -136,6 +139,7 @@ func countHistogramValidator(name string, expectedCount int) *metricValidator { func minCountHistogramValidator(name string, minCount int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() count := histogram.Count() if count < int64(minCount) { t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count) @@ -145,6 +149,7 @@ func minCountHistogramValidator(name string, minCount int) *metricValidator { func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() min := int(histogram.Min()) if min != expectedMin { t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min) @@ -158,6 +163,7 @@ func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *me func minValHistogramValidator(name string, minMin int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() min := int(histogram.Min()) if min < minMin { t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min) @@ -167,6 +173,7 @@ func minValHistogramValidator(name string, minMin int) *metricValidator { func maxValHistogramValidator(name string, maxMax int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() max := int(histogram.Max()) if max > maxMax { t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max) @@ -178,6 +185,7 @@ func counterValidator(name string, expectedCount int) *metricValidator { return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if counter, ok := metric.(metrics.Counter); !ok { t.Errorf("Expected counter metric for '%s', got %T", name, metric) } else {