Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: add client quotas support to rpk #18711

Merged
merged 3 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/go/rpk/pkg/cli/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/license"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/maintenance"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/partitions"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/quotas"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/selftest"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/txn"
Expand Down Expand Up @@ -47,6 +48,7 @@ func NewCommand(fs afero.Fs, p *pkgconfig.Params) *cobra.Command {
selftest.NewSelfTestCommand(fs, p),
storage.NewCommand(fs, p),
txn.NewCommand(fs, p),
quotas.NewCommand(fs, p),
offsets,
)

Expand Down
194 changes: 194 additions & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/alter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package quotas

import (
"fmt"
"strconv"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/zap"
)

type createResponse struct {
Entity []entityData `json:"entity" yaml:"entity"`
Status string `json:"status" yaml:"status"`

entityStr string
}

func alterCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
dry bool
names []string
defaults []string
adds []string
deletes []string
)
cmd := &cobra.Command{
Use: "alter",
Args: cobra.NoArgs,
Short: "Add or delete a client quota",
Long: `Add or delete a client quota

This command allows you to add or delete a client quota.

A client quota consists of an entity (to whom the quota is applied) and a quota
type (what is being applied).

There are two entity types supported by Redpanda: client ID and client ID
prefix.

Assigning quotas to default entity types is possible using the '--default' flag.

You can perform a dry run using the '--dry' flag.
`,
Example: `
Add quota (consumer_byte_rate) to client ID 'foo':
rpk cluster quotas alter --add consumer_byte_rate=200000 \
--name client-id=foo

Add quota (consumer_byte_rate) to client ID starting with 'bar-':
rpk cluster quotas alter --add consumer_byte_rate=200000 \
--name client-id-prefix=bar-

Add quota (producer_byte_rate) to default client ID:
rpk cluster quotas alter --add producer_byte_rate=180000 \
--default client-id=foo

Remove quota (producer_byte_rate) from client ID 'foo':
rpk cluster quotas alter --delete producer_byte_rate \
--name client-id=foo
`,
Run: func(cmd *cobra.Command, _ []string) {
f := p.Formatter
if h, ok := f.Help(createResponse{}); ok {
out.Exit(h)
}
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

var (
entity []kadm.ClientQuotaEntityComponent
nameMap = make(map[string]bool)
)
for _, name := range names {
split := strings.SplitN(name, "=", 2)
if len(split) != 2 {
out.Die("name %q missing value", split[0])
}
k, v := split[0], split[1]
k = strings.ToLower(k)
if !anyValidTypes[k] {
out.Die("name type %q is invalid (allowed: client-id, client-id-prefix)", split[0])
}
nameMap[k] = true
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: k,
Name: &v,
})
}
for _, def := range defaults {
if !defaultValidTypes[def] {
out.Die("default type %q is invalid (allowed: client-id)", def)
}
if nameMap[def] {
out.Die("default type %q was previously defined in --name, you can only set it once", def)
}
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: def,
})
}

var operations []kadm.AlterClientQuotaOp
for _, add := range adds {
split := strings.SplitN(add, "=", 2)
if len(split) != 2 {
out.Die("missing value in flag --add: %q", add)
}
k, v := split[0], split[1]
f, err := strconv.ParseFloat(v, 64)
out.MaybeDie(err, "unable to parse add %q: %v", add, err)

operations = append(operations, kadm.AlterClientQuotaOp{
Key: k,
Value: f,
})
}
for _, del := range deletes {
operations = append(operations, kadm.AlterClientQuotaOp{
Key: del,
Remove: true,
})
}

request := []kadm.AlterClientQuotaEntry{{entity, operations}}
var altered kadm.AlteredClientQuotas
if dry {
zap.L().Sugar().Debug("dry run: this result will not alter the client quotas")
altered, err = adm.ValidateAlterClientQuotas(cmd.Context(), request)
} else {
altered, err = adm.AlterClientQuotas(cmd.Context(), request)
}
out.MaybeDie(err, "unable to run alter client quotas: %v", err)
err = printAlteredQuotas(f, altered)
out.MaybeDie(err, "unable to print altered quotas: %v", err)
},
}
cmd.Flags().StringSliceVar(&names, "name", nil, "Entity for exact matching. Format type=name where type is client-id or client-id-prefix (repeatable)")
cmd.Flags().StringSliceVar(&defaults, "default", nil, "Entity type for default matching, where type is client-id or client-id-prefix (repeatable)")
cmd.Flags().StringSliceVar(&adds, "add", nil, "Key=value quota to add, where the value is a float number (repeatable)")
cmd.Flags().StringSliceVar(&deletes, "delete", nil, "Key of the quota to delete (repeatable)")
cmd.Flags().BoolVar(&dry, "dry", false, "Key of the quota to delete (repeatable)")

cmd.MarkFlagsOneRequired("name", "default")
cmd.MarkFlagsOneRequired("add", "delete")
return cmd
}

func printAlteredQuotas(f config.OutFormatter, altered kadm.AlteredClientQuotas) error {
// We only alter a single entity/quota.
var resp createResponse
for _, entry := range altered {
entity, entityStr := parseEntityData(entry.Entity)
status := "OK"
if entry.Err != nil {
status = fmt.Sprintf("Error: %v", entry.ErrMessage)
}
resp = createResponse{
Entity: entity,
Status: status,
entityStr: entityStr,
}
}
if isText, _, s, err := f.Format(resp); !isText {
if err != nil {
return fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err)
}
fmt.Println(s)
return nil
}

tw := out.NewTable("entity", "status")
defer tw.Flush()
tw.Print(resp.entityStr, resp.Status)

return nil
}
174 changes: 174 additions & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package quotas

import (
"fmt"
"strconv"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
)

type quotaValues struct {
Key string `json:"key" yaml:"key"`
Value string `json:"values" yaml:"values"`
}
type describedQuota struct {
Entity []entityData `json:"entity" yaml:"entity"`
Values []quotaValues `json:"values" yaml:"values"`

entityStr string
}

type describeResponse struct {
DescribedQuotas []describedQuota `json:"quotas,omitempty" yaml:"quotas,omitempty"`
}

func describeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
names []string
defaults []string
anyFlag []string
strict bool
)
cmd := &cobra.Command{
Use: "describe",
Args: cobra.NoArgs,
Short: "Describe client quotas",
Long: `Describe client quotas.

This command describes client quotas that match the provided filtering criteria.
Running the command without filters returns all client quotas. Use the
'--strict' flag for strict matching, which means that the only quotas returned
exactly match the filters.

Filters can be provided in terms of entities. An entity consists of either a
client ID or a client ID prefix.
`,
Example: `
Describe all client quotas:
rpk cluster quotas describe

Describe all client quota with client ID foo:
rpk cluster quotas describe --name client-id=foo

Describe client quotas for a given client ID prefix 'bar.':
rpk cluster quotas describe --name client-id=bar.
`,
Run: func(cmd *cobra.Command, _ []string) {
f := p.Formatter
if h, ok := f.Help(describeResponse{}); ok {
out.Exit(h)
}
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

var reqQuotas []kadm.DescribeClientQuotaComponent
for _, name := range names {
split := strings.SplitN(name, "=", 2)
if len(split) != 2 {
out.Die("--name flag %q missing value", split[0])
}
k, v := split[0], split[1]
k = strings.ToLower(k)
if !anyValidTypes[k] {
out.Die("name type %q is invalid (allowed: client-id, client-id-prefix)", split[0])
}
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{
Type: k,
MatchName: &v,
MatchType: 0,
})
}
for _, def := range defaults {
k := strings.ToLower(def)
if !defaultValidTypes[k] {
out.Die("default type %q is invalid (allowed: client-id)", def)
}
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{
Type: k,
MatchType: 1,
})
}
for _, a := range anyFlag {
k := strings.ToLower(a)
if !anyValidTypes[k] {
out.Die("'any' type %q is invalid (allowed: client-id, client-id-prefix)", a)
}
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{
Type: k,
MatchType: 2,
})
}
quotas, err := adm.DescribeClientQuotas(cmd.Context(), strict, reqQuotas)
out.MaybeDie(err, "unable to describe client quotas: %v", err)

err = printDescribedQuotas(f, quotas)
out.MaybeDie(err, "unable to print described quotas: %v", err)
},
}

cmd.Flags().StringSliceVar(&names, "name", nil, "type=name pair for exact name matching, where type is client-id or client-id-prefix (repeatable)")
cmd.Flags().StringSliceVar(&defaults, "default", nil, "type for default matching, where type is client-id or client-id-prefix (repeatable)")
cmd.Flags().StringSliceVar(&anyFlag, "any", nil, "type for any matching (names or default), where type is client-id or client-id-prefix (repeatable)")
cmd.Flags().BoolVar(&strict, "strict", false, "whether matches are strict, if true, entities with unspecified entity types are excluded")

return cmd
}

func printDescribedQuotas(f config.OutFormatter, quotas []kadm.DescribedClientQuota) error {
var described []describedQuota
for _, q := range quotas {
entity, entityStr := parseEntityData(q.Entity)
var qv []quotaValues
for _, v := range q.Values {
qv = append(qv, quotaValues{
Key: v.Key,
Value: strconv.FormatFloat(v.Value, 'f', -1, 64),
})
}
described = append(described, describedQuota{
Entity: entity,
Values: qv,
entityStr: entityStr,
})
}
if isText, _, s, err := f.Format(describeResponse{described}); !isText {
if err != nil {
return fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err)
}
fmt.Println(s)
return nil
}
if len(described) == 0 {
fmt.Println("no quotas matched to describe")
return nil
}
for i, d := range described {
fmt.Println(d.entityStr)
for _, qv := range d.Values {
fmt.Printf("\t%v=%v\n", qv.Key, qv.Value)
}
if i < len(described)-1 {
fmt.Println()
}
}
return nil
}
Loading
Loading