Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: add rpk cluster quotas import #21311

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/go/rpk/pkg/cli/cluster/quotas/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"github.com/twmb/franz-go/pkg/kadm"
)

type quotaValues struct {
type quotaValue struct {
Key string `json:"key" yaml:"key"`
Value string `json:"values" yaml:"values"`
Value string `json:"value" yaml:"value"`
}
type describedQuota struct {
Entity []entityData `json:"entity" yaml:"entity"`
Values []quotaValues `json:"values" yaml:"values"`
Entity []entityData `json:"entity" yaml:"entity"`
Values []quotaValue `json:"values" yaml:"values"`

entityStr string
}
Expand Down Expand Up @@ -137,9 +137,9 @@ func printDescribedQuotas(f config.OutFormatter, quotas []kadm.DescribedClientQu
var described []describedQuota
for _, q := range quotas {
entity, entityStr := parseEntityData(q.Entity)
var qv []quotaValues
var qv []quotaValue
for _, v := range q.Values {
qv = append(qv, quotaValues{
qv = append(qv, quotaValue{
Key: v.Key,
Value: strconv.FormatFloat(v.Value, 'f', -1, 64),
})
Expand Down
304 changes: 304 additions & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package quotas

import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/kafka"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/types"
"gopkg.in/yaml.v3"
)

// quotasDiff represents a delta in the quotas after importing a quota.
type quotasDiff struct {
EntityStr string `json:"entity,omitempty" yaml:"entity,omitempty"`
QuotaType string `json:"quota-type,omitempty" yaml:"quota-type,omitempty"`
OldValue string `json:"old-value,omitempty" yaml:"old-value,omitempty"`
NewValue string `json:"new-value,omitempty" yaml:"new-value,omitempty"`
}

func importCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
from string
noConfirm bool
)
cmd := &cobra.Command{
Use: "import",
Short: "Import client quotas",
Long: `Import client quotas.

Use this command to import client quotas in the format produced by
'rpk cluster quotas describe --format json/yaml'.

The schema of the import string matches the schema from
'rpk cluster quotas describe --format help':

{
quotas: []{
entity: []{
name: string
type: string
}
values: []{
key: string
values: string
}
}
}
r-vasquez marked this conversation as resolved.
Show resolved Hide resolved

Use the '--no-confirm' flag if you wish to avoid the confirmation prompt.
r-vasquez marked this conversation as resolved.
Show resolved Hide resolved
`,
Example: `
Import client quotas from a file:
rpk cluster quotas import --from /path/to/file

Import client quotas from a string:
rpk cluster quotas import --from '{"quotas":...}'
`,
Run: func(cmd *cobra.Command, args []string) {
f := p.Formatter
if h, ok := f.Help([]quotasDiff{}); ok {
out.Exit(h)
}

p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)

adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

var quotas describeResponse
var source []byte
// --from flag accepts either a file, or a string, we try first
// to read as a file as is the most expected usage.
file, err := afero.ReadFile(fs, from)
if err == nil {
source = file
} else {
if os.IsNotExist(err) || strings.Contains(err.Error(), "file name too long") {
source = []byte(from)
} else {
out.Exit("unable to read file: %v", err)
}
}
if err = json.Unmarshal(source, &quotas); err != nil {
yamlErr := yaml.Unmarshal(source, &quotas)
out.MaybeDie(yamlErr, "unable to parse quotas from %q: %v: %v", from, err, yamlErr)
}

importedQuotas, err := responseToDescribed(quotas)
out.MaybeDie(err, "unable to parse quotas: %v", err)

// Describe all quotas.
currentQuotas, err := adm.DescribeClientQuotas(cmd.Context(), false, []kadm.DescribeClientQuotaComponent{})
out.MaybeDie(err, "unable to describe client quotas: %v", err)

diff := calculateQuotasDiff(currentQuotas, importedQuotas)
if len(diff) == 0 {
out.Exit("No changes detected from import")
}
printDiff(f, diff)

if !noConfirm {
ok, err := out.Confirm("Confirm client quotas import above?")
out.MaybeDie(err, "unable to confirm deletion: %v", err)
if !ok {
out.Exit("Import canceled.")
}
}
_, err = adm.AlterClientQuotas(cmd.Context(), describedToAlterEntry(currentQuotas, importedQuotas))
out.MaybeDie(err, "unable to alter quotas: %v", err)

if f.Kind == "text" {
fmt.Println("Successfully imported the client quotas")
}
},
}

cmd.Flags().StringVar(&from, "from", "", "Either the quotas or a path to a file containing the quotas to import; check help text for more information")
cmd.Flags().BoolVar(&noConfirm, "no-confirm", false, "Disable confirmation prompt")

cmd.MarkFlagRequired("from")
return cmd
}

// responseToDescribed converts the describeResponse quota (imported source) to
// a []kadm.DescribedClientQuota.
func responseToDescribed(quotas describeResponse) ([]kadm.DescribedClientQuota, error) {
var resp []kadm.DescribedClientQuota
for _, q := range quotas.DescribedQuotas {
var (
entity []kadm.ClientQuotaEntityComponent
values []kadm.ClientQuotaValue
)
for _, e := range q.Entity {
e := e
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: e.Type,
Name: &e.Name,
})
}
for _, v := range q.Values {
floatVal, err := strconv.ParseFloat(v.Value, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse client quota value %q: %v", v.Value, err)
}
values = append(values, kadm.ClientQuotaValue{
Key: v.Key,
Value: floatVal,
})
}
resp = append(resp, kadm.DescribedClientQuota{
Entity: entity,
Values: values,
})
}
return resp, nil
}

// describedToAlterEntry creates a []kadm.AlterClientQuotaEntry based on the
// toDelete and toAdd described client quotas. The entry can be used to issue
// an alter client quota request.
func describedToAlterEntry(toDelete, toAdd []kadm.DescribedClientQuota) []kadm.AlterClientQuotaEntry {
var entries []kadm.AlterClientQuotaEntry
addEntries := func(described []kadm.DescribedClientQuota, delete bool) {
for _, d := range described {
var (
entity []kadm.ClientQuotaEntityComponent
operations []kadm.AlterClientQuotaOp
)
for _, e := range d.Entity {
entity = append(entity, kadm.ClientQuotaEntityComponent{
Type: e.Type,
Name: e.Name,
})
}
for _, o := range d.Values {
if delete {
operations = append(operations, kadm.AlterClientQuotaOp{
Key: o.Key,
Remove: true,
})
} else {
operations = append(operations, kadm.AlterClientQuotaOp{
Key: o.Key,
Value: o.Value,
})
}
}
entries = append(entries, kadm.AlterClientQuotaEntry{
Entity: entity,
Ops: operations,
})
}
}
addEntries(toDelete, true)
addEntries(toAdd, false)
return entries
}

// calculateQuotasDiff calculates the diff between 'before' and 'after', any
// value that is not present will be marked as '-' in the result string.
func calculateQuotasDiff(before, after []kadm.DescribedClientQuota) []quotasDiff {
type delta struct {
oldValue string
newValue string
}
type quotaTypeMap map[string]delta

entityMap := make(map[string]quotaTypeMap)

// Fill the map with old values.
for _, q := range before {
e := q.Entity
types.Sort(e)
_, entityStr := parseEntityData(e)
qMap := entityMap[entityStr]
if qMap == nil {
qMap = quotaTypeMap{}
entityMap[entityStr] = qMap
}
for _, v := range q.Values {
qMap[v.Key] = delta{
oldValue: strconv.FormatFloat(v.Value, 'f', -1, 64),
}
}
}

// Update map with new values and track differences.
for _, q := range after {
e := q.Entity
types.Sort(e)
_, entityStr := parseEntityData(e)
qMap := entityMap[entityStr]
if qMap == nil {
qMap = quotaTypeMap{}
entityMap[entityStr] = qMap
}
for _, v := range q.Values {
newVal := strconv.FormatFloat(v.Value, 'f', -1, 64)
if d, exists := qMap[v.Key]; exists {
// If the value changed, we add it to the map.
if newVal != d.oldValue {
qMap[v.Key] = delta{oldValue: d.oldValue, newValue: newVal}
} else {
// If not, we remove it, we do nothing if the values are
// the same.
delete(qMap, v.Key)
}
} else {
qMap[v.Key] = delta{oldValue: "-", newValue: newVal}
}
}
}

// Prepare the result
var diffResult []quotasDiff
for entityStr, qMap := range entityMap {
for key, d := range qMap {
newValue := d.newValue
if newValue == "" {
newValue = "-"
}
diffResult = append(diffResult, quotasDiff{
EntityStr: entityStr,
QuotaType: key,
OldValue: d.oldValue,
NewValue: newValue,
})
}
}
return diffResult
}

func printDiff(f config.OutFormatter, diff []quotasDiff) {
if isText, _, formatted, err := f.Format(diff); !isText {
out.MaybeDie(err, "unable to print in the required format %q: %v", f.Kind, err)
fmt.Println(formatted)
return
}
tw := out.NewTable("ENTITY", "QUOTA-TYPE", "OLD-VALUE", "NEW-VALUE")
defer tw.Flush()

for _, d := range diff {
tw.PrintStructFields(d)
}
}
1 change: 1 addition & 0 deletions src/go/rpk/pkg/cli/cluster/quotas/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command {
cmd.AddCommand(
alterCommand(fs, p),
describeCommand(fs, p),
importCommand(fs, p),
)
p.InstallKafkaFlags(cmd)
p.InstallFormatFlag(cmd)
Expand Down
13 changes: 13 additions & 0 deletions src/go/rpk/pkg/out/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,16 @@ func ParsePartitionString(ntp string) (ns, topic string, partitions []int, rerr
}
return ns, match[2], partitions, nil
}

// ParseFileOrStringFlag parses a flag string, if it starts with '@' it
// will treat it as a filepath and will attempt to read the file.
func ParseFileOrStringFlag(fs afero.Fs, flag string) ([]byte, error) {
if strings.HasPrefix(flag, "@") {
file, err := afero.ReadFile(fs, strings.TrimPrefix(flag, "@"))
if err != nil {
return nil, fmt.Errorf("unable to read file: %v", err)
}
return file, nil
}
return []byte(flag), nil
}
4 changes: 4 additions & 0 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,10 @@ def alter_cluster_quotas(self,
output_format=output_format,
node=node)

def import_cluster_quota(self, source, output_format="json"):
cmd = ["import", "--no-confirm", "--from", source]
return self._run_cluster_quotas(cmd, output_format=output_format)

def _run_cluster_quotas(self,
cmd,
output_format="json",
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/quota_management_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def controller_mutation_rate(value):

@classmethod
def from_dict(cls, d: dict):
return cls(key=QuotaValueType(d['key']), values=d['values'])
return cls(key=QuotaValueType(d['key']), values=d['value'])


class Quota(NamedTuple):
Expand Down
Loading
Loading