Skip to content

Commit

Permalink
Promtail Kafka target (#4568)
Browse files Browse the repository at this point in the history
* Adds a kafka target manager in promtail.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add validations.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Working on tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Moar test for the fanout client.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Finishing off tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* final  adjustement

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Adding topics discovery.
Still needs to finish tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Finishing off testing it.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Wip

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Revert config changes.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add group id as discovered label

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* linter

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add tools for running kafka and testing locally.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* got linted shell

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update sarama to compile in ARM.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation for kafka target.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve code comment.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* add a few s's

* Better cancellation support.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* adds `__meta_kafka_` suffix to discovered labels.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
cyriltovena and slim-bean authored Nov 4, 2021
1 parent 07ea1ed commit c8f3df3
Show file tree
Hide file tree
Showing 396 changed files with 56,433 additions and 1,029 deletions.
32 changes: 32 additions & 0 deletions clients/cmd/promtail/promtail-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: kafka
kafka:
use_incoming_timestamp: false
brokers:
- host.docker.internal:50705
group_id: some_group
topics:
- foo
- ^promtail.*
labels:
job: kafka
relabel_configs:
- action: replace
source_labels:
- __meta_kafka_topic
target_label: topic
- action: replace
source_labels:
- __meta_kafka_partition
target_label: partition
- action: replace
source_labels:
- __meta_kafka_group_id
target_label: group
26 changes: 25 additions & 1 deletion clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
}
Expand Down Expand Up @@ -221,6 +222,30 @@ type WindowsEventsTargetConfig struct {
Labels model.LabelSet `yaml:"labels"`
}

type KafkaTargetConfig struct {
// Labels optionally holds labels to associate with each log line.
Labels model.LabelSet `yaml:"labels"`

// UseIncomingTimestamp sets the timestamp to the incoming kafka messages
// timestamp if it's set.
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`

// The list of brokers to connect to kafka (Required).
Brokers []string `yaml:"brokers"`

// The consumer group id (Required).
GroupID string `yaml:"group_id"`

// Kafka Topics to consume (Required).
Topics []string `yaml:"topics"`

// Kafka version. Default to 2.2.1
Version string `yaml:"version"`

// Rebalancing strategy to use. (e.g sticky, roundrobin or range)
Assignor string `yaml:"assignor"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
Expand Down Expand Up @@ -263,7 +288,6 @@ func (c *Config) HasServiceDiscoveryConfig() bool {

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {

*c = DefaultScrapeConfig

type plain Config
Expand Down
150 changes: 150 additions & 0 deletions clients/pkg/promtail/targets/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package kafka

import (
"context"
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
)

var defaultBackOff = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
MaxRetries: 20,
}

type RunnableTarget interface {
target.Target
run()
}

type TargetDiscoverer interface {
NewTarget(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)
}

// consumer handle a group consumer instance.
// It will create a new target for every consumer claim using the `TargetDiscoverer`.
type consumer struct {
sarama.ConsumerGroup
discoverer TargetDiscoverer
logger log.Logger

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

mutex sync.Mutex // used during rebalancing setup and tear down
activeTargets []target.Target
droppedTargets []target.Target
}

// start starts the consumer for a given list of topics.
func (c *consumer) start(ctx context.Context, topics []string) {
c.wg.Wait()
c.wg.Add(1)

c.ctx, c.cancel = context.WithCancel(ctx)
level.Info(c.logger).Log("msg", "starting consumer", "topics", fmt.Sprintf("%+v", topics))

go func() {
defer c.wg.Done()
backoff := backoff.New(c.ctx, defaultBackOff)
for {
// Calling Consume in an infinite loop in case rebalancing is kicking in.
// In which case all claims will be renewed.
err := c.ConsumerGroup.Consume(c.ctx, topics, c)
if err != nil && err != context.Canceled {
level.Error(c.logger).Log("msg", "error from the consumer, retrying...", "err", err)
// backoff before re-trying.
backoff.Wait()
if backoff.Ongoing() {
continue
}
level.Error(c.logger).Log("msg", "maximun error from the consumer reached", "last_err", err)
return
}
if c.ctx.Err() != nil || err == context.Canceled {
level.Info(c.logger).Log("msg", "stopping consumer", "topics", fmt.Sprintf("%+v", topics))
return
}
backoff.Reset()
}
}()
}

// ConsumeClaim creates a target for the given received claim and start reading message from it.
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.wg.Add(1)
defer c.wg.Done()

t, err := c.discoverer.NewTarget(session, claim)
if err != nil {
return err
}
if len(t.Labels()) == 0 {
c.addDroppedTarget(t)
t.run()
return nil
}
c.addTarget(t)
level.Info(c.logger).Log("msg", "consuming topic", "details", t.Details())
t.run()

return nil
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
c.resetTargets()
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {
c.resetTargets()
return nil
}

// stop stops the consumer.
func (c *consumer) stop() {
c.cancel()
c.wg.Wait()
c.resetTargets()
}

func (c *consumer) resetTargets() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.activeTargets = nil
c.droppedTargets = nil
}

func (c *consumer) getActiveTargets() []target.Target {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.activeTargets
}

func (c *consumer) getDroppedTargets() []target.Target {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.droppedTargets
}

func (c *consumer) addTarget(t target.Target) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.activeTargets = append(c.activeTargets, t)
}

func (c *consumer) addDroppedTarget(t target.Target) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.droppedTargets = append(c.droppedTargets, t)
}
102 changes: 102 additions & 0 deletions clients/pkg/promtail/targets/kafka/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kafka

import (
"context"
"errors"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

type DiscovererFn func(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) (RunnableTarget, error)

func (d DiscovererFn) NewTarget(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
return d(session, claim)
}

type fakeTarget struct {
ctx context.Context
lbs model.LabelSet
}

func (f *fakeTarget) run() { <-f.ctx.Done() }
func (f *fakeTarget) Type() target.TargetType { return "" }
func (f *fakeTarget) DiscoveredLabels() model.LabelSet { return nil }
func (f *fakeTarget) Labels() model.LabelSet { return f.lbs }
func (f *fakeTarget) Ready() bool { return true }
func (f *fakeTarget) Details() interface{} { return nil }

func Test_ComsumerConsume(t *testing.T) {
var (
group = &testConsumerGroupHandler{}
session = &testSession{}
ctx, cancel = context.WithCancel(context.Background())
c = &consumer{
logger: log.NewNopLogger(),
ctx: context.Background(),
cancel: func() {},
ConsumerGroup: group,
discoverer: DiscovererFn(func(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (RunnableTarget, error) {
if claim.Topic() != "dropped" {
return &fakeTarget{
ctx: ctx,
lbs: model.LabelSet{"topic": model.LabelValue(claim.Topic())},
}, nil
}
return &fakeTarget{
ctx: ctx,
}, nil
}),
}
)

c.start(ctx, []string{"foo"})
require.Eventually(t, group.consuming.Load, 5*time.Second, 100*time.Microsecond)
require.NoError(t, group.handler.Setup(session))
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("foo", 1, 2))
require.NoError(t, err)
}()
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("bar", 1, 2))
require.NoError(t, err)
}()
go func() {
err := group.handler.ConsumeClaim(session, newTestClaim("dropped", 1, 2))
require.NoError(t, err)
}()
require.Eventually(t, func() bool {
return len(c.getActiveTargets()) == 2
}, 2*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
return len(c.getDroppedTargets()) == 1
}, 2*time.Second, 100*time.Millisecond)
err := group.handler.Cleanup(session)
require.NoError(t, err)
cancel()
c.stop()
}

func Test_ComsumerRetry(t *testing.T) {
var (
group = &testConsumerGroupHandler{
returnErr: errors.New("foo"),
}
ctx, cancel = context.WithCancel(context.Background())
c = &consumer{
logger: log.NewNopLogger(),
ctx: context.Background(),
cancel: func() {},
ConsumerGroup: group,
}
)
defer cancel()
c.start(ctx, []string{"foo"})
<-time.After(2 * time.Second)
c.stop()
}
Loading

0 comments on commit c8f3df3

Please sign in to comment.