Skip to content

Commit

Permalink
WIP: testing
Browse files Browse the repository at this point in the history
  • Loading branch information
r-vasquez committed Nov 29, 2024
1 parent 60ca935 commit faf1c11
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
61 changes: 32 additions & 29 deletions src/go/rpk/pkg/cli/topic/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"errors"
"fmt"
"github.com/twmb/types"
"io"
"os"
"sort"
Expand Down Expand Up @@ -60,7 +61,7 @@ For example,
`,

Args: cobra.MinimumNArgs(1),
Run: func(_ *cobra.Command, topicArg []string) {
Run: func(cmd *cobra.Command, topicArg []string) {
f := p.Formatter
if h, ok := f.Help([]describedTopic{}); ok {
out.Exit(h)
Expand All @@ -81,6 +82,8 @@ For example,
out.MaybeDie(err, "unable to filter topics by regex: %v", err)
}

// By default, if neither are specified, we opt in to
// the config section only.
if !summary && !configs && !partitions {
summary, configs = true, true
}
Expand All @@ -90,6 +93,8 @@ For example,
// - more than one topic are specified or matched.
if all || len(topicArg) > 1 {
summary, configs, partitions = true, true, true
} else if len(topicArg) == 0 {
out.Exit("did not match any topics, exiting.")
}

req := kmsg.NewPtrMetadataRequest()
Expand All @@ -98,32 +103,28 @@ For example,
reqTopic.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, reqTopic)
}
resp, err := req.RequestWith(context.Background(), cl)
resp, err := req.RequestWith(cmd.Context(), cl)
out.MaybeDie(err, "unable to request topic metadata: %v", err)

var topicDescriptions []describedTopic
for _, topic := range resp.Topics {
offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable)
offsets := listStartEndOffsets(cmd.Context(), cl, *topic.Topic, len(topic.Partitions), stable)
u := getDescribeUsed(topic.Partitions, offsets)
cfg, cfgErr := prepDescribeTopicConfig(topic, cl)
cfg, cfgErr := prepDescribeTopicConfig(cmd.Context(), topic, cl)
out.MaybeDieErr(cfgErr)
t := describedTopic{
Summary: buildDescribeTopicSummary(topic),
Configs: buildDescribeTopicConfig(cfg),
Partitions: buildDescribeTopicPartitions(topic.Partitions, offsets, u),
cfgErr: cfgErr,
u: u,
}
topicDescriptions = append(topicDescriptions, t)
}
if len(topicDescriptions) == 0 {
out.Exit("[]")
}

if printDescribedTopicsFormatter(f, topicDescriptions, os.Stdout) {
return
}
printDescribedTopics(summary, configs, partitions, topicDescriptions)
out.MaybeDie(err, "unable to request topic metadata: %v", err)
},
}

Expand Down Expand Up @@ -184,12 +185,11 @@ func printDescribedTopics(summary, configs, partitions bool, topics []describedT
if topic.Summary.Partitions > 0 {
tw.PrintColumn("REPLICAS", topic.Summary.Replicas)
}
if topic.Summary.isErr {
if topic.Summary.Error != "" {
tw.PrintColumn("ERROR", topic.Summary.Error)
}
})
sections.Add(secConfigs, func() {
out.MaybeDie(topic.cfgErr, "config response contained error: %v", topic.cfgErr)
tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
for _, c := range topic.Configs {
Expand All @@ -211,7 +211,6 @@ type describedTopic struct {
Configs []describeTopicConfig `json:"configs" yaml:"configs"`
Partitions []describeTopicPartition `json:"partitions" yaml:"partitions"`
u uses
cfgErr error
}

type describeTopicSummary struct {
Expand All @@ -220,11 +219,10 @@ type describeTopicSummary struct {
Partitions int `json:"partitions" yaml:"partitions"`
Replicas int `json:"replicas" yaml:"replicas"`
Error string `json:"error" yaml:"error"`
isErr bool
}

func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) (resp describeTopicSummary) {
resp = describeTopicSummary{
func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) describeTopicSummary {
resp := describeTopicSummary{
Name: *topic.Topic,
Internal: topic.IsInternal,
Partitions: len(topic.Partitions),
Expand All @@ -233,10 +231,9 @@ func buildDescribeTopicSummary(topic kmsg.MetadataResponseTopic) (resp describeT
resp.Replicas = len(topic.Partitions[0].Replicas)
}
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
resp.isErr = true
resp.Error = err.Error()
}
return
return resp
}

type describeTopicConfig struct {
Expand All @@ -245,24 +242,30 @@ type describeTopicConfig struct {
Source string `json:"source" yaml:"source"`
}

func prepDescribeTopicConfig(topic kmsg.MetadataResponseTopic, cl *kgo.Client) ([]kmsg.DescribeConfigsResponseResourceConfig, error) {
func prepDescribeTopicConfig(ctx context.Context, topic kmsg.MetadataResponseTopic, cl *kgo.Client) ([]kmsg.DescribeConfigsResponseResourceConfig, error) {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = *topic.Topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
resp, err := req.RequestWith(ctx, cl)
if err != nil {
return nil, fmt.Errorf("unable to request configs: %v", err)
}
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
return nil, fmt.Errorf("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
return resp.Resources[0].Configs, err
if err != nil {
return nil, fmt.Errorf("config response contained error: %v", err)
}
return resp.Resources[0].Configs, nil
}

func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) (output []describeTopicConfig) {
output = make([]describeTopicConfig, 0, len(configs))
func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConfig) []describeTopicConfig {
output := make([]describeTopicConfig, 0, len(configs))
types.Sort(configs)
for _, cfg := range configs {
d := describeTopicConfig{
Key: cfg.Name,
Expand All @@ -275,7 +278,7 @@ func buildDescribeTopicConfig(configs []kmsg.DescribeConfigsResponseResourceConf
}
output = append(output, d)
}
return
return output
}

type describeTopicPartition struct {
Expand Down Expand Up @@ -454,7 +457,7 @@ var errUnlisted = errors.New("list failed")
// always contain the one topic we asked for, and it will contain all
// partitions we asked for. The logic below will panic redpanda replies
// incorrectly.
func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset {
func listStartEndOffsets(ctx context.Context, cl *kgo.Client, topic string, numPartitions int, stable bool) []startStableEndOffset {
offsets := make([]startStableEndOffset, 0, numPartitions)

for i := 0; i < numPartitions; i++ {
Expand All @@ -480,7 +483,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable
reqTopic.Partitions = append(reqTopic.Partitions, part)
}
req.Topics = append(req.Topics, reqTopic)
shards := cl.RequestSharded(context.Background(), req)
shards := cl.RequestSharded(ctx, req)
allFailed := kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
resp := shard.Resp.(*kmsg.ListOffsetsResponse)
if len(resp.Topics) > 0 {
Expand Down Expand Up @@ -511,7 +514,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable
// transactions are in play.
if stable {
req.IsolationLevel = 1
shards = cl.RequestSharded(context.Background(), req)
shards = cl.RequestSharded(ctx, req)
allFailed = kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
resp := shard.Resp.(*kmsg.ListOffsetsResponse)
if len(resp.Topics) > 0 {
Expand All @@ -528,7 +531,7 @@ func listStartEndOffsets(cl *kgo.Client, topic string, numPartitions int, stable
}

// Finally, the HWM.
shards = cl.RequestSharded(context.Background(), req)
shards = cl.RequestSharded(ctx, req)
kafka.EachShard(req, shards, func(shard kgo.ResponseShard) {
resp := shard.Resp.(*kmsg.ListOffsetsResponse)
if len(resp.Topics) > 0 {
Expand Down
3 changes: 0 additions & 3 deletions src/go/rpk/pkg/cli/topic/describe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ func TestPrintDescribedTopicsFormatter(t *testing.T) {
Partitions: 0,
Replicas: 0,
Error: "UNKNOWN_TOPIC_OR_PARTITION",
isErr: true,
},
Configs: []describeTopicConfig{},
Partitions: []describeTopicPartition{},
Expand Down Expand Up @@ -553,7 +552,6 @@ func TestPrintDescribedTopicsFormatter(t *testing.T) {
Partitions: 0,
Replicas: 0,
Error: "UNKNOWN_TOPIC_OR_PARTITION",
isErr: true,
},
},
},
Expand Down Expand Up @@ -730,7 +728,6 @@ compression.type producer DYNAMIC_TOPIC_CONFIG
Partitions: 0,
Replicas: 0,
Error: "UNKNOWN_TOPIC_OR_PARTITION",
isErr: true,
},
},
},
Expand Down

0 comments on commit faf1c11

Please sign in to comment.