diff --git a/cmd/migrator/migrator.go b/cmd/migrator/migrator.go index 523f9cb34..553d09804 100644 --- a/cmd/migrator/migrator.go +++ b/cmd/migrator/migrator.go @@ -15,7 +15,7 @@ func main() { pflag.CommandLine.AddGoFlagSet(goflag.CommandLine) pflag.Parse() pflag.VisitAll(func(flag *pflag.Flag) { - klog.V(4).Infof("FLAG: --%s=%q", flag.Name, flag.Value) + klog.V(2).Infof("FLAG: --%s=%q", flag.Name, flag.Value) }) command := app.NewMigratorCommand() if err := command.Execute(); err != nil { diff --git a/manifests/migrator.yaml b/manifests/migrator.yaml index 9cb531b0c..9e92711ff 100644 --- a/manifests/migrator.yaml +++ b/manifests/migrator.yaml @@ -18,3 +18,9 @@ spec: containers: - name: migrator image: REGISTRY/storage-version-migration-migrator:VERSION + command: + - /migrator + - --v=2 + - --alsologtostderr + - --kube-api-qps=40 + - --kube-api-burst=1000 diff --git a/manifests/namespace-rbac.yaml b/manifests/namespace-rbac.yaml index 3f5f8158f..b3a5589ce 100644 --- a/manifests/namespace-rbac.yaml +++ b/manifests/namespace-rbac.yaml @@ -17,18 +17,6 @@ rules: --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: storage-version-migration-migrator -rules: -- apiGroups: ["*"] - resources: ["*"] - verbs: ["get", "list", "update"] -- apiGroups: ["migration.k8s.io"] - resources: ["storageversionmigrations"] - verbs: ["watch"] ---- -kind: ClusterRole -apiVersion: rbac.authorization.k8s.io/v1 metadata: name: storage-version-migration-crd-creator rules: @@ -55,7 +43,7 @@ subjects: namespace: NAMESPACE roleRef: kind: ClusterRole - name: storage-version-migration-migrator + name: cluster-admin apiGroup: rbac.authorization.k8s.io --- kind: ClusterRoleBinding diff --git a/pkg/controller/kubemigrator.go b/pkg/controller/kubemigrator.go index 2ae5fe2ca..5e1cf855c 100644 --- a/pkg/controller/kubemigrator.go +++ b/pkg/controller/kubemigrator.go @@ -102,10 +102,11 @@ func (km *KubeMigrator) processOne(obj interface{}) error { return err } if HasCondition(m, migrationv1alpha1.MigrationSucceeded) || HasCondition(m, migrationv1alpha1.MigrationFailed) { - klog.V(2).Infof("The migration has already completed for %#v", m) + klog.V(2).Infof("%v: migration has already completed", m.Name) return nil } m, err = km.updateStatus(m, migrationv1alpha1.MigrationRunning, "") + klog.V(2).Infof("%v: migration running", m.Name) if err != nil { return err } @@ -118,11 +119,17 @@ func (km *KubeMigrator) processOne(obj interface{}) error { err = core.Run() utilruntime.HandleError(err) if err == nil { - _, err = km.updateStatus(m, migrationv1alpha1.MigrationSucceeded, "") + if _, err := km.updateStatus(m, migrationv1alpha1.MigrationSucceeded, ""); err != nil { + utilruntime.HandleError(err) + } metrics.Metrics.ObserveSucceededMigration(resource(m).String()) + klog.V(2).Infof("%v: migration succeeded", m.Name) return err } - _, err = km.updateStatus(m, migrationv1alpha1.MigrationFailed, err.Error()) + klog.Errorf("%v: migration failed: %v", m.Name, err) + if _, err := km.updateStatus(m, migrationv1alpha1.MigrationFailed, err.Error()); err != nil { + utilruntime.HandleError(err) + } metrics.Metrics.ObserveFailedMigration(resource(m).String()) return err } diff --git a/pkg/migrator/core.go b/pkg/migrator/core.go index e92464efc..1eeaa01d2 100644 --- a/pkg/migrator/core.go +++ b/pkg/migrator/core.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator/metrics" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +30,9 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" + "k8s.io/klog" + + "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator/metrics" ) var metadataAccessor = meta.NewAccessor() @@ -93,6 +95,10 @@ func (m *migrator) Run() error { Continue: continueToken, }, ) + if errors.IsNotFound(listError) { + // Fail this migration, we don't want to get stuck on a migration for a resource that does not exist. + return fmt.Errorf("failed to list resources: %v", listError) + } if listError != nil && !errors.IsResourceExpired(listError) { if canRetry(listError) { if seconds, delay := errors.SuggestsClientDelay(listError); delay { @@ -201,8 +207,18 @@ func (m *migrator) migrateOneItem(item *unstructured.Unstructured) error { return nil } if canRetry(err) { - if seconds, delay := errors.SuggestsClientDelay(err); delay { + seconds, delay := errors.SuggestsClientDelay(err) + switch { + case delay && len(namespace) > 0: + klog.Warningf("migration of %s, in the %s namespace, will be retried after a %ds delay: %v", name, namespace, seconds, err) + time.Sleep(time.Duration(seconds) * time.Second) + case delay: + klog.Warningf("migration of %s will be retried after a %ds delay: %v", name, seconds, err) time.Sleep(time.Duration(seconds) * time.Second) + case !delay && len(namespace) > 0: + klog.Warningf("migration of %s, in the %s namespace, will be retried: %v", name, namespace, err) + default: + klog.Warningf("migration of %s will be retried: %v", name, err) } continue } diff --git a/pkg/trigger/discovery_handler.go b/pkg/trigger/discovery_handler.go index 5033ddd89..25e9e4bb0 100644 --- a/pkg/trigger/discovery_handler.go +++ b/pkg/trigger/discovery_handler.go @@ -20,14 +20,15 @@ import ( "fmt" "reflect" - migrationv1alpha1 "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1" - "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/controller" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" + + migrationv1alpha1 "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1" + "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/controller" ) func (mt *MigrationTrigger) processDiscovery() { @@ -185,10 +186,11 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) { utilruntime.HandleError(getErr) return } - - stale := (getErr == nil && mt.staleStorageState(ss)) - storageVersionChanged := (getErr == nil && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash) - notFound := (getErr != nil && errors.IsNotFound(getErr)) + found := getErr == nil + stale := found && mt.staleStorageState(ss) + storageVersionChanged := found && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash + needsMigration := found && !mt.isMigrated(ss) && !mt.hasPendingOrRunningMigration(r) + relaunchMigration := stale || !found || storageVersionChanged || needsMigration if stale { if err := mt.client.MigrationV1alpha1().StorageStates().Delete(storageStateName(toGroupResource(r)), nil); err != nil { @@ -197,7 +199,7 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) { } } - if stale || storageVersionChanged || notFound { + if relaunchMigration { // Note that this means historical migration objects are deleted. if err := mt.relaunchMigration(r); err != nil { utilruntime.HandleError(err) @@ -207,3 +209,27 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) { // always update status.heartbeat, sometimes update the version hashes. mt.updateStorageState(r.StorageVersionHash, r) } +func (mt *MigrationTrigger) isMigrated(ss *migrationv1alpha1.StorageState) bool { + if len(ss.Status.PersistedStorageVersionHashes) != 1 { + return false + } + return ss.Status.CurrentStorageVersionHash == ss.Status.PersistedStorageVersionHashes[0] +} + +func (mt *MigrationTrigger) hasPendingOrRunningMigration(r metav1.APIResource) bool { + // get the corresponding StorageVersionMigration resource + migrations, err := mt.migrationInformer.GetIndexer().ByIndex(controller.ResourceIndex, controller.ToIndex(toGroupResource(r))) + if err != nil { + utilruntime.HandleError(err) + return false + } + for _, migration := range migrations { + m := migration.(*migrationv1alpha1.StorageVersionMigration) + if controller.HasCondition(m, migrationv1alpha1.MigrationSucceeded) || controller.HasCondition(m, migrationv1alpha1.MigrationFailed) { + continue + } + // migration is running or pending + return true + } + return false +} diff --git a/pkg/trigger/discovery_handler_test.go b/pkg/trigger/discovery_handler_test.go index 0f3953d86..446beeb76 100644 --- a/pkg/trigger/discovery_handler_test.go +++ b/pkg/trigger/discovery_handler_test.go @@ -22,155 +22,17 @@ import ( "strings" "testing" - "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1" - "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/clients/clientset/fake" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" -) - -func staleStorageState() *v1alpha1.StorageState { - return &v1alpha1.StorageState{ - ObjectMeta: metav1.ObjectMeta{ - Name: storageStateName(v1alpha1.GroupVersionResource{Resource: "pods"}), - }, - Spec: v1alpha1.StorageStateSpec{ - Resource: v1alpha1.GroupResource{Resource: "pods"}, - }, - Status: v1alpha1.StorageStateStatus{ - LastHeartbeatTime: metav1.Time{metav1.Now().Add(-3 * discoveryPeriod)}, - }, - } -} - -func freshStorageState() *v1alpha1.StorageState { - return &v1alpha1.StorageState{ - ObjectMeta: metav1.ObjectMeta{ - Name: storageStateName(v1alpha1.GroupVersionResource{Resource: "pods"}), - }, - Spec: v1alpha1.StorageStateSpec{ - Resource: v1alpha1.GroupResource{Resource: "pods"}, - }, - Status: v1alpha1.StorageStateStatus{ - CurrentStorageVersionHash: "newhash", - PersistedStorageVersionHashes: []string{"newhash"}, - LastHeartbeatTime: metav1.Time{metav1.Now().Add(-1 * discoveryPeriod)}, - }, - } -} -func freshStorageStateWithOldHash() *v1alpha1.StorageState { - return &v1alpha1.StorageState{ - ObjectMeta: metav1.ObjectMeta{ - Name: storageStateName(v1alpha1.GroupVersionResource{Resource: "pods"}), - }, - Spec: v1alpha1.StorageStateSpec{ - Resource: v1alpha1.GroupResource{Resource: "pods"}, - }, - Status: v1alpha1.StorageStateStatus{ - CurrentStorageVersionHash: "oldhash", - PersistedStorageVersionHashes: []string{"oldhash"}, - LastHeartbeatTime: metav1.Time{metav1.Now().Add(-1 * discoveryPeriod)}, - }, - } -} - -func newMigrationList() *v1alpha1.StorageVersionMigrationList { - var migrations []v1alpha1.StorageVersionMigration - for i := 0; i < 3; i++ { - migration := newMigration(fmt.Sprintf("migration%d", i), v1alpha1.GroupVersionResource{Version: "v1", Resource: "pods"}) - migrations = append(migrations, migration) - } - for i := 3; i < 6; i++ { - migration := newMigration(fmt.Sprintf("migration%d", i), v1alpha1.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}) - migrations = append(migrations, migration) - } - return &v1alpha1.StorageVersionMigrationList{ - TypeMeta: metav1.TypeMeta{ - Kind: "StorageVersionMigrationList", - APIVersion: "migraiton.k8s.io/v1alpha1", - }, - Items: migrations, - } -} - -func newMigration(name string, r v1alpha1.GroupVersionResource) v1alpha1.StorageVersionMigration { - return v1alpha1.StorageVersionMigration{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1alpha1.StorageVersionMigrationSpec{ - Resource: r, - }, - } -} - -func newAPIResource() metav1.APIResource { - return metav1.APIResource{ - Group: "", - Name: "pods", - Version: "v1", - StorageVersionHash: "newhash", - } -} - -func verifyCleanupAndLaunch(t *testing.T, actions []core.Action) { - if len(actions) != 4 { - t.Fatalf("expected 4 actions") - - } - for i := 0; i < 3; i++ { - a := actions[i] - d, ok := a.(core.DeleteAction) - if !ok { - t.Fatalf("expected delete action") - } - r := schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storageversionmigrations"} - if d.GetResource() != r { - t.Fatalf("unexpected resource %v", d.GetResource()) - } - if !strings.Contains(d.GetName(), "migration") { - t.Fatalf("unexpected name %s", d.GetName()) - } - } - c, ok := actions[3].(core.CreateAction) - if !ok { - t.Fatalf("expected create action") - } - r := schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storageversionmigrations"} - if c.GetResource() != r { - t.Fatalf("unexpected resource %v", c.GetResource()) - } -} - -func verifyStorageStateUpdate(t *testing.T, a core.Action, expectedHeartbeat metav1.Time, expectedCurrentHash string, expectedPersistedHashes []string) { - u, ok := a.(core.UpdateAction) - if !ok { - t.Fatalf("expected update action") - } - r := schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storagestates"} - if u.GetResource() != r { - t.Fatalf("unexpected resource %v", u.GetResource()) - } - if u.GetSubresource() != "status" { - t.Fatalf("unexpected subresource %v", u.GetSubresource()) - } - ss, ok := u.GetObject().(*v1alpha1.StorageState) - if !ok { - t.Fatalf("expected storage state, got %v", ss) - } - if a, e := ss.Status.LastHeartbeatTime, expectedHeartbeat; a != e { - t.Fatalf("expected to update heartbeat, got %v", e) - } - if a, e := ss.Status.CurrentStorageVersionHash, expectedCurrentHash; a != e { - t.Fatalf("expected to has hash %v, got %v", e, a) - } - if a, e := ss.Status.PersistedStorageVersionHashes, expectedPersistedHashes; !reflect.DeepEqual(a, e) { - t.Fatalf("expected to has hashes %v, got %v", e, a) - } -} + "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1" + "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/clients/clientset/fake" +) func TestProcessDiscoveryResource(t *testing.T) { // TODO: we probably don't need a list @@ -202,7 +64,7 @@ func TestProcessDiscoveryResource(t *testing.T) { } func TestProcessDiscoveryResourceStaleState(t *testing.T) { - client := fake.NewSimpleClientset(newMigrationList(), staleStorageState()) + client := fake.NewSimpleClientset(newMigrationList(), storageState(withStaleHeartbeat())) trigger := NewMigrationTrigger(client) stopCh := make(chan struct{}) defer close(stopCh) @@ -224,7 +86,7 @@ func TestProcessDiscoveryResourceStaleState(t *testing.T) { if d.GetResource() != r { t.Fatalf("unexpected resource %v", d.GetResource()) } - if !strings.Contains(d.GetName(), staleStorageState().Name) { + if !strings.Contains(d.GetName(), "pods") { t.Fatalf("unexpected name %s", d.GetName()) } @@ -243,7 +105,14 @@ func TestProcessDiscoveryResourceStaleState(t *testing.T) { } func TestProcessDiscoveryResourceStorageVersionChanged(t *testing.T) { - client := fake.NewSimpleClientset(newMigrationList(), freshStorageStateWithOldHash()) + client := fake.NewSimpleClientset( + newMigrationList(), + storageState( + withFreshHeartbeat(), + withCurrentVersion("oldhash"), + withPersistedVersions("oldhash"), + ), + ) trigger := NewMigrationTrigger(client) stopCh := make(chan struct{}) defer close(stopCh) @@ -262,7 +131,14 @@ func TestProcessDiscoveryResourceStorageVersionChanged(t *testing.T) { } func TestProcessDiscoveryResourceNoChange(t *testing.T) { - client := fake.NewSimpleClientset(newMigrationList(), freshStorageState()) + client := fake.NewSimpleClientset( + newMigrationList(), + storageState( + withFreshHeartbeat(), + withCurrentVersion("newhash"), + withPersistedVersions("newhash"), + ), + ) trigger := NewMigrationTrigger(client) stopCh := make(chan struct{}) defer close(stopCh) @@ -278,3 +154,220 @@ func TestProcessDiscoveryResourceNoChange(t *testing.T) { actions := client.Actions() verifyStorageStateUpdate(t, actions[4], trigger.heartbeat, discoveredResource.StorageVersionHash, []string{"newhash"}) } + +func TestProcessDiscoveryResourceStorageMigrationMissing(t *testing.T) { + client := fake.NewSimpleClientset( + storageState( + withFreshHeartbeat(), + withCurrentVersion("newhash"), + withPersistedVersions(v1alpha1.Unknown), + ), + ) + trigger := NewMigrationTrigger(client) + stopCh := make(chan struct{}) + defer close(stopCh) + go trigger.migrationInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, trigger.migrationInformer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches")) + return + } + trigger.heartbeat = metav1.Now() + discoveredResource := newAPIResource() + trigger.processDiscoveryResource(discoveredResource) + + actions := client.Actions() + expectCreateStorageVersionMigrationAction(t, actions[3]) + verifyStorageStateUpdate(t, actions[len(actions)-1], trigger.heartbeat, discoveredResource.StorageVersionHash, []string{v1alpha1.Unknown}) +} + +func TestProcessDiscoveryResourceStorageMigrationFailed(t *testing.T) { + client := fake.NewSimpleClientset( + storageMigration(withFailedCondition()), + storageState( + withFreshHeartbeat(), + withCurrentVersion("newhash"), + withPersistedVersions(v1alpha1.Unknown), + ), + ) + trigger := NewMigrationTrigger(client) + stopCh := make(chan struct{}) + defer close(stopCh) + go trigger.migrationInformer.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, trigger.migrationInformer.HasSynced) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches")) + return + } + trigger.heartbeat = metav1.Now() + discoveredResource := newAPIResource() + trigger.processDiscoveryResource(discoveredResource) + actions := client.Actions() + expectCreateStorageVersionMigrationAction(t, actions[4]) + verifyStorageStateUpdate(t, actions[len(actions)-1], trigger.heartbeat, discoveredResource.StorageVersionHash, []string{v1alpha1.Unknown}) +} + +func storageState(options ...func(*v1alpha1.StorageState)) *v1alpha1.StorageState { + ss := &v1alpha1.StorageState{ + ObjectMeta: metav1.ObjectMeta{ + Name: storageStateName(v1alpha1.GroupVersionResource{Resource: "pods"}), + }, + Spec: v1alpha1.StorageStateSpec{ + Resource: v1alpha1.GroupResource{Resource: "pods"}, + }, + } + for _, fn := range options { + fn(ss) + } + return ss +} + +func withFreshHeartbeat() func(*v1alpha1.StorageState) { + return func(ss *v1alpha1.StorageState) { + ss.Status.LastHeartbeatTime = metav1.NewTime(metav1.Now().Add(-1 * discoveryPeriod)) + } +} + +func withStaleHeartbeat() func(*v1alpha1.StorageState) { + return func(ss *v1alpha1.StorageState) { + ss.Status.LastHeartbeatTime = metav1.NewTime(metav1.Now().Add(-3 * discoveryPeriod)) + } +} + +func withCurrentVersion(version string) func(*v1alpha1.StorageState) { + return func(ss *v1alpha1.StorageState) { + ss.Status.CurrentStorageVersionHash = version + } +} + +func withPersistedVersions(versions ...string) func(*v1alpha1.StorageState) { + return func(ss *v1alpha1.StorageState) { + ss.Status.PersistedStorageVersionHashes = append(ss.Status.PersistedStorageVersionHashes, versions...) + } +} + +func newMigrationList() *v1alpha1.StorageVersionMigrationList { + var migrations []v1alpha1.StorageVersionMigration + for i := 0; i < 3; i++ { + migration := storageMigration(withName(fmt.Sprintf("migration%d", i))) + migrations = append(migrations, *migration) + } + for i := 3; i < 6; i++ { + migration := storageMigration(withName(fmt.Sprintf("migration%d", i)), withResource(v1alpha1.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"})) + migrations = append(migrations, *migration) + } + return &v1alpha1.StorageVersionMigrationList{ + TypeMeta: metav1.TypeMeta{ + Kind: "StorageVersionMigrationList", + APIVersion: "migraiton.k8s.io/v1alpha1", + }, + Items: migrations, + } +} + +func storageMigration(options ...func(*v1alpha1.StorageVersionMigration)) *v1alpha1.StorageVersionMigration { + m := &v1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pods", + }, + Spec: v1alpha1.StorageVersionMigrationSpec{ + Resource: v1alpha1.GroupVersionResource{Version: "v1", Resource: "pods"}, + }, + } + for _, option := range options { + option(m) + } + return m +} + +func withName(name string) func(*v1alpha1.StorageVersionMigration) { + return func(migration *v1alpha1.StorageVersionMigration) { + migration.Name = name + } +} + +func withResource(resource v1alpha1.GroupVersionResource) func(*v1alpha1.StorageVersionMigration) { + return func(migration *v1alpha1.StorageVersionMigration) { + migration.Spec.Resource = resource + } +} + +func withFailedCondition() func(*v1alpha1.StorageVersionMigration) { + return func(migration *v1alpha1.StorageVersionMigration) { + migration.Status.Conditions = append(migration.Status.Conditions, v1alpha1.MigrationCondition{ + Type: v1alpha1.MigrationFailed, + Status: v1.ConditionTrue, + }) + } +} + +func newAPIResource() metav1.APIResource { + return metav1.APIResource{ + Group: "", + Name: "pods", + Version: "v1", + StorageVersionHash: "newhash", + } +} + +func verifyCleanupAndLaunch(t *testing.T, actions []core.Action) { + if len(actions) != 4 { + t.Fatalf("expected 4 actions") + + } + for i := 0; i < 3; i++ { + a := actions[i] + d, ok := a.(core.DeleteAction) + if !ok { + t.Fatalf("expected delete action") + } + r := schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storageversionmigrations"} + if d.GetResource() != r { + t.Fatalf("unexpected resource %v", d.GetResource()) + } + if !strings.Contains(d.GetName(), "migration") { + t.Fatalf("unexpected name %s", d.GetName()) + } + } + expectCreateStorageVersionMigrationAction(t, actions[3]) +} + +func expectCreateStorageVersionMigrationAction(t *testing.T, action core.Action) *v1alpha1.StorageVersionMigration { + return expectCreateAction(t, action, schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storageversionmigrations"}).(*v1alpha1.StorageVersionMigration) +} + +func expectCreateAction(t *testing.T, action core.Action, gvr schema.GroupVersionResource) runtime.Object { + c, ok := action.(core.CreateAction) + if !ok { + t.Fatalf("expected create action") + } + if c.GetResource() != gvr { + t.Fatalf("unexpected resource %v", c.GetResource()) + } + return c.GetObject() +} + +func verifyStorageStateUpdate(t *testing.T, a core.Action, expectedHeartbeat metav1.Time, expectedCurrentHash string, expectedPersistedHashes []string) { + u, ok := a.(core.UpdateAction) + if !ok { + t.Fatalf("expected update action") + } + r := schema.GroupVersionResource{Group: "migration.k8s.io", Version: "v1alpha1", Resource: "storagestates"} + if u.GetResource() != r { + t.Fatalf("unexpected resource %v", u.GetResource()) + } + if u.GetSubresource() != "status" { + t.Fatalf("unexpected subresource %v", u.GetSubresource()) + } + ss, ok := u.GetObject().(*v1alpha1.StorageState) + if !ok { + t.Fatalf("expected storage state, got %v", ss) + } + if a, e := ss.Status.LastHeartbeatTime, expectedHeartbeat; a != e { + t.Fatalf("expected to update heartbeat, got %v", e) + } + if a, e := ss.Status.CurrentStorageVersionHash, expectedCurrentHash; a != e { + t.Fatalf("expected hash %v, got %v", e, a) + } + if a, e := ss.Status.PersistedStorageVersionHashes, expectedPersistedHashes; !reflect.DeepEqual(a, e) { + t.Fatalf("expected hashes %v, got %v", e, a) + } +} diff --git a/test/e2e/test-cmd.sh b/test/e2e/test-cmd.sh index 409b19876..6793e5cc1 100755 --- a/test/e2e/test-cmd.sh +++ b/test/e2e/test-cmd.sh @@ -35,6 +35,12 @@ if [[ $rc -ne 0 ]]; then TLS_ARGS="" fi +function capture-logs() { + echo "initializer logs:" + kubectl logs --namespace=kube-system job/initializer || true + echo "migrator logs:" + kubectl logs --namespace=kube-system deployment/migrator || true +} function wait-for-migration() { @@ -63,6 +69,7 @@ function wait-for-migration() # Note that number=1 when pendings="". number=$(echo "${pendings}" | wc -l) if [ -z "${pendings}" ]; then + capture-logs return 0 else echo "${number} migrations haven't succeeded yet" @@ -75,10 +82,7 @@ function wait-for-migration() done echo "Timed out waiting for migration to complete." - echo "initializer logs:" - kubectl logs --namespace=kube-system -l job-name=initializer || true - echo "migrator logs:" - kubectl logs --namespace=kube-system -l app=migrator || true + capture-logs return 1 } diff --git a/test/e2e/test-fully-automated.sh b/test/e2e/test-fully-automated.sh index 6db0f1f3c..d18bbe404 100755 --- a/test/e2e/test-fully-automated.sh +++ b/test/e2e/test-fully-automated.sh @@ -36,6 +36,14 @@ cleanup() { return fi pushd "${MIGRATOR_ROOT}" + echo "===== migrator logs" + kubectl logs --namespace=kube-system deployment/migrator || true + echo "===== trigger logs" + kubectl logs --namespace=kube-system deployment/trigger || true + echo "===== storageversionmigrations" + kubectl get storageversionmigrations.migration.k8s.io -o yaml + echo "===== storagestates" + kubectl get storagestates.migration.k8s.io -o yaml echo "Deleting images" make delete-all-images echo "Deleting images successfully"