Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fvt): handle msgset vs batchset #2603

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 93 additions & 74 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,46 +24,38 @@ const TestBatchSize = 1000

func TestFuncProducing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingGzip(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Compression = CompressionGZIP
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingSnappy(t *testing.T) {
config := NewFunctionalTestConfig()
config.Producer.Compression = CompressionSnappy
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingZstd(t *testing.T) {
config := NewFunctionalTestConfig()
config.Producer.Compression = CompressionZSTD
testProducingMessages(t, config)
testProducingMessages(t, config, V2_1_0_0) // must be at least 2.1.0.0 for zstd
}

func TestFuncProducingNoResponse(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.RequiredAcks = NoResponse
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncProducingFlushing(t *testing.T) {
config := NewFunctionalTestConfig()
// FIXME: KAFKA_VERSION seems to break this test
config.Version = MinVersion
config.Producer.Flush.Messages = TestBatchSize / 8
config.Producer.Flush.Frequency = 250 * time.Millisecond
testProducingMessages(t, config)
testProducingMessages(t, config, MinVersion)
}

func TestFuncMultiPartitionProduce(t *testing.T) {
Expand Down Expand Up @@ -804,7 +796,7 @@ func TestInterceptors(t *testing.T) {
safeClose(t, consumer)
}

func testProducingMessages(t *testing.T, config *Config) {
func testProducingMessages(t *testing.T, config *Config, minVersion KafkaVersion) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -815,78 +807,95 @@ func testProducingMessages(t *testing.T, config *Config) {
}
}

config.ClientID = t.Name()
config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
kafkaVersions := map[KafkaVersion]bool{}
for _, v := range []KafkaVersion{MinVersion, V0_10_0_0, V0_11_0_0, V1_0_0_0, V2_0_0_0, V2_1_0_0} {
if v.IsAtLeast(minVersion) {
kafkaVersions[v] = true
}
}
defer safeClose(t, client)

// Keep in mind the current offset
initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
if upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")); err != nil {
kafkaVersions[upper] = true
}

producer, err := NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}
for version := range kafkaVersions {
t.Run(t.Name()+"-v"+version.String(), func(t *testing.T) {
checkKafkaVersion(t, version.String())
config.Version = version
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)

expectedResponses := TestBatchSize
for i := 1; i <= TestBatchSize; {
msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
select {
case producer.Input() <- msg:
i++
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
for expectedResponses > 0 {
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
safeClose(t, producer)
// Keep in mind the current offset
initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

// Validate producer metrics before using the consumer minus the offset request
validateProducerMetrics(t, client)
producer, err := NewAsyncProducerFromClient(client)
if err != nil {
t.Fatal(err)
}

master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}
expectedResponses := TestBatchSize
for i := 1; i <= TestBatchSize; {
msg := &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
select {
case producer.Input() <- msg:
i++
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
for expectedResponses > 0 {
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
expectedResponses--
}
}
safeClose(t, producer)

for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")
// Validate producer metrics before using the consumer minus the offset request
validateProducerMetrics(t, client)

case err := <-consumer.Errors():
t.Error(err)
master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}

case message := <-consumer.Messages():
if string(message.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
t.Fatal("Not received any more events in the last 10 seconds.")

case err := <-consumer.Errors():
t.Error(err)

case message := <-consumer.Messages():
if string(message.Value) != fmt.Sprintf("testing %d", i) {
t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
}
}
}
}
}

validateConsumerMetrics(t, client)
validateConsumerMetrics(t, client)

safeClose(t, consumer)
safeClose(t, consumer)
})
}
}

// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can
Expand Down Expand Up @@ -996,11 +1005,21 @@ func validateProducerMetrics(t *testing.T, client Client) {
if compressionEnabled {
// We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1))
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
if client.Config().Version.IsAtLeast(V0_11_0_0) {
// slightly better compression with batching
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 30))
} else {
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50))
}
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000))
} else {
// We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
if client.Config().Version.IsAtLeast(V0_11_0_0) {
// records will be grouped in batchSet rather than msgSet
metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 4))
} else {
metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize))
}
metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 100))
metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 100))
}
Expand Down
8 changes: 8 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func countMeterValidator(name string, expectedCount int) *metricValidator {

func minCountMeterValidator(name string, minCount int) *metricValidator {
return meterValidator(name, func(t *testing.T, meter metrics.Meter) {
t.Helper()
count := meter.Count()
if count < int64(minCount) {
t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count)
Expand All @@ -116,6 +117,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if histogram, ok := metric.(metrics.Histogram); !ok {
t.Errorf("Expected histogram metric for '%s', got %T", name, metric)
} else {
Expand All @@ -127,6 +129,7 @@ func histogramValidator(name string, extraValidator func(*testing.T, metrics.His

func countHistogramValidator(name string, expectedCount int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
count := histogram.Count()
if count != int64(expectedCount) {
t.Errorf("Expected histogram metric '%s' count = %d, got %d", name, expectedCount, count)
Expand All @@ -136,6 +139,7 @@ func countHistogramValidator(name string, expectedCount int) *metricValidator {

func minCountHistogramValidator(name string, minCount int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
count := histogram.Count()
if count < int64(minCount) {
t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count)
Expand All @@ -145,6 +149,7 @@ func minCountHistogramValidator(name string, minCount int) *metricValidator {

func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
min := int(histogram.Min())
if min != expectedMin {
t.Errorf("Expected histogram metric '%s' min = %d, got %d", name, expectedMin, min)
Expand All @@ -158,6 +163,7 @@ func minMaxHistogramValidator(name string, expectedMin int, expectedMax int) *me

func minValHistogramValidator(name string, minMin int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
min := int(histogram.Min())
if min < minMin {
t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min)
Expand All @@ -167,6 +173,7 @@ func minValHistogramValidator(name string, minMin int) *metricValidator {

func maxValHistogramValidator(name string, maxMax int) *metricValidator {
return histogramValidator(name, func(t *testing.T, histogram metrics.Histogram) {
t.Helper()
max := int(histogram.Max())
if max > maxMax {
t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
Expand All @@ -178,6 +185,7 @@ func counterValidator(name string, expectedCount int) *metricValidator {
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
t.Helper()
if counter, ok := metric.(metrics.Counter); !ok {
t.Errorf("Expected counter metric for '%s', got %T", name, metric)
} else {
Expand Down
Loading