From 5211881a7ae4a205dc9cdd67fb64f94f2d702c99 Mon Sep 17 00:00:00 2001 From: Daisuke Kobayashi Date: Fri, 3 May 2024 01:02:01 +0900 Subject: [PATCH] rpk: topic describe supports multiple topics and --regex flag (cherry picked from commit e6996a97581db56cd555b684a12f7afe64f2b08b) --- src/go/rpk/pkg/cli/topic/describe.go | 198 +++++++++++++++------------ 1 file changed, 113 insertions(+), 85 deletions(-) diff --git a/src/go/rpk/pkg/cli/topic/describe.go b/src/go/rpk/pkg/cli/topic/describe.go index 06703e7fd0517..fdf69e7a83dba 100644 --- a/src/go/rpk/pkg/cli/topic/describe.go +++ b/src/go/rpk/pkg/cli/topic/describe.go @@ -12,6 +12,7 @@ package topic import ( "context" "errors" + "fmt" "sort" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" @@ -31,20 +32,32 @@ func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command { summary bool configs bool partitions bool + re bool stable bool ) cmd := &cobra.Command{ - Use: "describe [TOPIC]", + Use: "describe [TOPICS]", Aliases: []string{"info"}, - Short: "Describe a topic", - Long: `Describe a topic. + Short: "Describe topics", + Long: `Describe topics. -This command prints detailed information about a topic. There are three -potential sections: a summary of the topic, the topic configs, and a detailed +This command prints detailed information about topics. The output contains +up to three sections: a summary of the topic, the topic configs, and a detailed partitions section. By default, the summary and configs sections are printed. + +The --regex flag (-r) parses arguments as regular expressions +and describes topics that match any of the expressions. + +For example, + + describe foo bar # describe topics foo and bar + describe -r '^f.*' '.*r$' # describe any topic starting with f and any topics ending in r + describe -r '*' # describe all topics + describe -r . # describe any one-character topics + `, - Args: cobra.ExactArgs(1), + Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, topicArg []string) { p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) @@ -53,31 +66,38 @@ partitions section. By default, the summary and configs sections are printed. out.MaybeDie(err, "unable to initialize kafka client: %v", err) defer cl.Close() - topic := topicArg[0] + adm, err := kafka.NewAdmin(fs, p) + out.MaybeDie(err, "unable to initialize kafka client: %v", err) + defer adm.Close() + + if re { + topicArg, err = regexTopics(adm, topicArg) + out.MaybeDie(err, "unable to filter topics by regex: %v", err) + } // By default, if neither are specified, we opt in to // the config section only. if !summary && !configs && !partitions { summary, configs = true, true } - if all { + + // We show all sections if: + // - "print-all" is used or + // - more than one topic are specified or matched. + if all || len(topicArg) > 1 { summary, configs, partitions = true, true, true + } else if len(topicArg) == 0 { + out.Exit("did not match any topics, exiting.") } - var t kmsg.MetadataResponseTopic - { - req := kmsg.NewPtrMetadataRequest() + req := kmsg.NewPtrMetadataRequest() + for _, topic := range topicArg { reqTopic := kmsg.NewMetadataRequestTopic() reqTopic.Topic = kmsg.StringPtr(topic) req.Topics = append(req.Topics, reqTopic) - - resp, err := req.RequestWith(context.Background(), cl) - out.MaybeDie(err, "unable to request topic metadata: %v", err) - if len(resp.Topics) != 1 { - out.Die("metadata response returned %d topics when we asked for 1", len(resp.Topics)) - } - t = resp.Topics[0] } + resp, err := req.RequestWith(context.Background(), cl) + out.MaybeDie(err, "unable to request topic metadata: %v", err) const ( secSummary = "summary" @@ -85,75 +105,82 @@ partitions section. By default, the summary and configs sections are printed. secPart = "partitions" ) - sections := out.NewMaybeHeaderSections( - out.ConditionalSectionHeaders(map[string]bool{ - secSummary: summary, - secConfigs: configs, - secPart: partitions, - })..., - ) - - sections.Add(secSummary, func() { - tw := out.NewTabWriter() - defer tw.Flush() - tw.PrintColumn("NAME", *t.Topic) - if t.IsInternal { - tw.PrintColumn("INTERNAL", t.IsInternal) - } - tw.PrintColumn("PARTITIONS", len(t.Partitions)) - if len(t.Partitions) > 0 { - p0 := &t.Partitions[0] - tw.PrintColumn("REPLICAS", len(p0.Replicas)) - } - if err := kerr.ErrorForCode(t.ErrorCode); err != nil { - tw.PrintColumn("ERROR", err) - } - }) - - sections.Add(secConfigs, func() { - req := kmsg.NewPtrDescribeConfigsRequest() - reqResource := kmsg.NewDescribeConfigsRequestResource() - reqResource.ResourceType = kmsg.ConfigResourceTypeTopic - reqResource.ResourceName = topic - req.Resources = append(req.Resources, reqResource) - - resp, err := req.RequestWith(context.Background(), cl) - out.MaybeDie(err, "unable to request configs: %v", err) - if len(resp.Resources) != 1 { - out.Die("config response returned %d resources when we asked for 1", len(resp.Resources)) - } - err = kerr.ErrorForCode(resp.Resources[0].ErrorCode) - out.MaybeDie(err, "config response contained error: %v", err) - - tw := out.NewTable("KEY", "VALUE", "SOURCE") - defer tw.Flush() - types.Sort(resp) - for _, config := range resp.Resources[0].Configs { - var val string - if config.IsSensitive { - val = "(sensitive)" - } else if config.Value != nil { - val = *config.Value + for i, topic := range resp.Topics { + sections := out.NewMaybeHeaderSections( + out.ConditionalSectionHeaders(map[string]bool{ + secSummary: summary, + secConfigs: configs, + secPart: partitions, + })..., + ) + + sections.Add(secSummary, func() { + tw := out.NewTabWriter() + defer tw.Flush() + tw.PrintColumn("NAME", *topic.Topic) + if topic.IsInternal { + tw.PrintColumn("INTERNAL", topic.IsInternal) } - tw.Print(config.Name, val, config.Source) - } - }) - - sections.Add(secPart, func() { - offsets := listStartEndOffsets(cl, topic, len(t.Partitions), stable) - - tw := out.NewTable(describePartitionsHeaders( - t.Partitions, - offsets, - )...) - defer tw.Flush() - for _, row := range describePartitionsRows( - t.Partitions, - offsets, - ) { - tw.Print(row...) + tw.PrintColumn("PARTITIONS", len(topic.Partitions)) + if len(topic.Partitions) > 0 { + p0 := &topic.Partitions[0] + tw.PrintColumn("REPLICAS", len(p0.Replicas)) + } + if err := kerr.ErrorForCode(topic.ErrorCode); err != nil { + tw.PrintColumn("ERROR", err) + } + }) + + sections.Add(secConfigs, func() { + req := kmsg.NewPtrDescribeConfigsRequest() + reqResource := kmsg.NewDescribeConfigsRequestResource() + reqResource.ResourceType = kmsg.ConfigResourceTypeTopic + reqResource.ResourceName = *topic.Topic + req.Resources = append(req.Resources, reqResource) + + resp, err := req.RequestWith(context.Background(), cl) + out.MaybeDie(err, "unable to request configs: %v", err) + if len(resp.Resources) != 1 { + out.Die("config response returned %d resources when we asked for 1", len(resp.Resources)) + } + err = kerr.ErrorForCode(resp.Resources[0].ErrorCode) + out.MaybeDie(err, "config response contained error: %v", err) + + tw := out.NewTable("KEY", "VALUE", "SOURCE") + defer tw.Flush() + types.Sort(resp) + for _, config := range resp.Resources[0].Configs { + var val string + if config.IsSensitive { + val = "(sensitive)" + } else if config.Value != nil { + val = *config.Value + } + tw.Print(config.Name, val, config.Source) + } + }) + + sections.Add(secPart, func() { + offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable) + + tw := out.NewTable(describePartitionsHeaders( + topic.Partitions, + offsets, + )...) + defer tw.Flush() + for _, row := range describePartitionsRows( + topic.Partitions, + offsets, + ) { + tw.Print(row...) + } + }) + + i++ + if i < len(resp.Topics) { + fmt.Println() } - }) + } }, } @@ -170,6 +197,7 @@ partitions section. By default, the summary and configs sections are printed. cmd.Flags().BoolVarP(&configs, "print-configs", "c", false, "Print the config section") cmd.Flags().BoolVarP(&partitions, "print-partitions", "p", false, "Print the detailed partitions section") cmd.Flags().BoolVarP(&all, "print-all", "a", false, "Print all sections") + cmd.Flags().BoolVarP(&re, "regex", "r", false, "Parse arguments as regex; describe any topic that matches any input topic expression") cmd.Flags().BoolVar(&stable, "stable", false, "Include the stable offsets column in the partitions section; only relevant if you produce to this topic transactionally")