Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to export configurable set of topic configuration keys #97

Merged
merged 4 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/kminion/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ kminion:
# # IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
# # take precedence over allowed topics.
# ignoredTopics: [ ]
# # infoMetric is a configuration object for the kminion_kafka_topic_info metric
# infoMetric:
# # ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric
# configKeys: ["cleanup.policy"]
# logDirs:
# # Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
# # to version 1.0.0 as describing log dirs was not supported back then.
Expand Down
4 changes: 4 additions & 0 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ minion:
# IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
# take precedence over allowed topics.
ignoredTopics: [ ]
# infoMetric is a configuration object for the kminion_kafka_topic_info metric
infoMetric:
# ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric
configKeys: ["cleanup.policy"]
logDirs:
# Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
# to version 1.0.0 as describing log dirs was not supported back then.
Expand Down
15 changes: 14 additions & 1 deletion minion/config_topic_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package minion

import "fmt"
import (
"fmt"
)

const (
TopicGranularityTopic string = "topic"
Expand All @@ -18,6 +20,16 @@ type TopicConfig struct {
// IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
// take precedence over allowed topics.
IgnoredTopics []string `koanf:"ignoredTopics"`

// InfoMetric configures how the kafka_topic_info metric is populated
InfoMetric InfoMetricConfig `koanf:"exporter"`
amuraru marked this conversation as resolved.
Show resolved Hide resolved
}

type InfoMetricConfig struct {
// ConfigKeys configures optional topic configuration keys that should be exported
// as prometheus metric labels.
// By default "topic_name", "partition_count", "replication_factor" and "cleanup.policy" are exported
amuraru marked this conversation as resolved.
Show resolved Hide resolved
ConfigKeys []string `koanf:"infoMetric"`
amuraru marked this conversation as resolved.
Show resolved Hide resolved
}

// Validate if provided TopicConfig is valid.
Expand Down Expand Up @@ -50,4 +62,5 @@ func (c *TopicConfig) Validate() error {
func (c *TopicConfig) SetDefaults() {
c.Granularity = TopicGranularityPartition
c.AllowedTopics = []string{"/.*/"}
c.InfoMetric = InfoMetricConfig{ConfigKeys: []string{"cleanup.policy"}}
}
1 change: 0 additions & 1 deletion minion/describe_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes
resourceReq := kmsg.NewDescribeConfigsRequestResource()
resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic
resourceReq.ResourceName = topic.Topic
resourceReq.ConfigNames = []string{"cleanup.policy"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done to ensure all configurations are returned

req.Resources = append(req.Resources, resourceReq)
}

Expand Down
27 changes: 17 additions & 10 deletions prometheus/collect_topic_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
e.logger.Warn("failed to get metadata of a specific topic",
zap.String("topic_name", topic.Topic),
zap.Error(typedErr))
return false
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be lenient to individual topic failures

}
partitionCount := len(topic.Partitions)
replicationFactor := -1
Expand All @@ -64,20 +65,26 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
replicationFactor = len(topic.Partitions[0].Replicas)
}

cleanupPolicy, exists := configsByTopic[topic.Topic]["cleanup.policy"]
if !exists {
cleanupPolicy = "N/A"
var labelsValues []string
labelsValues = append(labelsValues, topic.Topic)
labelsValues = append(labelsValues, strconv.Itoa(partitionCount))
labelsValues = append(labelsValues, strconv.Itoa(replicationFactor))
for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys {
labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A"))
}

ch <- prometheus.MustNewConstMetric(
e.topicInfo,
prometheus.GaugeValue,
float64(1),
topic.Topic,
strconv.Itoa(partitionCount),
strconv.Itoa(replicationFactor),
cleanupPolicy,
labelsValues...,
)
}
return isOk
}

func getOrDefault(m map[string]string, key string, defaultValue string) string {
if value, exists := m[key]; exists {
return value
}
return defaultValue
}
8 changes: 7 additions & 1 deletion prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheus
import (
"context"
"os"
"strings"
"time"

"github.com/cloudhut/kminion/v2/minion"
Expand Down Expand Up @@ -98,10 +99,15 @@ func (e *Exporter) InitializeMetrics() {

// Topic / Partition metrics
// Topic info
var labels = []string{"topic_name", "partition_count", "replication_factor"}
for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys {
// prometheus does not allow . in label keys
labels = append(labels, strings.ReplaceAll(key, ".", "_"))
}
e.topicInfo = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info"),
"Info labels for a given topic",
[]string{"topic_name", "partition_count", "replication_factor", "cleanup_policy"},
labels,
nil,
)
// Partition Low Water Mark
Expand Down