Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail Kafka target #4568

Merged
merged 22 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
32 changes: 32 additions & 0 deletions clients/cmd/promtail/promtail-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka
kafka:
use_incoming_timestamp: false
brokers:
- host.docker.internal:50705
group_id: some_group
topics:
- foo
- ^promtail.*
labels:
job: kafka
relabel_configs:
- action: replace
source_labels:
- __meta_kafka_topic
target_label: topic
- action: replace
source_labels:
- __meta_kafka_partition
target_label: partition
- action: replace
source_labels:
- __meta_kafka_group_id
target_label: group
26 changes: 25 additions & 1 deletion clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
}
Expand Down Expand Up @@ -221,6 +222,30 @@ type WindowsEventsTargetConfig struct {
Labels model.LabelSet `yaml:"labels"`
}

type KafkaTargetConfig struct {
// Labels optionally holds labels to associate with each log line.
Labels model.LabelSet `yaml:"labels"`

// UseIncomingTimestamp sets the timestamp to the incoming kafka messages
// timestamp if it's set.
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`

// The list of brokers to connect to kafka (Required).
Brokers []string `yaml:"brokers"`

// The consumer group id (Required).
GroupID string `yaml:"group_id"`

// Kafka Topics to consume (Required).
Topics []string `yaml:"topics"`

// Kafka version. Default to 2.2.1
Version string `yaml:"version"`

// Rebalancing strategy to use. (e.g sticky, roundrobin or range)
Assignor string `yaml:"assignor"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
Expand Down Expand Up @@ -263,7 +288,6 @@ func (c *Config) HasServiceDiscoveryConfig() bool {

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {

*c = DefaultScrapeConfig

type plain Config
Expand Down
150 changes: 150 additions & 0 deletions clients/pkg/promtail/targets/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package kafka

import (
"context"
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
)

var defaultBackOff = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
MaxRetries: 20,
}

type RunnableTarget interface {
target.Target
run()
}

type TargetDiscoverer interface {
NewTarget(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)
}

// consumer handle a group consumer instance.
// It will create a new target for every consumer claim using the `TargetDiscoverer`.
type consumer struct {
sarama.ConsumerGroup
discoverer TargetDiscoverer
logger log.Logger

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

mutex sync.Mutex // used during rebalancing setup and tear down
activeTargets []target.Target
droppedTargets []target.Target
}

// start starts the consumer for a given list of topics.
func (c *consumer) start(ctx context.Context, topics []string) {
c.wg.Wait()
c.wg.Add(1)

c.ctx, c.cancel = context.WithCancel(ctx)
level.Info(c.logger).Log("msg", "starting consumer", "topics", fmt.Sprintf("%+v", topics))

go func() {
defer c.wg.Done()
backoff := backoff.New(c.ctx, defaultBackOff)
for {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
// Calling Consume in an infinite loop in case rebalancing is kicking in.
// In which case all claims will be renewed.
err := c.ConsumerGroup.Consume(c.ctx, topics, c)
if err != nil && err != context.Canceled {
level.Error(c.logger).Log("msg", "error from the consumer, retrying...", "err", err)
// backoff before re-trying.
backoff.Wait()
if backoff.Ongoing() {
continue
}
level.Error(c.logger).Log("msg", "maximun error from the consumer reached", "last_err", err)
return
}
if c.ctx.Err() != nil || err == context.Canceled {
level.Info(c.logger).Log("msg", "stopping consumer", "topics", fmt.Sprintf("%+v", topics))
return
}
backoff.Reset()
}
}()
}

// ConsumeClaim creates a target for the given received claim and start reading message from it.
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.wg.Add(1)
defer c.wg.Done()

t, err := c.discoverer.NewTarget(session, claim)
if err != nil {
return err
}
if len(t.Labels()) == 0 {
c.addDroppedTarget(t)
t.run()
return nil
}
c.addTarget(t)
level.Info(c.logger).Log("msg", "consuming topic", "details", t.Details())
t.run()

return nil
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
c.resetTargets()
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {
c.resetTargets()
return nil
}

// stop stops the consumer.
func (c *consumer) stop() {
c.cancel()
c.wg.Wait()
c.resetTargets()
}

func (c *consumer) resetTargets() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.activeTargets = nil
c.droppedTargets = nil
}

func (c *consumer) getActiveTargets() []target.Target {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.activeTargets
}

func (c *consumer) getDroppedTargets() []target.Target {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.droppedTargets
}

func (c *consumer) addTarget(t target.Target) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.activeTargets = append(c.activeTargets, t)
}

func (c *consumer) addDroppedTarget(t target.Target) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.droppedTargets = append(c.droppedTargets, t)
}
102 changes: 102 additions & 0 deletions clients/pkg/promtail/targets/kafka/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kafka

import (
"context"
"errors"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

type DiscovererFn func(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)

func (d DiscovererFn) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
return d(session, claim)
}

type fakeTarget struct {
ctx context.Context
lbs model.LabelSet
}

func (f *fakeTarget) run() { <-f.ctx.Done() }
func (f *fakeTarget) Type() target.TargetType { return "" }
func (f *fakeTarget) DiscoveredLabels() model.LabelSet { return nil }
func (f *fakeTarget) Labels() model.LabelSet { return f.lbs }
func (f *fakeTarget) Ready() bool { return true }
func (f *fakeTarget) Details() interface{} { return nil }

func Test_ComsumerConsume(t *testing.T) {
var (
group = &testConsumerGroupHandler{}
session = &testSession{}
ctx, cancel = context.WithCancel(context.Background())
c = &consumer{
logger: log.NewNopLogger(),
ctx: context.Background(),
cancel: func() {},
ConsumerGroup: group,
discoverer: DiscovererFn(func(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
if claim.Topic() != "dropped" {
return &fakeTarget{
ctx: ctx,
lbs: model.LabelSet{"topic": model.LabelValue(claim.Topic())},
}, nil
}
return &fakeTarget{
ctx: ctx,
}, nil
}),
}
)

c.start(ctx, []string{"foo"})
require.Eventually(t, group.consuming.Load, 5*time.Second, 100*time.Microsecond)
require.NoError(t, group.handler.Setup(session))
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("foo", 1, 2))
require.NoError(t, err)
}()
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("bar", 1, 2))
require.NoError(t, err)
}()
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("dropped", 1, 2))
require.NoError(t, err)
}()
require.Eventually(t, func() bool {
return len(c.getActiveTargets()) == 2
}, 2*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
return len(c.getDroppedTargets()) == 1
}, 2*time.Second, 100*time.Millisecond)
err := group.handler.Cleanup(session)
require.NoError(t, err)
cancel()
c.stop()
}

func Test_ComsumerRetry(t *testing.T) {
var (
group = &testConsumerGroupHandler{
returnErr: errors.New("foo"),
}
ctx, cancel = context.WithCancel(context.Background())
c = &consumer{
logger: log.NewNopLogger(),
ctx: context.Background(),
cancel: func() {},
ConsumerGroup: group,
}
)
defer cancel()
c.start(ctx, []string{"foo"})
<-time.After(2 * time.Second)
c.stop()
}
Loading