Skip to content

Commit

Permalink
🐛 fix the rebootstrap issue
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Le <yangle@redhat.com>
  • Loading branch information
elgnay committed Aug 8, 2023
1 parent 3167826 commit 72aa4b4
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 116 deletions.
2 changes: 2 additions & 0 deletions pkg/operator/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
FeatureGatesTypeValid = "ValidFeatureGates"
FeatureGatesReasonAllValid = "FeatureGatesAllValid"
FeatureGatesReasonInvalidExisting = "InvalidFeatureGatesExisting"

KlusterletRebootstrapProgressing = "RebootstrapProgressing"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"

operatorv1client "open-cluster-management.io/api/client/operator/clientset/versioned/typed/operator/v1"
operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1"
operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1"
operatorapiv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/operator/helpers"
)

Expand All @@ -40,19 +43,25 @@ var BootstrapControllerSyncInterval = 5 * time.Minute
type bootstrapController struct {
kubeClient kubernetes.Interface
klusterletLister operatorlister.KlusterletLister
klusterletClient operatorv1client.KlusterletInterface
secretInformers map[string]coreinformer.SecretInformer
patcher patcher.Patcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus]
}

// NewBootstrapController returns a bootstrapController
func NewBootstrapController(
kubeClient kubernetes.Interface,
klusterletClient operatorv1client.KlusterletInterface,
klusterletInformer operatorinformer.KlusterletInformer,
secretInformers map[string]coreinformer.SecretInformer,
recorder events.Recorder) factory.Controller {
controller := &bootstrapController{
kubeClient: kubeClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
secretInformers: secretInformers,
patcher: patcher.NewPatcher[
*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient),
}
return factory.New().WithSync(controller.sync).
WithInformersQueueKeysFunc(bootstrapSecretQueueKeyFunc(controller.klusterletLister),
Expand Down Expand Up @@ -93,6 +102,18 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor
return nil
}

// handle rebootstrap if the klusterlet is in rebootstrapping state
klusterlet, err := k.klusterletLister.Get(klusterletName)
if err != nil {
return err
}
requeueFunc := func(duration time.Duration) {
controllerContext.Queue().AddAfter(queueKey, duration)
}
if meta.IsStatusConditionTrue(klusterlet.Status.Conditions, helpers.KlusterletRebootstrapProgressing) {
return k.processRebootstrap(ctx, agentNamespace, klusterlet, controllerContext.Recorder(), requeueFunc)
}

bootstrapHubKubeconfigSecret, err := k.secretInformers[helpers.BootstrapHubKubeConfig].Lister().Secrets(agentNamespace).Get(helpers.BootstrapHubKubeConfig)
switch {
case errors.IsNotFound(err):
Expand Down Expand Up @@ -136,7 +157,7 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor
!bytes.Equal(bootstrapKubeconfig.CertificateAuthorityData, hubKubeconfig.CertificateAuthorityData) {
// the bootstrap kubeconfig secret is changed, reload the klusterlet agents
reloadReason := fmt.Sprintf("the bootstrap secret %s/%s is changed", agentNamespace, helpers.BootstrapHubKubeConfig)
return k.reloadAgents(ctx, controllerContext, agentNamespace, klusterletName, reloadReason)
return k.startRebootstrap(ctx, klusterlet, reloadReason, controllerContext.Recorder(), requeueFunc)
}

expired, err := isHubKubeconfigSecretExpired(hubKubeconfigSecret)
Expand All @@ -154,33 +175,69 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor

// the hub kubeconfig secret cert is expired, reload klusterlet to restart bootstrap
reloadReason := fmt.Sprintf("the hub kubeconfig secret %s/%s is expired", agentNamespace, helpers.HubKubeConfig)
return k.reloadAgents(ctx, controllerContext, agentNamespace, klusterletName, reloadReason)
return k.startRebootstrap(ctx, klusterlet, reloadReason, controllerContext.Recorder(), requeueFunc)
}

// reloadAgents reload klusterlet agents by
// 1. make the registration agent re-bootstrap by deleting the current hub kubeconfig secret to
// 2. restart the registration and work agents to reload the new hub ca by deleting the agent deployments
func (k *bootstrapController) reloadAgents(ctx context.Context, ctrlContext factory.SyncContext, namespace, klusterletName, reason string) error {
if err := k.kubeClient.CoreV1().Secrets(namespace).Delete(ctx, helpers.HubKubeConfig, metav1.DeleteOptions{}); err != nil {
func (k *bootstrapController) processRebootstrap(ctx context.Context, agentNamespace string, klusterlet *operatorapiv1.Klusterlet, recorder events.Recorder, requeueFunc func(time.Duration)) error {
deploymentName := fmt.Sprintf("%s-registration-agent", klusterlet.Name)
deployment, err := k.kubeClient.AppsV1().Deployments(agentNamespace).Get(ctx, deploymentName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return k.completeRebootstrap(ctx, agentNamespace, klusterlet, recorder)
}
if err != nil {
return err
}
ctrlContext.Recorder().Eventf("HubKubeconfigSecretDeleted", fmt.Sprintf("the hub kubeconfig secret %s/%s is deleted due to %s",
namespace, helpers.HubKubeConfig, reason))

registrationName := fmt.Sprintf("%s-registration-agent", klusterletName)
if err := k.kubeClient.AppsV1().Deployments(namespace).Delete(ctx, registrationName, metav1.DeleteOptions{}); err != nil {
return err
if deployment.Status.AvailableReplicas == 0 {
return k.completeRebootstrap(ctx, agentNamespace, klusterlet, recorder)
}
ctrlContext.Recorder().Eventf("KlusterletAgentDeploymentDeleted", fmt.Sprintf("the deployment %s/%s is deleted due to %s",
namespace, registrationName, reason))

workName := fmt.Sprintf("%s-work-agent", klusterletName)
if err := k.kubeClient.AppsV1().Deployments(namespace).Delete(ctx, workName, metav1.DeleteOptions{}); err != nil {
// there still is registation agent pod running. Resync in 5 seconds
requeueFunc(5 * time.Second)
return nil
}

func (k *bootstrapController) startRebootstrap(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, message string, recorder events.Recorder, requeueFunc func(duration time.Duration)) error {
klusterletCopy := klusterlet.DeepCopy()
meta.SetStatusCondition(&klusterletCopy.Status.Conditions, metav1.Condition{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
Reason: "RebootstrapStarted",
Message: message,
})
_, err := k.patcher.PatchStatus(ctx, klusterlet, klusterletCopy.Status, klusterlet.Status)
if err != nil {
return err
}
ctrlContext.Recorder().Eventf("KlusterletAgentDeploymentDeleted", fmt.Sprintf("the deployment %s/%s is deleted due to %s",
namespace, workName, reason))
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("The klusterlet %q starts rebootstrapping due to %s",
klusterlet.Name, message))

// requeue and check the rebootstrap progress in 5 seconds
requeueFunc(5 * time.Second)
return nil
}

func (k *bootstrapController) completeRebootstrap(ctx context.Context, agentNamespace string, klusterlet *operatorapiv1.Klusterlet, recorder events.Recorder) error {
// delete the existing hub kubeconfig
if err := k.kubeClient.CoreV1().Secrets(agentNamespace).Delete(ctx, helpers.HubKubeConfig, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return err
}
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("Secret %s/%s is deleted", agentNamespace, helpers.HubKubeConfig))

// update the condition of klusterlet
klusterletCopy := klusterlet.DeepCopy()
meta.SetStatusCondition(&klusterletCopy.Status.Conditions, metav1.Condition{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionFalse,
Reason: "RebootstrapCompleted",
Message: fmt.Sprintf("Secret %s/%s is deleted and bootstrap is triggered", agentNamespace, helpers.HubKubeConfig),
})

_, err := k.patcher.PatchStatus(ctx, klusterlet, klusterletCopy.Status, klusterlet.Status)
if err != nil {
return err
}
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("Rebootstrap of the klusterlet %q is completed", klusterlet.Name))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -29,16 +30,19 @@ import (
operatorinformers "open-cluster-management.io/api/client/operator/informers/externalversions"
operatorapiv1 "open-cluster-management.io/api/operator/v1"

"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/operator/helpers"
)

func TestSync(t *testing.T) {
cases := []struct {
name string
queueKey string
objects []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
name string
queueKey string
initRebootstrapping bool
objects []runtime.Object
expectedRebootstrapping bool
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "the changed secret is not bootstrap secret",
Expand All @@ -50,18 +54,17 @@ func TestSync(t *testing.T) {
},
},
{
name: "checking the hub kubeconfig secret",
name: "client certificate expired",
queueKey: "test/test",
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.47:6443")),
newHubKubeConfigSecret("test", time.Now().Add(-60*time.Second).UTC()),
newDeployment("test-registration-agent", "test"),
newDeployment("test-work-agent", "test"),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertDelete(t, actions[0], "secrets", "test", "hub-kubeconfig-secret")
testingcommon.AssertDelete(t, actions[1], "deployments", "test", "test-registration-agent")
testingcommon.AssertDelete(t, actions[2], "deployments", "test", "test-work-agent")
if len(actions) != 0 {
t.Errorf("expected no actions happens, but got %#v", actions)
}
},
},
{
Expand Down Expand Up @@ -90,28 +93,62 @@ func TestSync(t *testing.T) {
{
name: "the bootstrap secret is changed",
queueKey: "test/test",
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("expected no actions happens, but got %#v", actions)
}
},
},
{
name: "wait for scaling down",
queueKey: "test/test",
initRebootstrapping: true,
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
newDeploymentWithAvailableReplicas("test-registration-agent", "test", 1),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertGet(t, actions[0], "apps", "v1", "deployments")
},
},
{
name: "rebootstrap is completed",
queueKey: "test/test",
initRebootstrapping: true,
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
newDeployment("test-registration-agent", "test"),
newDeployment("test-work-agent", "test"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertDelete(t, actions[0], "secrets", "test", "hub-kubeconfig-secret")
testingcommon.AssertDelete(t, actions[1], "deployments", "test", "test-registration-agent")
testingcommon.AssertDelete(t, actions[2], "deployments", "test", "test-work-agent")
testingcommon.AssertDelete(t, actions[1], "secrets", "test", "hub-kubeconfig-secret")
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fakeKubeClient := fakekube.NewSimpleClientset(c.objects...)

fakeOperatorClient := fakeoperatorclient.NewSimpleClientset()
klusterlet := newKlusterlet("test", "test")
if c.initRebootstrapping {
klusterlet.Status.Conditions = []metav1.Condition{
{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
},
}
}
fakeOperatorClient := fakeoperatorclient.NewSimpleClientset(klusterlet)
operatorInformers := operatorinformers.NewSharedInformerFactory(fakeOperatorClient, 5*time.Minute)
operatorStore := operatorInformers.Operator().V1().Klusterlets().Informer().GetStore()
if err := operatorStore.Add(newKlusterlet("test", "test")); err != nil {
if err := operatorStore.Add(klusterlet); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -151,10 +188,14 @@ func TestSync(t *testing.T) {
}
}

klusterletClient := fakeOperatorClient.OperatorV1().Klusterlets()
klusterletPatcher := patcher.NewPatcher[*operatorapiv1.Klusterlet, operatorapiv1.KlusterletSpec, operatorapiv1.KlusterletStatus](klusterletClient)
controller := &bootstrapController{
kubeClient: fakeKubeClient,
klusterletClient: klusterletClient,
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
secretInformers: secretInformers,
patcher: klusterletPatcher,
}

syncContext := testingcommon.NewFakeSyncContext(t, c.queueKey)
Expand All @@ -163,6 +204,15 @@ func TestSync(t *testing.T) {
}

c.validateActions(t, fakeKubeClient.Actions())

klusterlet, err := fakeOperatorClient.OperatorV1().Klusterlets().Get(context.Background(), klusterlet.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected no errors, but got %v", err)
}
rebootstrapping := meta.IsStatusConditionTrue(klusterlet.Status.Conditions, helpers.KlusterletRebootstrapProgressing)
if c.expectedRebootstrapping != rebootstrapping {
t.Errorf("Expected rebootstrapping is %v, but got %v", c.expectedRebootstrapping, rebootstrapping)
}
})
}
}
Expand Down Expand Up @@ -313,3 +363,9 @@ func newDeployment(name, namespace string) *appsv1.Deployment {
Spec: appsv1.DeploymentSpec{},
}
}

func newDeploymentWithAvailableReplicas(name, namespace string, availableReplicas int32) *appsv1.Deployment {
deploy := newDeployment(name, namespace)
deploy.Status.AvailableReplicas = availableReplicas
return deploy
}
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,32 @@ func TestReplica(t *testing.T) {
t.Errorf("Expected non error when sync, %v", err)
}

// should have 3 replicas for clusters with multiple nodes
assertRegistrationDeployment(t, controller.kubeClient.Actions(), "update", "", "cluster1", 3)
assertWorkDeployment(t, controller.kubeClient.Actions(), "update", "cluster1", operatorapiv1.InstallModeDefault, 3)

klusterlet = newKlusterlet("klusterlet", "testns", "cluster1")
klusterlet.Status.Conditions = []metav1.Condition{
{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
},
}
if err := controller.operatorStore.Update(klusterlet); err != nil {
t.Fatal(err)
}

controller.kubeClient.ClearActions()
controller.operatorClient.ClearActions()

err = controller.controller.sync(context.TODO(), syncContext)
if err != nil {
t.Errorf("Expected non error when sync, %v", err)
}

// should have 0 replicas for klusterlet in rebootstrapping state
assertRegistrationDeployment(t, controller.kubeClient.Actions(), "update", "", "cluster1", 0)
assertWorkDeployment(t, controller.kubeClient.Actions(), "update", "cluster1", operatorapiv1.InstallModeDefault, 0)
}

func TestClusterNameChange(t *testing.T) {
Expand Down
Loading

0 comments on commit 72aa4b4

Please sign in to comment.