diff --git a/clients/pkg/promtail/targets/kafka/target_syncer.go b/clients/pkg/promtail/targets/kafka/target_syncer.go index ac16ab18e892..fbb7523b3b36 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer.go @@ -10,13 +10,14 @@ import ( "github.com/Shopify/sarama" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/target" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/util" ) @@ -28,10 +29,11 @@ type TopicManager interface { } type TargetSyncer struct { - logger log.Logger - cfg scrapeconfig.Config - reg prometheus.Registerer - client api.EntryHandler + logger log.Logger + cfg scrapeconfig.Config + pipeline *stages.Pipeline + reg prometheus.Registerer + client api.EntryHandler topicManager TopicManager consumer @@ -86,8 +88,11 @@ func NewSyncer( if err != nil { return nil, fmt.Errorf("error creating topic manager: %w", err) } + pipeline, err := stages.NewPipeline(log.With(logger, "component", "kafka_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) + if err != nil { + return nil, fmt.Errorf("error creating pipeline: %w", err) + } ctx, cancel := context.WithCancel(context.Background()) - t := &TargetSyncer{ logger: logger, ctx: ctx, @@ -96,6 +101,7 @@ func NewSyncer( cfg: cfg, reg: reg, client: pushClient, + pipeline: pipeline, close: func() error { if err := group.Close(); err != nil { level.Warn(logger).Log("msg", "error while closing consumer group", "err", err) @@ -280,19 +286,13 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar }, }, nil } - - pipeline, err := stages.NewPipeline(log.With(ts.logger, "component", "kafka_pipeline"), ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg) - if err != nil { - return nil, err - } - t := NewTarget( session, claim, discoveredLabels, labelOut, ts.cfg.RelabelConfigs, - pipeline.Wrap(ts.client), + ts.pipeline.Wrap(ts.client), ts.cfg.KafkaConfig.UseIncomingTimestamp, ) diff --git a/clients/pkg/promtail/targets/kafka/target_syncer_test.go b/clients/pkg/promtail/targets/kafka/target_syncer_test.go index 5415976e5763..c7afe92c6e11 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer_test.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer_test.go @@ -6,18 +6,21 @@ import ( "testing" "time" + "github.com/grafana/loki/clients/pkg/logentry/stages" + "github.com/grafana/dskit/flagext" "github.com/prometheus/common/config" "github.com/Shopify/sarama" "github.com/go-kit/log" - "github.com/grafana/loki/clients/pkg/promtail/client/fake" - "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" ) func Test_TopicDiscovery(t *testing.T) { @@ -102,6 +105,9 @@ func Test_NewTarget(t *testing.T) { }, }, } + pipeline, err := stages.NewPipeline(ts.logger, ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg) + require.NoError(t, err) + ts.pipeline = pipeline tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1)) require.NoError(t, err)