diff --git a/src/go/rpk/pkg/cli/topic/describe.go b/src/go/rpk/pkg/cli/topic/describe.go index cdbaeb517184d..33ed6350f0623 100644 --- a/src/go/rpk/pkg/cli/topic/describe.go +++ b/src/go/rpk/pkg/cli/topic/describe.go @@ -13,6 +13,7 @@ import ( "context" "errors" "fmt" + "github.com/twmb/types" "io" "os" "sort" @@ -60,7 +61,7 @@ For example, `, Args: cobra.MinimumNArgs(1), - Run: func(_ *cobra.Command, topicArg []string) { + Run: func(cmd *cobra.Command, topicArg []string) { f := p.Formatter if h, ok := f.Help([]describedTopic{}); ok { out.Exit(h) @@ -81,6 +82,8 @@ For example, out.MaybeDie(err, "unable to filter topics by regex: %v", err) } + // By default, if neither are specified, we opt in to + // the config section only. if !summary && !configs && !partitions { summary, configs = true, true } @@ -90,6 +93,8 @@ For example, // - more than one topic are specified or matched. if all || len(topicArg) > 1 { summary, configs, partitions = true, true, true + } else if len(topicArg) == 0 { + out.Exit("did not match any topics, exiting.") } req := kmsg.NewPtrMetadataRequest() @@ -98,32 +103,28 @@ For example, reqTopic.Topic = kmsg.StringPtr(topic) req.Topics = append(req.Topics, reqTopic) } - resp, err := req.RequestWith(context.Background(), cl) + resp, err := req.RequestWith(cmd.Context(), cl) out.MaybeDie(err, "unable to request topic metadata: %v", err) var topicDescriptions []describedTopic for _, topic := range resp.Topics { - offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable) + offsets := listStartEndOffsets(cmd.Context(), cl, *topic.Topic, len(topic.Partitions), stable) u := getDescribeUsed(topic.Partitions, offsets) - cfg, cfgErr := prepDescribeTopicConfig(topic, cl) + cfg, cfgErr := prepDescribeTopicConfig(cmd.Context(), topic, cl) + out.MaybeDieErr(cfgErr) t := describedTopic{ Summary: buildDescribeTopicSummary(topic), Configs: buildDescribeTopicConfig(cfg), Partitions: buildDescribeTopicPartitions(topic.Partitions, offsets, u), - cfgErr: cfgErr, u: u, } topicDescriptions = append(topicDescriptions, t) } - if len(topicDescriptions) == 0 { - out.Exit("[]") - } if printDescribedTopicsFormatter(f, topicDescriptions, os.Stdout) { return } printDescribedTopics(summary, configs, partitions, topicDescriptions) - out.MaybeDie(err, "unable to request topic metadata: %v", err) }, } @@ -184,12 +185,11 @@ func printDescribedTopics(summary, configs, partitions bool, topics []describedT if topic.Summary.Partitions > 0 { tw.PrintColumn("REPLICAS", topic.Summary.Replicas) } - if topic.Summary.isErr { + if topic.Summary.Error != "" { tw.PrintColumn("ERROR", topic.Summary.Error) } }) sections.Add(secConfigs, func() { - out.MaybeDie(topic.cfgErr, "config response contained error: %v", topic.cfgErr) tw := out.NewTable("KEY", "VALUE", "SOURCE") defer tw.Flush() for _, c := range topic.Configs { @@ -211,7 +211,6 @@ type describedTopic struct { Configs []describeTopicConfig `json:"configs" yaml:"configs"` Partitions []describeTopicPartition `json:"partitions" yaml:"partitions"` u uses - cfgErr error } type describeTopicSummary struct { @@ -220,11 +219,10 @@ type describeTopicSummary struct { Partitions int `json:"partitions" yaml:"partitions"` Replicas int `json:"replicas" yaml:"replicas"` Error string `json:"error" yaml:"error"` - isErr bool } -func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) (resp describeTopicSummary) { - resp = describeTopicSummary{ +func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) describeTopicSummary { + resp := describeTopicSummary{ Name: *topic.Topic, Internal: topic.IsInternal, Partitions: len(topic.Partitions), @@ -233,10 +231,9 @@ func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) (resp describeT resp.Replicas = len(topic.Partitions[0].Replicas) } if err := kerr.ErrorForCode(topic.ErrorCode); err != nil { - resp.isErr = true resp.Error = err.Error() } - return + return resp } type describeTopicConfig struct { @@ -245,24 +242,30 @@ type describeTopicConfig struct { Source string `json:"source" yaml:"source"` } -func prepDescribeTopicConfig(topic kmsg.MetadataResponseTopic, cl *kgo.Client) ([]kmsg.DescribeConfigsResponseResourceConfig, error) { +func prepDescribeTopicConfig(ctx context.Context, topic kmsg.MetadataResponseTopic, cl *kgo.Client) ([]kmsg.DescribeConfigsResponseResourceConfig, error) { req := kmsg.NewPtrDescribeConfigsRequest() reqResource := kmsg.NewDescribeConfigsRequestResource() reqResource.ResourceType = kmsg.ConfigResourceTypeTopic reqResource.ResourceName = *topic.Topic req.Resources = append(req.Resources, reqResource) - resp, err := req.RequestWith(context.Background(), cl) - out.MaybeDie(err, "unable to request configs: %v", err) + resp, err := req.RequestWith(ctx, cl) + if err != nil { + return nil, fmt.Errorf("unable to request configs: %v", err) + } if len(resp.Resources) != 1 { - out.Die("config response returned %d resources when we asked for 1", len(resp.Resources)) + return nil, fmt.Errorf("config response returned %d resources when we asked for 1", len(resp.Resources)) } err = kerr.ErrorForCode(resp.Resources[0].ErrorCode) - return resp.Resources[0].Configs, err + if err != nil { + return nil, fmt.Errorf("config response contained error: %v", err) + } + return resp.Resources[0].Configs, nil } -func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) (output []describeTopicConfig) { - output = make([]describeTopicConfig, 0, len(configs)) +func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) []describeTopicConfig { + output := make([]describeTopicConfig, 0, len(configs)) + types.Sort(configs) for _, cfg := range configs { d := describeTopicConfig{ Key: cfg.Name, @@ -275,7 +278,7 @@ func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConf } output = append(output, d) } - return + return output } type describeTopicPartition struct { @@ -454,7 +457,7 @@ var errUnlisted = errors.New("list failed") // always contain the one topic we asked for, and it will contain all // partitions we asked for. The logic below will panic redpanda replies // incorrectly. -func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset { +func listStartEndOffsets(ctx context.Context, cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset { offsets := make([]startStableEndOffset, 0, numPartitions) for i := 0; i < numPartitions; i++ { @@ -480,7 +483,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable reqTopic.Partitions = append(reqTopic.Partitions, part) } req.Topics = append(req.Topics, reqTopic) - shards := cl.RequestSharded(context.Background(), req) + shards := cl.RequestSharded(ctx, req) allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { @@ -511,7 +514,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable // transactions are in play. if stable { req.IsolationLevel = 1 - shards = cl.RequestSharded(context.Background(), req) + shards = cl.RequestSharded(ctx, req) allFailed = kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { @@ -528,7 +531,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable } // Finally, the HWM. - shards = cl.RequestSharded(context.Background(), req) + shards = cl.RequestSharded(ctx, req) kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { diff --git a/src/go/rpk/pkg/cli/topic/describe_test.go b/src/go/rpk/pkg/cli/topic/describe_test.go index 671ff0f6bb529..68917fa27be5a 100644 --- a/src/go/rpk/pkg/cli/topic/describe_test.go +++ b/src/go/rpk/pkg/cli/topic/describe_test.go @@ -368,7 +368,6 @@ func TestPrintDescribedTopicsFormatter(t *testing.T) { Partitions: 0, Replicas: 0, Error: "UNKNOWN_TOPIC_OR_PARTITION", - isErr: true, }, Configs: []describeTopicConfig{}, Partitions: []describeTopicPartition{}, @@ -553,7 +552,6 @@ func TestPrintDescribedTopicsFormatter(t *testing.T) { Partitions: 0, Replicas: 0, Error: "UNKNOWN_TOPIC_OR_PARTITION", - isErr: true, }, }, }, @@ -730,7 +728,6 @@ compression.type producer DYNAMIC_TOPIC_CONFIG Partitions: 0, Replicas: 0, Error: "UNKNOWN_TOPIC_OR_PARTITION", - isErr: true, }, }, },