Skip to content

Commit

Permalink
Merge pull request #18646 from vbotbuildovich/backport-pr-18221-v23.3…
Browse files Browse the repository at this point in the history
….x-929

[v23.3.x] rpk: topic describe supports --regex flag
  • Loading branch information
r-vasquez authored May 24, 2024
2 parents 3e554cd + 5211881 commit 9599755
Showing 1 changed file with 113 additions and 85 deletions.
198 changes: 113 additions & 85 deletions src/go/rpk/pkg/cli/topic/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package topic
import (
"context"
"errors"
"fmt"
"sort"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand All @@ -31,20 +32,32 @@ func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
summary bool
configs bool
partitions bool
re bool
stable bool
)
cmd := &cobra.Command{
Use: "describe [TOPIC]",
Use: "describe [TOPICS]",
Aliases: []string{"info"},
Short: "Describe a topic",
Long: `Describe a topic.
Short: "Describe topics",
Long: `Describe topics.
This command prints detailed information about a topic. There are three
potential sections: a summary of the topic, the topic configs, and a detailed
This command prints detailed information about topics. The output contains
up to three sections: a summary of the topic, the topic configs, and a detailed
partitions section. By default, the summary and configs sections are printed.
The --regex flag (-r) parses arguments as regular expressions
and describes topics that match any of the expressions.
For example,
describe foo bar # describe topics foo and bar
describe -r '^f.*' '.*r$' # describe any topic starting with f and any topics ending in r
describe -r '*' # describe all topics
describe -r . # describe any one-character topics
`,

Args: cobra.ExactArgs(1),
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, topicArg []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
Expand All @@ -53,107 +66,121 @@ partitions section. By default, the summary and configs sections are printed.
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer cl.Close()

topic := topicArg[0]
adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

if re {
topicArg, err = regexTopics(adm, topicArg)
out.MaybeDie(err, "unable to filter topics by regex: %v", err)
}

// By default, if neither are specified, we opt in to
// the config section only.
if !summary && !configs && !partitions {
summary, configs = true, true
}
if all {

// We show all sections if:
// - "print-all" is used or
// - more than one topic are specified or matched.
if all || len(topicArg) > 1 {
summary, configs, partitions = true, true, true
} else if len(topicArg) == 0 {
out.Exit("did not match any topics, exiting.")
}

var t kmsg.MetadataResponseTopic
{
req := kmsg.NewPtrMetadataRequest()
req := kmsg.NewPtrMetadataRequest()
for _, topic := range topicArg {
reqTopic := kmsg.NewMetadataRequestTopic()
reqTopic.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, reqTopic)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request topic metadata: %v", err)
if len(resp.Topics) != 1 {
out.Die("metadata response returned %d topics when we asked for 1", len(resp.Topics))
}
t = resp.Topics[0]
}
resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request topic metadata: %v", err)

const (
secSummary = "summary"
secConfigs = "configs"
secPart = "partitions"
)

sections := out.NewMaybeHeaderSections(
out.ConditionalSectionHeaders(map[string]bool{
secSummary: summary,
secConfigs: configs,
secPart: partitions,
})...,
)

sections.Add(secSummary, func() {
tw := out.NewTabWriter()
defer tw.Flush()
tw.PrintColumn("NAME", *t.Topic)
if t.IsInternal {
tw.PrintColumn("INTERNAL", t.IsInternal)
}
tw.PrintColumn("PARTITIONS", len(t.Partitions))
if len(t.Partitions) > 0 {
p0 := &t.Partitions[0]
tw.PrintColumn("REPLICAS", len(p0.Replicas))
}
if err := kerr.ErrorForCode(t.ErrorCode); err != nil {
tw.PrintColumn("ERROR", err)
}
})

sections.Add(secConfigs, func() {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)

tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
types.Sort(resp)
for _, config := range resp.Resources[0].Configs {
var val string
if config.IsSensitive {
val = "(sensitive)"
} else if config.Value != nil {
val = *config.Value
for i, topic := range resp.Topics {
sections := out.NewMaybeHeaderSections(
out.ConditionalSectionHeaders(map[string]bool{
secSummary: summary,
secConfigs: configs,
secPart: partitions,
})...,
)

sections.Add(secSummary, func() {
tw := out.NewTabWriter()
defer tw.Flush()
tw.PrintColumn("NAME", *topic.Topic)
if topic.IsInternal {
tw.PrintColumn("INTERNAL", topic.IsInternal)
}
tw.Print(config.Name, val, config.Source)
}
})

sections.Add(secPart, func() {
offsets := listStartEndOffsets(cl, topic, len(t.Partitions), stable)

tw := out.NewTable(describePartitionsHeaders(
t.Partitions,
offsets,
)...)
defer tw.Flush()
for _, row := range describePartitionsRows(
t.Partitions,
offsets,
) {
tw.Print(row...)
tw.PrintColumn("PARTITIONS", len(topic.Partitions))
if len(topic.Partitions) > 0 {
p0 := &topic.Partitions[0]
tw.PrintColumn("REPLICAS", len(p0.Replicas))
}
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
tw.PrintColumn("ERROR", err)
}
})

sections.Add(secConfigs, func() {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = *topic.Topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)

tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
types.Sort(resp)
for _, config := range resp.Resources[0].Configs {
var val string
if config.IsSensitive {
val = "(sensitive)"
} else if config.Value != nil {
val = *config.Value
}
tw.Print(config.Name, val, config.Source)
}
})

sections.Add(secPart, func() {
offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable)

tw := out.NewTable(describePartitionsHeaders(
topic.Partitions,
offsets,
)...)
defer tw.Flush()
for _, row := range describePartitionsRows(
topic.Partitions,
offsets,
) {
tw.Print(row...)
}
})

i++
if i < len(resp.Topics) {
fmt.Println()
}
})
}
},
}

Expand All @@ -170,6 +197,7 @@ partitions section. By default, the summary and configs sections are printed.
cmd.Flags().BoolVarP(&configs, "print-configs", "c", false, "Print the config section")
cmd.Flags().BoolVarP(&partitions, "print-partitions", "p", false, "Print the detailed partitions section")
cmd.Flags().BoolVarP(&all, "print-all", "a", false, "Print all sections")
cmd.Flags().BoolVarP(&re, "regex", "r", false, "Parse arguments as regex; describe any topic that matches any input topic expression")

cmd.Flags().BoolVar(&stable, "stable", false, "Include the stable offsets column in the partitions section; only relevant if you produce to this topic transactionally")

Expand Down

0 comments on commit 9599755

Please sign in to comment.