From f48b28b4266c9e1fbe0734792d041f515f0643cf Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Fri, 3 Dec 2021 13:53:49 +0900 Subject: [PATCH 1/2] Move pipeline creation to new targetsyncer func * registry.MustRegister is called every newPipeline so ensure it only at once --- .../promtail/targets/kafka/target_syncer.go | 30 +++++++++---------- .../targets/kafka/target_syncer_test.go | 10 +++++-- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/clients/pkg/promtail/targets/kafka/target_syncer.go b/clients/pkg/promtail/targets/kafka/target_syncer.go index ac16ab18e892..fbb7523b3b36 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer.go @@ -10,13 +10,14 @@ import ( "github.com/Shopify/sarama" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/target" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/util" ) @@ -28,10 +29,11 @@ type TopicManager interface { } type TargetSyncer struct { - logger log.Logger - cfg scrapeconfig.Config - reg prometheus.Registerer - client api.EntryHandler + logger log.Logger + cfg scrapeconfig.Config + pipeline *stages.Pipeline + reg prometheus.Registerer + client api.EntryHandler topicManager TopicManager consumer @@ -86,8 +88,11 @@ func NewSyncer( if err != nil { return nil, fmt.Errorf("error creating topic manager: %w", err) } + pipeline, err := stages.NewPipeline(log.With(logger, "component", "kafka_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) + if err != nil { + return nil, fmt.Errorf("error creating pipeline: %w", err) + } ctx, cancel := context.WithCancel(context.Background()) - t := &TargetSyncer{ logger: logger, ctx: ctx, @@ -96,6 +101,7 @@ func NewSyncer( cfg: cfg, reg: reg, client: pushClient, + pipeline: pipeline, close: func() error { if err := group.Close(); err != nil { level.Warn(logger).Log("msg", "error while closing consumer group", "err", err) @@ -280,19 +286,13 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar }, }, nil } - - pipeline, err := stages.NewPipeline(log.With(ts.logger, "component", "kafka_pipeline"), ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg) - if err != nil { - return nil, err - } - t := NewTarget( session, claim, discoveredLabels, labelOut, ts.cfg.RelabelConfigs, - pipeline.Wrap(ts.client), + ts.pipeline.Wrap(ts.client), ts.cfg.KafkaConfig.UseIncomingTimestamp, ) diff --git a/clients/pkg/promtail/targets/kafka/target_syncer_test.go b/clients/pkg/promtail/targets/kafka/target_syncer_test.go index 5415976e5763..c7afe92c6e11 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer_test.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer_test.go @@ -6,18 +6,21 @@ import ( "testing" "time" + "github.com/grafana/loki/clients/pkg/logentry/stages" + "github.com/grafana/dskit/flagext" "github.com/prometheus/common/config" "github.com/Shopify/sarama" "github.com/go-kit/log" - "github.com/grafana/loki/clients/pkg/promtail/client/fake" - "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" ) func Test_TopicDiscovery(t *testing.T) { @@ -102,6 +105,9 @@ func Test_NewTarget(t *testing.T) { }, }, } + pipeline, err := stages.NewPipeline(ts.logger, ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg) + require.NoError(t, err) + ts.pipeline = pipeline tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1)) require.NoError(t, err) From 72b9350d6d37c515cbac5c38ba0bbc048717c15e Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Fri, 3 Dec 2021 13:56:15 +0900 Subject: [PATCH 2/2] add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c4a7283beba..0fb551c7b961 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [4865](https://github.com/grafana/loki/pull/4865) **taisho6339**: Fix duplicate registry.MustRegister call in Promtail Kafka * [4845](https://github.com/grafana/loki/pull/4845) **chaudum** Return error responses consistently as JSON * [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests. * [4828](https://github.com/grafana/loki/pull/4282) **chaudum**: Set correct `Content-Type` header in query response