From 18a21a234ff2a5f6e09f721839224ff82def0850 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Mon, 5 Aug 2024 11:32:52 +0300 Subject: [PATCH 1/3] feat: enhance Message Could not read error with topic and cg --- consumer_base.go | 4 +++- consumer_base_test.go | 1 + examples/with-standalone-consumer/main.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/consumer_base.go b/consumer_base.go index 87a3bbd..1fba290 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -84,6 +84,7 @@ type base struct { consumerState state metricPrefix string mu sync.Mutex + consumerCfg *ConsumerConfig } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -138,6 +139,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, metricPrefix: cfg.MetricPrefix, mu: sync.Mutex{}, + consumerCfg: cfg, } if cfg.DistributedTracingEnabled { @@ -201,7 +203,7 @@ func (c *base) startConsume() { } c.metric.TotalErrorCountDuringFetchingMessage++ - c.logger.Warnf("Message could not read, err %s", err.Error()) + c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID) continue } diff --git a/consumer_base_test.go b/consumer_base_test.go index c0c2c70..40e0000 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -23,6 +23,7 @@ func Test_base_startConsume(t *testing.T) { logger: NewZapLogger(LogLevelError), consumerState: stateRunning, metric: &ConsumerMetric{}, + consumerCfg: &ConsumerConfig{}, } b.context, b.cancelFn = context.WithCancel(context.Background()) diff --git a/examples/with-standalone-consumer/main.go b/examples/with-standalone-consumer/main.go index d4f9f9e..37f7e8c 100644 --- a/examples/with-standalone-consumer/main.go +++ b/examples/with-standalone-consumer/main.go @@ -12,7 +12,7 @@ func main() { Concurrency: 1, Reader: kafka.ReaderConfig{ Brokers: []string{"localhost:29092"}, - Topic: "standart-topic", + Topic: "standart-topicxxxx", GroupID: "standart-cg", }, RetryEnabled: false, From b47cbb711d078e8eaac9a014c0a8d62d6f167174 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Mon, 5 Aug 2024 11:33:28 +0300 Subject: [PATCH 2/3] chore: remove prefix xxx from example topic --- examples/with-standalone-consumer/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/with-standalone-consumer/main.go b/examples/with-standalone-consumer/main.go index 37f7e8c..d4f9f9e 100644 --- a/examples/with-standalone-consumer/main.go +++ b/examples/with-standalone-consumer/main.go @@ -12,7 +12,7 @@ func main() { Concurrency: 1, Reader: kafka.ReaderConfig{ Brokers: []string{"localhost:29092"}, - Topic: "standart-topicxxxx", + Topic: "standart-topic", GroupID: "standart-cg", }, RetryEnabled: false, From 76a6a0cd91ed3a3046af9d63e8ee0d500c203f32 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Mon, 5 Aug 2024 11:39:51 +0300 Subject: [PATCH 3/3] chore: fix lint --- batch_consumer_test.go | 6 +++--- consumer_base.go | 1 + consumer_config_test.go | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 9cccaae..0981922 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -34,7 +34,7 @@ func Test_batchConsumer_startBatch(t *testing.T) { concurrency: 1, }, messageGroupLimit: 3, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { numberOfBatch++ return nil }, @@ -101,7 +101,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) { concurrency: 1, }, messageGroupLimit: 2, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { numberOfBatch++ return nil }, @@ -179,7 +179,7 @@ func Test_batchConsumer_process(t *testing.T) { gotOnlyOneTimeException := true bc := batchConsumer{ base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)}, - consumeFn: func(messages []*Message) error { + consumeFn: func(_ []*Message) error { if gotOnlyOneTimeException { gotOnlyOneTimeException = false return errors.New("simulate only one time exception") diff --git a/consumer_base.go b/consumer_base.go index 1fba290..083e2df 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -203,6 +203,7 @@ func (c *base) startConsume() { } c.metric.TotalErrorCountDuringFetchingMessage++ + //nolint:lll c.logger.Warnf("Message could not read, err %s, from topics %s with consumer group %s", err.Error(), c.consumerCfg.getTopics(), c.consumerCfg.Reader.GroupID) continue } diff --git a/consumer_config_test.go b/consumer_config_test.go index 97e03ad..98a8333 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -98,7 +98,7 @@ func TestConsumerConfig_newCronsumerConfig(t *testing.T) { // Given cfg := ConsumerConfig{ RetryConfiguration: RetryConfiguration{ - SkipMessageByHeaderFn: func(headers []Header) bool { + SkipMessageByHeaderFn: func(_ []Header) bool { return false }, },