diff --git a/src/go/rpk/pkg/adminapi/api_partition.go b/src/go/rpk/pkg/adminapi/api_partition.go index 8392544798882..f3c13d75adb39 100644 --- a/src/go/rpk/pkg/adminapi/api_partition.go +++ b/src/go/rpk/pkg/adminapi/api_partition.go @@ -76,3 +76,11 @@ func (a *AdminAPI) Reconfigurations(ctx context.Context) ([]ReconfigurationsResp var rr []ReconfigurationsResponse return rr, a.sendAny(ctx, http.MethodGet, "/v1/partitions/reconfigurations", nil, &rr) } + +func (a *AdminAPI) MoveReplicas(ctx context.Context, ns string, topic string, part int, r []Replica) error { + return a.sendToLeader(ctx, + http.MethodPost, + fmt.Sprintf("/v1/partitions/%s/%s/%d/replicas", ns, topic, part), + r, + nil) +} diff --git a/src/go/rpk/pkg/cli/cluster/partitions/move.go b/src/go/rpk/pkg/cli/cluster/partitions/move.go new file mode 100644 index 0000000000000..273f33dc44b71 --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/partitions/move.go @@ -0,0 +1,316 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package partitions + +import ( + "fmt" + "math/rand" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command { + var ( + ns string + topic string + partitions []string + ) + type newAssignment struct { + Namespace string + Topic string + Partition int + NewReplicas []adminapi.Replica + } + cmd := &cobra.Command{ + Use: "move", + Short: "Move partition replicas across nodes / cores", + Long: helpAlterAssignments, + Run: func(cmd *cobra.Command, topics []string) { + p, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "unable to load config: %v", err) + out.CheckExitCloudAdmin(p) + + cl, err := adminapi.NewClient(fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + var ( + mu sync.Mutex + newAssignmentList []newAssignment + reqCount int + wg sync.WaitGroup + ) + errors := make([]error, 0) + brokerReqs := make(map[int]struct{}) + knownNodeCore := make(map[int]int) + + // Concurrently parse the requested partitions and + // find current replica assignments + g, egCtx := errgroup.WithContext(cmd.Context()) + for _, partition := range partitions { + partition := partition + g.Go(func() error { + if len(topics) > 0 { // foo -p 0:1,2,3 + for _, t := range topics { + _, _, part, err := extractNTP(t, partition) + out.MaybeDie(err, "failed to extract topic/partition: %s\n", err) + + if nt := strings.Split(t, "/"); len(nt) == 1 { + ns = "kafka" + topic = nt[0] + } else { + ns = nt[0] + topic = nt[1] + } + + current, err := cl.GetPartition(egCtx, ns, topic, part) + out.MaybeDie(err, "unable to get partition: %s\n", err) + + newReplicas, err := configureReplicas(partition, current.Replicas) + out.MaybeDie(err, "unable to configure new replicas: %v\n", err) + + mu.Lock() + newAssignmentList = append(newAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + NewReplicas: newReplicas, + }) + mu.Unlock() + } + } else { // -p foo/0:1,2,3 + ns, topic, part, err := extractNTP("", partition) + out.MaybeDie(err, "failed to extract topic/partition: %s\n", err) + + current, err := cl.GetPartition(egCtx, ns, topic, part) + out.MaybeDie(err, "unable to get partition: %s\n", err) + + newReplicas, err := configureReplicas(partition, current.Replicas) + out.MaybeDie(err, "unable to configure new replicas: %v\n", err) + + mu.Lock() + newAssignmentList = append(newAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + NewReplicas: newReplicas, + }) + mu.Unlock() + } + return nil + }) + } + + if err := g.Wait(); err != nil { + out.Die("failed to parse the arguments: %v\n", err) + } + + for _, newa := range newAssignmentList { + for _, nr := range newa.NewReplicas { + if nr.Core == -1 { + brokerReqs[nr.NodeID] = struct{}{} + } + } + } + for node := range brokerReqs { + node := node + g.Go(func() error { + broker, err := cl.Broker(cmd.Context(), node) + mu.Lock() + defer mu.Unlock() + knownNodeCore[node] = broker.NumCores + return err + }) + } + + if err := g.Wait(); err != nil { + out.Die("unable to find core counts", err) + } + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := range newAssignmentList { + newa := newAssignmentList[i] + for j, nr := range newa.NewReplicas { + if nr.Core == -1 { + numCore := knownNodeCore[nr.NodeID] + newa.NewReplicas[j].Core = rng.Intn(numCore) + } + } + wg.Add(1) + go func() { + defer wg.Done() + fmt.Printf("Requested to move %s/%s/%d to %s\n", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas)) + err := cl.MoveReplicas(cmd.Context(), newa.Namespace, newa.Topic, newa.Partition, newa.NewReplicas) + mu.Lock() + defer mu.Unlock() + reqCount = reqCount + 1 + if err != nil { + reqCount = reqCount - 1 + errors = append(errors, fmt.Errorf("failed to move %s/%s/%d to %s; %v", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas), err)) + } + }() + } + wg.Wait() + if len(errors) > 0 { + fmt.Println() + for _, err := range errors { + fmt.Println(err) + } + } + + fmt.Println() + fmt.Printf("Successfully began %d partition movement(s).\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", reqCount) + }, + } + cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)") + cmd.MarkFlagRequired("partition") + return cmd +} + +func formatNodeCore(replicas []adminapi.Replica) string { + var formatted string + for i, r := range replicas { + if i == 0 { + formatted = "[" + } + formatted = formatted + fmt.Sprintf("Node: %d, Core: %d", r.NodeID, r.Core) + if i != len(replicas)-1 { + formatted = formatted + ", " + } + } + return formatted + "]" +} + +var ( + ntpRe *regexp.Regexp + ntpReOnce sync.Once +) + +// extractNTP parses the partition flag with format; foo/0:1,2,3 or 0:1,2,3. +// It extracts letters before the colon and formats it. +func extractNTP(topic string, ntp string) (ns string, t string, p int, err error) { + ntpReOnce.Do(func() { + ntpRe = regexp.MustCompile(`^((?:[^:]+/)?\d+):.*$`) + }) + m := ntpRe.FindStringSubmatch(ntp) + if len(m) == 0 { + return "", "", -1, fmt.Errorf("invalid format for %s", ntp) + } + beforeColon := m[1] + if topic != "" { + p, err = strconv.Atoi(beforeColon) + if err != nil { + return "", "", -1, fmt.Errorf("%s", err) + } + } else if n := strings.Split(beforeColon, "/"); len(n) == 3 { + ns = n[0] + t = n[1] + p, err = strconv.Atoi(n[2]) + if err != nil { + return "", "", -1, fmt.Errorf("%s", err) + } + } else if len(n) == 2 { + ns = "kafka" + t = n[0] + p, err = strconv.Atoi(n[1]) + if err != nil { + return "", "", -1, fmt.Errorf("%s", err) + } + } else { + return "", "", -1, fmt.Errorf("invalid format for %s", ntp) + } + return ns, t, p, nil +} + +var ( + replicaRe *regexp.Regexp + replicaReOnce sync.Once +) + +// configureReplicas parses the partition flag with format; foo/0:1-0,2-1,3-2 or 0:1,2,3 +// It extracts letters after the colon and return as a slice of []adminapi.Replica. +func configureReplicas(partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) { + replicaReOnce.Do(func() { + replicaRe = regexp.MustCompile(`^[^:]+:(\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$`) + }) + m := replicaRe.FindStringSubmatch(partition) + if len(m) == 0 { + return nil, fmt.Errorf("invalid format for %s", partition) + } + var newReplicas []adminapi.Replica + for _, nodeCore := range strings.Split(m[1], ",") { + if split := strings.Split(nodeCore, "-"); len(split) == 1 { + node, _ := strconv.Atoi(split[0]) + core := findCore(node, currentReplicas) + newReplicas = append(newReplicas, adminapi.Replica{ + NodeID: node, + Core: core, + }) + } else { + node, _ := strconv.Atoi(split[0]) + core, _ := strconv.Atoi(split[1]) + newReplicas = append(newReplicas, adminapi.Replica{ + NodeID: node, + Core: core, + }) + } + } + return newReplicas, nil +} + +// findCore finds a shard (CPU core) where an existing replica is +// assigned on. Returns '-1' for a new node. +func findCore(nodeID int, currentReplicas []adminapi.Replica) int { + for _, i := range currentReplicas { + if nodeID == i.NodeID { + return i.Core + } + } + return -1 +} + +const helpAlterAssignments = `Move partition replicas across nodes / cores. + +This command changes replica assignments for given partitions. By default, it +assumes the "kafka" namespace, but you can specify an internal namespace using +the "{namespace}/" prefix. + +To move replicas, use the following syntax: + + rpk cluster partitions move foo --partition 0:1,2,3 -p 1:2,3,4 + +Here, the command assigns new replicas for partition 0 to brokers [1, 2, 3] and +for partition 1 to brokers [2, 3, 4] for the topic "foo". + +You can also specify the core id with "-{core_id}" where the new replicas +should be placed: + + rpk cluster partitions move foo -p 0:1-0,2-0,3-0 + +Here all new replicas [1, 2, 3] will be assigned on core 0 on the nodes. + +The command does not change a "core" assignment unless it is explicitly +specified. When a core is not specified for a new node, the command randomly +picks a core and assign a replica on the core. + +Topic arguments are optional. For more control, you can specify the topic name +in the "--partition" flag: + + rpk cluster partitions move -p foo/0:1,2,3 -p kafka_internal/tx/0:1-0,2-0,3-0 +` diff --git a/src/go/rpk/pkg/cli/cluster/partitions/move_test.go b/src/go/rpk/pkg/cli/cluster/partitions/move_test.go new file mode 100644 index 0000000000000..b2935fcf5fec3 --- /dev/null +++ b/src/go/rpk/pkg/cli/cluster/partitions/move_test.go @@ -0,0 +1,103 @@ +package partitions + +import ( + "testing" +) + +func Test_extractNTP(t *testing.T) { + tests := []struct { + name string + topic string + partition string + want0 string + want1 string + want2 int + expErr bool + }{ + { + name: "t/p:r,r,r", + topic: "", + partition: "foo/0:1,2,3", + want0: "kafka", + want1: "foo", + want2: 0, + }, + { + name: "t/p:r,r,r", + topic: "", + partition: "foo/0:1,2,3", + want0: "kafka", + want1: "foo", + want2: 0, + }, + { + name: "p:r,r,r", + topic: "foo", + partition: "0:1,2,3", + want0: "", + want1: "", + want2: 0, + }, + { + name: "ns/t/p:r,r,r", + topic: "", + partition: "redpanda_internal/tx/0:1,2,3", + want0: "redpanda_internal", + want1: "tx", + want2: 0, + }, + { + name: "t/p", + topic: "", + partition: "foo/0", + want2: -1, + expErr: true, + }, + { + name: "t/t/t:r,r,r", + topic: "", + partition: "foo/bar/foo", + want2: -1, + expErr: true, + }, + { + name: "topic t/p:r,r,r", + topic: "foo", + partition: "bar/0:1,2,3", + want2: -1, + expErr: true, + }, + { + name: "topic non-digit:r,r,r", + topic: "foo", + partition: "one:1,2,3", + want2: -1, + expErr: true, + }, + { + name: "t/non-digit:r,r,r", + topic: "", + partition: "foo/one:1,2,3", + want2: -1, + expErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got0, got1, got2, err := extractNTP(tt.topic, tt.partition) + gotErr := err != nil + if gotErr != tt.expErr { + t.Errorf("got err? %v (%v), exp err? %v", gotErr, err, tt.expErr) + } + if got0 != tt.want0 { + t.Errorf("extractNTP() got = %v, want %v", got0, tt.want0) + } + if got1 != tt.want1 { + t.Errorf("extractNTP() got1 = %v, want %v", got1, tt.want1) + } + if got2 != tt.want2 { + t.Errorf("extractNTP() got2 = %v, want %v", got2, tt.want2) + } + }) + } +} diff --git a/src/go/rpk/pkg/cli/cluster/partitions/partitions.go b/src/go/rpk/pkg/cli/cluster/partitions/partitions.go index 89958efc0e5ba..770bc808cc103 100644 --- a/src/go/rpk/pkg/cli/cluster/partitions/partitions.go +++ b/src/go/rpk/pkg/cli/cluster/partitions/partitions.go @@ -15,6 +15,7 @@ func NewPartitionsCommand(fs afero.Fs, p *config.Params) *cobra.Command { p.InstallAdminFlags(cmd) p.InstallSASLFlags(cmd) cmd.AddCommand( + newMovePartitionReplicasCommand(fs, p), newBalancerStatusCommand(fs, p), newMovementCancelCommand(fs, p), newMovementCancelCommandHidden(fs, p),