Skip to content

Commit

Permalink
handle storage state out of sync with storage migration
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchezl committed Mar 17, 2020
1 parent 43dd414 commit 55b451e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/migrator/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (m *migrator) Run() error {
},
)
if errors.IsNotFound(listError) {
// fail this migration, we don't want to get stuck on a migration for a resource that does not exist)
return fmt.Errorf("failed to list resources: %v", listError)
}
if listError != nil && !errors.IsResourceExpired(listError) {
Expand Down
35 changes: 30 additions & 5 deletions pkg/trigger/discovery_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
utilruntime.HandleError(getErr)
return
}

stale := (getErr == nil && mt.staleStorageState(ss))
storageVersionChanged := (getErr == nil && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash)
notFound := (getErr != nil && errors.IsNotFound(getErr))
found := getErr == nil || !errors.IsNotFound(getErr)
stale := found && mt.staleStorageState(ss)
storageVersionChanged := found && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash
needsMigration := found && !mt.isMigrated(ss) && !mt.hasPendingOrRunningMigration(r)
relaunchMigration := stale || !found || storageVersionChanged || needsMigration

if stale {
if err := mt.client.MigrationV1alpha1().StorageStates().Delete(storageStateName(toGroupResource(r)), nil); err != nil {
Expand All @@ -197,7 +198,7 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
}
}

if stale || storageVersionChanged || notFound {
if relaunchMigration {
// Note that this means historical migration objects are deleted.
if err := mt.relaunchMigration(r); err != nil {
utilruntime.HandleError(err)
Expand All @@ -207,3 +208,27 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
// always update status.heartbeat, sometimes update the version hashes.
mt.updateStorageState(r.StorageVersionHash, r)
}
func (mt *MigrationTrigger) isMigrated(ss *migrationv1alpha1.StorageState) bool {
if len(ss.Status.PersistedStorageVersionHashes) != 1 {
return false
}
return ss.Status.CurrentStorageVersionHash == ss.Status.PersistedStorageVersionHashes[0]
}

func (mt *MigrationTrigger) hasPendingOrRunningMigration(r metav1.APIResource) bool {
// get the corresponding StorageVersionMigration resource
migrations, err := mt.migrationInformer.GetIndexer().ByIndex(controller.ResourceIndex, controller.ToIndex(toGroupResource(r)))
if err != nil {
utilruntime.HandleError(err)
return false
}
for _, migration := range migrations {
m := migration.(*migrationv1alpha1.StorageVersionMigration)
if controller.HasCondition(m, migrationv1alpha1.MigrationSucceeded) || controller.HasCondition(m, migrationv1alpha1.MigrationFailed) {
continue
}
// migration is running or pending
return true
}
return false
}

0 comments on commit 55b451e

Please sign in to comment.