Skip to content

Commit

Permalink
feat(kafka): enqueue commit offset only once per batch process (#14278)
Browse files Browse the repository at this point in the history
Co-authored-by: George Robinson <george.robinson@grafana.com>
  • Loading branch information
cyriltovena and grobinson-grafana authored Sep 26, 2024
1 parent c874d2c commit beca6f3
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 18 deletions.
69 changes: 56 additions & 13 deletions pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ingester

import (
"context"
"errors"
math "math"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -39,24 +41,26 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics {

func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory {
metrics := newConsumerMetrics(reg)
return func(_ partition.Committer) (partition.Consumer, error) {
return func(committer partition.Committer) (partition.Consumer, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &kafkaConsumer{
pusher: pusher,
logger: logger,
decoder: decoder,
metrics: metrics,
pusher: pusher,
logger: logger,
decoder: decoder,
metrics: metrics,
committer: committer,
}, nil
}
}

type kafkaConsumer struct {
pusher logproto.PusherServer
logger log.Logger
decoder *kafka.Decoder
pusher logproto.PusherServer
logger log.Logger
decoder *kafka.Decoder
committer partition.Committer

metrics *consumerMetrics
}
Expand All @@ -72,14 +76,14 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti
level.Info(kc.logger).Log("msg", "shutting down kafka consumer")
return
case records := <-recordsChan:
kc.consume(records)
kc.consume(ctx, records)
}
}
}()
return wg.Wait
}

func (kc *kafkaConsumer) consume(records []partition.Record) {
func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record) {
if len(records) == 0 {
return
}
Expand All @@ -101,13 +105,52 @@ func (kc *kafkaConsumer) consume(records []partition.Record) {
level.Error(kc.logger).Log("msg", "failed to decode record", "error", err)
continue
}
ctx := user.InjectOrgID(record.Ctx, record.TenantID)
if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{
recordCtx := user.InjectOrgID(record.Ctx, record.TenantID)
req := &logproto.PushRequest{
Streams: []logproto.Stream{stream},
}
if err := retryWithBackoff(ctx, func(attempts int) error {
if _, err := kc.pusher.Push(recordCtx, req); err != nil {
level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts)
return err
}
return nil
}); err != nil {
level.Error(kc.logger).Log("msg", "failed to push records", "error", err)
level.Error(kc.logger).Log("msg", "exhausted all retry attempts, failed to push records", "err", err, "offset", record.Offset)
}
kc.committer.EnqueueOffset(record.Offset)
}
kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
kc.metrics.currentOffset.Set(float64(maxOffset))
}

func canRetry(err error) bool {
return errors.Is(err, ErrReadOnly)
}

func retryWithBackoff(ctx context.Context, fn func(attempts int) error) error {
err := fn(0)
if err == nil {
return nil
}
if !canRetry(err) {
return err
}
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
MaxRetries: 0, // Retry infinitely
})
backoff.Wait()
for backoff.Ongoing() {
err = fn(backoff.NumRetries())
if err == nil {
return nil
}
if !canRetry(err) {
return err
}
backoff.Wait()
}
return backoff.Err()
}
2 changes: 2 additions & 0 deletions pkg/ingester/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logpr

type noopCommitter struct{}

func (nc *noopCommitter) EnqueueOffset(_ int64) {}

func (noopCommitter) Commit(_ context.Context, _ int64) error { return nil }

func TestConsumer(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions pkg/kafka/ingester/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (m *mockCommitter) Commit(_ context.Context, offset int64) error {
return nil
}

func (m *mockCommitter) EnqueueOffset(offset int64) {
// For testing purposes, we'll just set the committed offset directly
m.committed = offset
}

func TestConsumer_PeriodicFlush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -46,7 +51,7 @@ func TestConsumer_PeriodicFlush(t *testing.T) {
flushInterval := 100 * time.Millisecond
maxFlushSize := int64(1000)

committer := &mockCommitter{}
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
Expand Down Expand Up @@ -99,7 +104,7 @@ func TestConsumer_ShutdownFlush(t *testing.T) {
flushInterval := 1 * time.Hour
maxFlushSize := int64(1000)

committer := &mockCommitter{}
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
Expand Down Expand Up @@ -153,7 +158,7 @@ func TestConsumer_MaxFlushSize(t *testing.T) {
flushInterval := 1 * time.Hour
maxFlushSize := int64(10)

committer := &mockCommitter{}
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
// Committer defines an interface for committing offsets
type Committer interface {
Commit(ctx context.Context, offset int64) error
EnqueueOffset(offset int64)
}

// partitionCommitter is responsible for committing offsets for a specific Kafka partition
Expand Down Expand Up @@ -113,7 +114,7 @@ func (r *partitionCommitter) autoCommitLoop(ctx context.Context) {
}
}

func (r *partitionCommitter) enqueueOffset(o int64) {
func (r *partitionCommitter) EnqueueOffset(o int64) {
if r.kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 {
r.toCommit.Store(o)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (p *Reader) startFetchLoop(ctx context.Context) <-chan []Record {
return
default:
records <- p.poll(ctx)
p.committer.enqueueOffset(p.lastProcessedOffset)
}
}
}()
Expand Down

0 comments on commit beca6f3

Please sign in to comment.