diff --git a/functional_producer_test.go b/functional_producer_test.go index 003a48c12..ebd206b13 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -24,46 +24,38 @@ const TestBatchSize = 1000 func TestFuncProducing(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingGzip(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.Compression = CompressionGZIP - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingSnappy(t *testing.T) { config := NewFunctionalTestConfig() config.Producer.Compression = CompressionSnappy - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingZstd(t *testing.T) { config := NewFunctionalTestConfig() config.Producer.Compression = CompressionZSTD - testProducingMessages(t, config) + testProducingMessages(t, config, V2_1_0_0) // must be at least 2.1.0.0 for zstd } func TestFuncProducingNoResponse(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.RequiredAcks = NoResponse - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncProducingFlushing(t *testing.T) { config := NewFunctionalTestConfig() - // FIXME: KAFKA_VERSION seems to break this test - config.Version = MinVersion config.Producer.Flush.Messages = TestBatchSize / 8 config.Producer.Flush.Frequency = 250 * time.Millisecond - testProducingMessages(t, config) + testProducingMessages(t, config, MinVersion) } func TestFuncMultiPartitionProduce(t *testing.T) { @@ -804,7 +796,7 @@ func TestInterceptors(t *testing.T) { safeClose(t, consumer) } -func testProducingMessages(t *testing.T, config *Config) { +func testProducingMessages(t *testing.T, config *Config, minVersion KafkaVersion) { setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -815,78 +807,95 @@ func testProducingMessages(t *testing.T, config *Config) { } } + config.ClientID = t.Name() config.Producer.Return.Successes = true config.Consumer.Return.Errors = true - client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) - if err != nil { - t.Fatal(err) + kafkaVersions := map[KafkaVersion]bool{} + for _, v := range []KafkaVersion{MinVersion, V0_10_0_0, V0_11_0_0, V1_0_0_0, V2_0_0_0, V2_1_0_0} { + if v.IsAtLeast(minVersion) { + kafkaVersions[v] = true + } } - defer safeClose(t, client) - - // Keep in mind the current offset - initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) - if err != nil { - t.Fatal(err) + if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil { + kafkaVersions[upper] = true } - producer, err := NewAsyncProducerFromClient(client) - if err != nil { - t.Fatal(err) - } + for version := range kafkaVersions { + t.Run(t.Name()+"-v"+version.String(), func(t *testing.T) { + checkKafkaVersion(t, version.String()) + config.Version = version + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client) - expectedResponses := TestBatchSize - for i := 1; i <= TestBatchSize; { - msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))} - select { - case producer.Input() <- msg: - i++ - case ret := <-producer.Errors(): - t.Fatal(ret.Err) - case <-producer.Successes(): - expectedResponses-- - } - } - for expectedResponses > 0 { - select { - case ret := <-producer.Errors(): - t.Fatal(ret.Err) - case <-producer.Successes(): - expectedResponses-- - } - } - safeClose(t, producer) + // Keep in mind the current offset + initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) + if err != nil { + t.Fatal(err) + } - // Validate producer metrics before using the consumer minus the offset request - validateProducerMetrics(t, client) + producer, err := NewAsyncProducerFromClient(client) + if err != nil { + t.Fatal(err) + } - master, err := NewConsumerFromClient(client) - if err != nil { - t.Fatal(err) - } - consumer, err := master.ConsumePartition("test.1", 0, initialOffset) - if err != nil { - t.Fatal(err) - } + expectedResponses := TestBatchSize + for i := 1; i <= TestBatchSize; { + msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))} + select { + case producer.Input() <- msg: + i++ + case ret := <-producer.Errors(): + t.Fatal(ret.Err) + case <-producer.Successes(): + expectedResponses-- + } + } + for expectedResponses > 0 { + select { + case ret := <-producer.Errors(): + t.Fatal(ret.Err) + case <-producer.Successes(): + expectedResponses-- + } + } + safeClose(t, producer) - for i := 1; i <= TestBatchSize; i++ { - select { - case <-time.After(10 * time.Second): - t.Fatal("Not received any more events in the last 10 seconds.") + // Validate producer metrics before using the consumer minus the offset request + validateProducerMetrics(t, client) - case err := <-consumer.Errors(): - t.Error(err) + master, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + consumer, err := master.ConsumePartition("test.1", 0, initialOffset) + if err != nil { + t.Fatal(err) + } - case message := <-consumer.Messages(): - if string(message.Value) != fmt.Sprintf("testing %d", i) { - t.Fatalf("Unexpected message with index %d: %s", i, message.Value) + for i := 1; i <= TestBatchSize; i++ { + select { + case <-time.After(10 * time.Second): + t.Fatal("Not received any more events in the last 10 seconds.") + + case err := <-consumer.Errors(): + t.Error(err) + + case message := <-consumer.Messages(): + if string(message.Value) != fmt.Sprintf("testing %d", i) { + t.Fatalf("Unexpected message with index %d: %s", i, message.Value) + } + } } - } - } - validateConsumerMetrics(t, client) + validateConsumerMetrics(t, client) - safeClose(t, consumer) + safeClose(t, consumer) + }) + } } // TestAsyncProducerRemoteBrokerClosed ensures that the async producer can @@ -996,11 +1005,21 @@ func validateProducerMetrics(t *testing.T, client Client) { if compressionEnabled { // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1)) - metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) + if client.Config().Version.IsAtLeast(V0_11_0_0) { + // slightly better compression with batching + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 30)) + } else { + metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) + } metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000)) } else { // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record - metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) + if client.Config().Version.IsAtLeast(V0_11_0_0) { + // records will be grouped in batchSet rather than msgSet + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 4)) + } else { + metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) + } metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100)) metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100)) } diff --git a/metrics_test.go b/metrics_test.go index ee7aea374..c9144df38 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -105,6 +105,7 @@ func countMeterValidator(name string, expectedCount int) *metricValidator { func minCountMeterValidator(name string, minCount int) *metricValidator { return meterValidator(name, func(t *testing.T, meter metrics.Meter) { + t.Helper() count := meter.Count() if count < int64(minCount) { t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count) @@ -116,6 +117,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if histogram, ok := metric.(metrics.Histogram); !ok { t.Errorf("Expected histogram metric for '%s', got %T", name, metric) } else { @@ -127,6 +129,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His func countHistogramValidator(name string, expectedCount int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() count := histogram.Count() if count != int64(expectedCount) { t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count) @@ -136,6 +139,7 @@ func countHistogramValidator(name string, expectedCount int) *metricValidator { func minCountHistogramValidator(name string, minCount int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() count := histogram.Count() if count < int64(minCount) { t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count) @@ -145,6 +149,7 @@ func minCountHistogramValidator(name string, minCount int) *metricValidator { func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() min := int(histogram.Min()) if min != expectedMin { t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min) @@ -158,6 +163,7 @@ func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *me func minValHistogramValidator(name string, minMin int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() min := int(histogram.Min()) if min < minMin { t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min) @@ -167,6 +173,7 @@ func minValHistogramValidator(name string, minMin int) *metricValidator { func maxValHistogramValidator(name string, maxMax int) *metricValidator { return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) { + t.Helper() max := int(histogram.Max()) if max > maxMax { t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max) @@ -178,6 +185,7 @@ func counterValidator(name string, expectedCount int) *metricValidator { return &metricValidator{ name: name, validator: func(t *testing.T, metric interface{}) { + t.Helper() if counter, ok := metric.(metrics.Counter); !ok { t.Errorf("Expected counter metric for '%s', got %T", name, metric) } else {