Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Make timeout to wait for blocked move global and configurable #9741

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ResourceMutatorFunc func(u *unstructured.Unstructured) error
// ObjectMover defines methods for moving Cluster API objects to another management cluster.
type ObjectMover interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(ctx context.Context, namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error
Move(ctx context.Context, namespace string, toCluster Client, dryRun bool, waitForUnblockTimeout time.Duration, mutators ...ResourceMutatorFunc) error

// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
ToDirectory(ctx context.Context, namespace string, directory string) error
Expand All @@ -70,7 +70,7 @@ type objectMover struct {
// ensure objectMover implements the ObjectMover interface.
var _ ObjectMover = &objectMover{}

func (o *objectMover) Move(ctx context.Context, namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error {
func (o *objectMover) Move(ctx context.Context, namespace string, toCluster Client, dryRun bool, waitForUnblockTimeout time.Duration, mutators ...ResourceMutatorFunc) error {
log := logf.Log
log.Info("Performing move...")
o.dryRun = dryRun
Expand Down Expand Up @@ -98,7 +98,7 @@ func (o *objectMover) Move(ctx context.Context, namespace string, toCluster Clie
proxy = toCluster.Proxy()
}

return o.move(ctx, objectGraph, proxy, mutators...)
return o.move(ctx, objectGraph, proxy, waitForUnblockTimeout, mutators...)
}

func (o *objectMover) ToDirectory(ctx context.Context, namespace string, directory string) error {
Expand Down Expand Up @@ -315,7 +315,7 @@ func getMachineObj(ctx context.Context, proxy Proxy, machine *node, machineObj *
}

// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
func (o *objectMover) move(ctx context.Context, graph *objectGraph, toProxy Proxy, mutators ...ResourceMutatorFunc) error {
func (o *objectMover) move(ctx context.Context, graph *objectGraph, toProxy Proxy, waitForUnblockTimeout time.Duration, mutators ...ResourceMutatorFunc) error {
log := logf.Log

clusters := graph.getClusters()
Expand All @@ -336,15 +336,19 @@ func (o *objectMover) move(ctx context.Context, graph *objectGraph, toProxy Prox
}

log.Info("Waiting for all resources to be ready to move")
// exponential backoff configuration which returns durations for a total time of ~2m.
// Example: 0, 5s, 8s, 11s, 17s, 26s, 38s, 57s, 86s, 128s
waitForMoveUnblockedBackoff := wait.Backoff{
Duration: 5 * time.Second,
Factor: 1.5,
Steps: 10,
// backoff to wait for a successful GET to check for the annotation
getResourceBackoff := newReadBackoff()
// backoff to re-check if an individual resource is blocking move.
// In total, this is excessively long (>2 days) to try to make sure it's always larger than the global
// timeout. The global timeout will supersede this if it is reached first.
waitForResourceMoveUnblockedBackoff := wait.Backoff{
Duration: 3 * time.Second,
Steps: 100,
Factor: 1.1,
Jitter: 0.1,
Cap: 1 * time.Hour,
}
if err := waitReadyForMove(ctx, o.fromProxy, graph.getMoveNodes(), o.dryRun, waitForMoveUnblockedBackoff); err != nil {
if err := waitReadyForMove(ctx, o.fromProxy, graph.getMoveNodes(), o.dryRun, waitForUnblockTimeout, getResourceBackoff, waitForResourceMoveUnblockedBackoff); err != nil {
return errors.Wrap(err, "error waiting for resources to be ready to move")
}

Expand Down Expand Up @@ -610,7 +614,7 @@ func setClusterClassPause(ctx context.Context, proxy Proxy, clusterclasses []*no
return nil
}

func waitReadyForMove(ctx context.Context, proxy Proxy, nodes []*node, dryRun bool, backoff wait.Backoff) error {
func waitReadyForMove(ctx context.Context, proxy Proxy, nodes []*node, dryRun bool, globalTimeout time.Duration, getResourceBackoff, waitForResourceMoveUnblockedBackoff wait.Backoff) error {
if dryRun {
return nil
}
Expand All @@ -622,6 +626,9 @@ func waitReadyForMove(ctx context.Context, proxy Proxy, nodes []*node, dryRun bo
return errors.Wrap(err, "error creating client")
}

ctx, cancel := context.WithTimeout(ctx, globalTimeout)
defer cancel()

for _, n := range nodes {
log := log.WithValues(
"apiVersion", n.identity.GroupVersionKind(),
Expand All @@ -647,18 +654,16 @@ func waitReadyForMove(ctx context.Context, proxy Proxy, nodes []*node, dryRun bo
}
key := client.ObjectKeyFromObject(obj)

blockLogged := false
if err := retryWithExponentialBackoff(ctx, backoff, func(ctx context.Context) error {
if err := c.Get(ctx, key, obj); err != nil {
log.Info(fmt.Sprintf("Move blocked by %s annotation, waiting for it to be removed", clusterctlv1.BlockMoveAnnotation))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar clusterctl code base, how do you know this annotation is blocking here without checking for their existence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n.blockingMove which is checked above is populated earlier in the flow by checking the annotation. This was added by #9246:

_, n.blockingMove = obj.GetAnnotations()[clusterctlv1.BlockMoveAnnotation]

if err := retryWithExponentialBackoff(ctx, waitForResourceMoveUnblockedBackoff, func(ctx context.Context) error {
if err := retryWithExponentialBackoff(ctx, getResourceBackoff, func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is getResourceBackoff basically an interval to retry under the long waitForResourceMoveUnblockedBackoff? I think a comment to explain this may help. I'm just a little confused about the nested retryWithExponentialBackoff.

return c.Get(ctx, key, obj)
}); err != nil {
return errors.Wrapf(err, "error getting %s/%s", obj.GroupVersionKind(), key)
}

if _, exists := obj.GetAnnotations()[clusterctlv1.BlockMoveAnnotation]; exists {
if !blockLogged {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we no longer need this check? Is this just extra code that was supposed to be removed in #9246?

log.Info(fmt.Sprintf("Move blocked by %s annotation, waiting for it to be removed", clusterctlv1.BlockMoveAnnotation))
blockLogged = true
}
return errors.Errorf("resource is not ready to move: %s/%s", obj.GroupVersionKind(), key)
return errors.New("Resource is blocking move")
}
log.V(5).Info("Resource is ready to move")
return nil
Expand Down
18 changes: 11 additions & 7 deletions cmd/clusterctl/client/cluster/mover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ func Test_objectMover_move_dryRun(t *testing.T) {
dryRun: true,
}

err := mover.move(ctx, graph, toProxy, nil)
err := mover.move(ctx, graph, toProxy, 0, nil)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func Test_objectMover_move(t *testing.T) {
mover := objectMover{
fromProxy: graph.proxy,
}
err := mover.move(ctx, graph, toProxy)
err := mover.move(ctx, graph, toProxy, 0)

if tt.wantErr {
g.Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -1388,7 +1388,7 @@ func Test_objectMover_move_with_Mutator(t *testing.T) {
fromProxy: graph.proxy,
}

err := mover.move(ctx, graph, toProxy, namespaceMutator)
err := mover.move(ctx, graph, toProxy, 0, namespaceMutator)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down Expand Up @@ -2390,16 +2390,20 @@ func TestWaitReadyForMove(t *testing.T) {
// trigger discovery the content of the source cluster
g.Expect(graph.Discovery(ctx, "")).To(Succeed())

backoff := wait.Backoff{
timeout := 100 * time.Millisecond
getResourceBackoff := wait.Backoff{
Steps: 1,
}
waitForResourceMoveUnblockedBackoff := wait.Backoff{
Steps: 1,
}
if tt.doUnblock {
backoff = wait.Backoff{
waitForResourceMoveUnblockedBackoff = wait.Backoff{
Duration: 20 * time.Millisecond,
Steps: 10,
Steps: 1000,
}
}
err := waitReadyForMove(ctx, graph.proxy, graph.getMoveNodes(), false, backoff)
err := waitReadyForMove(ctx, graph.proxy, graph.getMoveNodes(), false, timeout, getResourceBackoff, waitForResourceMoveUnblockedBackoff)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
} else {
Expand Down
14 changes: 13 additions & 1 deletion cmd/clusterctl/client/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package client
import (
"context"
"os"
"time"

"github.com/pkg/errors"

"sigs.k8s.io/cluster-api/cmd/clusterctl/client/cluster"
)

// DefaultWaitForUnblockTimeout is the default value for the time to wait for all resources not to be blocking
// a move operation.
const DefaultWaitForUnblockTimeout = 5 * time.Minute

// MoveOptions carries the options supported by move.
type MoveOptions struct {
// FromKubeconfig defines the kubeconfig to use for accessing the source management cluster. If empty,
Expand All @@ -51,6 +56,9 @@ type MoveOptions struct {

// DryRun means the move action is a dry run, no real action will be performed.
DryRun bool

// WaitForUnblockTimeout specifies how long to wait for all resources not to be blocking the move.
WaitForUnblockTimeout time.Duration
}

func (c *clusterctlClient) Move(ctx context.Context, options MoveOptions) error {
Expand All @@ -66,6 +74,10 @@ func (c *clusterctlClient) Move(ctx context.Context, options MoveOptions) error
return errors.Errorf("at least one of FromDirectory, ToDirectory and ToKubeconfig must be set")
}

if options.WaitForUnblockTimeout == 0 {
options.WaitForUnblockTimeout = DefaultWaitForUnblockTimeout
}

if options.ToDirectory != "" {
return c.toDirectory(ctx, options)
} else if options.FromDirectory != "" {
Expand Down Expand Up @@ -99,7 +111,7 @@ func (c *clusterctlClient) move(ctx context.Context, options MoveOptions) error
}
}

return fromCluster.ObjectMover().Move(ctx, options.Namespace, toCluster, options.DryRun, options.ExperimentalResourceMutators...)
return fromCluster.ObjectMover().Move(ctx, options.Namespace, toCluster, options.DryRun, options.WaitForUnblockTimeout, options.ExperimentalResourceMutators...)
}

func (c *clusterctlClient) fromDirectory(ctx context.Context, options MoveOptions) error {
Expand Down
3 changes: 2 additions & 1 deletion cmd/clusterctl/client/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"os"
"testing"
"time"

. "github.com/onsi/gomega"

Expand Down Expand Up @@ -307,7 +308,7 @@ type fakeObjectMover struct {
fromDirectoryErr error
}

func (f *fakeObjectMover) Move(_ context.Context, _ string, _ cluster.Client, _ bool, _ ...cluster.ResourceMutatorFunc) error {
func (f *fakeObjectMover) Move(_ context.Context, _ string, _ cluster.Client, _ bool, _ time.Duration, _ ...cluster.ResourceMutatorFunc) error {
return f.moveErr
}

Expand Down
17 changes: 11 additions & 6 deletions cmd/clusterctl/cmd/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -34,6 +35,7 @@ type moveOptions struct {
fromDirectory string
toDirectory string
dryRun bool
waitForUnblockTimeout time.Duration
}

var mo = &moveOptions{}
Expand Down Expand Up @@ -80,6 +82,8 @@ func init() {
"Write Cluster API objects and all dependencies from a management cluster to directory.")
moveCmd.Flags().StringVar(&mo.fromDirectory, "from-directory", "",
"Read Cluster API objects and all dependencies from a directory into a management cluster.")
moveCmd.Flags().DurationVar(&mo.waitForUnblockTimeout, "wait-for-unblock-timeout", client.DefaultWaitForUnblockTimeout,
"Timeout specifying how long to wait for all resources not to be blocking the move.")

moveCmd.MarkFlagsMutuallyExclusive("to-directory", "to-kubeconfig")
moveCmd.MarkFlagsMutuallyExclusive("from-directory", "to-directory")
Expand All @@ -104,11 +108,12 @@ func runMove() error {
}

return c.Move(ctx, client.MoveOptions{
FromKubeconfig: client.Kubeconfig{Path: mo.fromKubeconfig, Context: mo.fromKubeconfigContext},
ToKubeconfig: client.Kubeconfig{Path: mo.toKubeconfig, Context: mo.toKubeconfigContext},
FromDirectory: mo.fromDirectory,
ToDirectory: mo.toDirectory,
Namespace: mo.namespace,
DryRun: mo.dryRun,
FromKubeconfig: client.Kubeconfig{Path: mo.fromKubeconfig, Context: mo.fromKubeconfigContext},
ToKubeconfig: client.Kubeconfig{Path: mo.toKubeconfig, Context: mo.toKubeconfigContext},
FromDirectory: mo.fromDirectory,
ToDirectory: mo.toDirectory,
Namespace: mo.namespace,
DryRun: mo.dryRun,
WaitForUnblockTimeout: mo.waitForUnblockTimeout,
})
}
22 changes: 13 additions & 9 deletions test/framework/clusterctl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path"
"path/filepath"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -344,11 +345,12 @@ func ConfigClusterWithBinary(_ context.Context, clusterctlBinaryPath string, inp

// MoveInput is the input for ClusterctlMove.
type MoveInput struct {
LogFolder string
ClusterctlConfigPath string
FromKubeconfigPath string
ToKubeconfigPath string
Namespace string
LogFolder string
ClusterctlConfigPath string
FromKubeconfigPath string
ToKubeconfigPath string
Namespace string
WaitForUnblockTimeout time.Duration
}

// Move moves workload clusters.
Expand All @@ -361,18 +363,20 @@ func Move(ctx context.Context, input MoveInput) {
Expect(os.MkdirAll(logDir, 0750)).To(Succeed(), "Invalid argument. input.LogFolder can't be created for Move")

By("Moving workload clusters")
log.Logf("clusterctl move --from-kubeconfig %s --to-kubeconfig %s --namespace %s",
log.Logf("clusterctl move --from-kubeconfig %s --to-kubeconfig %s --namespace %s --wait-for-unblock-timeout %s",
input.FromKubeconfigPath,
input.ToKubeconfigPath,
input.Namespace,
input.WaitForUnblockTimeout,
)

clusterctlClient, log := getClusterctlClientWithLogger(ctx, input.ClusterctlConfigPath, "clusterctl-move.log", logDir)
defer log.Close()
options := clusterctlclient.MoveOptions{
FromKubeconfig: clusterctlclient.Kubeconfig{Path: input.FromKubeconfigPath, Context: ""},
ToKubeconfig: clusterctlclient.Kubeconfig{Path: input.ToKubeconfigPath, Context: ""},
Namespace: input.Namespace,
FromKubeconfig: clusterctlclient.Kubeconfig{Path: input.FromKubeconfigPath, Context: ""},
ToKubeconfig: clusterctlclient.Kubeconfig{Path: input.ToKubeconfigPath, Context: ""},
Namespace: input.Namespace,
WaitForUnblockTimeout: input.WaitForUnblockTimeout,
}

Expect(clusterctlClient.Move(ctx, options)).To(Succeed(), "Failed to run clusterctl move")
Expand Down