Skip to content

Commit

Permalink
Merge pull request #58 from sanchezl/sigs-fix-flaky-tests
Browse files Browse the repository at this point in the history
Harden against disruptive loss of apiserver
  • Loading branch information
k8s-ci-robot committed Mar 19, 2020
2 parents 221b6ef + 8740b3a commit d0b0463
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 177 deletions.
2 changes: 1 addition & 1 deletion cmd/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
pflag.Parse()
pflag.VisitAll(func(flag *pflag.Flag) {
klog.V(4).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
klog.V(2).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
})
command := app.NewMigratorCommand()
if err := command.Execute(); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions manifests/migrator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ spec:
containers:
- name: migrator
image: REGISTRY/storage-version-migration-migrator:VERSION
command:
- /migrator
- --v=2
- --alsologtostderr
- --kube-api-qps=40
- --kube-api-burst=1000
14 changes: 1 addition & 13 deletions manifests/namespace-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,6 @@ rules:
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: storage-version-migration-migrator
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["get", "list", "update"]
- apiGroups: ["migration.k8s.io"]
resources: ["storageversionmigrations"]
verbs: ["watch"]
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: storage-version-migration-crd-creator
rules:
Expand All @@ -55,7 +43,7 @@ subjects:
namespace: NAMESPACE
roleRef:
kind: ClusterRole
name: storage-version-migration-migrator
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
---
kind: ClusterRoleBinding
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/kubemigrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ func (km *KubeMigrator) processOne(obj interface{}) error {
return err
}
if HasCondition(m, migrationv1alpha1.MigrationSucceeded) || HasCondition(m, migrationv1alpha1.MigrationFailed) {
klog.V(2).Infof("The migration has already completed for %#v", m)
klog.V(2).Infof("%v: migration has already completed", m.Name)
return nil
}
m, err = km.updateStatus(m, migrationv1alpha1.MigrationRunning, "")
klog.V(2).Infof("%v: migration running", m.Name)
if err != nil {
return err
}
Expand All @@ -118,11 +119,17 @@ func (km *KubeMigrator) processOne(obj interface{}) error {
err = core.Run()
utilruntime.HandleError(err)
if err == nil {
_, err = km.updateStatus(m, migrationv1alpha1.MigrationSucceeded, "")
if _, err := km.updateStatus(m, migrationv1alpha1.MigrationSucceeded, ""); err != nil {
utilruntime.HandleError(err)
}
metrics.Metrics.ObserveSucceededMigration(resource(m).String())
klog.V(2).Infof("%v: migration succeeded", m.Name)
return err
}
_, err = km.updateStatus(m, migrationv1alpha1.MigrationFailed, err.Error())
klog.Errorf("%v: migration failed: %v", m.Name, err)
if _, err := km.updateStatus(m, migrationv1alpha1.MigrationFailed, err.Error()); err != nil {
utilruntime.HandleError(err)
}
metrics.Metrics.ObserveFailedMigration(resource(m).String())
return err
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/migrator/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator/metrics"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,6 +30,9 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/klog"

"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator/metrics"
)

var metadataAccessor = meta.NewAccessor()
Expand Down Expand Up @@ -93,6 +95,10 @@ func (m *migrator) Run() error {
Continue: continueToken,
},
)
if errors.IsNotFound(listError) {
// Fail this migration, we don't want to get stuck on a migration for a resource that does not exist.
return fmt.Errorf("failed to list resources: %v", listError)
}
if listError != nil && !errors.IsResourceExpired(listError) {
if canRetry(listError) {
if seconds, delay := errors.SuggestsClientDelay(listError); delay {
Expand Down Expand Up @@ -201,8 +207,18 @@ func (m *migrator) migrateOneItem(item *unstructured.Unstructured) error {
return nil
}
if canRetry(err) {
if seconds, delay := errors.SuggestsClientDelay(err); delay {
seconds, delay := errors.SuggestsClientDelay(err)
switch {
case delay && len(namespace) > 0:
klog.Warningf("migration of %s, in the %s namespace, will be retried after a %ds delay: %v", name, namespace, seconds, err)
time.Sleep(time.Duration(seconds) * time.Second)
case delay:
klog.Warningf("migration of %s will be retried after a %ds delay: %v", name, seconds, err)
time.Sleep(time.Duration(seconds) * time.Second)
case !delay && len(namespace) > 0:
klog.Warningf("migration of %s, in the %s namespace, will be retried: %v", name, namespace, err)
default:
klog.Warningf("migration of %s will be retried: %v", name, err)
}
continue
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/trigger/discovery_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"fmt"
"reflect"

migrationv1alpha1 "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1"
"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/controller"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

migrationv1alpha1 "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1"
"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/controller"
)

func (mt *MigrationTrigger) processDiscovery() {
Expand Down Expand Up @@ -185,10 +186,11 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
utilruntime.HandleError(getErr)
return
}

stale := (getErr == nil && mt.staleStorageState(ss))
storageVersionChanged := (getErr == nil && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash)
notFound := (getErr != nil && errors.IsNotFound(getErr))
found := getErr == nil
stale := found && mt.staleStorageState(ss)
storageVersionChanged := found && ss.Status.CurrentStorageVersionHash != r.StorageVersionHash
needsMigration := found && !mt.isMigrated(ss) && !mt.hasPendingOrRunningMigration(r)
relaunchMigration := stale || !found || storageVersionChanged || needsMigration

if stale {
if err := mt.client.MigrationV1alpha1().StorageStates().Delete(storageStateName(toGroupResource(r)), nil); err != nil {
Expand All @@ -197,7 +199,7 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
}
}

if stale || storageVersionChanged || notFound {
if relaunchMigration {
// Note that this means historical migration objects are deleted.
if err := mt.relaunchMigration(r); err != nil {
utilruntime.HandleError(err)
Expand All @@ -207,3 +209,27 @@ func (mt *MigrationTrigger) processDiscoveryResource(r metav1.APIResource) {
// always update status.heartbeat, sometimes update the version hashes.
mt.updateStorageState(r.StorageVersionHash, r)
}
func (mt *MigrationTrigger) isMigrated(ss *migrationv1alpha1.StorageState) bool {
if len(ss.Status.PersistedStorageVersionHashes) != 1 {
return false
}
return ss.Status.CurrentStorageVersionHash == ss.Status.PersistedStorageVersionHashes[0]
}

func (mt *MigrationTrigger) hasPendingOrRunningMigration(r metav1.APIResource) bool {
// get the corresponding StorageVersionMigration resource
migrations, err := mt.migrationInformer.GetIndexer().ByIndex(controller.ResourceIndex, controller.ToIndex(toGroupResource(r)))
if err != nil {
utilruntime.HandleError(err)
return false
}
for _, migration := range migrations {
m := migration.(*migrationv1alpha1.StorageVersionMigration)
if controller.HasCondition(m, migrationv1alpha1.MigrationSucceeded) || controller.HasCondition(m, migrationv1alpha1.MigrationFailed) {
continue
}
// migration is running or pending
return true
}
return false
}
Loading

0 comments on commit d0b0463

Please sign in to comment.