Skip to content

Commit

Permalink
Fix: migration-controller depending on cluster-manager condition.
Browse files Browse the repository at this point in the history
Signed-off-by: xuezhaojun <zxue@redhat.com>
  • Loading branch information
xuezhaojun committed Dec 7, 2023
1 parent 5884bc5 commit dfa9384
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 201 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migrationcontroller
import (
"context"
"fmt"
"time"

"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/controller/factory"
Expand Down Expand Up @@ -56,6 +57,7 @@ const (
MigrationSucceeded = "MigrationSucceeded"

migrationRequestCRDName = "storageversionmigrations.migration.k8s.io"
reSyncTime = time.Second * 5
)

type crdMigrationController struct {
Expand All @@ -65,6 +67,7 @@ type crdMigrationController struct {
clusterManagerLister operatorlister.ClusterManagerLister
recorder events.Recorder
generateHubClusterClients func(hubConfig *rest.Config) (apiextensionsclient.Interface, migrationv1alpha1client.StorageVersionMigrationsGetter, error)
parseMigrations func() ([]*migrationv1alpha1.StorageVersionMigration, error)
}

// NewCRDMigrationController construct crd migration controller
Expand All @@ -81,6 +84,7 @@ func NewCRDMigrationController(
*operatorapiv1.ClusterManager, operatorapiv1.ClusterManagerSpec, operatorapiv1.ClusterManagerStatus](
clusterManagerClient),
clusterManagerLister: clusterManagerInformer.Lister(),
parseMigrations: parseStorageVersionMigrationFiles,
recorder: recorder,
generateHubClusterClients: generateHubClients,
}
Expand All @@ -94,7 +98,12 @@ func (c *crdMigrationController) sync(ctx context.Context, controllerContext fac
clusterManagerName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ClusterManager %q", clusterManagerName)

if len(migrationRequestFiles) == 0 {
// if no migration files exist, do nothing and exit the reconcile
migrations, err := c.parseMigrations()
if err != nil {
return err
}
if len(migrations) == 0 {
return nil
}

Expand All @@ -117,19 +126,13 @@ func (c *crdMigrationController) sync(ctx context.Context, controllerContext fac
return err
}

// ClusterManager is deleting, we remove its related resources on hub
if !clusterManager.DeletionTimestamp.IsZero() {
return removeStorageVersionMigrations(ctx, migrationClient)
}

// apply storage version migrations if it is supported
// find whether the storageversionmigration CRD is supported
supported, err := supportStorageVersionMigration(ctx, apiExtensionClient)
if err != nil {
return err
}

newClusterManager := clusterManager.DeepCopy()
if !supported {
newClusterManager := clusterManager.DeepCopy()
meta.SetStatusCondition(&newClusterManager.Status.Conditions, metav1.Condition{
Type: MigrationSucceeded,
Status: metav1.ConditionFalse,
Expand All @@ -140,24 +143,38 @@ func (c *crdMigrationController) sync(ctx context.Context, controllerContext fac
return err
}

// if the ClusterManager is deleting, we remove its related resources on hub
if !clusterManager.DeletionTimestamp.IsZero() {
return removeStorageVersionMigrations(ctx, migrations, migrationClient)
}

// do not apply storage version migrations until other resources are applied
if applied := meta.IsStatusConditionTrue(clusterManager.Status.Conditions, clusterManagerApplied); !applied {
controllerContext.Queue().AddRateLimited(clusterManagerName)
return nil
}

err = applyStorageVersionMigrations(ctx, migrationClient, c.recorder)
err = checkCRDStorageVersion(ctx, migrations, apiExtensionClient)
if err != nil {
klog.Errorf("Failed to check CRD current storage version. %v", err)
controllerContext.Queue().AddAfter(clusterManagerName, reSyncTime)
c.recorder.Warningf("StorageVersionMigrationFailed", "Failed to check CRD current storage version. %v", err)
return nil
}

err = createStorageVersionMigrations(ctx, migrations, migrationClient, c.recorder)
if err != nil {
klog.Errorf("Failed to apply StorageVersionMigrations. %v", err)
return err
}

migrationCond, err := syncStorageVersionMigrationsCondition(ctx, migrationClient)
migrationCond, err := syncStorageVersionMigrationsCondition(ctx, migrations, migrationClient)
if err != nil {
klog.Errorf("Failed to sync StorageVersionMigrations condition. %v", err)
return err
}

newClusterManager := clusterManager.DeepCopy()
meta.SetStatusCondition(&newClusterManager.Status.Conditions, migrationCond)

_, err = c.patcher.PatchStatus(ctx, newClusterManager, newClusterManager.Status, clusterManager.Status)
Expand Down Expand Up @@ -188,75 +205,78 @@ func supportStorageVersionMigration(ctx context.Context, apiExtensionClient apie

func removeStorageVersionMigrations(
ctx context.Context,
toRemoveMigrations []*migrationv1alpha1.StorageVersionMigration,
migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter) error {
// Reomve storage version migrations
for _, file := range migrationRequestFiles {
err := removeStorageVersionMigration(
ctx,
migrationClient,
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil
},
file,
)
for _, migration := range toRemoveMigrations {
err := migrationClient.StorageVersionMigrations().Delete(ctx, migration.Name, metav1.DeleteOptions{})
if errors.IsNotFound(err) {
continue
}
if err != nil {
return err
return nil
}
}
return nil
}

func applyStorageVersionMigrations(ctx context.Context,
migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter, recorder events.Recorder) error {
// 1.The CRD must exists before the migration CR is created.
// 2.The currrent storage vesion in CRD should not be the version in the migration CR, otherwise during the migration, the
// objects will still be stored in as this version.
func checkCRDStorageVersion(ctx context.Context, toCreateMigrations []*migrationv1alpha1.StorageVersionMigration,
apiExtensionClient apiextensionsclient.Interface) error {
errs := []error{}
for _, file := range migrationRequestFiles {
required, err := parseStorageVersionMigrationFile(
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil
},
file)
for _, migration := range toCreateMigrations {
crd, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx,
resourceToCRDName(migration.Spec.Resource.Resource, migration.Spec.Resource.Group), metav1.GetOptions{})
if err != nil {
errs = append(errs, err)
continue
}
var storageVersion string
for _, version := range crd.Spec.Versions {
if version.Name == migration.Spec.Resource.Version && version.Storage {
storageVersion = version.Name // find the current storage version of the CRD
break
}
}
if storageVersion == migration.Spec.Resource.Version {
errs = append(errs, fmt.Errorf("the current storage version of %v is %v, which is the same as the version in the migration CR %v",
resourceToCRDName(migration.Spec.Resource.Resource, migration.Spec.Resource.Group),
storageVersion, migration.Name))
continue
}
}
return operatorhelpers.NewMultiLineAggregate(errs)
}

_, _, err = applyStorageVersionMigration(ctx, migrationClient, required, recorder)
// StorageVersionMigration is a create-only, job-style CR, once it's done, updating the spec won't trigger a new migration.
// See code details in:
// https://github.com/kubernetes-sigs/kube-storage-version-migrator/blob/5c8923c5ff96ceb4435f66b986b5aec2dd0cbc22/pkg/controller/kubemigrator.go#L105-L108
func createStorageVersionMigrations(ctx context.Context,
toCreateMigrations []*migrationv1alpha1.StorageVersionMigration,
migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter, recorder events.Recorder) error {
errs := []error{}
for _, migration := range toCreateMigrations {
err := createStorageVersionMigration(ctx, migrationClient, migration, recorder)
if err != nil {
errs = append(errs, err)
continue
}
}

return operatorhelpers.NewMultiLineAggregate(errs)
}

func resourceToCRDName(resource, group string) string {
return fmt.Sprintf("%s.%s", resource, group)
}

// syncStorageVersionMigrationsCondition sync the migration condition based on all the StorageVersionMigrations status
// 1. migrationSucceeded is true only when all the StorageVersionMigrations resources succeed.
// 2. migrationSucceeded is false when any of the StorageVersionMigrations resources failed or running
func syncStorageVersionMigrationsCondition(ctx context.Context,
func syncStorageVersionMigrationsCondition(ctx context.Context, toSyncMigrations []*migrationv1alpha1.StorageVersionMigration,
migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter) (metav1.Condition, error) {
for _, file := range migrationRequestFiles {
required, err := parseStorageVersionMigrationFile(
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil
},
file)
if err != nil {
return metav1.Condition{}, err
}
existing, err := migrationClient.StorageVersionMigrations().Get(ctx, required.Name, metav1.GetOptions{})
for _, migration := range toSyncMigrations {
existing, err := migrationClient.StorageVersionMigrations().Get(ctx, migration.Name, metav1.GetOptions{})
if err != nil {
return metav1.Condition{}, err
}
Expand Down Expand Up @@ -296,20 +316,29 @@ func syncStorageVersionMigrationsCondition(ctx context.Context,
}, nil
}

func removeStorageVersionMigration(
ctx context.Context,
migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter,
manifests resourceapply.AssetFunc,
file string) error {
required, err := parseStorageVersionMigrationFile(manifests, file)
if err != nil {
return err
func parseStorageVersionMigrationFiles() ([]*migrationv1alpha1.StorageVersionMigration, error) {
var errs []error
var migrations []*migrationv1alpha1.StorageVersionMigration
for _, file := range migrationRequestFiles {
migration, err := parseStorageVersionMigrationFile(
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil
},
file)
if err != nil {
errs = append(errs, err)
continue
}
migrations = append(migrations, migration)
}
err = migrationClient.StorageVersionMigrations().Delete(ctx, required.Name, metav1.DeleteOptions{})
if errors.IsNotFound(err) {
return nil
if len(errs) > 0 {
return nil, operatorhelpers.NewMultiLineAggregate(errs)
}
return err
return migrations, nil
}

func parseStorageVersionMigrationFile(
Expand All @@ -333,48 +362,42 @@ func parseStorageVersionMigrationFile(
return svm, nil
}

func applyStorageVersionMigration(
func createStorageVersionMigration(
ctx context.Context,
client migrationv1alpha1client.StorageVersionMigrationsGetter,
required *migrationv1alpha1.StorageVersionMigration,
migration *migrationv1alpha1.StorageVersionMigration,
recorder events.Recorder,
) (*migrationv1alpha1.StorageVersionMigration, bool, error) {
if required == nil {
return nil, false, fmt.Errorf("required StorageVersionMigration is nil")
) error {
if migration == nil {
return fmt.Errorf("required StorageVersionMigration is nil")
}
existing, err := client.StorageVersionMigrations().Get(ctx, required.Name, metav1.GetOptions{})
existing, err := client.StorageVersionMigrations().Get(ctx, migration.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
actual, err := client.StorageVersionMigrations().Create(context.TODO(), required, metav1.CreateOptions{})
actual, err := client.StorageVersionMigrations().Create(context.TODO(), migration, metav1.CreateOptions{})
if err != nil {
recorder.Warningf("StorageVersionMigrationCreateFailed", "Failed to create %s: %v", resourcehelper.FormatResourceForCLIWithNamespace(required), err)
return actual, true, err
recorder.Warningf("StorageVersionMigrationCreateFailed", "Failed to create %s: %v", resourcehelper.FormatResourceForCLIWithNamespace(migration), err)
return err
}

recorder.Eventf("StorageVersionMigrationCreated", "Created %s because it was missing", resourcehelper.FormatResourceForCLIWithNamespace(actual))
return actual, true, err
return err
}
if err != nil {
return nil, false, err
return err
}

modified := resourcemerge.BoolPtr(false)
existingCopy := existing.DeepCopy()
resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, required.ObjectMeta)
if !equality.Semantic.DeepEqual(existingCopy.Spec, required.Spec) {
resourcemerge.EnsureObjectMeta(modified, &existingCopy.ObjectMeta, migration.ObjectMeta)
if !equality.Semantic.DeepEqual(existingCopy.Spec, migration.Spec) {
*modified = true
existing.Spec = required.Spec
}
if !*modified {
return existing, false, nil
return nil // nothing change in the spec
}

actual, err := client.StorageVersionMigrations().Update(ctx, existingCopy, metav1.UpdateOptions{})
if err != nil {
recorder.Warningf("StorageVersionMigrationUpdateFailed", "Failed to update %s: %v", resourcehelper.FormatResourceForCLIWithNamespace(existingCopy), err)
return actual, true, err
}
recorder.Eventf("StorageVersionMigrationUpdated", "Updated %s because it changed", resourcehelper.FormatResourceForCLIWithNamespace(actual))
return actual, true, nil
recorder.Eventf("StorageVersionMigrationUpdated", "Updated %s because it changed", resourcehelper.FormatResourceForCLIWithNamespace(existing))
return fmt.Errorf("StorageVersionMigrationConflict: Trying to set %s with different spec", migration.Name)
}

func getStorageVersionMigrationStatusCondition(svmcr *migrationv1alpha1.StorageVersionMigration) *migrationv1alpha1.MigrationCondition {
Expand Down
Loading

0 comments on commit dfa9384

Please sign in to comment.