Skip to content

Commit

Permalink
make Broker api call once per broker
Browse files Browse the repository at this point in the history
  • Loading branch information
daisukebe committed Nov 11, 2023
1 parent 4a4ee81 commit 57e450e
Showing 1 changed file with 80 additions and 22 deletions.
102 changes: 80 additions & 22 deletions src/go/rpk/pkg/cli/cluster/partitions/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package partitions

import (
"context"
"fmt"
"math/rand"
"regexp"
Expand All @@ -30,9 +29,15 @@ import (
func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
ns string
partitions []string
topic string
partitions []string
)
type newAssignment struct {
Namespace string
Topic string
Partition int
NewReplicas []adminapi.Replica
}
cmd := &cobra.Command{
Use: "move",
Short: "Move partition replicas across nodes / cores",
Expand All @@ -45,6 +50,10 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma
cl, err := adminapi.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

var newAssignmentList []newAssignment

// Concurrently parse the requested partitions and
// find current replica assignments
g, egCtx := errgroup.WithContext(cmd.Context())
for _, partition := range partitions {
partition := partition
Expand All @@ -65,11 +74,15 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma
current, err := cl.GetPartition(egCtx, ns, topic, part)
out.MaybeDie(err, "unable to get partition: %s\n", err)

newReplicas, err := configureReplicas(egCtx, cl, partition, current.Replicas)
newReplicas, err := configureReplicas(partition, current.Replicas)
out.MaybeDie(err, "unable to configure new replicas: %v\n", err)

err = cl.MoveReplicas(egCtx, ns, topic, part, newReplicas)
out.MaybeDie(err, "unable to move partition replicas: %v\n", err)
newAssignmentList = append(newAssignmentList, newAssignment{
Namespace: ns,
Topic: topic,
Partition: part,
NewReplicas: newReplicas,
})
}
} else { // -p foo/0:1,2,3
ns, topic, part, err := extractNTP("", partition)
Expand All @@ -78,11 +91,15 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma
current, err := cl.GetPartition(egCtx, ns, topic, part)
out.MaybeDie(err, "unable to get partition: %s\n", err)

newReplicas, err := configureReplicas(egCtx, cl, partition, current.Replicas)
newReplicas, err := configureReplicas(partition, current.Replicas)
out.MaybeDie(err, "unable to configure new replicas: %v\n", err)

err = cl.MoveReplicas(egCtx, ns, topic, part, newReplicas)
out.MaybeDie(err, "unable to move partition replicas: %v\n", err)
newAssignmentList = append(newAssignmentList, newAssignment{
Namespace: ns,
Topic: topic,
Partition: part,
NewReplicas: newReplicas,
})
}
return nil
})
Expand All @@ -92,14 +109,61 @@ func newMovePartitionReplicasCommand(fs afero.Fs, p *config.Params) *cobra.Comma
fmt.Printf("failed to begin movements: %v\n", err)
}

fmt.Println("Successfully began partition movements.\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.")
knownNodeCore := make(map[int]int)
var reqCount int

// We do call MoveReplicas serially to ensure each iteration
// refers to the updated 'knownNodeCore'
for i, newa := range newAssignmentList {
for j, nr := range newa.NewReplicas {

Check failure on line 118 in src/go/rpk/pkg/cli/cluster/partitions/move.go

View workflow job for this annotation

GitHub Actions / Lint go files

File is not `gofmt`-ed with `-s` (gofmt)
if nr.Core == -1 {
numCore, exists := knownNodeCore[nr.NodeID]
if exists {
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
newAssignmentList[i].NewReplicas[j].Core = r.Intn(numCore)
} else {
broker, err := cl.Broker(cmd.Context(), nr.NodeID)
out.MaybeDie(err, "unable to find the current core assignments", err)
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
newAssignmentList[i].NewReplicas[j].Core = r.Intn(broker.NumCores)
knownNodeCore[nr.NodeID] = broker.NumCores
}
}
}
fmt.Printf("Requested to move %s/%s/%d to %s\n", newa.Namespace, newa.Topic, newa.Partition, formatNodeCore(newa.NewReplicas))
reqCount += 1
err = cl.MoveReplicas(cmd.Context(), newa.Namespace, newa.Topic, newa.Partition, newa.NewReplicas)
if err != nil {
fmt.Printf("unable to move replicas for partition %s/%s/%d: %v\n", newa.Namespace, newa.Topic, newa.Partition, err)
reqCount -= 1
}
}

fmt.Println()
fmt.Printf("Successfully began %d partition movements.\n\nCheck the movement status with 'rpk cluster partitions move-status' or see new assignments with 'rpk topic describe -p TOPIC'.\n", reqCount)
},
}
cmd.Flags().StringArrayVarP(&partitions, "partition", "p", nil, "Topic-partitions to move and new replica locations (repeatable)")
cmd.MarkFlagRequired("partition")
return cmd
}

func formatNodeCore(replicas []adminapi.Replica) string {
var formatted string
for i, r := range replicas {
if i == 0 {
formatted = "["
}
formatted = formatted + fmt.Sprintf("Node: %d, Core: %d", r.NodeID, r.Core)
if i != len(replicas)-1 {
formatted = formatted + ", "
}
}
return formatted + "]"
}

var (
ntpRe *regexp.Regexp
ntpReOnce sync.Once
Expand Down Expand Up @@ -148,7 +212,7 @@ var (

// configureReplicas parses the partition flag with format; foo/0:1-0,2-1,3-2 or 0:1,2,3
// It extracts letters after the colon and return as a slice of []adminapi.Replica.
func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) {
func configureReplicas(partition string, currentReplicas []adminapi.Replica) ([]adminapi.Replica, error) {
replicaReOnce.Do(func() {
replicaRe = regexp.MustCompile(`^[^:]+:(\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$`)
})
Expand All @@ -159,8 +223,8 @@ func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition str
var newReplicas []adminapi.Replica
for _, nodeCore := range strings.Split(m[1], ",") {
if split := strings.Split(nodeCore, "-"); len(split) == 1 {
node, _ := strconv.Atoi(nodeCore)
core := findCore(cmd, cl, node, currentReplicas)
node, _ := strconv.Atoi(split[0])
core := findCore(node, currentReplicas)
newReplicas = append(newReplicas, adminapi.Replica{
NodeID: node,
Core: core,
Expand All @@ -177,21 +241,15 @@ func configureReplicas(cmd context.Context, cl *adminapi.AdminAPI, partition str
return newReplicas, nil
}

// findCore finds a shard (CPU core) where a replica should be assigned on.
// If 'core' is specified, use it.
// If 'core' is not specified for a new node, find a shard in a random-fashion.
// If 'core' is not specified for an existing node, use the current shard as is.
func findCore(cmd context.Context, cl *adminapi.AdminAPI, nodeID int, currentReplicas []adminapi.Replica) int {
// findCore finds a shard (CPU core) where an existing replica is
// assigned on. Returns '-1' for a new node.
func findCore(nodeID int, currentReplicas []adminapi.Replica) int {
for _, i := range currentReplicas {
if nodeID == i.NodeID {
return i.Core
}
}
broker, err := cl.Broker(cmd, nodeID)
out.MaybeDie(err, "unable to find the current core assignments", err)
src := rand.NewSource(time.Now().UnixNano())
r := rand.New(src)
return r.Intn(broker.NumCores)
return -1
}

const helpAlterAssignments = `Move partition replicas across nodes / cores.
Expand Down

0 comments on commit 57e450e

Please sign in to comment.