diff --git a/src/go/rpk/pkg/cli/topic/BUILD b/src/go/rpk/pkg/cli/topic/BUILD index 3e611e0f3955b..c4aa966bcf451 100644 --- a/src/go/rpk/pkg/cli/topic/BUILD +++ b/src/go/rpk/pkg/cli/topic/BUILD @@ -55,6 +55,7 @@ go_test( deps = [ "//src/go/rpk/pkg/config", "@com_github_spf13_afero//:afero", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_twmb_franz_go//pkg/kerr", "@com_github_twmb_franz_go_pkg_kadm//:kadm", diff --git a/src/go/rpk/pkg/cli/topic/describe.go b/src/go/rpk/pkg/cli/topic/describe.go index 060ab0f6d854b..709439eef302e 100644 --- a/src/go/rpk/pkg/cli/topic/describe.go +++ b/src/go/rpk/pkg/cli/topic/describe.go @@ -13,7 +13,10 @@ import ( "context" "errors" "fmt" + "io" + "os" "sort" + "strconv" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka" @@ -45,6 +48,9 @@ This command prints detailed information about topics. The output contains up to three sections: a summary of the topic, the topic configs, and a detailed partitions section. By default, the summary and configs sections are printed. +Using the --format flag with either JSON or YAML will default in printing +all the topic information (--all). + The --regex flag (-r) parses arguments as regular expressions and describes topics that match any of the expressions. @@ -54,11 +60,13 @@ For example, describe -r '^f.*' '.*r$' # describe any topic starting with f and any topics ending in r describe -r '*' # describe all topics describe -r . # describe any one-character topics - `, - Args: cobra.MinimumNArgs(1), - Run: func(_ *cobra.Command, topicArg []string) { + Run: func(cmd *cobra.Command, topicArg []string) { + f := p.Formatter + if h, ok := f.Help([]describedTopic{}); ok { + out.Exit(h) + } p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) @@ -84,7 +92,8 @@ For example, // We show all sections if: // - "print-all" is used or // - more than one topic are specified or matched. - if all || len(topicArg) > 1 { + // - the formatter is not text (json/yaml). + if all || len(topicArg) > 1 || !f.IsText() { summary, configs, partitions = true, true, true } else if len(topicArg) == 0 { out.Exit("did not match any topics, exiting.") @@ -96,94 +105,41 @@ For example, reqTopic.Topic = kmsg.StringPtr(topic) req.Topics = append(req.Topics, reqTopic) } - resp, err := req.RequestWith(context.Background(), cl) + resp, err := req.RequestWith(cmd.Context(), cl) out.MaybeDie(err, "unable to request topic metadata: %v", err) - const ( - secSummary = "summary" - secConfigs = "configs" - secPart = "partitions" - ) - - for i, topic := range resp.Topics { - sections := out.NewMaybeHeaderSections( - out.ConditionalSectionHeaders(map[string]bool{ - secSummary: summary, - secConfigs: configs, - secPart: partitions, - })..., - ) - - sections.Add(secSummary, func() { - tw := out.NewTabWriter() - defer tw.Flush() - tw.PrintColumn("NAME", *topic.Topic) - if topic.IsInternal { - tw.PrintColumn("INTERNAL", topic.IsInternal) - } - tw.PrintColumn("PARTITIONS", len(topic.Partitions)) - if len(topic.Partitions) > 0 { - p0 := &topic.Partitions[0] - tw.PrintColumn("REPLICAS", len(p0.Replicas)) - } - if err := kerr.ErrorForCode(topic.ErrorCode); err != nil { - tw.PrintColumn("ERROR", err) - } - }) - - sections.Add(secConfigs, func() { - req := kmsg.NewPtrDescribeConfigsRequest() - reqResource := kmsg.NewDescribeConfigsRequestResource() - reqResource.ResourceType = kmsg.ConfigResourceTypeTopic - reqResource.ResourceName = *topic.Topic - req.Resources = append(req.Resources, reqResource) - - resp, err := req.RequestWith(context.Background(), cl) - out.MaybeDie(err, "unable to request configs: %v", err) - if len(resp.Resources) != 1 { - out.Die("config response returned %d resources when we asked for 1", len(resp.Resources)) - } - err = kerr.ErrorForCode(resp.Resources[0].ErrorCode) - out.MaybeDie(err, "config response contained error: %v", err) - - tw := out.NewTable("KEY", "VALUE", "SOURCE") - defer tw.Flush() - types.Sort(resp) - for _, config := range resp.Resources[0].Configs { - var val string - if config.IsSensitive { - val = "(sensitive)" - } else if config.Value != nil { - val = *config.Value - } - tw.Print(config.Name, val, config.Source) - } - }) - - sections.Add(secPart, func() { - offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable) - - tw := out.NewTable(describePartitionsHeaders( - topic.Partitions, - offsets, - )...) - defer tw.Flush() - for _, row := range describePartitionsRows( - topic.Partitions, - offsets, - ) { - tw.Print(row...) + var topicDescriptions []describedTopic + for _, topic := range resp.Topics { + var t describedTopic + if summary { + t.Summary = buildDescribeTopicSummary(topic) + } + if configs { + cfgResp, cfgErr := prepDescribeTopicConfig(cmd.Context(), topic, cl) + out.MaybeDieErr(cfgErr) + err = kerr.ErrorForCode(cfgResp.ErrorCode) + if err != nil { + t.cfgErr = err } - }) - - i++ - if i < len(resp.Topics) { - fmt.Println() + t.Configs = buildDescribeTopicConfig(cfgResp.Configs) + } + if partitions { + offsets := listStartEndOffsets(cmd.Context(), cl, *topic.Topic, len(topic.Partitions), stable) + u := getDescribeUsed(topic.Partitions, offsets) + t.Partitions = buildDescribeTopicPartitions(topic.Partitions, offsets, u) + t.u = u } + topicDescriptions = append(topicDescriptions, t) } + + if printDescribedTopicsFormatter(f, topicDescriptions, os.Stdout) { + return + } + printDescribedTopics(summary, configs, partitions, topicDescriptions) }, } + p.InstallFormatFlag(cmd) cmd.Flags().IntVar(new(int), "page", -1, "deprecated") cmd.Flags().IntVar(new(int), "page-size", 20, "deprecated") cmd.Flags().BoolVar(new(bool), "watermarks", true, "deprecated") @@ -204,106 +160,291 @@ For example, return cmd } -// We optionally include the following columns: -// - offline-replicas, if any are offline -// - load-error, if metadata indicates load errors any partitions -// - last-stable-offset, if it is ever not equal to the high watermark (transactions) -func getDescribeUsed(partitions []kmsg.MetadataResponseTopicPartition, offsets []startStableEndOffset) (useOffline, useErr, useStable bool) { - for _, p := range partitions { - if len(p.OfflineReplicas) > 0 { - useOffline = true - } - if p.ErrorCode != 0 { - useErr = true - } +func printDescribedTopicsFormatter(f config.OutFormatter, topics []describedTopic, w io.Writer) bool { + if isText, _, t, err := f.Format(topics); !isText { + out.MaybeDie(err, "unable to print in the requested format %v", err) + fmt.Fprintln(w, t) + return true } - for _, o := range offsets { - // The default stableErr is errUnlisted. We avoid listing - // stable offsets unless the user asks, so by default, we do - // not print the stable column. - if o.stableErr == nil && o.endErr == nil && o.stable != o.end { - useStable = true + return false +} + +func printDescribedTopics(summary, configs, partitions bool, topics []describedTopic) { + const ( + secSummary = "summary" + secConfigs = "configs" + secPart = "partitions" + ) + + for _, topic := range topics { + sections := out.NewMaybeHeaderSections( + out.ConditionalSectionHeaders(map[string]bool{ + secSummary: summary, + secConfigs: configs, + secPart: partitions, + })..., + ) + + sections.Add(secSummary, func() { + tw := out.NewTabWriter() + defer tw.Flush() + tw.PrintColumn("NAME", topic.Summary.Name) + if topic.Summary.Internal { + tw.PrintColumn("INTERNAL", topic.Summary.Internal) + } + tw.PrintColumn("PARTITIONS", topic.Summary.Partitions) + if topic.Summary.Partitions > 0 { + tw.PrintColumn("REPLICAS", topic.Summary.Replicas) + } + if topic.Summary.Error != "" { + tw.PrintColumn("ERROR", topic.Summary.Error) + } + }) + sections.Add(secConfigs, func() { + out.MaybeDie(topic.cfgErr, "config response contained error: %v", topic.cfgErr) + tw := out.NewTable("KEY", "VALUE", "SOURCE") + defer tw.Flush() + for _, c := range topic.Configs { + tw.Print(c.Key, c.Value, c.Source) + } + }) + sections.Add(secPart, func() { + tw := out.NewTable(partitionHeader(topic.u)...) + defer tw.Flush() + for _, row := range topic.Partitions { + tw.PrintStrings(row.Row(topic.u)...) + } + }) + } +} + +type describedTopic struct { + Summary describeTopicSummary `json:"summary" yaml:"summary"` + Configs []describeTopicConfig `json:"configs" yaml:"configs"` + Partitions []describeTopicPartition `json:"partitions" yaml:"partitions"` + u uses + cfgErr error +} + +type describeTopicSummary struct { + Name string `json:"name" yaml:"name"` + Internal bool `json:"internal" yaml:"internal"` + Partitions int `json:"partitions" yaml:"partitions"` + Replicas int `json:"replicas" yaml:"replicas"` + Error string `json:"error" yaml:"error"` +} + +func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) describeTopicSummary { + resp := describeTopicSummary{ + Name: *topic.Topic, + Internal: topic.IsInternal, + Partitions: len(topic.Partitions), + } + if len(topic.Partitions) > 0 { + resp.Replicas = len(topic.Partitions[0].Replicas) + } + if err := kerr.ErrorForCode(topic.ErrorCode); err != nil { + resp.Error = err.Error() + } + return resp +} + +type describeTopicConfig struct { + Key string `json:"key" yaml:"key"` + Value string `json:"value" yaml:"value"` + Source string `json:"source" yaml:"source"` +} + +func prepDescribeTopicConfig(ctx context.Context, topic kmsg.MetadataResponseTopic, cl *kgo.Client) (*kmsg.DescribeConfigsResponseResource, error) { + req := kmsg.NewPtrDescribeConfigsRequest() + reqResource := kmsg.NewDescribeConfigsRequestResource() + reqResource.ResourceType = kmsg.ConfigResourceTypeTopic + reqResource.ResourceName = *topic.Topic + req.Resources = append(req.Resources, reqResource) + + resp, err := req.RequestWith(ctx, cl) + if err != nil { + return nil, fmt.Errorf("unable to request configs: %v", err) + } + if len(resp.Resources) != 1 { + return nil, fmt.Errorf("config response returned %d resources when we asked for 1", len(resp.Resources)) + } + return &resp.Resources[0], nil +} + +func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) []describeTopicConfig { + output := make([]describeTopicConfig, 0, len(configs)) + types.Sort(configs) + for _, cfg := range configs { + d := describeTopicConfig{ + Key: cfg.Name, + Source: cfg.Source.String(), + } + if cfg.IsSensitive { + d.Value = "(sensitive)" + } else if cfg.Value != nil { + d.Value = *cfg.Value } + output = append(output, d) } - return + return output +} + +type describeTopicPartition struct { + Partition int32 `json:"partition" yaml:"partition"` + Leader int32 `json:"leader" yaml:"leader"` + Epoch int32 `json:"epoch" yaml:"epoch"` + Replicas []int32 `json:"replicas" yaml:"replicas"` + OfflineReplicas []int32 `json:"offline_replicas,omitempty" yaml:"offline_replicas,omitempty"` + LoadError string `json:"load_error,omitempty" yaml:"load_error,omitempty"` + LogStartOffset int64 `json:"log_start_offset" yaml:"log_start_offset"` + logStartOffsetText any + LastStableOffset int64 `json:"last_stable_offset,omitempty" yaml:"last_stable_offset,omitempty"` + lastStableOffsetText any + HighWatermark int64 `json:"high_watermark" yaml:"high_watermark"` + highWatermarkText any + Errors []string `json:"error,omitempty" yaml:"error,omitempty"` } -func describePartitionsHeaders( - partitions []kmsg.MetadataResponseTopicPartition, - offsets []startStableEndOffset, -) []string { - offline, err, stable := getDescribeUsed(partitions, offsets) - headers := []string{"partition", "leader", "epoch"} - headers = append(headers, "replicas") // TODO add isr see #1928 - if offline { +func partitionHeader(u uses) []string { + headers := []string{ + "partition", + "leader", + "epoch", + "replicas", + } + + if u.Offline { headers = append(headers, "offline-replicas") } - if err { + if u.LoadErr { headers = append(headers, "load-error") } headers = append(headers, "log-start-offset") - if stable { + if u.Stable { headers = append(headers, "last-stable-offset") } headers = append(headers, "high-watermark") return headers } -func describePartitionsRows( - partitions []kmsg.MetadataResponseTopicPartition, - offsets []startStableEndOffset, -) [][]interface{} { +type uses struct { + Offline bool + LoadErr bool + Stable bool +} + +func (dp describeTopicPartition) Row(u uses) []string { + row := []string{ + strconv.FormatInt(int64(dp.Partition), 10), + strconv.FormatInt(int64(dp.Leader), 10), + strconv.FormatInt(int64(dp.Epoch), 10), + fmt.Sprintf("%v", dp.Replicas), + } + + if u.Offline { + row = append(row, fmt.Sprintf("%v", dp.OfflineReplicas)) + } + + if u.LoadErr { + row = append(row, dp.LoadError) + } + row = append(row, fmt.Sprintf("%v", dp.logStartOffsetText)) + + if u.Stable { + row = append(row, fmt.Sprintf("%v", dp.lastStableOffsetText)) + } + row = append(row, fmt.Sprintf("%v", dp.highWatermarkText)) + return row +} + +func buildDescribeTopicPartitions(partitions []kmsg.MetadataResponseTopicPartition, offsets []startStableEndOffset, u uses) (resp []describeTopicPartition) { sort.Slice(partitions, func(i, j int) bool { return partitions[i].Partition < partitions[j].Partition }) - - offline, err, stable := getDescribeUsed(partitions, offsets) - var rows [][]interface{} for _, p := range partitions { - row := []interface{}{p.Partition, p.Leader, p.LeaderEpoch} - row = append(row, int32s(p.Replicas).sort()) - if offline { - row = append(row, int32s(p.OfflineReplicas).sort()) + row := describeTopicPartition{ + Partition: p.Partition, + Leader: p.Leader, + Epoch: p.LeaderEpoch, + Replicas: int32s(p.Replicas).sort(), + } + if u.Offline { + row.OfflineReplicas = int32s(p.OfflineReplicas).sort() } - if err { + if u.LoadErr { if err := kerr.ErrorForCode(p.ErrorCode); err != nil { - row = append(row, err) + row.LoadError = err.Error() } else { - row = append(row, "-") + row.LoadError = "-" } } - - // For offsets, we have three options: - // - we listed the offset successfully, we write the number - // - list offsets, we write "-" - // - the partition had a partition error, we write the kerr.Error message o := offsets[p.Partition] if o.startErr == nil { - row = append(row, o.start) + row.LogStartOffset = o.start + row.logStartOffsetText = o.start } else if errors.Is(o.startErr, errUnlisted) { - row = append(row, "-") + row.LogStartOffset = -1 + row.logStartOffsetText = "-" } else { - row = append(row, o.startErr.(*kerr.Error).Message) //nolint:errorlint // This error must be kerr.Error, and we want the message + row.LogStartOffset = -1 + err := o.startErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message + row.logStartOffsetText = err + row.Errors = append(row.Errors, err) } - if stable { + if u.Stable { if o.stableErr == nil { - row = append(row, o.stable) + row.LastStableOffset = o.stable + row.lastStableOffsetText = o.stable } else if errors.Is(o.stableErr, errUnlisted) { - row = append(row, "-") + row.LastStableOffset = -1 + row.lastStableOffsetText = "-" } else { - row = append(row, o.stableErr.(*kerr.Error).Message) //nolint:errorlint // This error must be kerr.Error, and we want the message + row.LastStableOffset = -1 + err := o.stableErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message + row.lastStableOffsetText = err + row.Errors = append(row.Errors, err) } } if o.endErr == nil { - row = append(row, o.end) + row.HighWatermark = o.end + row.highWatermarkText = o.end } else if errors.Is(o.endErr, errUnlisted) { - row = append(row, "-") + row.HighWatermark = -1 + row.highWatermarkText = "-" } else { - row = append(row, o.endErr.(*kerr.Error).Message) //nolint:errorlint // This error must be kerr.Error, and we want the message + row.HighWatermark = -1 + err := o.endErr.(*kerr.Error).Message //nolint:errorlint // This error must be kerr.Error, and we want the message + row.highWatermarkText = err + row.Errors = append(row.Errors, err) } - rows = append(rows, row) + resp = append(resp, row) } - return rows + return resp +} + +// We optionally include the following columns: +// - offline-replicas, if any are offline +// - load-error, if metadata indicates load errors any partitions +// - last-stable-offset, if it is ever not equal to the high watermark (transactions) +func getDescribeUsed(partitions []kmsg.MetadataResponseTopicPartition, offsets []startStableEndOffset) (u uses) { + for _, p := range partitions { + if len(p.OfflineReplicas) > 0 { + u.Offline = true + } + if p.ErrorCode != 0 { + u.LoadErr = true + } + } + for _, o := range offsets { + // The default stableErr is errUnlisted. We avoid listing + // stable offsets unless the user asks, so by default, we do + // not print the stable column. + if o.stableErr == nil && o.endErr == nil && o.stable != o.end { + u.Stable = true + } + } + return } type startStableEndOffset struct { @@ -325,7 +466,7 @@ var errUnlisted = errors.New("list failed") // always contain the one topic we asked for, and it will contain all // partitions we asked for. The logic below will panic redpanda replies // incorrectly. -func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset { +func listStartEndOffsets(ctx context.Context, cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset { offsets := make([]startStableEndOffset, 0, numPartitions) for i := 0; i < numPartitions; i++ { @@ -351,7 +492,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable reqTopic.Partitions = append(reqTopic.Partitions, part) } req.Topics = append(req.Topics, reqTopic) - shards := cl.RequestSharded(context.Background(), req) + shards := cl.RequestSharded(ctx, req) allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { @@ -382,7 +523,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable // transactions are in play. if stable { req.IsolationLevel = 1 - shards = cl.RequestSharded(context.Background(), req) + shards = cl.RequestSharded(ctx, req) allFailed = kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { @@ -399,7 +540,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable } // Finally, the HWM. - shards = cl.RequestSharded(context.Background(), req) + shards = cl.RequestSharded(ctx, req) kafka.EachShard(req, shards, func(shard kgo.ResponseShard) { resp := shard.Resp.(*kmsg.ListOffsetsResponse) if len(resp.Topics) > 0 { diff --git a/src/go/rpk/pkg/cli/topic/describe_test.go b/src/go/rpk/pkg/cli/topic/describe_test.go index d7a66df046535..68917fa27be5a 100644 --- a/src/go/rpk/pkg/cli/topic/describe_test.go +++ b/src/go/rpk/pkg/cli/topic/describe_test.go @@ -1,136 +1,773 @@ package topic import ( + "bytes" + "encoding/json" + "io" + "os" + "strings" "testing" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" ) -func TestDescribePartitions(t *testing.T) { - // inputs: conditionals for what columns, as well as the rows - // test: ensure which headers are returned, and args - - for _, test := range []struct { - name string - - inMeta []kmsg.MetadataResponseTopicPartition - inOffsets []startStableEndOffset - - expHeaders []string - expRows [][]interface{} +func TestBuildDescribeTopicPartitions(t *testing.T) { + testCases := []struct { + name string + partitions []kmsg.MetadataResponseTopicPartition + offsets []startStableEndOffset + uses uses + expUseErr bool + expected []describeTopicPartition }{ { - name: "all ok, no optional columns, one partition", - - inMeta: []kmsg.MetadataResponseTopicPartition{ + name: "Normal case", + partitions: []kmsg.MetadataResponseTopicPartition{ + { + Partition: 0, + Leader: 1, + LeaderEpoch: 5, + Replicas: []int32{1, 2, 3}, + }, + { + Partition: 1, + Leader: 2, + LeaderEpoch: 3, + Replicas: []int32{1, 2, 3}, + OfflineReplicas: []int32{3}, + }, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 100, end: 100, startErr: nil, stableErr: nil, endErr: nil}, + {start: 50, stable: 150, end: 200, startErr: nil, stableErr: nil, endErr: nil}, + }, + uses: uses{Offline: true, Stable: true}, + expected: []describeTopicPartition{ + { + Partition: 0, + Leader: 1, + Epoch: 5, + Replicas: []int32{1, 2, 3}, + OfflineReplicas: []int32{}, + LogStartOffset: 0, + logStartOffsetText: int64(0), + LastStableOffset: 100, + lastStableOffsetText: int64(100), + HighWatermark: 100, + highWatermarkText: int64(100), + }, + { + Partition: 1, + Leader: 2, + Epoch: 3, + Replicas: []int32{1, 2, 3}, + OfflineReplicas: []int32{3}, + LogStartOffset: 50, + logStartOffsetText: int64(50), + LastStableOffset: 150, + lastStableOffsetText: int64(150), + HighWatermark: 200, + highWatermarkText: int64(200), + }, + }, + }, + { + name: "With errors", + partitions: []kmsg.MetadataResponseTopicPartition{ { Partition: 0, - Leader: 0, - ErrorCode: 0, - LeaderEpoch: -1, - Replicas: []int32{0, 1, 2}, + Leader: 1, + LeaderEpoch: 5, + Replicas: []int32{1, 2, 3}, + ErrorCode: 9, // REPLICA_NOT_AVAILABLE error code + }, + }, + offsets: []startStableEndOffset{ + { + start: -1, stable: -1, end: -1, + startErr: kerr.ErrorForCode(9), stableErr: errUnlisted, endErr: kerr.ErrorForCode(9), + }, + }, + uses: uses{LoadErr: true}, + expUseErr: true, + expected: []describeTopicPartition{ + { + Partition: 0, + Leader: 1, + Epoch: 5, + Replicas: []int32{1, 2, 3}, + LoadError: "REPLICA_NOT_AVAILABLE: The replica is not available for the requested topic-partition.", + LogStartOffset: -1, + logStartOffsetText: "REPLICA_NOT_AVAILABLE", + HighWatermark: -1, + highWatermarkText: "REPLICA_NOT_AVAILABLE", + Errors: []string{"REPLICA_NOT_AVAILABLE", "REPLICA_NOT_AVAILABLE"}, + }, + }, + }, + { + name: "Recovery failure - Unknown topic or partition", + partitions: []kmsg.MetadataResponseTopicPartition{ + { + Partition: 0, + Leader: -1, // No leader due to failed recovery + ErrorCode: 3, // UNKNOWN_TOPIC_OR_PARTITION error code + }, + }, + offsets: []startStableEndOffset{ + { + start: -1, + startErr: kerr.ErrorForCode(3), // Set the error + stable: -1, + end: -1, + endErr: kerr.ErrorForCode(3), + }, + }, + uses: uses{LoadErr: true}, + expUseErr: true, + expected: []describeTopicPartition{ + { + Partition: 0, + Leader: -1, + LoadError: "UNKNOWN_TOPIC_OR_PARTITION: This server does not host this topic-partition.", + LogStartOffset: -1, + logStartOffsetText: "UNKNOWN_TOPIC_OR_PARTITION", + Replicas: []int32{}, + HighWatermark: -1, + highWatermarkText: "UNKNOWN_TOPIC_OR_PARTITION", + Errors: []string{"UNKNOWN_TOPIC_OR_PARTITION", "UNKNOWN_TOPIC_OR_PARTITION"}, }, }, - inOffsets: []startStableEndOffset{{ - start: 0, - stable: 1, - end: 1, - }}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := buildDescribeTopicPartitions(tc.partitions, tc.offsets, tc.uses) + assert.Equal(t, tc.expected, result) + }) + } +} - expHeaders: []string{ - "partition", - "leader", - "epoch", - "replicas", - "log-start-offset", - "high-watermark", +func TestPartitionHeaderAndRow(t *testing.T) { + tests := []struct { + name string + partition describeTopicPartition + uses uses + expectedHeader []string + expectedRow []string + }{ + { + name: "all fields", + partition: describeTopicPartition{ + Partition: 0, + Leader: 1, + Epoch: 5, + Replicas: []int32{1, 2, 3}, + OfflineReplicas: []int32{3}, + LoadError: "REPLICA_NOT_AVAILABLE", + LogStartOffset: 100, + logStartOffsetText: int64(100), + LastStableOffset: 200, + lastStableOffsetText: int64(200), + HighWatermark: 300, + highWatermarkText: int64(300), + Errors: []string{"Error1", "Error2"}, }, - expRows: [][]interface{}{ - {int32(0), int32(0), int32(-1), []int32{0, 1, 2}, int64(0), int64(1)}, + uses: uses{Offline: true, LoadErr: true, Stable: true}, + expectedHeader: []string{ + "partition", "leader", "epoch", "replicas", "offline-replicas", + "load-error", "log-start-offset", "last-stable-offset", "high-watermark", + }, + expectedRow: []string{ + "0", "1", "5", "[1 2 3]", "[3]", "REPLICA_NOT_AVAILABLE", + "100", "200", "300", }, }, + { + name: "minimal fields", + partition: describeTopicPartition{ + Partition: 1, + Leader: 2, + Epoch: 3, + Replicas: []int32{1, 2}, + LogStartOffset: 50, + logStartOffsetText: int64(50), + HighWatermark: 150, + highWatermarkText: int64(150), + }, + uses: uses{}, + expectedHeader: []string{ + "partition", "leader", "epoch", "replicas", "log-start-offset", "high-watermark", + }, + expectedRow: []string{ + "1", "2", "3", "[1 2]", "50", "150", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + header := partitionHeader(tt.uses) + assert.Equal(t, tt.expectedHeader, header, "Headers do not match expected") + + row := tt.partition.Row(tt.uses) + assert.Equal(t, tt.expectedRow, row, "Row does not match expected") + }) + } +} +func TestGetDescribeUsed(t *testing.T) { + testCases := []struct { + name string + partitions []kmsg.MetadataResponseTopicPartition + offsets []startStableEndOffset + expected uses + }{ + { + name: "No special cases", + partitions: []kmsg.MetadataResponseTopicPartition{ + {Partition: 0, ErrorCode: 0}, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 100, end: 100}, + }, + expected: uses{}, + }, + { + name: "With offline replicas", + partitions: []kmsg.MetadataResponseTopicPartition{ + {Partition: 0, OfflineReplicas: []int32{1}}, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 100, end: 100}, + }, + expected: uses{Offline: true}, + }, { - name: "all ok, all extra columns, out of order partitions, errors", + name: "With load error", + partitions: []kmsg.MetadataResponseTopicPartition{ + {Partition: 0, ErrorCode: 1}, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 100, end: 100}, + }, + expected: uses{LoadErr: true}, + }, + { + name: "With stable offset different from end", + partitions: []kmsg.MetadataResponseTopicPartition{ + {Partition: 0}, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 50, end: 100, stableErr: nil, endErr: nil}, + }, + expected: uses{Stable: true}, + }, + { + name: "All cases", + partitions: []kmsg.MetadataResponseTopicPartition{ + {Partition: 0, ErrorCode: 1, OfflineReplicas: []int32{1}}, + }, + offsets: []startStableEndOffset{ + {start: 0, stable: 50, end: 100, stableErr: nil, endErr: nil}, + }, + expected: uses{Offline: true, LoadErr: true, Stable: true}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := getDescribeUsed(tc.partitions, tc.offsets) + assert.Equal(t, tc.expected, result) + }) + } +} - inMeta: []kmsg.MetadataResponseTopicPartition{ +func TestPrintDescribedTopicsFormatter(t *testing.T) { + testCases := []struct { + name string + format string + topics []describedTopic + expectedOutput string + expectedReturn bool + }{ + { + name: "JSON format - single topic", + format: "json", + topics: []describedTopic{ { - Partition: 1, - Leader: 0, - ErrorCode: 1, - LeaderEpoch: 0, // optional, used - Replicas: []int32{0, 1}, - OfflineReplicas: []int32{2, 3}, // optional, used + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: false, + Partitions: 3, + Replicas: 2, + }, + Configs: []describeTopicConfig{ + {Key: "retention.ms", Value: "604800000", Source: "DEFAULT_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + {Partition: 0, Leader: 1, Replicas: []int32{1, 2}}, + {Partition: 1, Leader: 2, Replicas: []int32{2, 1}}, + {Partition: 2, Leader: 1, Replicas: []int32{1, 2}}, + }, }, - + }, + expectedOutput: `[{"summary":{"name":"test-topic","internal":false,"partitions":3,"replicas":2,"error":""},"configs":[{"key":"retention.ms","value":"604800000","source":"DEFAULT_CONFIG"}],"partitions":[{"partition":0,"leader":1,"epoch":0,"replicas":[1,2],"log_start_offset":0,"high_watermark":0},{"partition":1,"leader":2,"epoch":0,"replicas":[2,1],"log_start_offset":0,"high_watermark":0},{"partition":2,"leader":1,"epoch":0,"replicas":[1,2],"log_start_offset":0,"high_watermark":0}]}]`, + expectedReturn: true, + }, + { + name: "JSON format - multiple topics", + format: "json", + topics: []describedTopic{ { - Partition: 0, - Leader: 1, - LeaderEpoch: -1, - Replicas: []int32{0}, + Summary: describeTopicSummary{ + Name: "topic1", + Internal: false, + Partitions: 2, + Replicas: 2, + }, + Configs: []describeTopicConfig{ + {Key: "retention.ms", Value: "86400000", Source: "DYNAMIC_TOPIC_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + {Partition: 0, Leader: 1, Replicas: []int32{1, 2}}, + {Partition: 1, Leader: 2, Replicas: []int32{2, 1}}, + }, + }, + { + Summary: describeTopicSummary{ + Name: "topic2", + Internal: true, + Partitions: 1, + Replicas: 3, + }, + Configs: []describeTopicConfig{ + {Key: "cleanup.policy", Value: "compact", Source: "STATIC_BROKER_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + {Partition: 0, Leader: 3, Replicas: []int32{1, 2, 3}}, + }, }, }, - inOffsets: []startStableEndOffset{ + expectedOutput: `[{"summary":{"name":"topic1","internal":false,"partitions":2,"replicas":2,"error":""},"configs":[{"key":"retention.ms","value":"86400000","source":"DYNAMIC_TOPIC_CONFIG"}],"partitions":[{"partition":0,"leader":1,"epoch":0,"replicas":[1,2],"log_start_offset":0,"high_watermark":0},{"partition":1,"leader":2,"epoch":0,"replicas":[2,1],"log_start_offset":0,"high_watermark":0}]},{"summary":{"name":"topic2","internal":true,"partitions":1,"replicas":3,"error":""},"configs":[{"key":"cleanup.policy","value":"compact","source":"STATIC_BROKER_CONFIG"}],"partitions":[{"partition":0,"leader":3,"epoch":0,"replicas":[1,2,3],"log_start_offset":0,"high_watermark":0}]}]`, + expectedReturn: true, + }, + { + name: "JSON format - topics with errors", + format: "json", + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "error-topic-1", + Internal: false, + Partitions: 0, + Replicas: 0, + Error: "UNKNOWN_TOPIC_OR_PARTITION", + }, + Configs: []describeTopicConfig{}, + Partitions: []describeTopicPartition{}, + }, { - start: 0, - stable: 1, - end: 1, + Summary: describeTopicSummary{ + Name: "partial-error-topic", + Internal: false, + Partitions: 2, + Replicas: 3, + Error: "", + }, + Configs: []describeTopicConfig{ + {Key: "min.insync.replicas", Value: "2", Source: "DYNAMIC_TOPIC_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + { + Partition: 0, + Leader: 1, + Replicas: []int32{1, 2, 3}, + OfflineReplicas: []int32{3}, + LogStartOffset: 100, + HighWatermark: 200, + }, + { + Partition: 1, + Leader: -1, + Replicas: []int32{1, 2, 3}, + LogStartOffset: -1, + HighWatermark: -1, + LoadError: "LEADER_NOT_AVAILABLE", + Errors: []string{"LEADER_NOT_AVAILABLE"}, + }, + }, }, { - startErr: kerr.ErrorForCode(9), - stable: 1, - end: 2, + Summary: describeTopicSummary{ + Name: "normal-topic", + Internal: false, + Partitions: 1, + Replicas: 1, + }, + Configs: []describeTopicConfig{ + {Key: "retention.ms", Value: "86400000", Source: "DEFAULT_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + { + Partition: 0, + Leader: 1, + Replicas: []int32{1}, + LogStartOffset: 0, + HighWatermark: 150, + }, + }, }, }, - - expHeaders: []string{ - "partition", - "leader", - "epoch", - "replicas", - "offline-replicas", - "load-error", - "log-start-offset", - "last-stable-offset", - "high-watermark", + expectedOutput: `[ + { + "summary": { + "name": "error-topic-1", + "internal": false, + "partitions": 0, + "replicas": 0, + "error": "UNKNOWN_TOPIC_OR_PARTITION" + }, + "configs": [], + "partitions": [] + }, + { + "summary": { + "name": "partial-error-topic", + "internal": false, + "partitions": 2, + "replicas": 3, + "error": "" + }, + "configs": [ + { + "key": "min.insync.replicas", + "value": "2", + "source": "DYNAMIC_TOPIC_CONFIG" + } + ], + "partitions": [ + { + "partition": 0, + "leader": 1, + "epoch": 0, + "replicas": [1, 2, 3], + "offline_replicas": [3], + "log_start_offset": 100, + "high_watermark": 200 + }, + { + "partition": 1, + "leader": -1, + "epoch": 0, + "replicas": [1, 2, 3], + "log_start_offset": -1, + "high_watermark": -1, + "load_error": "LEADER_NOT_AVAILABLE", + "error": ["LEADER_NOT_AVAILABLE"] + } + ] + }, + { + "summary": { + "name": "normal-topic", + "internal": false, + "partitions": 1, + "replicas": 1, + "error": "" + }, + "configs": [ + { + "key": "retention.ms", + "value": "86400000", + "source": "DEFAULT_CONFIG" + } + ], + "partitions": [ + { + "partition": 0, + "leader": 1, + "epoch": 0, + "replicas": [1], + "log_start_offset": 0, + "high_watermark": 150 + } + ] + } + ]`, + expectedReturn: true, + }, + { + name: "YAML format - single topic", + format: "yaml", + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: false, + Partitions: 1, + Replicas: 1, + }, + Configs: []describeTopicConfig{ + {Key: "compression.type", Value: "producer", Source: "DEFAULT_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + {Partition: 0, Leader: 1, Replicas: []int32{1}}, + }, + }, }, - expRows: [][]interface{}{ - {int32(0), int32(1), int32(-1), []int32{0}, []int32{}, "-", int64(0), int64(1), int64(1)}, - {int32(1), int32(0), int32(0), []int32{0, 1}, []int32{2, 3}, kerr.ErrorForCode(1), kerr.TypedErrorForCode(9).Message, int64(1), int64(2)}, + expectedOutput: `- summary: + name: test-topic + internal: false + partitions: 1 + replicas: 1 + error: "" + configs: + - key: compression.type + value: producer + source: DEFAULT_CONFIG + partitions: + - partition: 0 + leader: 1 + epoch: 0 + replicas: + - 1 + log_start_offset: 0 + high_watermark: 0`, + expectedReturn: true, + }, + { + name: "YAML format - topic with error", + format: "yaml", + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "error-topic", + Internal: false, + Partitions: 0, + Replicas: 0, + Error: "UNKNOWN_TOPIC_OR_PARTITION", + }, + }, }, + expectedOutput: `- summary: + name: error-topic + internal: false + partitions: 0 + replicas: 0 + error: UNKNOWN_TOPIC_OR_PARTITION + configs: [] + partitions: []`, + expectedReturn: true, }, + { + name: "Text format - should return false", + format: "text", + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: false, + Partitions: 1, + Replicas: 1, + }, + }, + }, + expectedOutput: "", + expectedReturn: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + f := config.OutFormatter{Kind: tc.format} + result := printDescribedTopicsFormatter(f, tc.topics, &buf) + + assert.Equal(t, tc.expectedReturn, result) + + if tc.expectedReturn { + switch tc.format { + case "json": + var expected, actual interface{} + err := json.Unmarshal([]byte(tc.expectedOutput), &expected) + require.NoError(t, err) + err = json.Unmarshal(buf.Bytes(), &actual) + require.NoError(t, err) + assert.Equal(t, expected, actual) + case "yaml": + assert.Equal(t, strings.TrimRight(tc.expectedOutput, "\n"), strings.TrimRight(buf.String(), "\n")) + default: + assert.Equal(t, tc.expectedOutput, buf.String()) + } + } else { + assert.Empty(t, buf.String()) + } + }) + } +} + +func TestPrintDescribedTopics(t *testing.T) { + testCases := []struct { + name string + summary bool + configs bool + partitions bool + topics []describedTopic + expectedOutput string + }{ { - name: "no rows", + name: "Print all sections", + summary: true, + configs: true, + partitions: true, + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: false, + Partitions: 2, + Replicas: 3, + }, + Configs: []describeTopicConfig{ + {Key: "retention.ms", Value: "604800000", Source: "DEFAULT_CONFIG"}, + }, + Partitions: []describeTopicPartition{ + {Partition: 0, Leader: 1, Replicas: []int32{1, 2, 3}}, + {Partition: 1, Leader: 2, Replicas: []int32{2, 3, 1}}, + }, + }, + }, + expectedOutput: `SUMMARY +======= +NAME test-topic +PARTITIONS 2 +REPLICAS 3 - inMeta: []kmsg.MetadataResponseTopicPartition{}, - inOffsets: []startStableEndOffset{}, - expHeaders: []string{ - "partition", - "leader", - "epoch", - "replicas", - "log-start-offset", - "high-watermark", +CONFIGS +======= +KEY VALUE SOURCE +retention.ms 604800000 DEFAULT_CONFIG + +PARTITIONS +========== +PARTITION LEADER EPOCH REPLICAS LOG-START-OFFSET HIGH-WATERMARK +0 1 0 [1 2 3] +1 2 0 [2 3 1] +`, + }, + { + name: "Print only summary", + summary: true, + configs: false, + partitions: false, + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: true, + Partitions: 1, + Replicas: 1, + }, + }, + }, + expectedOutput: `NAME test-topic +INTERNAL true +PARTITIONS 1 +REPLICAS 1 +`, + }, + { + name: "Print summary and configs", + summary: true, + configs: true, + partitions: false, + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "test-topic", + Internal: false, + Partitions: 1, + Replicas: 1, + }, + Configs: []describeTopicConfig{ + {Key: "cleanup.policy", Value: "delete", Source: "DEFAULT_CONFIG"}, + {Key: "compression.type", Value: "producer", Source: "DYNAMIC_TOPIC_CONFIG"}, + }, + }, + }, + expectedOutput: `SUMMARY +======= +NAME test-topic +PARTITIONS 1 +REPLICAS 1 + +CONFIGS +======= +KEY VALUE SOURCE +cleanup.policy delete DEFAULT_CONFIG +compression.type producer DYNAMIC_TOPIC_CONFIG +`, + }, + { + name: "Print with errors", + summary: true, + configs: true, + partitions: true, + topics: []describedTopic{ + { + Summary: describeTopicSummary{ + Name: "error-topic", + Internal: false, + Partitions: 0, + Replicas: 0, + Error: "UNKNOWN_TOPIC_OR_PARTITION", + }, + }, }, + expectedOutput: `SUMMARY +======= +NAME error-topic +PARTITIONS 0 +ERROR UNKNOWN_TOPIC_OR_PARTITION + +CONFIGS +======= +KEY VALUE SOURCE + +PARTITIONS +========== +PARTITION LEADER EPOCH REPLICAS LOG-START-OFFSET HIGH-WATERMARK +`, }, + } + + for _, tc := range testCases { + // Janky way to test the output of a function that prints to stdout. Would be preferable to just pass in a buf and check that. + t.Run(tc.name, func(t *testing.T) { + // Redirect stdout to capture output + old := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + printDescribedTopics(tc.summary, tc.configs, tc.partitions, tc.topics) + + // Restore stdout + w.Close() + os.Stdout = old - // - } { - t.Run(test.name, func(t *testing.T) { - headers := describePartitionsHeaders( - test.inMeta, - test.inOffsets, - ) - rows := describePartitionsRows( - test.inMeta, - test.inOffsets, - ) + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() - require.Equal(t, test.expHeaders, headers, "headers") - require.Equal(t, test.expRows, rows, "rows") + // Compare output + assert.Equal(t, tc.expectedOutput, output) }) } } diff --git a/src/go/rpk/pkg/config/format.go b/src/go/rpk/pkg/config/format.go index 8f82f558aab0d..37e03a088bad2 100644 --- a/src/go/rpk/pkg/config/format.go +++ b/src/go/rpk/pkg/config/format.go @@ -63,6 +63,10 @@ func (f *OutFormatter) Help(t any) (string, bool) { return s, true } +func (f *OutFormatter) IsText() bool { + return f.Kind != "json" && f.Kind != "yaml" +} + func formatType(t any, includeTypeName bool) (string, error) { types := make(map[reflect.Type]struct{})