diff --git a/minion/describe_topic_config.go b/minion/describe_topic_config.go index bb7cc77..8ffaf59 100644 --- a/minion/describe_topic_config.go +++ b/minion/describe_topic_config.go @@ -20,7 +20,6 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes resourceReq := kmsg.NewDescribeConfigsRequestResource() resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic resourceReq.ResourceName = topic.Topic - resourceReq.ConfigNames = []string{"cleanup.policy"} req.Resources = append(req.Resources, resourceReq) } diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index ccd9277..99ccde1 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -2,10 +2,11 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Metric) bool { @@ -55,7 +56,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me e.logger.Warn("failed to get metadata of a specific topic", zap.String("topic_name", topic.Topic), zap.Error(typedErr)) - return false + continue } partitionCount := len(topic.Partitions) replicationFactor := -1 @@ -64,11 +65,14 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me replicationFactor = len(topic.Partitions[0].Replicas) } - cleanupPolicy, exists := configsByTopic[topic.Topic]["cleanup.policy"] - if !exists { - cleanupPolicy = "N/A" - } - + cleanupPolicy := getOrDefault(configsByTopic[topic.Topic], "cleanup.policy", "N/A") + maxMessageBytes := getOrDefault(configsByTopic[topic.Topic], "max.message.bytes", "N/A") + messageFormatVersion := getOrDefault(configsByTopic[topic.Topic], "message.format.version", "N/A") + messageTimestampType := getOrDefault(configsByTopic[topic.Topic], "message.timestamp.type", "N/A") + minInSyncReplicas := getOrDefault(configsByTopic[topic.Topic], "min.insync.replicas", "N/A") + retentionBytes := getOrDefault(configsByTopic[topic.Topic], "retention.bytes", "N/A") + retentionMs := getOrDefault(configsByTopic[topic.Topic], "retention.ms", "N/A") + segmentBytes := getOrDefault(configsByTopic[topic.Topic], "segment.bytes", "N/A") ch <- prometheus.MustNewConstMetric( e.topicInfo, prometheus.GaugeValue, @@ -77,7 +81,21 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me strconv.Itoa(partitionCount), strconv.Itoa(replicationFactor), cleanupPolicy, + maxMessageBytes, + messageFormatVersion, + messageTimestampType, + minInSyncReplicas, + retentionBytes, + retentionMs, + segmentBytes, ) } return isOk } + +func getOrDefault(m map[string]string, key string, defaultValue string) string { + if value, exists := m[key]; exists { + return value + } + return defaultValue +} diff --git a/prometheus/exporter.go b/prometheus/exporter.go index b0bcf98..f0c2111 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -101,7 +101,8 @@ func (e *Exporter) InitializeMetrics() { e.topicInfo = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info"), "Info labels for a given topic", - []string{"topic_name", "partition_count", "replication_factor", "cleanup_policy"}, + []string{"topic_name", "partition_count", "replication_factor", "cleanup_policy", "max_message_bytes", "message_format_version", + "message_timestamp_type", "min_insync_replicas", "retention_bytes", "retention_ms", "segment_bytes"}, nil, ) // Partition Low Water Mark