From af8af575da9ce8a04357a0c6eaa8f5038c5cd161 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sat, 4 Nov 2023 19:12:02 +0300 Subject: [PATCH] feat: change consumeFn and batchConsumeFn as pointer signature --- batch_consumer.go | 8 +- batch_consumer_test.go | 32 +++--- consumer.go | 4 +- consumer_base.go | 4 +- consumer_base_test.go | 7 +- consumer_config.go | 4 +- consumer_test.go | 20 ++-- examples/with-deadletter/main.go | 2 +- examples/with-grafana/main.go | 2 +- examples/with-kafka-batch-consumer/main.go | 10 +- examples/with-kafka-cronsumer/main.go | 2 +- examples/with-prometheus/main.go | 2 +- examples/with-sasl-plaintext/go.mod | 24 ++--- examples/with-sasl-plaintext/go.sum | 16 +++ examples/with-sasl-plaintext/main.go | 2 +- examples/with-standalone-consumer/main.go | 2 +- examples/with-tracing/main.go | 2 +- message.go | 26 +++-- message_test.go | 114 ++++++++++++++------- test/integration/go.mod | 12 +-- test/integration/go.sum | 24 ++--- test/integration/integration_test.go | 73 ++++++++++++- 22 files changed, 268 insertions(+), 124 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 82de420..8a3ecf8 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -9,7 +9,7 @@ import ( type batchConsumer struct { *base - consumeFn func([]Message) error + consumeFn func([]*Message) error messageGroupLimit int messageGroupDuration time.Duration @@ -30,7 +30,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { if cfg.RetryEnabled { c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.consumeFn([]Message{toMessage(message)}) + return c.consumeFn([]*Message{toMessage(message)}) }) } @@ -62,7 +62,7 @@ func (b *batchConsumer) startBatch() { ticker := time.NewTicker(b.messageGroupDuration) defer ticker.Stop() - messages := make([]Message, 0, b.messageGroupLimit) + messages := make([]*Message, 0, b.messageGroupLimit) for { select { @@ -88,7 +88,7 @@ func (b *batchConsumer) startBatch() { } } -func (b *batchConsumer) process(messages []Message) { +func (b *batchConsumer) process(messages []*Message) { consumeErr := b.consumeFn(messages) if consumeErr != nil { diff --git a/batch_consumer_test.go b/batch_consumer_test.go index aabdcfa..ccd5918 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -17,27 +17,27 @@ func Test_batchConsumer_startBatch(t *testing.T) { bc := batchConsumer{ base: &base{ - messageCh: make(chan Message), + messageCh: make(chan *Message), metric: &ConsumerMetric{}, wg: sync.WaitGroup{}, }, messageGroupLimit: 3, messageGroupDuration: 500 * time.Millisecond, - consumeFn: func(messages []Message) error { + consumeFn: func(messages []*Message) error { numberOfBatch++ return nil }, } go func() { // Simulate messageGroupLimit - bc.base.messageCh <- Message{} - bc.base.messageCh <- Message{} - bc.base.messageCh <- Message{} + bc.base.messageCh <- &Message{} + bc.base.messageCh <- &Message{} + bc.base.messageCh <- &Message{} time.Sleep(1 * time.Second) // Simulate messageGroupDuration - bc.base.messageCh <- Message{} + bc.base.messageCh <- &Message{} time.Sleep(1 * time.Second) @@ -64,13 +64,13 @@ func Test_batchConsumer_process(t *testing.T) { // Given bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}}, - consumeFn: func([]Message) error { + consumeFn: func([]*Message) error { return nil }, } // When - bc.process([]Message{{}, {}, {}}) + bc.process([]*Message{{}, {}, {}}) // Then if bc.metric.TotalProcessedMessagesCounter != 3 { @@ -85,7 +85,7 @@ func Test_batchConsumer_process(t *testing.T) { gotOnlyOneTimeException := true bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(messages []Message) error { + consumeFn: func(messages []*Message) error { if gotOnlyOneTimeException { gotOnlyOneTimeException = false return errors.New("simulate only one time exception") @@ -95,7 +95,7 @@ func Test_batchConsumer_process(t *testing.T) { } // When - bc.process([]Message{{}, {}, {}}) + bc.process([]*Message{{}, {}, {}}) // Then if bc.metric.TotalProcessedMessagesCounter != 3 { @@ -109,13 +109,13 @@ func Test_batchConsumer_process(t *testing.T) { // Given bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(messages []Message) error { + consumeFn: func(messages []*Message) error { return errors.New("error case") }, } // When - bc.process([]Message{{}, {}, {}}) + bc.process([]*Message{{}, {}, {}}) // Then if bc.metric.TotalProcessedMessagesCounter != 0 { @@ -130,13 +130,13 @@ func Test_batchConsumer_process(t *testing.T) { mc := mockCronsumer{} bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, - consumeFn: func(messages []Message) error { + consumeFn: func(messages []*Message) error { return errors.New("error case") }, } // When - bc.process([]Message{{}, {}, {}}) + bc.process([]*Message{{}, {}, {}}) // Then if bc.metric.TotalProcessedMessagesCounter != 0 { @@ -151,13 +151,13 @@ func Test_batchConsumer_process(t *testing.T) { mc := mockCronsumer{wantErr: true} bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, - consumeFn: func(messages []Message) error { + consumeFn: func(messages []*Message) error { return errors.New("error case") }, } // When - bc.process([]Message{{}, {}, {}}) + bc.process([]*Message{{}, {}, {}}) // Then if bc.metric.TotalProcessedMessagesCounter != 0 { diff --git a/consumer.go b/consumer.go index fa981bc..6c5c6b8 100644 --- a/consumer.go +++ b/consumer.go @@ -7,7 +7,7 @@ import ( type consumer struct { *base - consumeFn func(Message) error + consumeFn func(*Message) error } func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -51,7 +51,7 @@ func (c *consumer) Consume() { } } -func (c *consumer) process(message Message) { +func (c *consumer) process(message *Message) { consumeErr := c.consumeFn(message) if consumeErr != nil { diff --git a/consumer_base.go b/consumer_base.go index da1a59c..1ff1365 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -36,7 +36,7 @@ type base struct { logger LoggerInterface metric *ConsumerMetric context context.Context - messageCh chan Message + messageCh chan *Message quit chan struct{} cancelFn context.CancelFunc r Reader @@ -69,7 +69,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) { c := base{ metric: &ConsumerMetric{}, - messageCh: make(chan Message, cfg.Concurrency), + messageCh: make(chan *Message, cfg.Concurrency), quit: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, diff --git a/consumer_base_test.go b/consumer_base_test.go index 214bcf0..c63ae2a 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -15,8 +15,9 @@ func Test_base_startConsume(t *testing.T) { mc := mockReader{wantErr: true} b := base{ wg: sync.WaitGroup{}, r: &mc, - messageCh: make(chan Message), quit: make(chan struct{}), - logger: NewZapLogger(LogLevelDebug), + messageCh: make(chan *Message), + quit: make(chan struct{}), + logger: NewZapLogger(LogLevelDebug), } b.context, b.cancelFn = context.WithCancel(context.Background()) @@ -33,7 +34,7 @@ func Test_base_startConsume(t *testing.T) { t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) { // Given mc := mockReader{} - b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan Message)} + b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan *Message)} b.wg.Add(1) // When diff --git a/consumer_config.go b/consumer_config.go index 468232f..79ce01a 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -15,9 +15,9 @@ import ( type ReaderConfig kafka.ReaderConfig -type BatchConsumeFn func([]Message) error +type BatchConsumeFn func([]*Message) error -type ConsumeFn func(Message) error +type ConsumeFn func(*Message) error type DialConfig struct { Timeout time.Duration diff --git a/consumer_test.go b/consumer_test.go index 89b5d95..65a1813 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -10,13 +10,13 @@ func Test_consumer_process(t *testing.T) { // Given c := consumer{ base: &base{metric: &ConsumerMetric{}}, - consumeFn: func(Message) error { + consumeFn: func(*Message) error { return nil }, } // When - c.process(Message{}) + c.process(&Message{}) // Then if c.metric.TotalProcessedMessagesCounter != 1 { @@ -31,7 +31,7 @@ func Test_consumer_process(t *testing.T) { gotOnlyOneTimeException := true c := consumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(Message) error { + consumeFn: func(*Message) error { if gotOnlyOneTimeException { gotOnlyOneTimeException = false return errors.New("simulate only one time exception") @@ -41,7 +41,7 @@ func Test_consumer_process(t *testing.T) { } // When - c.process(Message{}) + c.process(&Message{}) // Then if c.metric.TotalProcessedMessagesCounter != 1 { @@ -55,13 +55,13 @@ func Test_consumer_process(t *testing.T) { // Given c := consumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(Message) error { + consumeFn: func(*Message) error { return errors.New("error case") }, } // When - c.process(Message{}) + c.process(&Message{}) // Then if c.metric.TotalProcessedMessagesCounter != 0 { @@ -76,13 +76,13 @@ func Test_consumer_process(t *testing.T) { mc := mockCronsumer{} c := consumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, - consumeFn: func(Message) error { + consumeFn: func(*Message) error { return errors.New("error case") }, } // When - c.process(Message{}) + c.process(&Message{}) // Then if c.metric.TotalProcessedMessagesCounter != 0 { @@ -97,13 +97,13 @@ func Test_consumer_process(t *testing.T) { mc := mockCronsumer{wantErr: true} c := consumer{ base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, - consumeFn: func(Message) error { + consumeFn: func(*Message) error { return errors.New("error case") }, } // When - c.process(Message{}) + c.process(&Message{}) // Then if c.metric.TotalProcessedMessagesCounter != 0 { diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go index fce7eb0..ede7246 100644 --- a/examples/with-deadletter/main.go +++ b/examples/with-deadletter/main.go @@ -59,7 +59,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) // returns error to be sent to dead-letter topic return errors.New("consumer error") diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index 82a0605..3fd9063 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -70,7 +70,7 @@ func main() { WorkDuration: 50 * time.Second, MaxRetry: 3, }, - ConsumeFn: func(message kafka.Message) error { + ConsumeFn: func(message *kafka.Message) error { // mocking some background task time.Sleep(1 * time.Second) diff --git a/examples/with-kafka-batch-consumer/main.go b/examples/with-kafka-batch-consumer/main.go index 9982031..bbd550f 100644 --- a/examples/with-kafka-batch-consumer/main.go +++ b/examples/with-kafka-batch-consumer/main.go @@ -20,6 +20,14 @@ func main() { MessageGroupDuration: time.Second, BatchConsumeFn: batchConsumeFn, }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, } consumer, _ := kafka.NewConsumer(consumerCfg) @@ -35,7 +43,7 @@ func main() { // In order to load topic with data, use: // kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt -func batchConsumeFn(messages []kafka.Message) error { +func batchConsumeFn(messages []*kafka.Message) error { fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value) return nil } diff --git a/examples/with-kafka-cronsumer/main.go b/examples/with-kafka-cronsumer/main.go index 5314e59..4b97ee3 100644 --- a/examples/with-kafka-cronsumer/main.go +++ b/examples/with-kafka-cronsumer/main.go @@ -39,7 +39,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) return nil } diff --git a/examples/with-prometheus/main.go b/examples/with-prometheus/main.go index 5889c6d..81e793e 100644 --- a/examples/with-prometheus/main.go +++ b/examples/with-prometheus/main.go @@ -41,7 +41,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) return nil } diff --git a/examples/with-sasl-plaintext/go.mod b/examples/with-sasl-plaintext/go.mod index 125416b..e2e11fa 100644 --- a/examples/with-sasl-plaintext/go.mod +++ b/examples/with-sasl-plaintext/go.mod @@ -7,33 +7,33 @@ replace github.com/Trendyol/kafka-konsumer => ../.. require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 require ( - github.com/Trendyol/kafka-cronsumer v1.0.0 // indirect + github.com/Trendyol/kafka-cronsumer v1.3.4 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect - github.com/gofiber/fiber/v2 v2.44.0 // indirect + github.com/gofiber/fiber/v2 v2.48.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.16.5 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.18 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/philhofer/fwd v1.1.2 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect - github.com/prometheus/client_golang v1.15.1 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect - github.com/segmentio/kafka-go v0.4.40 // indirect + github.com/segmentio/kafka-go v0.4.43 // indirect github.com/tinylib/msgp v1.1.8 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.47.0 // indirect + github.com/valyala/fasthttp v1.48.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -42,7 +42,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/examples/with-sasl-plaintext/go.sum b/examples/with-sasl-plaintext/go.sum index 09872cd..cb064c1 100644 --- a/examples/with-sasl-plaintext/go.sum +++ b/examples/with-sasl-plaintext/go.sum @@ -1,5 +1,6 @@ github.com/Trendyol/kafka-cronsumer v1.0.0 h1:35ZaChsCPmesBPY81zSUZWhKkCChJbgdfzH/XoxVT28= github.com/Trendyol/kafka-cronsumer v1.0.0/go.mod h1:GzV1DUvjUTco+Qk4zR2GLfWblFszKTuYY9Epx8d7ROM= +github.com/Trendyol/kafka-cronsumer v1.3.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -14,6 +15,7 @@ github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9 github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= github.com/gofiber/fiber/v2 v2.44.0 h1:Z90bEvPcJM5GFJnu1py0E1ojoerkyew3iiNJ78MQCM8= github.com/gofiber/fiber/v2 v2.44.0/go.mod h1:VTMtb/au8g01iqvHyaCzftuM/xmZgKOZCtFzz6CdV9w= +github.com/gofiber/fiber/v2 v2.48.0/go.mod h1:xqJgfqrc23FJuqGOW6DVgi3HyZEm2Mn9pRqUb2kHSX8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -26,11 +28,13 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -41,17 +45,22 @@ github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2 github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -64,6 +73,7 @@ github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1Avp github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/segmentio/kafka-go v0.4.40 h1:sszW7c0/uyv7+VcTW5trx2ZC7kMWDTxuR/6Zn8U1bm8= github.com/segmentio/kafka-go v0.4.40/go.mod h1:naFEZc5MQKdeL3W6NkZIAn48Y6AazqjRFDhnXeg3h94= +github.com/segmentio/kafka-go v0.4.43/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -76,6 +86,7 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c= github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -106,6 +117,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -122,17 +134,21 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/examples/with-sasl-plaintext/main.go b/examples/with-sasl-plaintext/main.go index b742d07..df0f5ff 100644 --- a/examples/with-sasl-plaintext/main.go +++ b/examples/with-sasl-plaintext/main.go @@ -39,7 +39,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) return nil } diff --git a/examples/with-standalone-consumer/main.go b/examples/with-standalone-consumer/main.go index f192d17..25cb6ba 100644 --- a/examples/with-standalone-consumer/main.go +++ b/examples/with-standalone-consumer/main.go @@ -32,7 +32,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) return nil } diff --git a/examples/with-tracing/main.go b/examples/with-tracing/main.go index 2c3b1b2..3fb783f 100644 --- a/examples/with-tracing/main.go +++ b/examples/with-tracing/main.go @@ -70,7 +70,7 @@ func main() { <-c } -func consumeFn(message kafka.Message) error { +func consumeFn(message *kafka.Message) error { fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) tr := otel.Tracer("consumer") diff --git a/message.go b/message.go index bc79210..1119eae 100644 --- a/message.go +++ b/message.go @@ -9,6 +9,8 @@ import ( "github.com/segmentio/kafka-go/protocol" ) +type Header = protocol.Header + type Message struct { Topic string Partition int @@ -16,7 +18,7 @@ type Message struct { HighWaterMark int64 Key []byte Value []byte - Headers []kafka.Header + Headers []Header WriterData interface{} Time time.Time // Context To enable distributed tracing support @@ -37,8 +39,8 @@ func (m *Message) toKafkaMessage() kafka.Message { } } -func fromKafkaMessage(message *kafka.Message) Message { - return Message{ +func fromKafkaMessage(message *kafka.Message) *Message { + return &Message{ Topic: message.Topic, Partition: message.Partition, Offset: message.Offset, @@ -71,7 +73,7 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { Build() } -func toMessage(message kcronsumer.Message) Message { +func toMessage(message kcronsumer.Message) *Message { headers := make([]protocol.Header, 0, len(message.Headers)) for i := range message.Headers { headers = append(headers, protocol.Header{ @@ -80,7 +82,7 @@ func toMessage(message kcronsumer.Message) Message { }) } - return Message{ + return &Message{ Topic: message.Topic, Partition: message.Partition, Offset: message.Offset, @@ -102,11 +104,19 @@ func (m *Message) Header(key string) *kafka.Header { return nil } -func (m *Message) AddHeader(header ...kafka.Header) { - m.Headers = append(m.Headers, header...) +// AddHeader works as a idempotent function +func (m *Message) AddHeader(header Header) { + for i := range m.Headers { + if m.Headers[i].Key == header.Key { + m.Headers[i].Value = header.Value + return + } + } + + m.Headers = append(m.Headers, header) } -func (m *Message) RemoveHeader(header kafka.Header) { +func (m *Message) RemoveHeader(header Header) { for i, h := range m.Headers { if h.Key == header.Key { m.Headers = append(m.Headers[:i], m.Headers[i+1:]...) diff --git a/message_test.go b/message_test.go index 9e5bd28..ee10e58 100644 --- a/message_test.go +++ b/message_test.go @@ -8,48 +8,92 @@ import ( ) func TestMessage_Header(t *testing.T) { - // Given - m := Message{ - Headers: []kafka.Header{ - {Key: "foo", Value: []byte("fooValue")}, - {Key: "another", Value: []byte("anotherValue")}, - }, - } + t.Run("When_Header_Exist", func(t *testing.T) { + // Given + m := Message{ + Headers: []kafka.Header{ + {Key: "foo", Value: []byte("fooValue")}, + {Key: "another", Value: []byte("anotherValue")}, + }, + } - // When - header := m.Header("foo") + // When + header := m.Header("foo") - // Then - if header.Key != "foo" { - t.Fatalf("Header must be equal to foo") - } - if !bytes.Equal(header.Value, []byte("fooValue")) { - t.Fatalf("Header value must be equal to fooValue") - } + // Then + if header.Key != "foo" { + t.Fatalf("Header must be equal to foo") + } + if !bytes.Equal(header.Value, []byte("fooValue")) { + t.Fatalf("Header value must be equal to fooValue") + } + }) + t.Run("When_Header_Does_Not_Exist", func(t *testing.T) { + // Given + m := Message{ + Headers: []kafka.Header{ + {Key: "foo", Value: []byte("fooValue")}, + {Key: "another", Value: []byte("anotherValue")}, + }, + } + + // When + header := m.Header("notexist") + + // Then + if header != nil { + t.Fatalf("Header must be equal to nil") + } + }) } func TestMessage_AddHeader(t *testing.T) { - // Given - m := Message{ - Headers: []kafka.Header{ - {Key: "foo", Value: []byte("fooValue")}, - }, - } + t.Run("When_New_Header_Comes", func(t *testing.T) { + // Given + m := Message{ + Headers: []kafka.Header{ + {Key: "foo", Value: []byte("fooValue")}, + }, + } - // When - m.AddHeader(kafka.Header{Key: "bar", Value: []byte("barValue")}) + // When + m.AddHeader(kafka.Header{Key: "bar", Value: []byte("barValue")}) - // Then - headers := m.Headers - if len(headers) != 2 { - t.Fatalf("Header length must be equal to 2") - } - if headers[1].Key != "bar" { - t.Fatalf("Header key must be equal to bar") - } - if !bytes.Equal(headers[1].Value, []byte("barValue")) { - t.Fatalf("Header value must be equal to barValue") - } + // Then + headers := m.Headers + if len(headers) != 2 { + t.Fatalf("Header length must be equal to 2") + } + if headers[1].Key != "bar" { + t.Fatalf("Header key must be equal to bar") + } + if !bytes.Equal(headers[1].Value, []byte("barValue")) { + t.Fatalf("Header value must be equal to barValue") + } + }) + t.Run("When_Same_Header_Comes", func(t *testing.T) { + // Given + m := Message{ + Headers: []kafka.Header{ + {Key: "foo", Value: []byte("fooValue")}, + }, + } + + // When + m.AddHeader(kafka.Header{Key: "foo", Value: []byte("barValue")}) + + // Then + headers := m.Headers + if len(headers) != 1 { + t.Fatalf("Header length must be equal to 1") + } + if headers[0].Key != "foo" { + t.Fatalf("Header key must be equal to foo") + } + if !bytes.Equal(headers[0].Value, []byte("barValue")) { + t.Fatalf("Header value must be equal to barValue") + } + }) } func TestMessage_RemoveHeader(t *testing.T) { diff --git a/test/integration/go.mod b/test/integration/go.mod index e5292b3..e70f23e 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,8 +10,8 @@ require ( ) require ( - github.com/Trendyol/otel-kafka-konsumer v0.0.0 // indirect - github.com/Trendyol/kafka-cronsumer v1.3.3 // indirect + github.com/Trendyol/kafka-cronsumer v1.3.4 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -40,13 +40,13 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect - go.opentelemetry.io/otel v1.18.0 // indirect - go.opentelemetry.io/otel/metric v1.18.0 // indirect - go.opentelemetry.io/otel/trace v1.18.0 // indirect + go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/sys v0.10.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index c582be8..7d89770 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,7 +1,7 @@ -github.com/Trendyol/otel-kafka-konsumer v0.0.0 h1:5iLegl3ZoOVAIgua/ne5OrTwYITLz+4EhbK7i/orXOw= -github.com/Trendyol/otel-kafka-konsumer v0.0.0/go.mod h1:SOtgXp7znhCuI/+F4by91/0JWLfFqcZyYd1x9EtoxUU= -github.com/Trendyol/kafka-cronsumer v1.3.3 h1:KNlX/L4bDkpNAXTqHUQNrnkC/kauL6djKp0tuOFny8Y= -github.com/Trendyol/kafka-cronsumer v1.3.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.3.4 h1:H1PmXfNtzCQm6pYsERUHlSTaib/WaICES+GJvl2RX8U= +github.com/Trendyol/kafka-cronsumer v1.3.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= +github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= @@ -82,12 +82,12 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= -go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= -go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= -go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= -go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= -go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= @@ -114,8 +114,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 7a2af80..e33756b 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -1,6 +1,7 @@ package integration import ( + "bytes" "context" "errors" "fmt" @@ -74,11 +75,11 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) { conn, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{{Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}) defer cleanUp() - messageCh := make(chan kafka.Message) + messageCh := make(chan *kafka.Message) consumerCfg := &kafka.ConsumerConfig{ Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, - ConsumeFn: func(message kafka.Message) error { + ConsumeFn: func(message *kafka.Message) error { messageCh <- message return nil }, @@ -128,7 +129,7 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { BatchConfiguration: &kafka.BatchConfiguration{ MessageGroupLimit: 100, MessageGroupDuration: time.Second, - BatchConsumeFn: func(messages []kafka.Message) error { + BatchConsumeFn: func(messages []*kafka.Message) error { messagesLen <- len(messages) return nil }, @@ -178,7 +179,7 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { MaxRetry: 3, LogLevel: "error", }, - ConsumeFn: func(message kafka.Message) error { + ConsumeFn: func(message *kafka.Message) error { return errors.New("err occurred") }, LogLevel: kafka.LogLevelError, @@ -199,6 +200,70 @@ func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { assertEventually(t, conditionFunc, 45*time.Second, time.Second) } +func Test_Should_Progate_Custom_Headers_With_Kafka_Cronsumer_Successfully(t *testing.T) { + // Given + topic := "cronsumer-header-topic" + consumerGroup := "cronsumer-header-cg" + brokerAddress := "localhost:9092" + + retryTopic := "exception-topic" + + _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{ + {Topic: topic, Key: []byte("1"), Value: []byte(`foo`)}}, + ) + defer cleanUp() + + retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil) + defer cleanUpThisToo() + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{brokerAddress}, + Topic: retryTopic, + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + LogLevel: "error", + }, + ConsumeFn: func(message *kafka.Message) error { + message.AddHeader(kafka.Header{Key: "custom_exception_header", Value: []byte("custom_exception_value")}) + + return errors.New("err occurred") + }, + LogLevel: kafka.LogLevelError, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + var expectedOffset int64 = 1 + conditionFunc := func() bool { + lastOffset, _ := retryConn.ReadLastOffset() + return lastOffset == expectedOffset + } + + assertEventually(t, conditionFunc, 45*time.Second, time.Second) + msg, err := retryConn.ReadMessage(10_000) + if err != nil { + t.Fatalf("error reading message") + } + if len(msg.Headers) != 1 { + t.Fatalf("msg header must be length of 1") + } + if msg.Headers[0].Key != "custom_exception_header" { + t.Fatalf("key must be custom_exception_header") + } + if !bytes.Equal(msg.Headers[0].Value, []byte("custom_exception_value")) { + t.Fatalf("value must be custom_exception_value") + } + _ = msg +} + func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) { t.Helper()