Skip to content

Commit

Permalink
Move pipeline creation to new targetsyncer func
Browse files Browse the repository at this point in the history
* registry.MustRegister is called every newPipeline so ensure it only at once
  • Loading branch information
taisho6339 committed Dec 3, 2021
1 parent e573a4d commit f48b28b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
30 changes: 15 additions & 15 deletions clients/pkg/promtail/targets/kafka/target_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/util"
)
Expand All @@ -28,10 +29,11 @@ type TopicManager interface {
}

type TargetSyncer struct {
logger log.Logger
cfg scrapeconfig.Config
reg prometheus.Registerer
client api.EntryHandler
logger log.Logger
cfg scrapeconfig.Config
pipeline *stages.Pipeline
reg prometheus.Registerer
client api.EntryHandler

topicManager TopicManager
consumer
Expand Down Expand Up @@ -86,8 +88,11 @@ func NewSyncer(
if err != nil {
return nil, fmt.Errorf("error creating topic manager: %w", err)
}
pipeline, err := stages.NewPipeline(log.With(logger, "component", "kafka_pipeline"), cfg.PipelineStages, &cfg.JobName, reg)
if err != nil {
return nil, fmt.Errorf("error creating pipeline: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())

t := &TargetSyncer{
logger: logger,
ctx: ctx,
Expand All @@ -96,6 +101,7 @@ func NewSyncer(
cfg: cfg,
reg: reg,
client: pushClient,
pipeline: pipeline,
close: func() error {
if err := group.Close(); err != nil {
level.Warn(logger).Log("msg", "error while closing consumer group", "err", err)
Expand Down Expand Up @@ -280,19 +286,13 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar
},
}, nil
}

pipeline, err := stages.NewPipeline(log.With(ts.logger, "component", "kafka_pipeline"), ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg)
if err != nil {
return nil, err
}

t := NewTarget(
session,
claim,
discoveredLabels,
labelOut,
ts.cfg.RelabelConfigs,
pipeline.Wrap(ts.client),
ts.pipeline.Wrap(ts.client),
ts.cfg.KafkaConfig.UseIncomingTimestamp,
)

Expand Down
10 changes: 8 additions & 2 deletions clients/pkg/promtail/targets/kafka/target_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
"testing"
"time"

"github.com/grafana/loki/clients/pkg/logentry/stages"

"github.com/grafana/dskit/flagext"
"github.com/prometheus/common/config"

"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
)

func Test_TopicDiscovery(t *testing.T) {
Expand Down Expand Up @@ -102,6 +105,9 @@ func Test_NewTarget(t *testing.T) {
},
},
}
pipeline, err := stages.NewPipeline(ts.logger, ts.cfg.PipelineStages, &ts.cfg.JobName, ts.reg)
require.NoError(t, err)
ts.pipeline = pipeline
tg, err := ts.NewTarget(&testSession{}, newTestClaim("foo", 10, 1))

require.NoError(t, err)
Expand Down

0 comments on commit f48b28b

Please sign in to comment.