Skip to content

Commit

Permalink
Export more topic configuration parameters
Browse files Browse the repository at this point in the history
Add the following labels to topic_info metric:
- max_message_bytes
- message_format_version
- message_timestamp_type
- min_insync_replicas
- retention_bytes
- retention_ms
- segment_bytes
  • Loading branch information
amuraru committed Jun 21, 2021
1 parent 1a777bf commit 9e36bda
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
1 change: 0 additions & 1 deletion minion/describe_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes
resourceReq := kmsg.NewDescribeConfigsRequestResource()
resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic
resourceReq.ResourceName = topic.Topic
resourceReq.ConfigNames = []string{"cleanup.policy"}
req.Resources = append(req.Resources, resourceReq)
}

Expand Down
32 changes: 25 additions & 7 deletions prometheus/collect_topic_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
e.logger.Warn("failed to get metadata of a specific topic",
zap.String("topic_name", topic.Topic),
zap.Error(typedErr))
return false
continue
}
partitionCount := len(topic.Partitions)
replicationFactor := -1
Expand All @@ -64,11 +65,14 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
replicationFactor = len(topic.Partitions[0].Replicas)
}

cleanupPolicy, exists := configsByTopic[topic.Topic]["cleanup.policy"]
if !exists {
cleanupPolicy = "N/A"
}

cleanupPolicy := getOrDefault(configsByTopic[topic.Topic], "cleanup.policy", "N/A")
maxMessageBytes := getOrDefault(configsByTopic[topic.Topic], "max.message.bytes", "N/A")
messageFormatVersion := getOrDefault(configsByTopic[topic.Topic], "message.format.version", "N/A")
messageTimestampType := getOrDefault(configsByTopic[topic.Topic], "message.timestamp.type", "N/A")
minInSyncReplicas := getOrDefault(configsByTopic[topic.Topic], "min.insync.replicas", "N/A")
retentionBytes := getOrDefault(configsByTopic[topic.Topic], "retention.bytes", "N/A")
retentionMs := getOrDefault(configsByTopic[topic.Topic], "retention.ms", "N/A")
segmentBytes := getOrDefault(configsByTopic[topic.Topic], "segment.bytes", "N/A")
ch <- prometheus.MustNewConstMetric(
e.topicInfo,
prometheus.GaugeValue,
Expand All @@ -77,7 +81,21 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
strconv.Itoa(partitionCount),
strconv.Itoa(replicationFactor),
cleanupPolicy,
maxMessageBytes,
messageFormatVersion,
messageTimestampType,
minInSyncReplicas,
retentionBytes,
retentionMs,
segmentBytes,
)
}
return isOk
}

func getOrDefault(m map[string]string, key string, defaultValue string) string {
if value, exists := m[key]; exists {
return value
}
return defaultValue
}
3 changes: 2 additions & 1 deletion prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func (e *Exporter) InitializeMetrics() {
e.topicInfo = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info"),
"Info labels for a given topic",
[]string{"topic_name", "partition_count", "replication_factor", "cleanup_policy"},
[]string{"topic_name", "partition_count", "replication_factor", "cleanup_policy", "max_message_bytes", "message_format_version",
"message_timestamp_type", "min_insync_replicas", "retention_bytes", "retention_ms", "segment_bytes"},
nil,
)
// Partition Low Water Mark
Expand Down

0 comments on commit 9e36bda

Please sign in to comment.