Skip to content

Commit

Permalink
fix(inputs.prometheus): Avoid race when creating informer factory (in…
Browse files Browse the repository at this point in the history
  • Loading branch information
redbaron committed May 8, 2023
1 parent 1bcc279 commit 5cb928c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
17 changes: 8 additions & 9 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
}
}

if !p.isNodeScrapeScope {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Warnf("Error while attempting to watch pod: %s", err.Error())
}
}

p.wg.Add(1)
go func() {
defer p.wg.Done()
Expand All @@ -90,10 +97,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Warnf("Error while attempting to watch pod: %s", err.Error())
}
<-ctx.Done()
}
}
}
Expand Down Expand Up @@ -193,8 +197,6 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients

informerfactory.Start(ctx.Done())
informerfactory.WaitForCacheSync(wait.NeverStop)

<-ctx.Done()
return err
}

Expand Down Expand Up @@ -366,9 +368,6 @@ func podReady(pod *corev1.Pod) bool {
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[PodID]URLAndAddress{}
}
targetURL, err := getScrapeURL(pod, p)
if err != nil {
p.Log.Errorf("could not parse URL: %s", err)
Expand Down
17 changes: 9 additions & 8 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func initPrometheus() *Prometheus {
prom.MonitorKubernetesPodsPort = 9102
prom.MonitorKubernetesPodsPath = "/metrics"
prom.MonitorKubernetesPodsMethod = MonitorMethodAnnotations
prom.kubernetesPods = map[PodID]URLAndAddress{}
return prom
}

Expand Down Expand Up @@ -129,7 +130,7 @@ func TestScrapeURLAnnotationsCustomPathWithFragment(t *testing.T) {
}

func TestAddPod(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand All @@ -148,7 +149,7 @@ func TestAddPodScrapeConfig(t *testing.T) {
}

func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand All @@ -161,7 +162,7 @@ func TestAddMultipleDuplicatePods(t *testing.T) {
}

func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand All @@ -173,7 +174,7 @@ func TestAddMultiplePods(t *testing.T) {
}

func TestDeletePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand All @@ -185,7 +186,7 @@ func TestDeletePods(t *testing.T) {
}

func TestKeepDefaultNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand All @@ -197,7 +198,7 @@ func TestKeepDefaultNamespaceLabelName(t *testing.T) {
}

func TestChangeNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace"}
prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[PodID]URLAndAddress{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
Expand Down Expand Up @@ -296,7 +297,7 @@ func TestAnnotationFilters(t *testing.T) {

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom.PodAnnotationInclude = tc.include
prom.PodAnnotationExclude = tc.exclude
require.NoError(t, prom.initFilters())
Expand Down Expand Up @@ -341,7 +342,7 @@ func TestLabelFilters(t *testing.T) {

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom.PodLabelInclude = tc.include
prom.PodLabelExclude = tc.exclude
require.NoError(t, prom.initFilters())
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ func (p *Prometheus) Init() error {
"Accept": acceptHeader,
}

p.kubernetesPods = map[PodID]URLAndAddress{}

return nil
}

Expand Down

0 comments on commit 5cb928c

Please sign in to comment.