Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change consumeFn and batchConsumeFn as pointer signature #62

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
type batchConsumer struct {
*base

consumeFn func([]Message) error
consumeFn func([]*Message) error

messageGroupLimit int
messageGroupDuration time.Duration
Expand All @@ -30,7 +30,7 @@

if cfg.RetryEnabled {
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
return c.consumeFn([]Message{toMessage(message)})
return c.consumeFn([]*Message{toMessage(message)})

Check warning on line 33 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L33

Added line #L33 was not covered by tests
})
}

Expand Down Expand Up @@ -62,7 +62,7 @@
ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()

messages := make([]Message, 0, b.messageGroupLimit)
messages := make([]*Message, 0, b.messageGroupLimit)

for {
select {
Expand All @@ -88,7 +88,7 @@
}
}

func (b *batchConsumer) process(messages []Message) {
func (b *batchConsumer) process(messages []*Message) {
consumeErr := b.consumeFn(messages)

if consumeErr != nil {
Expand Down
32 changes: 16 additions & 16 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ func Test_batchConsumer_startBatch(t *testing.T) {

bc := batchConsumer{
base: &base{
messageCh: make(chan Message),
messageCh: make(chan *Message),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
},
messageGroupLimit: 3,
messageGroupDuration: 500 * time.Millisecond,
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
}
go func() {
// Simulate messageGroupLimit
bc.base.messageCh <- Message{}
bc.base.messageCh <- Message{}
bc.base.messageCh <- Message{}
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.messageCh <- Message{}
bc.base.messageCh <- &Message{}

time.Sleep(1 * time.Second)

Expand All @@ -64,13 +64,13 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func([]Message) error {
consumeFn: func([]*Message) error {
return nil
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
Expand All @@ -85,7 +85,7 @@ func Test_batchConsumer_process(t *testing.T) {
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
Expand All @@ -95,7 +95,7 @@ func Test_batchConsumer_process(t *testing.T) {
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
Expand All @@ -109,13 +109,13 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -130,13 +130,13 @@ func Test_batchConsumer_process(t *testing.T) {
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -151,13 +151,13 @@ func Test_batchConsumer_process(t *testing.T) {
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
type consumer struct {
*base

consumeFn func(Message) error
consumeFn func(*Message) error
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -51,7 +51,7 @@ func (c *consumer) Consume() {
}
}

func (c *consumer) process(message Message) {
func (c *consumer) process(message *Message) {
consumeErr := c.consumeFn(message)

if consumeErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan Message
messageCh chan *Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
Expand Down Expand Up @@ -69,7 +69,7 @@

c := base{
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
messageCh: make(chan *Message, cfg.Concurrency),

Check warning on line 72 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L72

Added line #L72 was not covered by tests
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
Expand Down
7 changes: 4 additions & 3 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ func Test_base_startConsume(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
messageCh: make(chan Message), quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
messageCh: make(chan *Message),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand All @@ -33,7 +34,7 @@ func Test_base_startConsume(t *testing.T) {
t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) {
// Given
mc := mockReader{}
b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan Message)}
b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan *Message)}
b.wg.Add(1)

// When
Expand Down
4 changes: 2 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

type ReaderConfig kafka.ReaderConfig

type BatchConsumeFn func([]Message) error
type BatchConsumeFn func([]*Message) error

type ConsumeFn func(Message) error
type ConsumeFn func(*Message) error

type DialConfig struct {
Timeout time.Duration
Expand Down
20 changes: 10 additions & 10 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ func Test_consumer_process(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return nil
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
Expand All @@ -31,7 +31,7 @@ func Test_consumer_process(t *testing.T) {
gotOnlyOneTimeException := true
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
Expand All @@ -41,7 +41,7 @@ func Test_consumer_process(t *testing.T) {
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
Expand All @@ -55,13 +55,13 @@ func Test_consumer_process(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -76,13 +76,13 @@ func Test_consumer_process(t *testing.T) {
mc := mockCronsumer{}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -97,13 +97,13 @@ func Test_consumer_process(t *testing.T) {
mc := mockCronsumer{wantErr: true}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
// returns error to be sent to dead-letter topic
return errors.New("consumer error")
Expand Down
2 changes: 1 addition & 1 deletion examples/with-grafana/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
ConsumeFn: func(message *kafka.Message) error {
// mocking some background task
time.Sleep(1 * time.Second)

Expand Down
10 changes: 9 additions & 1 deletion examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func main() {
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
Expand All @@ -35,7 +43,7 @@ func main() {

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt
func batchConsumeFn(messages []kafka.Message) error {
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
2 changes: 1 addition & 1 deletion examples/with-kafka-cronsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
2 changes: 1 addition & 1 deletion examples/with-prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Loading