Skip to content

Commit

Permalink
Merge pull request #122 from Trendyol/fix/issue-nontransactional-erro…
Browse files Browse the repository at this point in the history
…rmessage

fix: Transactional Retry false x-error-message bug
  • Loading branch information
dilaragorum authored Mar 28, 2024
2 parents 474c038 + 532d1a4 commit f9bc1d6
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 42 deletions.
20 changes: 15 additions & 5 deletions batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"errors"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -41,9 +42,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.RetryEnabled {
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
return c.consumeFn([]*Message{toMessage(message)})
})
c.base.setupCronsumer(cfg, c.runKonsumerFn)
}

if cfg.APIEnabled {
Expand All @@ -53,6 +52,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error {
msgList := []*Message{toMessage(message)}

err := b.consumeFn(msgList)
if msgList[0].ErrDescription != "" {
err = errors.New(msgList[0].ErrDescription)
}
return err
}

func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
return b.base.GetMetricCollectors()
}
Expand Down Expand Up @@ -176,14 +185,15 @@ func (b *batchConsumer) process(chunkMessages []*Message) {

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages))
errorMessage := consumeErr.Error()
if b.transactionalRetry {
for i := range chunkMessages {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
}
} else {
for i := range chunkMessages {
if chunkMessages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) {
}
}

func Test_batchConsumer_runKonsumerFn(t *testing.T) {
t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("default error")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
return expectedError
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})

t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("message error description")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
messages[0].ErrDescription = "message error description"
return errors.New("default error")
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *consumer) process(message *Message) {
}

if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic)
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
Expand Down
29 changes: 23 additions & 6 deletions examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
Expand All @@ -10,6 +11,16 @@ import (
)

func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"},
})

producer.ProduceBatch(context.Background(), []kafka.Message{
{Key: []byte("key1"), Value: []byte("message1")},
{Key: []byte("key2"), Value: []byte("message2")},
{Key: []byte("key3"), Value: []byte("message3")},
})

consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Expand All @@ -25,11 +36,11 @@ func main() {
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/5 * * * *",
WorkDuration: 4 * time.Minute,
StartTimeCron: "*/1 * * * *",
WorkDuration: 20 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
MessageGroupDuration: 5 * time.Second,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
Expand All @@ -43,13 +54,19 @@ func main() {
<-c
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
if i < 2 {
messages[i].IsFailed = true

var retryCount string
retryCountHeader := messages[i].Header("x-retry-count")
if retryCountHeader != nil {
retryCount = string(retryCountHeader.Value)
}

messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount)
}
}

Expand Down
24 changes: 23 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/segmentio/kafka-go/protocol"
)

const (
errMessageKey = "x-error-message"
)

type Header = protocol.Header

type Message struct {
Expand All @@ -27,6 +31,12 @@ type Message struct {

// IsFailed Is only used on transactional retry disabled
IsFailed bool

// ErrDescription specifies the IsFailed message's error

// If available, kafka-konsumer writes this description into the failed message's
// headers as `x-error-message` key when producing retry topic
ErrDescription string
}

type IncomingMessage struct {
Expand Down Expand Up @@ -63,7 +73,7 @@ func fromKafkaMessage(kafkaMessage *kafka.Message) *Message {
return message
}

func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message {
func (m *Message) toRetryableMessage(retryTopic, consumeError string) kcronsumer.Message {
headers := make([]kcronsumer.Header, 0, len(m.Headers))
for i := range m.Headers {
headers = append(headers, kcronsumer.Header{
Expand All @@ -72,6 +82,18 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message {
})
}

if m.ErrDescription == "" {
headers = append(headers, kcronsumer.Header{
Key: errMessageKey,
Value: []byte(consumeError),
})
} else {
headers = append(headers, kcronsumer.Header{
Key: errMessageKey,
Value: []byte(m.ErrDescription),
})
}

return kcronsumer.NewMessageBuilder().
WithKey(m.Key).
WithValue(m.Value).
Expand Down
134 changes: 134 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"testing"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"

"github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -113,3 +115,135 @@ func TestMessage_RemoveHeader(t *testing.T) {
t.Fatalf("Header length must be equal to 0")
}
}

func TestMessage_toRetryableMessage(t *testing.T) {
t.Run("When_error_description_exist", func(t *testing.T) {
// Given
message := Message{
Key: []byte("key"),
Value: []byte("value"),
Headers: []Header{
{
Key: "x-custom-client-header",
Value: []byte("bar"),
},
},
ErrDescription: "some error description",
}
expected := kcronsumer.Message{
Topic: "retry-topic",
Key: []byte("key"),
Value: []byte("value"),
Headers: []kcronsumer.Header{
{
Key: "x-custom-client-header",
Value: []byte("bar"),
},
{
Key: "x-error-message",
Value: []byte("some error description"),
},
},
}

// When
actual := message.toRetryableMessage("retry-topic", "consumeFn error")

// Then
if actual.Topic != expected.Topic {
t.Errorf("topic must be %q", expected.Topic)
}

if !bytes.Equal(actual.Key, expected.Key) {
t.Errorf("Key must be equal to %q", string(expected.Key))
}

if !bytes.Equal(actual.Value, expected.Value) {
t.Errorf("Value must be equal to %q", string(expected.Value))
}

if len(actual.Headers) != 2 {
t.Error("Header length must be equal to 2")
}

if actual.Headers[0].Key != expected.Headers[0].Key {
t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key)
}

if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) {
t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value)
}

if actual.Headers[1].Key != expected.Headers[1].Key {
t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key)
}

if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) {
t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value)
}
})
t.Run("When_error_description_does_not_exist", func(t *testing.T) {
// Given
message := Message{
Key: []byte("key"),
Value: []byte("value"),
Headers: []Header{
{
Key: "x-custom-client-header",
Value: []byte("bar"),
},
},
}
expected := kcronsumer.Message{
Topic: "retry-topic",
Key: []byte("key"),
Value: []byte("value"),
Headers: []kcronsumer.Header{
{
Key: "x-custom-client-header",
Value: []byte("bar"),
},
{
Key: "x-error-message",
Value: []byte("consumeFn error"),
},
},
}

// When
actual := message.toRetryableMessage("retry-topic", "consumeFn error")

// Then
if actual.Topic != expected.Topic {
t.Errorf("topic must be %q", expected.Topic)
}

if !bytes.Equal(actual.Key, expected.Key) {
t.Errorf("Key must be equal to %q", string(expected.Key))
}

if !bytes.Equal(actual.Value, expected.Value) {
t.Errorf("Value must be equal to %q", string(expected.Value))
}

if len(actual.Headers) != 2 {
t.Error("Header length must be equal to 2")
}

if actual.Headers[0].Key != expected.Headers[0].Key {
t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key)
}

if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) {
t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value)
}

if actual.Headers[1].Key != expected.Headers[1].Key {
t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key)
}

if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) {
t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value)
}
})
}
2 changes: 1 addition & 1 deletion test/integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.8"
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v23.2.4
image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64
container_name: redpanda-1
command:
- redpanda
Expand Down
Loading

0 comments on commit f9bc1d6

Please sign in to comment.