Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lag calculation and error handling in lagCmd #341

Merged
merged 2 commits into from
Jul 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 62 additions & 25 deletions cmd/kaf/topic.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package main

import (
"encoding/json"
"fmt"
"os"
"sort"
"text/tabwriter"

"strings"

"encoding/json"
"text/tabwriter"

"github.com/IBM/sarama"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -336,63 +335,101 @@ var deleteTopicCmd = &cobra.Command{
}
},
}

var lagCmd = &cobra.Command{
Use: "lag",
Short: "Display the total lags for each consumer group",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
topic := args[0]
admin := getClusterAdmin()
topicDetails, err := admin.DescribeTopics([]string{args[0]})
if err != nil {
defer admin.Close()

// Describe the topic
topicDetails, err := admin.DescribeTopics([]string{topic})
if err != nil || len(topicDetails) == 0 {
errorExit("Unable to describe topics: %v\n", err)
}

// Get the list of partitions for the topic
partitions := make([]int32, 0, len(topicDetails[0].Partitions))
for _, partition := range topicDetails[0].Partitions {
partitions = append(partitions, partition.ID)
}
highWatermarks := getHighWatermarks(args[0], partitions)
highWatermarks := getHighWatermarks(topic, partitions)

var groups []string
rst, err := admin.ListConsumerGroups()
// List all consumer groups
consumerGroups, err := admin.ListConsumerGroups()
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
errorExit("Unable to list consumer groups: %v\n", err)
}
for group, _ := range rst {

var groups []string
for group := range consumerGroups {
groups = append(groups, group)
}

// Describe all consumer groups
groupsInfo, err := admin.DescribeConsumerGroups(groups)
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
errorExit("Unable to describe consumer groups: %v\n", err)
}
var lagInfo = make(map[string]int64)
for _, v := range groupsInfo {
for _, member := range v.Members {

// Calculate lag for each group
lagInfo := make(map[string]int64)
groupStates := make(map[string]string) // To store the state of each group
for _, group := range groupsInfo {
var sum int64
show := false
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil || assignment == nil {
continue
}
if _, exist := assignment.Topics[topic]; exist {
var sum int64
resp, _ := admin.ListConsumerGroupOffsets(v.GroupId, assignment.Topics)
for _, v1 := range resp.Blocks {
for pid, v2 := range v1 {
sum += highWatermarks[pid] - v2.Offset

metadata, err := member.GetMemberMetadata()
if err != nil || metadata == nil {
continue
}

if topicPartitions, exist := assignment.Topics[topic]; exist {
show = true
resp, err := admin.ListConsumerGroupOffsets(group.GroupId, map[string][]int32{topic: topicPartitions})
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", group.GroupId, err)
continue
}

if blocks, ok := resp.Blocks[topic]; ok {
for pid, block := range blocks {
if hwm, ok := highWatermarks[pid]; ok {
if block.Offset > hwm {
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, group.GroupId)
} else if block.Offset < 0 {
// Skip partitions with negative offsets
} else {
sum += hwm - block.Offset
}
}
}
}
lagInfo[v.GroupId] = sum
}
}

if show && sum >= 0 {
lagInfo[group.GroupId] = sum
groupStates[group.GroupId] = group.State // Store the state of the group
}
}

// Print the lag information along with group state
w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
if !noHeaderFlag {
fmt.Fprintf(w, "GROUP ID\tLAG\n")
fmt.Fprintf(w, "GROUP ID\tSTATE\tLAG\n")
}
for group, lag := range lagInfo {
fmt.Fprintf(w, "%v\t%v\n", group, lag)
fmt.Fprintf(w, "%v\t%v\t%v\n", group, groupStates[group], lag)
}
w.Flush()

},
}
Loading