From 57e450ece740e0bd6fd946c549a513038443daca Mon Sep 17 00:00:00 2001 From: Daisuke Kobayashi Date: Sun, 12 Nov 2023 00:15:18 +0900 Subject: [PATCH] make Broker api call once per broker --- src/go/rpk/pkg/cli/cluster/partitions/move.go | 102 ++++++++++++++---- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/src/go/rpk/pkg/cli/cluster/partitions/move.go b/src/go/rpk/pkg/cli/cluster/partitions/move.go index 1b13c22372358..a41bd7f5ea913 100644 --- a/src/go/rpk/pkg/cli/cluster/partitions/move.go +++ b/src/go/rpk/pkg/cli/cluster/partitions/move.go @@ -10,7 +10,6 @@ package partitions import ( - "context" "fmt" "math/rand" "regexp" @@ -30,9 +29,15 @@ import ( func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command { var ( ns string - partitions []string topic string + partitions []string ) + type newAssignment struct { + Namespace string + Topic string + Partition int + NewReplicas []adminapi.Replica + } cmd := &cobra.Command{ Use: "move", Short: "Move partition replicas across nodes / cores", @@ -45,6 +50,10 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma cl, err := adminapi.NewClient(fs, p) out.MaybeDie(err, "unable to initialize admin client: %v", err) + var newAssignmentList []newAssignment + + // Concurrently parse the requested partitions and + // find current replica assignments g, egCtx := errgroup.WithContext(cmd.Context()) for _, partition := range partitions { partition := partition @@ -65,11 +74,15 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma current, err := cl.GetPartition(egCtx, ns, topic, part) out.MaybeDie(err, "unable to get partition: %s\n", err) - newReplicas, err := configureReplicas(egCtx, cl, partition, current.Replicas) + newReplicas, err := configureReplicas(partition, current.Replicas) out.MaybeDie(err, "unable to configure new replicas: %v\n", err) - err = cl.MoveReplicas(egCtx, ns, topic, part, newReplicas) - out.MaybeDie(err, "unable to move partition replicas: %v\n", err) + newAssignmentList = append(newAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + NewReplicas: newReplicas, + }) } } else { // -p foo/0:1,2,3 ns, topic, part, err := extractNTP("", partition) @@ -78,11 +91,15 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma current, err := cl.GetPartition(egCtx, ns, topic, part) out.MaybeDie(err, "unable to get partition: %s\n", err) - newReplicas, err := configureReplicas(egCtx, cl, partition, current.Replicas) + newReplicas, err := configureReplicas(partition, current.Replicas) out.MaybeDie(err, "unable to configure new replicas: %v\n", err) - err = cl.MoveReplicas(egCtx, ns, topic, part, newReplicas) - out.MaybeDie(err, "unable to move partition replicas: %v\n", err) + newAssignmentList = append(newAssignmentList, newAssignment{ + Namespace: ns, + Topic: topic, + Partition: part, + NewReplicas: newReplicas, + }) } return nil }) @@ -92,7 +109,40 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma fmt.Printf("failed to begin movements: %v\n", err) } - fmt.Println("Successfully began partition movements.\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.") + knownNodeCore := make(map[int]int) + var reqCount int + + // We do call MoveReplicas serially to ensure each iteration + // refers to the updated 'knownNodeCore' + for i, newa := range newAssignmentList { + for j, nr := range newa.NewReplicas { + if nr.Core == -1 { + numCore, exists := knownNodeCore[nr.NodeID] + if exists { + src := rand.NewSource(time.Now().UnixNano()) + r := rand.New(src) + newAssignmentList[i].NewReplicas[j].Core = r.Intn(numCore) + } else { + broker, err := cl.Broker(cmd.Context(), nr.NodeID) + out.MaybeDie(err, "unable to find the current core assignments", err) + src := rand.NewSource(time.Now().UnixNano()) + r := rand.New(src) + newAssignmentList[i].NewReplicas[j].Core = r.Intn(broker.NumCores) + knownNodeCore[nr.NodeID] = broker.NumCores + } + } + } + fmt.Printf("Requested to move %s/%s/%d to %s\n", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas)) + reqCount += 1 + err = cl.MoveReplicas(cmd.Context(), newa.Namespace, newa.Topic, newa.Partition, newa.NewReplicas) + if err != nil { + fmt.Printf("unable to move replicas for partition %s/%s/%d: %v\n", newa.Namespace, newa.Topic, newa.Partition, err) + reqCount -= 1 + } + } + + fmt.Println() + fmt.Printf("Successfully began %d partition movements.\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", reqCount) }, } cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)") @@ -100,6 +150,20 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma return cmd } +func formatNodeCore(replicas []adminapi.Replica) string { + var formatted string + for i, r := range replicas { + if i == 0 { + formatted = "[" + } + formatted = formatted + fmt.Sprintf("Node: %d, Core: %d", r.NodeID, r.Core) + if i != len(replicas)-1 { + formatted = formatted + ", " + } + } + return formatted + "]" +} + var ( ntpRe *regexp.Regexp ntpReOnce sync.Once @@ -148,7 +212,7 @@ var ( // configureReplicas parses the partition flag with format; foo/0:1-0,2-1,3-2 or 0:1,2,3 // It extracts letters after the colon and return as a slice of []adminapi.Replica. -func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) { +func configureReplicas(partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) { replicaReOnce.Do(func() { replicaRe = regexp.MustCompile(`^[^:]+:(\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$`) }) @@ -159,8 +223,8 @@ func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition str var newReplicas []adminapi.Replica for _, nodeCore := range strings.Split(m[1], ",") { if split := strings.Split(nodeCore, "-"); len(split) == 1 { - node, _ := strconv.Atoi(nodeCore) - core := findCore(cmd, cl, node, currentReplicas) + node, _ := strconv.Atoi(split[0]) + core := findCore(node, currentReplicas) newReplicas = append(newReplicas, adminapi.Replica{ NodeID: node, Core: core, @@ -177,21 +241,15 @@ func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition str return newReplicas, nil } -// findCore finds a shard (CPU core) where a replica should be assigned on. -// If 'core' is specified, use it. -// If 'core' is not specified for a new node, find a shard in a random-fashion. -// If 'core' is not specified for an existing node, use the current shard as is. -func findCore(cmd context.Context, cl *adminapi.AdminAPI, nodeID int, currentReplicas []adminapi.Replica) int { +// findCore finds a shard (CPU core) where an existing replica is +// assigned on. Returns '-1' for a new node. +func findCore(nodeID int, currentReplicas []adminapi.Replica) int { for _, i := range currentReplicas { if nodeID == i.NodeID { return i.Core } } - broker, err := cl.Broker(cmd, nodeID) - out.MaybeDie(err, "unable to find the current core assignments", err) - src := rand.NewSource(time.Now().UnixNano()) - r := rand.New(src) - return r.Intn(broker.NumCores) + return -1 } const helpAlterAssignments = `Move partition replicas across nodes / cores.