-
Notifications
You must be signed in to change notification settings - Fork 592
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rpk: add client quotas support to rpk
- Loading branch information
Showing
4 changed files
with
440 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package quotas | ||
|
||
import ( | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" | ||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka" | ||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" | ||
"github.com/spf13/afero" | ||
"github.com/spf13/cobra" | ||
"github.com/twmb/franz-go/pkg/kadm" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type createResponse struct { | ||
Entity []entityData `json:"entity" yaml:"entity"` | ||
Status string `json:"status" yaml:"status"` | ||
|
||
entityStr string | ||
} | ||
|
||
func alterCommand(fs afero.Fs, p *config.Params) *cobra.Command { | ||
var ( | ||
dry bool | ||
names []string | ||
defaults []string | ||
adds []string | ||
deletes []string | ||
) | ||
cmd := &cobra.Command{ | ||
Use: "alter", | ||
Args: cobra.NoArgs, | ||
Short: "Add or delete a client quota", | ||
Long: `Add or delete a client quota | ||
This command allows you to add or delete a client quota. | ||
A client quota consists of an entity (to whom the quota is applied) and a quota | ||
type (what is being applied). | ||
There are three entity types: client ID, user, or IP. You can assign quotas to | ||
an entity consisting of a client ID and a user, or an IP. | ||
Assigning quotas to default entity types is possible using the '--default' flag, | ||
a default user/client ID matches to all users or client IDs. | ||
A dry run can be performed using the '--dry' flag. | ||
`, | ||
Example: ` | ||
Add quota (consumer_byte_rate) to client ID 'foo' and user 'bar' | ||
rpk cluster quotas alter --add consumer_byte_rate=200000 \ | ||
--name client-id=foo,user=bar | ||
Add quota (producer_byte_rate) to client ID 'foo' and default users | ||
rpk cluster quotas alter --add producer_byte_rate=180000 \ | ||
--name client-id=foo --default user | ||
Add quota (connection_creation_rate) to a given IP | ||
rpk cluster quotas alter --add connection_creation_rate=100 \ | ||
--name ip=181.81.18.81 | ||
Remove quota (request_percentage) from client ID 'foo' and default users | ||
rpk cluster quotas alter --delete request_percentage \ | ||
--name client-id=foo --default user | ||
`, | ||
Run: func(cmd *cobra.Command, _ []string) { | ||
f := p.Formatter | ||
if h, ok := f.Help(createResponse{}); ok { | ||
out.Exit(h) | ||
} | ||
p, err := p.LoadVirtualProfile(fs) | ||
out.MaybeDie(err, "rpk unable to load config: %v", err) | ||
|
||
adm, err := kafka.NewAdmin(fs, p) | ||
out.MaybeDie(err, "unable to initialize kafka client: %v", err) | ||
defer adm.Close() | ||
|
||
var ( | ||
entity []kadm.ClientQuotaEntityComponent | ||
nameMap = make(map[string]bool) | ||
) | ||
for _, name := range names { | ||
split := strings.SplitN(name, "=", 2) | ||
if len(split) != 2 { | ||
out.Die("name %q missing value", split[0]) | ||
} | ||
k, v := split[0], split[1] | ||
k = strings.ToLower(k) | ||
if !validTypes[k] { | ||
out.Die("name type %q is invalid (allowed: user, client-id, ip)", split[0]) | ||
} | ||
nameMap[k] = true | ||
entity = append(entity, kadm.ClientQuotaEntityComponent{ | ||
Type: k, | ||
Name: &v, | ||
}) | ||
} | ||
for _, def := range defaults { | ||
if !validTypes[def] { | ||
out.Die("default type %q is invalid (allowed: user, client-id, ip)", def) | ||
} | ||
if nameMap[def] { | ||
out.Die("default type %q was previously defined in --name, you can only set it once", def) | ||
} | ||
entity = append(entity, kadm.ClientQuotaEntityComponent{ | ||
Type: def, | ||
}) | ||
} | ||
|
||
var operations []kadm.AlterClientQuotaOp | ||
for _, add := range adds { | ||
split := strings.SplitN(add, "=", 2) | ||
if len(split) != 2 { | ||
out.Die("missing value in flag --add: %q", add) | ||
} | ||
k, v := split[0], split[1] | ||
f, err := strconv.ParseFloat(v, 64) | ||
out.MaybeDie(err, "unable to parse add %q: %v", add, err) | ||
|
||
operations = append(operations, kadm.AlterClientQuotaOp{ | ||
Key: k, | ||
Value: f, | ||
}) | ||
} | ||
for _, del := range deletes { | ||
operations = append(operations, kadm.AlterClientQuotaOp{ | ||
Key: del, | ||
Remove: true, | ||
}) | ||
} | ||
|
||
request := []kadm.AlterClientQuotaEntry{{entity, operations}} | ||
var altered kadm.AlteredClientQuotas | ||
if dry { | ||
zap.L().Sugar().Debug("dry run: this result will not alter the client quotas") | ||
altered, err = adm.ValidateAlterClientQuotas(cmd.Context(), request) | ||
} else { | ||
altered, err = adm.AlterClientQuotas(cmd.Context(), request) | ||
} | ||
out.MaybeDie(err, "unable to run alter client quotas: %v", err) | ||
err = printAlteredQuotas(f, altered) | ||
out.MaybeDie(err, "unable to print altered quotas: %v", err) | ||
}, | ||
} | ||
cmd.Flags().StringSliceVar(&names, "name", nil, "Entity for exact matching. Format type=name where type is user, client-id, or ip (repeatable)") | ||
cmd.Flags().StringSliceVar(&defaults, "default", nil, "Entity type for default matching, where type is user, client-id, or ip (repeatable)") | ||
cmd.Flags().StringSliceVar(&adds, "add", nil, "Key=value quota to add, where the value is a float number (repeatable)") | ||
cmd.Flags().StringSliceVar(&deletes, "delete", nil, "Key of the quota to delete (repeatable)") | ||
cmd.Flags().BoolVar(&dry, "dry", false, "Key of the quota to delete (repeatable)") | ||
|
||
cmd.MarkFlagsOneRequired("name", "default") | ||
cmd.MarkFlagsOneRequired("add", "delete") | ||
return cmd | ||
} | ||
|
||
func printAlteredQuotas(f config.OutFormatter, altered kadm.AlteredClientQuotas) error { | ||
// We only alter a single entity/quota. | ||
var resp createResponse | ||
for _, entry := range altered { | ||
entity, entityStr := parseEntityData(entry.Entity) | ||
status := "OK" | ||
if entry.Err != nil { | ||
status = fmt.Sprintf("Error: %v", entry.ErrMessage) | ||
} | ||
resp = createResponse{ | ||
Entity: entity, | ||
Status: status, | ||
entityStr: entityStr, | ||
} | ||
} | ||
if isText, _, s, err := f.Format(resp); !isText { | ||
if err != nil { | ||
return fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err) | ||
} | ||
fmt.Println(s) | ||
return nil | ||
} | ||
|
||
tw := out.NewTable("entity", "status") | ||
defer tw.Flush() | ||
tw.Print(resp.entityStr, resp.Status) | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
|
||
package quotas | ||
|
||
import ( | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" | ||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka" | ||
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" | ||
"github.com/spf13/afero" | ||
"github.com/spf13/cobra" | ||
"github.com/twmb/franz-go/pkg/kadm" | ||
) | ||
|
||
type quotaValues struct { | ||
Key string `json:"key" yaml:"key"` | ||
Value string `json:"values" yaml:"values"` | ||
} | ||
type describedQuota struct { | ||
Entity []entityData `json:"entity" yaml:"entity"` | ||
Values []quotaValues `json:"values" yaml:"values"` | ||
|
||
entityStr string | ||
} | ||
|
||
type describeResponse struct { | ||
DescribedQuotas []describedQuota `json:"quotas,omitempty" yaml:"quotas,omitempty"` | ||
} | ||
|
||
func describeCommand(fs afero.Fs, p *config.Params) *cobra.Command { | ||
var ( | ||
names []string | ||
defaults []string | ||
anyFlag []string | ||
strict bool | ||
) | ||
cmd := &cobra.Command{ | ||
Use: "describe", | ||
Args: cobra.NoArgs, | ||
Short: "Describe client quotas", | ||
Long: `Describe client quotas. | ||
This command describes client quotas that match the provided filtering criteria. | ||
Running the command without filters will return all client quotas. The | ||
'--strict' flag can be used for strict matching (only returning quotas that | ||
exactly match the filters). | ||
Filters can be provided in terms of entities. An entity consists of either a | ||
client ID and user pair or an IP address. | ||
`, | ||
Example: ` | ||
Describe all client quotas: | ||
rpk cluster quotas describe | ||
Describe all client quota with client ID foo: | ||
rpk cluster quotas describe --name client-id=foo | ||
Describe only the client quotas of the entity client ID foo, user bar: | ||
rpk cluster quotas describe --name client-id=foo,user=bar | ||
Describe client quotas for a given IP: | ||
rpk cluster quotas describe --name ip=127.12.12.4 | ||
`, | ||
Run: func(cmd *cobra.Command, _ []string) { | ||
f := p.Formatter | ||
if h, ok := f.Help(describeResponse{}); ok { | ||
out.Exit(h) | ||
} | ||
p, err := p.LoadVirtualProfile(fs) | ||
out.MaybeDie(err, "rpk unable to load config: %v", err) | ||
|
||
adm, err := kafka.NewAdmin(fs, p) | ||
out.MaybeDie(err, "unable to initialize kafka client: %v", err) | ||
defer adm.Close() | ||
|
||
var reqQuotas []kadm.DescribeClientQuotaComponent | ||
for _, name := range names { | ||
split := strings.SplitN(name, "=", 2) | ||
if len(split) != 2 { | ||
out.Die("--name flag %q missing value", split[0]) | ||
} | ||
k, v := split[0], split[1] | ||
k = strings.ToLower(k) | ||
if !validTypes[k] { | ||
out.Die("name type %q is invalid (allowed: user, client-id, ip)", split[0]) | ||
} | ||
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{ | ||
Type: k, | ||
MatchName: &v, | ||
MatchType: 0, | ||
}) | ||
} | ||
for _, def := range defaults { | ||
k := strings.ToLower(def) | ||
if !validTypes[k] { | ||
out.Die("default type %q is invalid (allowed: user, client-id, ip)", def) | ||
} | ||
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{ | ||
Type: k, | ||
MatchType: 1, | ||
}) | ||
} | ||
for _, a := range anyFlag { | ||
k := strings.ToLower(a) | ||
if !validTypes[k] { | ||
out.Die("'any' type %q is invalid (allowed: user, client-id, ip)", a) | ||
} | ||
reqQuotas = append(reqQuotas, kadm.DescribeClientQuotaComponent{ | ||
Type: k, | ||
MatchType: 2, | ||
}) | ||
} | ||
quotas, err := adm.DescribeClientQuotas(cmd.Context(), strict, reqQuotas) | ||
out.MaybeDie(err, "unable to describe client quotas: %v", err) | ||
|
||
err = printDescribedQuotas(f, quotas) | ||
out.MaybeDie(err, "unable to print described quotas: %v", err) | ||
}, | ||
} | ||
|
||
cmd.Flags().StringSliceVar(&names, "name", nil, "type=name pair for exact name matching, where type is user, client-id, or ip (repeatable)") | ||
cmd.Flags().StringSliceVar(&defaults, "default", nil, "type for default matching, where type is user, client-id, or ip (repeatable)") | ||
cmd.Flags().StringSliceVar(&anyFlag, "any", nil, "type for any matching (names or default), where type is user, client-id, or ip (repeatable)") | ||
cmd.Flags().BoolVar(&strict, "strict", false, "whether matches are strict, if true, entities with unspecified entity types are excluded") | ||
|
||
return cmd | ||
} | ||
|
||
func printDescribedQuotas(f config.OutFormatter, quotas []kadm.DescribedClientQuota) error { | ||
var described []describedQuota | ||
for _, q := range quotas { | ||
entity, entityStr := parseEntityData(q.Entity) | ||
var qv []quotaValues | ||
for _, v := range q.Values { | ||
qv = append(qv, quotaValues{ | ||
Key: v.Key, | ||
Value: strconv.FormatFloat(v.Value, 'f', -1, 64), | ||
}) | ||
} | ||
described = append(described, describedQuota{ | ||
Entity: entity, | ||
Values: qv, | ||
entityStr: entityStr, | ||
}) | ||
} | ||
if isText, _, s, err := f.Format(describeResponse{described}); !isText { | ||
if err != nil { | ||
return fmt.Errorf("unable to print in the required format %q: %v", f.Kind, err) | ||
} | ||
fmt.Println(s) | ||
return nil | ||
} | ||
for i, d := range described { | ||
fmt.Println(d.entityStr) | ||
for _, qv := range d.Values { | ||
fmt.Printf("\t%v=%v\n", qv.Key, qv.Value) | ||
} | ||
if i < len(described)-1 { | ||
fmt.Println() | ||
} | ||
} | ||
return nil | ||
} |
Oops, something went wrong.