diff --git a/.github/workflows/fvt.yml b/.github/workflows/fvt.yml index 24f97b1ff..3f77a8b87 100644 --- a/.github/workflows/fvt.yml +++ b/.github/workflows/fvt.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: go-version: [1.18.x] - kafka-version: [2.8.1, 3.0.1, 3.1.0] + kafka-version: [2.8.1, 3.0.1, 3.1.0, 3.2.0] env: DEBUG: true GOFLAGS: -trimpath diff --git a/Dockerfile.kafka b/Dockerfile.kafka index f501f007a..37aa1aa1d 100644 --- a/Dockerfile.kafka +++ b/Dockerfile.kafka @@ -20,6 +20,7 @@ ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.1" RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.0.1" RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.0" +RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.0" COPY entrypoint.sh / diff --git a/Makefile b/Makefile index 3e8b7dc81..7cefc2a2c 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ test: $(GOBIN)/tparse || NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}" .PHONY: test_functional test_functional: $(GOBIN)/tparse - $(GOTEST) -timeout 12m -tags=functional -json ./... \ + $(GOTEST) -timeout 15m -tags=functional -json ./... \ | tee output.json | $(GOBIN)/tparse -follow -all [ -z "$${GITHUB_STEP_SUMMARY:-}" ] \ || NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}" diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 12f87518a..346479b2d 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -16,6 +16,9 @@ import ( "testing" "time" + "golang.org/x/sync/errgroup" + + "github.com/rcrowley/go-metrics" "github.com/stretchr/testify/require" ) @@ -83,6 +86,11 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { // from this test case. It has a similar version matrix test case below that // only checks versions from v0.10.0.0 until KAFKA_VERSION. func TestVersionMatrix(t *testing.T) { + metrics.UseNilMetrics = true // disable Sarama's go-metrics library + t.Cleanup(func() { + metrics.UseNilMetrics = false + }) + setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -99,6 +107,10 @@ func TestVersionMatrix(t *testing.T) { // Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to // test LZ4 should start with v0.10.0.0. func TestVersionMatrixLZ4(t *testing.T) { + metrics.UseNilMetrics = true // disable Sarama's go-metrics library + t.Cleanup(func() { + metrics.UseNilMetrics = false + }) setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -115,6 +127,10 @@ func TestVersionMatrixLZ4(t *testing.T) { // Support for zstd codec was introduced in v2.1.0.0 func TestVersionMatrixZstd(t *testing.T) { + metrics.UseNilMetrics = true // disable Sarama's go-metrics library + t.Cleanup(func() { + metrics.UseNilMetrics = false + }) setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -129,6 +145,10 @@ func TestVersionMatrixZstd(t *testing.T) { } func TestVersionMatrixIdempotent(t *testing.T) { + metrics.UseNilMetrics = true // disable Sarama's go-metrics library + t.Cleanup(func() { + metrics.UseNilMetrics = false + }) setupFunctionalTest(t) defer teardownFunctionalTest(t) @@ -424,8 +444,8 @@ func versionRange(lower KafkaVersion) []KafkaVersion { upper = MaxVersion } - versions := make([]KafkaVersion, 0, len(SupportedVersions)) - for _, v := range SupportedVersions { + versions := make([]KafkaVersion, 0, len(fvtRangeVersions)) + for _, v := range fvtRangeVersions { if !v.IsAtLeast(lower) { continue } @@ -439,33 +459,44 @@ func versionRange(lower KafkaVersion) []KafkaVersion { func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { var ( - wg sync.WaitGroup producers []SyncProducer producedMessagesMu sync.Mutex producedMessages []*ProducerMessage ) + g := errgroup.Group{} for _, prodVer := range clientVersions { for _, codec := range codecs { - t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) { - t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec) - prodCfg := NewTestConfig() - prodCfg.Version = prodVer - prodCfg.Producer.Return.Successes = true - prodCfg.Producer.Return.Errors = true - prodCfg.Producer.Flush.MaxMessages = flush - prodCfg.Producer.Compression = codec - prodCfg.Producer.Idempotent = idempotent - if idempotent { - prodCfg.Producer.RequiredAcks = WaitForAll - prodCfg.Net.MaxOpenRequests = 1 - } + prodCfg := NewTestConfig() + prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String() + if idempotent { + prodCfg.ClientID += "-idempotent" + } + if codec > 0 { + prodCfg.ClientID += "-" + codec.String() + } + prodCfg.Metadata.Full = false + prodCfg.Version = prodVer + prodCfg.Producer.Return.Successes = true + prodCfg.Producer.Return.Errors = true + prodCfg.Producer.Flush.MaxMessages = flush + prodCfg.Producer.Compression = codec + prodCfg.Producer.Idempotent = idempotent + if idempotent { + prodCfg.Producer.RequiredAcks = WaitForAll + prodCfg.Net.MaxOpenRequests = 1 + } - p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) - if err != nil { - t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) - return - } - producers = append(producers, p) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) + if err != nil { + t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) + } + producers = append(producers, p) + + prodVer := prodVer + codec := codec + g.Go(func() error { + t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec) + var wg sync.WaitGroup for i := 0; i < countPerVerCodec; i++ { msg := &ProducerMessage{ Topic: "test.1", @@ -483,10 +514,14 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi producedMessagesMu.Unlock() }() } + wg.Wait() + return nil }) } } - wg.Wait() + if err := g.Wait(); err != nil { + t.Fatal(err) + } for _, p := range producers { safeClose(t, p) @@ -496,6 +531,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi sort.Slice(producedMessages, func(i, j int) bool { return producedMessages[i].Offset < producedMessages[j].Offset }) + require.NotEmpty(t, producedMessages, "should have produced >0 messages") t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n", len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset) return producedMessages @@ -504,26 +540,33 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) { // Consume all produced messages with all client versions supported by the // cluster. + g := errgroup.Group{} for _, consVer := range clientVersions { - t.Run("consumer-"+consVer.String(), func(t *testing.T) { - t.Logf("*** Consuming with client version %s\n", consVer) - // Create a partition consumer that should start from the first produced - // message. - consCfg := NewTestConfig() - consCfg.Version = consVer - c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, c) - pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) - if err != nil { - t.Fatal(err) - } - defer safeClose(t, pc) + // Create a partition consumer that should start from the first produced + // message. + consCfg := NewTestConfig() + consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String() + consCfg.Consumer.MaxProcessingTime = time.Second + consCfg.Metadata.Full = false + consCfg.Version = consVer + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, c) + pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, pc) + var wg sync.WaitGroup + wg.Add(1) + consVer := consVer + g.Go(func() error { // Consume as many messages as there have been produced and make sure that // order is preserved. + t.Logf("*** Consuming with client version %s\n", consVer) for i, prodMsg := range producedMessages { select { case consMsg := <-pc.Messages(): @@ -535,10 +578,25 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [ t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s", consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) } - case <-time.After(3 * time.Second): - t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value) + if i == 0 { + t.Logf("Consumed first msg: version=%s, index=%d, got=%s", + consVer, i, consMsg2Str(consMsg)) + wg.Done() + } + if i%1000 == 0 { + t.Logf("Consumed messages: version=%s, index=%d, got=%s", + consVer, i, consMsg2Str(consMsg)) + } + case <-time.After(15 * time.Second): + t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s", + consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value) } } + return nil }) + wg.Wait() // wait for first message to be consumed before starting next consumer + } + if err := g.Wait(); err != nil { + t.Fatal(err) } } diff --git a/go.mod b/go.mod index aeb711931..8911cd6a4 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/xdg-go/scram v1.1.1 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect golang.org/x/net v0.0.0-20220708220712-1185a9018129 + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 59f77638b..3a08aacbb 100644 --- a/go.sum +++ b/go.sum @@ -333,6 +333,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/utils.go b/utils.go index a9e706f10..c8723d381 100644 --- a/utils.go +++ b/utils.go @@ -182,6 +182,7 @@ var ( V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) V3_0_1_0 = newKafkaVersion(3, 0, 1, 0) V3_1_0_0 = newKafkaVersion(3, 1, 0, 0) + V3_2_0_0 = newKafkaVersion(3, 2, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -227,10 +228,31 @@ var ( V3_0_0_0, V3_0_1_0, V3_1_0_0, + V3_2_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_1_0_0 + MaxVersion = V3_2_0_0 DefaultVersion = V1_0_0_0 + + // reduced set of versions to matrix test + fvtRangeVersions = []KafkaVersion{ + V0_8_2_2, + V0_10_2_2, + V1_0_2_0, + V1_1_1_0, + V2_0_1_0, + V2_1_1_0, + V2_2_2_0, + V2_3_1_0, + V2_4_1_0, + V2_5_1_0, + V2_6_2_0, + V2_7_1_0, + V2_8_1_0, + V3_0_1_0, + V3_1_0_0, + V3_2_0_0, + } ) // ParseKafkaVersion parses and returns kafka version or error from a string