diff --git a/Makefile b/Makefile index 56efb1532e..db86ef0ff0 100644 --- a/Makefile +++ b/Makefile @@ -88,10 +88,11 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} $(KUSTOMIZE) build config/default | kubectl apply -f - echo -e "resources:\n- manager.yaml" > config/manager/kustomization.yaml + $(KUSTOMIZE) build config/daemonconfig | kubectl apply -f - undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config. $(KUSTOMIZE) build config/default | kubectl delete -f - - + $(KUSTOMIZE) build config/daemonconfig | kubectl delete -f - CONTROLLER_GEN = $(shell pwd)/bin/controller-gen controller-gen: ## Download controller-gen locally if necessary. diff --git a/apis/apps/defaults/v1alpha1.go b/apis/apps/defaults/v1alpha1.go index ba9177131e..9a15eaeb80 100644 --- a/apis/apps/defaults/v1alpha1.go +++ b/apis/apps/defaults/v1alpha1.go @@ -24,6 +24,12 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" v1 "k8s.io/kubernetes/pkg/apis/core/v1" utilpointer "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + // ProtectionFinalizer is designed to ensure the GC of resources. + ProtectionFinalizer = "apps.kruise.io/deletion-protection" ) // SetDefaults_SidecarSet set default values for SidecarSet. @@ -372,7 +378,7 @@ func SetDefaultsImageTagPullPolicy(obj *v1alpha1.ImageTagPullPolicy) { } // SetDefaults_ImagePullJob set default values for ImagePullJob. -func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) { +func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob, addProtection bool) { if obj.Spec.CompletionPolicy.Type == "" { obj.Spec.CompletionPolicy.Type = v1alpha1.Always } @@ -388,6 +394,9 @@ func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) { if obj.Spec.ImagePullPolicy == "" { obj.Spec.ImagePullPolicy = v1alpha1.PullIfNotPresent } + if addProtection { + controllerutil.AddFinalizer(obj, ProtectionFinalizer) + } } // SetDefaultsImageListPullJob set default values for ImageListPullJob. diff --git a/config/daemonconfig/config/kustomization.yaml b/config/daemonconfig/config/kustomization.yaml new file mode 100644 index 0000000000..f4bb65c0c9 --- /dev/null +++ b/config/daemonconfig/config/kustomization.yaml @@ -0,0 +1,3 @@ +resources: +- namespace.yaml +- rbac.yaml diff --git a/config/daemonconfig/config/namespace.yaml b/config/daemonconfig/config/namespace.yaml new file mode 100644 index 0000000000..27503a6a71 --- /dev/null +++ b/config/daemonconfig/config/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: kruise-daemon-config diff --git a/config/daemonconfig/config/rbac.yaml b/config/daemonconfig/config/rbac.yaml new file mode 100644 index 0000000000..26ebdb9873 --- /dev/null +++ b/config/daemonconfig/config/rbac.yaml @@ -0,0 +1,29 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + creationTimestamp: null + name: kruise-daemon-secret-role + namespace: kruise-daemon-config +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: kruise-daemon-secret-rolebinding + namespace: kruise-daemon-config +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: kruise-daemon-secret-role +subjects: + - kind: ServiceAccount + name: kruise-daemon + namespace: kruise-system \ No newline at end of file diff --git a/config/daemonconfig/kustomization.yaml b/config/daemonconfig/kustomization.yaml new file mode 100644 index 0000000000..744829430b --- /dev/null +++ b/config/daemonconfig/kustomization.yaml @@ -0,0 +1,8 @@ +namespace: kruise-daemon-config +# Value of this field is prepended to the +# names of all resources, e.g. a deployment named +# "wordpress" becomes "alices-wordpress". +# Note that it should also match with the prefix (text before '-') of the namespace +# field above. +bases: + - config \ No newline at end of file diff --git a/config/default/kruise-daemon-config.yaml b/config/default/kruise-daemon-config.yaml new file mode 100644 index 0000000000..1bdfb1e114 --- /dev/null +++ b/config/default/kruise-daemon-config.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: Namespace +metadata: + labels: + control-plane: controller-manager + name: kruise-daemon-config diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index 55cddb0722..74658315bc 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -1,6 +1,3 @@ -# Adds namespace to all resources. -namespace: kruise-system - # Value of this field is prepended to the # names of all resources, e.g. a deployment named # "wordpress" becomes "alices-wordpress". @@ -12,16 +9,19 @@ namePrefix: kruise- #commonLabels: # someName: someValue +resources: + - kruise-daemon-config.yaml + bases: - ../crd - ../rbac - ../manager -# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in +# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in # crd/kustomization.yaml - ../webhook # [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required. #- ../certmanager -# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'. +# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'. #- ../prometheus patchesStrategicMerge: @@ -30,7 +30,7 @@ patchesStrategicMerge: # endpoint w/o any authn/z, please comment the following line. # - manager_auth_proxy_patch.yaml -# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in +# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in # crd/kustomization.yaml - manager_webhook_patch.yaml diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 5c5f0b84cb..b23702d1cc 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -1,2 +1,5 @@ +# Adds namespace to all resources. +namespace: kruise-system + resources: - manager.yaml diff --git a/config/rbac/daemon_role.yaml b/config/rbac/daemon_role.yaml index e87ce2a1b7..66e5dec60d 100644 --- a/config/rbac/daemon_role.yaml +++ b/config/rbac/daemon_role.yaml @@ -53,8 +53,6 @@ rules: verbs: - get - list - - patch - - update - watch - apiGroups: - "" @@ -64,14 +62,6 @@ rules: - get - patch - update -- apiGroups: - - "" - resources: - - secrets - verbs: - - get - - list - - watch - apiGroups: - apps.kruise.io resources: diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 34802c2a8d..f64757ff5d 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -1,3 +1,6 @@ +# Adds namespace to all resources. +namespace: kruise-system + resources: - role.yaml - role_binding.yaml diff --git a/config/webhook/kustomization.yaml b/config/webhook/kustomization.yaml index 058b243beb..aea412241b 100644 --- a/config/webhook/kustomization.yaml +++ b/config/webhook/kustomization.yaml @@ -1,3 +1,6 @@ +# Adds namespace to all resources. +namespace: kruise-system + resources: - manifests.yaml - service.yaml diff --git a/pkg/controller/imagepulljob/imagepulljob_controller.go b/pkg/controller/imagepulljob/imagepulljob_controller.go index 3f9c14f0b6..e26e1064a9 100644 --- a/pkg/controller/imagepulljob/imagepulljob_controller.go +++ b/pkg/controller/imagepulljob/imagepulljob_controller.go @@ -23,6 +23,7 @@ import ( "sort" "time" + "github.com/openkruise/kruise/apis/apps/defaults" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" daemonutil "github.com/openkruise/kruise/pkg/daemon/util" "github.com/openkruise/kruise/pkg/features" @@ -38,10 +39,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -56,11 +59,22 @@ var ( concurrentReconciles = 3 controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob") resourceVersionExpectations = expectations.NewResourceVersionExpectation() + scaleExpectations = expectations.NewScaleExpectations() ) const ( defaultParallelism = 1 minRequeueTime = time.Second + + // SourceSecretKeyAnno is an annotations instead of label + // because the length of key may be more than 64. + SourceSecretKeyAnno = "imagepulljobs.kruise.io/source-key" + // SourceSecretUIDLabelKey is designed to select target via source secret. + SourceSecretUIDLabelKey = "imagepulljobs.kruise.io/source-uid" + // TargetOwnerReferencesAnno records the keys of imagePullJobs that refers + // the target secret. If TargetOwnerReferencesAnno is empty, means the target + // secret should be deleted. + TargetOwnerReferencesAnno = "imagepulljobs.kruise.io/references" ) // Add creates a new ImagePullJob Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller @@ -109,6 +123,12 @@ func add(mgr manager.Manager, r *ReconcileImagePullJob) error { return err } + // Watch for secret for jobs that have pullSecrets + err = c.Watch(&source.Kind{Type: &v1.Secret{}}, &secretEventHandler{Reader: mgr.GetCache()}) + if err != nil { + return err + } + return nil } @@ -154,6 +174,16 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R return reconcile.Result{}, err } + // If scale expectations have not satisfied yet, just skip this reconcile + if scaleSatisfied, unsatisfiedDuration, dirtyData := scaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied { + if unsatisfiedDuration >= expectations.ExpectationTimeout { + klog.Warningf("ImagePullJob: expectation unsatisfied overtime for %v, dirtyData=%v, overtime=%v", request.String(), dirtyData, unsatisfiedDuration) + return reconcile.Result{}, nil + } + klog.V(4).Infof("ImagePullJob: not satisfied scale for %v, dirtyData=%v", request.String(), dirtyData) + return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil + } + // If resourceVersion expectations have not satisfied yet, just skip this reconcile resourceVersionExpectations.Observe(job) if isSatisfied, unsatisfiedDuration := resourceVersionExpectations.IsSatisfied(job); !isSatisfied { @@ -166,11 +196,17 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R } if job.DeletionTimestamp != nil { - return reconcile.Result{}, nil + // ensure the GC of secrets and remove protection finalizer + return reconcile.Result{}, r.finalize(job) } // The Job has been finished if job.Status.CompletionTime != nil { + // ensure the GC of secrets and remove protection finalizer + if err = r.finalize(job); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to remove finalizer: %v", err) + } + var leftTime time.Duration if job.Spec.CompletionPolicy.TTLSecondsAfterFinished != nil { leftTime = time.Duration(*job.Spec.CompletionPolicy.TTLSecondsAfterFinished)*time.Second - time.Since(job.Status.CompletionTime.Time) @@ -185,6 +221,11 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R return reconcile.Result{RequeueAfter: leftTime}, nil } + // add protection finalizer to ensure the GC of secrets + if err = r.addProtectionFinalizer(job); err != nil { + return reconcile.Result{}, err + } + // Get all NodeImage related to this ImagePullJob nodeImages, err := utilimagejob.GetNodeImagesForJob(r.Client, job) if err != nil { @@ -204,14 +245,20 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R } } + // sync secret to kruise-daemon-config namespace before pulling + secrets, err := r.syncSecrets(job) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to sync secrets: %v", err) + } + // Calculate the new status for this job - newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages) + newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages, secrets) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to calculate status: %v", err) } // Sync image to more NodeImages - if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages); err != nil { + if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages, secrets); err != nil { return reconcile.Result{}, fmt.Errorf("failed to sync NodeImages: %v", err) } @@ -234,7 +281,28 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R return reconcile.Result{}, nil } -func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string) error { +func (r *ReconcileImagePullJob) syncSecrets(job *appsv1alpha1.ImagePullJob) ([]appsv1alpha1.ReferenceObject, error) { + if job.Namespace == util.GetKruiseDaemonConfigNamespace() { + return getSecrets(job), nil // Ignore this special case. + } + + targetMap, deleteMap, err := r.getTargetSecretMap(job) + if err != nil { + return nil, err + } + if err = r.releaseTargetSecrets(deleteMap, job); err != nil { + return nil, err + } + if job.DeletionTimestamp != nil || job.Status.CompletionTime != nil { + return nil, r.releaseTargetSecrets(targetMap, job) + } + if err = r.checkNamespaceExists(util.GetKruiseDaemonConfigNamespace()); err != nil { + return nil, fmt.Errorf("failed to check kruise-daemon-config namespace: %v", err) + } + return r.syncTargetSecrets(job, targetMap) +} + +func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string, secrets []appsv1alpha1.ReferenceObject) error { if len(notSyncedNodeImages) == 0 { return nil } @@ -254,7 +322,6 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n } ownerRef := getOwnerRef(job) - secrets := getSecrets(job) pullPolicy := getImagePullPolicy(job) now := metav1.NewTime(r.clock.Now()) imageName, imageTag, _ := daemonutil.NormalizeImageRefToNameTag(job.Spec.Image) @@ -339,7 +406,116 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n return nil } -func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage) (*appsv1alpha1.ImagePullJobStatus, []string, error) { +func (r *ReconcileImagePullJob) getTargetSecretMap(job *appsv1alpha1.ImagePullJob) (map[string]*v1.Secret, map[string]*v1.Secret, error) { + options := client.ListOptions{ + Namespace: util.GetKruiseDaemonConfigNamespace(), + } + targetLister := &v1.SecretList{} + if err := r.List(context.TODO(), targetLister, &options, utilclient.DisableDeepCopy); err != nil { + return nil, nil, err + } + + jobKey := keyFromObject(job) + sourceReferences := getSecrets(job) + deleteMap := make(map[string]*v1.Secret) + targetMap := make(map[string]*v1.Secret, len(targetLister.Items)) + for i := range targetLister.Items { + target := &targetLister.Items[i] + if target.DeletionTimestamp != nil { + continue + } + keySet := referenceSetFromTarget(target) + if !keySet.Contains(jobKey) { + continue + } + sourceNs, sourceName, err := cache.SplitMetaNamespaceKey(target.Annotations[SourceSecretKeyAnno]) + if err != nil { + klog.Warningf("Failed to parse source key from target %s annotations: %s", target.Name, err) + } + if containsObject(sourceReferences, appsv1alpha1.ReferenceObject{Namespace: sourceNs, Name: sourceName}) { + targetMap[target.Labels[SourceSecretUIDLabelKey]] = target + } else { + deleteMap[target.Labels[SourceSecretUIDLabelKey]] = target + } + } + return targetMap, deleteMap, nil +} + +func (r *ReconcileImagePullJob) releaseTargetSecrets(targetMap map[string]*v1.Secret, job *appsv1alpha1.ImagePullJob) error { + if len(targetMap) == 0 { + return nil + } + + jobKey := keyFromObject(job) + for _, secret := range targetMap { + if secret == nil { + continue + } + + keySet := referenceSetFromTarget(secret) + // Remove the reference to this job from target, we use Update instead of + // Patch to make sure we do not delete any targets that is still referred, + // because a target may be newly referred in this reconcile round. + if keySet.Contains(keyFromObject(job)) { + keySet.Delete(jobKey) + secret = secret.DeepCopy() + secret.Annotations[TargetOwnerReferencesAnno] = keySet.String() + if err := r.Update(context.TODO(), secret); err != nil { + return err + } + resourceVersionExpectations.Expect(secret) + } + + // The target is still referred by other jobs, do not delete it. + if !keySet.IsEmpty() { + return nil + } + + // Just delete it if no one refers it anymore. + if err := r.Delete(context.TODO(), secret); err != nil && !errors.IsNotFound(err) { + return err + } + } + return nil +} + +func (r *ReconcileImagePullJob) syncTargetSecrets(job *appsv1alpha1.ImagePullJob, targetMap map[string]*v1.Secret) ([]appsv1alpha1.ReferenceObject, error) { + sourceReferences := getSecrets(job) + targetReferences := make([]appsv1alpha1.ReferenceObject, 0, len(sourceReferences)) + for _, sourceRef := range sourceReferences { + source := &v1.Secret{} + if err := r.Get(context.TODO(), keyFromRef(sourceRef), source); err != nil { + if errors.IsNotFound(err) { + continue + } + return nil, err + } + + target := targetMap[string(source.UID)] + switch action := computeTargetSyncAction(source, target, job); action { + case create: + referenceKeys := makeReferenceSet(keyFromObject(job)) + target = targetFromSource(source, referenceKeys) + scaleExpectations.ExpectScale(keyFromObject(job).String(), expectations.Create, string(source.UID)) + if err := r.Create(context.TODO(), target); err != nil { + scaleExpectations.ObserveScale(keyFromObject(job).String(), expectations.Create, string(source.UID)) + return nil, err + } + + case update: + referenceKeys := referenceSetFromTarget(target).Insert(keyFromObject(job)) + target = updateTarget(target, source, referenceKeys) + if err := r.Update(context.TODO(), target); err != nil { + return nil, err + } + resourceVersionExpectations.Expect(target) + } + targetReferences = append(targetReferences, appsv1alpha1.ReferenceObject{Namespace: target.Namespace, Name: target.Name}) + } + return targetReferences, nil +} + +func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage, secrets []appsv1alpha1.ReferenceObject) (*appsv1alpha1.ImagePullJobStatus, []string, error) { newStatus := appsv1alpha1.ImagePullJobStatus{ StartTime: job.Status.StartTime, Desired: int32(len(nodeImages)), @@ -357,7 +533,20 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, var notSynced, pulling, succeeded, failed []string for _, nodeImage := range nodeImages { var tagVersion int64 = -1 + var secretSynced bool = true if imageSpec, ok := nodeImage.Spec.Images[imageName]; ok { + for _, secret := range secrets { + if !containsObject(imageSpec.PullSecrets, secret) { + secretSynced = false + break + } + } + + if !secretSynced { + notSynced = append(notSynced, nodeImage.Name) + continue + } + for _, tagSpec := range imageSpec.Tags { if tagSpec.Tag != imageTag { continue @@ -375,6 +564,7 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, tagVersion = tagSpec.Version } } + if tagVersion < 0 { notSynced = append(notSynced, nodeImage.Name) continue @@ -430,3 +620,29 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, sort.Strings(newStatus.FailedNodes) return &newStatus, notSynced, nil } + +func (r *ReconcileImagePullJob) checkNamespaceExists(nsName string) error { + namespace := v1.Namespace{} + return r.Get(context.TODO(), types.NamespacedName{Name: nsName}, &namespace) +} + +// addProtectionFinalizer ensure the GC of secrets in kruise-daemon-config ns +func (r *ReconcileImagePullJob) addProtectionFinalizer(job *appsv1alpha1.ImagePullJob) error { + if controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) { + return nil + } + job.Finalizers = append(job.Finalizers, defaults.ProtectionFinalizer) + return r.Update(context.TODO(), job) +} + +// finalize also ensure the GC of secrets in kruise-daemon-config ns +func (r *ReconcileImagePullJob) finalize(job *appsv1alpha1.ImagePullJob) error { + if !controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) { + return nil + } + if _, err := r.syncSecrets(job); err != nil { + return err + } + controllerutil.RemoveFinalizer(job, defaults.ProtectionFinalizer) + return r.Update(context.TODO(), job) +} diff --git a/pkg/controller/imagepulljob/imagepulljob_event_handler.go b/pkg/controller/imagepulljob/imagepulljob_event_handler.go index dde18cbaa1..579ce99e27 100644 --- a/pkg/controller/imagepulljob/imagepulljob_event_handler.go +++ b/pkg/controller/imagepulljob/imagepulljob_event_handler.go @@ -17,10 +17,14 @@ limitations under the License. package imagepulljob import ( + "context" "reflect" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" daemonutil "github.com/openkruise/kruise/pkg/daemon/util" + kruiseutil "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/expectations" utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -187,6 +191,102 @@ func (e *podEventHandler) handleUpdate(pod, oldPod *v1.Pod, q workqueue.RateLimi } } +type secretEventHandler struct { + client.Reader +} + +var _ handler.EventHandler = &secretEventHandler{} + +func (e *secretEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + obj := evt.Object.(*v1.Secret) + e.handle(obj, q) +} + +func (e *secretEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + newObj := evt.ObjectNew.(*v1.Secret) + oldObj := evt.ObjectOld.(*v1.Secret) + e.handleUpdate(newObj, oldObj, q) +} + +func (e *secretEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +} + +func (e *secretEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +func (e *secretEventHandler) handle(secret *v1.Secret, q workqueue.RateLimitingInterface) { + if secret != nil && secret.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() { + jobKeySet := referenceSetFromTarget(secret) + klog.V(4).Infof("Observe secret %s/%s created, uid: %s, refs: %s", secret.Namespace, secret.Name, secret.UID, jobKeySet.String()) + for key := range jobKeySet { + scaleExpectations.ObserveScale(key.String(), expectations.Create, secret.Labels[SourceSecretUIDLabelKey]) + } + return + } + + if secret == nil || secret.DeletionTimestamp != nil { + return + } + // Get jobs related to this Secret + jobKeys, err := e.getActiveJobKeysForSecret(secret) + if err != nil { + klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secret.Namespace, secret.Name, err) + } + for _, jKey := range jobKeys { + q.Add(reconcile.Request{NamespacedName: jKey}) + } +} + +func (e *secretEventHandler) handleUpdate(secretNew, secretOld *v1.Secret, q workqueue.RateLimitingInterface) { + if secretNew != nil && secretNew.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() { + jobKeySet := referenceSetFromTarget(secretNew) + for key := range jobKeySet { + scaleExpectations.ObserveScale(key.String(), expectations.Create, secretNew.Labels[SourceSecretUIDLabelKey]) + } + return + } + + if secretOld == nil || secretNew == nil || secretNew.DeletionTimestamp != nil || + (reflect.DeepEqual(secretNew.Data, secretOld.Data) && reflect.DeepEqual(secretNew.StringData, secretOld.StringData)) { + return + } + // Get jobs related to this Secret + jobKeys, err := e.getActiveJobKeysForSecret(secretNew) + if err != nil { + klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secretNew.Namespace, secretNew.Name, err) + } + for _, jKey := range jobKeys { + q.Add(reconcile.Request{NamespacedName: jKey}) + } +} + +func (e *secretEventHandler) getActiveJobKeysForSecret(secret *v1.Secret) ([]types.NamespacedName, error) { + jobLister := &appsv1alpha1.ImagePullJobList{} + if err := e.List(context.TODO(), jobLister, client.InNamespace(secret.Namespace), utilclient.DisableDeepCopy); err != nil { + return nil, err + } + var jobKeys []types.NamespacedName + for i := range jobLister.Items { + job := &jobLister.Items[i] + if job.DeletionTimestamp != nil { + continue + } + if jobContainsSecret(job, secret.Name) { + jobKeys = append(jobKeys, keyFromObject(job)) + } + } + return jobKeys, nil +} + +func jobContainsSecret(job *appsv1alpha1.ImagePullJob, secretName string) bool { + for _, s := range job.Spec.PullSecrets { + if secretName == s { + return true + } + } + return false +} + func diffJobs(newJobs, oldJobs []*appsv1alpha1.ImagePullJob) set { setNew := make(set, len(newJobs)) setOld := make(set, len(oldJobs)) diff --git a/pkg/controller/imagepulljob/imagepulljob_utils.go b/pkg/controller/imagepulljob/imagepulljob_utils.go index d423ab3cbe..83a38945b2 100644 --- a/pkg/controller/imagepulljob/imagepulljob_utils.go +++ b/pkg/controller/imagepulljob/imagepulljob_utils.go @@ -19,15 +19,28 @@ package imagepulljob import ( "fmt" "math/rand" + "reflect" + "strings" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" ) -const ( - defaultTTLSecondsForNever = int32(24 * 3600) +type syncAction string +const ( + defaultTTLSecondsForNever = int32(24 * 3600) defaultActiveDeadlineSecondsForNever = int64(1800) + + create syncAction = "create" + update syncAction = "update" + noAction syncAction = "noAction" ) func getTTLSecondsForAlways(job *appsv1alpha1.ImagePullJob) *int32 { @@ -118,3 +131,105 @@ func formatStatusMessage(status *appsv1alpha1.ImagePullJobStatus) (ret string) { } return fmt.Sprintf("job is running, progress %.1f%%", 100.0*float64(status.Succeeded+status.Failed)/float64(status.Desired)) } + +func keyFromRef(ref appsv1alpha1.ReferenceObject) types.NamespacedName { + return types.NamespacedName{ + Name: ref.Name, + Namespace: ref.Namespace, + } +} + +func keyFromObject(object client.Object) types.NamespacedName { + return types.NamespacedName{ + Name: object.GetName(), + Namespace: object.GetNamespace(), + } +} + +func targetFromSource(source *v1.Secret, keySet referenceSet) *v1.Secret { + target := source.DeepCopy() + target.ObjectMeta = metav1.ObjectMeta{ + Namespace: util.GetKruiseDaemonConfigNamespace(), + GenerateName: fmt.Sprintf("%s-", source.Name), + Labels: map[string]string{ + SourceSecretUIDLabelKey: string(source.UID), + }, + Annotations: map[string]string{ + SourceSecretKeyAnno: keyFromObject(source).String(), + TargetOwnerReferencesAnno: keySet.String(), + }, + } + return target +} + +func updateTarget(target, source *v1.Secret, keySet referenceSet) *v1.Secret { + target = target.DeepCopy() + target.Data = source.Data + target.StringData = source.StringData + target.Annotations[TargetOwnerReferencesAnno] = keySet.String() + return target +} + +func referenceSetFromTarget(target *v1.Secret) referenceSet { + refs := strings.Split(target.Annotations[TargetOwnerReferencesAnno], ",") + keys := makeReferenceSet() + for _, ref := range refs { + namespace, name, err := cache.SplitMetaNamespaceKey(ref) + if err != nil { + klog.Errorf("Failed to parse job key from target secret %s annotations: %v", target.Name, err) + continue + } + keys.Insert(types.NamespacedName{Namespace: namespace, Name: name}) + } + return keys +} + +func computeTargetSyncAction(source, target *v1.Secret, job *appsv1alpha1.ImagePullJob) syncAction { + if target == nil || len(target.UID) == 0 { + return create + } + keySet := referenceSetFromTarget(target) + if !keySet.Contains(keyFromObject(job)) || + !reflect.DeepEqual(source.Data, target.Data) || + !reflect.DeepEqual(source.StringData, target.StringData) { + return update + } + return noAction +} + +func makeReferenceSet(items ...types.NamespacedName) referenceSet { + refSet := map[types.NamespacedName]struct{}{} + for _, item := range items { + refSet[item] = struct{}{} + } + return refSet +} + +type referenceSet map[types.NamespacedName]struct{} + +func (set referenceSet) String() string { + keyList := make([]string, 0, len(set)) + for ref := range set { + keyList = append(keyList, ref.String()) + } + return strings.Join(keyList, ",") +} + +func (set referenceSet) Contains(key types.NamespacedName) bool { + _, exists := set[key] + return exists +} + +func (set referenceSet) Insert(key types.NamespacedName) referenceSet { + set[key] = struct{}{} + return set +} + +func (set referenceSet) Delete(key types.NamespacedName) referenceSet { + delete(set, key) + return set +} + +func (set referenceSet) IsEmpty() bool { + return len(set) == 0 +} diff --git a/pkg/util/meta.go b/pkg/util/meta.go index 23595bd0db..9690d81c89 100644 --- a/pkg/util/meta.go +++ b/pkg/util/meta.go @@ -1,5 +1,5 @@ /* -Copyright 2021. +Copyright 2022 The Kruise Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,3 +24,10 @@ func GetKruiseNamespace() string { } return "kruise-system" } + +func GetKruiseDaemonConfigNamespace() string { + if ns := os.Getenv("KRUISE_DAEMON_CONFIG_NS"); len(ns) > 0 { + return ns + } + return "kruise-daemon-config" +} diff --git a/pkg/util/meta_test.go b/pkg/util/meta_test.go new file mode 100644 index 0000000000..65f9cff1f0 --- /dev/null +++ b/pkg/util/meta_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "os" + "testing" +) + +func TestMetaGetNamespace(t *testing.T) { + if GetKruiseNamespace() != "kruise-system" { + t.Fatalf("expect(kruise-system), but get(%s)", GetKruiseNamespace()) + } + _ = os.Setenv("POD_NAMESPACE", "test") + if GetKruiseNamespace() != "test" { + t.Fatalf("expect(test), but get(%s)", GetKruiseNamespace()) + } + if GetKruiseDaemonConfigNamespace() != "kruise-daemon-config" { + t.Fatalf("expect(kruise-daemon-config), but get(%s)", GetKruiseDaemonConfigNamespace()) + } + _ = os.Setenv("KRUISE_DAEMON_CONFIG_NS", "test") + if GetKruiseDaemonConfigNamespace() != "test" { + t.Fatalf("expect(test), but get(%s)", GetKruiseDaemonConfigNamespace()) + } +} diff --git a/pkg/webhook/imagepulljob/mutating/imagepulljob_create_update_handler.go b/pkg/webhook/imagepulljob/mutating/imagepulljob_create_update_handler.go index 4c471cfa40..dd0749deed 100644 --- a/pkg/webhook/imagepulljob/mutating/imagepulljob_create_update_handler.go +++ b/pkg/webhook/imagepulljob/mutating/imagepulljob_create_update_handler.go @@ -25,6 +25,7 @@ import ( "github.com/openkruise/kruise/apis/apps/defaults" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/util" + admissionv1 "k8s.io/api/admission/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -46,7 +47,7 @@ func (h *ImagePullJobCreateUpdateHandler) Handle(ctx context.Context, req admiss return admission.Errored(http.StatusBadRequest, err) } var copy runtime.Object = obj.DeepCopy() - defaults.SetDefaultsImagePullJob(obj) + defaults.SetDefaultsImagePullJob(obj, req.AdmissionRequest.Operation == admissionv1.Create) if reflect.DeepEqual(obj, copy) { return admission.Allowed("") } diff --git a/test/e2e/apps/pullimages.go b/test/e2e/apps/pullimages.go index c32e212e97..4a87cc264d 100644 --- a/test/e2e/apps/pullimages.go +++ b/test/e2e/apps/pullimages.go @@ -18,22 +18,29 @@ package apps import ( "context" + "encoding/base64" "fmt" + "reflect" + "strings" "time" "github.com/onsi/ginkgo" "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/controller/imagepulljob" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" utilpointer "k8s.io/utils/pointer" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/pkg/util" - "github.com/openkruise/kruise/test/e2e/framework" + "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = SIGDescribe("PullImage", func() { @@ -65,6 +72,20 @@ var _ = SIGDescribe("PullImage", func() { }, } + referenceSetFromTarget := func(target *v1.Secret) map[types.NamespacedName]struct{} { + refs := strings.Split(target.Annotations[imagepulljob.TargetOwnerReferencesAnno], ",") + keys := map[types.NamespacedName]struct{}{} + for _, ref := range refs { + namespace, name, err := cache.SplitMetaNamespaceKey(ref) + if err != nil { + klog.Errorf("Failed to parse job key from target secret %s annotations: %v", target.Name, err) + continue + } + keys[types.NamespacedName{Namespace: namespace, Name: name}] = struct{}{} + } + return keys + } + ginkgo.BeforeEach(func() { c = f.ClientSet kc = f.KruiseClientSet @@ -91,6 +112,197 @@ var _ = SIGDescribe("PullImage", func() { baseJob = &appsv1alpha1.ImagePullJob{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "test-imagepulljob"}} }) + framework.ConformanceIt("pull image with secret", func() { + var err error + base64Code := "eyJhdXRocyI6eyJodHRwczovL2luZGV4LmRvY2tlci5pby92MS8iOnsidXNlcm5hbWUiOiJtaW5jaG91IiwicGFzc3dvcmQiOiJtaW5nemhvdS5zd3giLCJlbWFpbCI6InZlYy5nLnN1bkBnbWFpbC5jb20iLCJhdXRoIjoiYldsdVkyaHZkVHB0YVc1bmVtaHZkUzV6ZDNnPSJ9fX0=" + bytes, err := base64.StdEncoding.DecodeString(base64Code) + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: "pull-secret", + }, + Type: "kubernetes.io/dockerconfigjson", + Data: map[string][]byte{ + ".dockerconfigjson": bytes, + }, + } + secret, err = testerForImagePullJob.CreateSecret(secret) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImagePullJobSpec{ + Image: NginxImage, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + Selector: &appsv1alpha1.ImagePullJobNodeSelector{LabelSelector: metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: framework.FakeNodeImageLabelKey, Operator: metav1.LabelSelectorOpDoesNotExist}, + }}}, + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + PullSecrets: []string{secret.Name}, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Always, + ActiveDeadlineSeconds: utilpointer.Int64Ptr(50), + TTLSecondsAfterFinished: utilpointer.Int32Ptr(20), + }, + }, + } + err = testerForImagePullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Desired should be equal to number of nodes") + gomega.Eventually(func() int32 { + job, err = testerForImagePullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(nodes)))) + + ginkgo.By("Secret is synced") + gomega.Eventually(func() bool { + synced, _ := testerForImagePullJob.ListSyncedSecrets(secret) + if len(synced) != 1 { + return false + } + if _, exists := referenceSetFromTarget(&synced[0])[client.ObjectKeyFromObject(job)]; !exists { + return false + } + return reflect.DeepEqual(synced[0].Data, secret.Data) + }, 10*time.Second, time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() bool { + job, err = testerForImagePullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.CompletionTime != nil + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(true)) + gomega.Expect(job.Status.Succeeded).To(gomega.Equal(int32(len(nodes)))) + + ginkgo.By("Wait clean in 25s") + gomega.Eventually(func() bool { + _, err = testerForImagePullJob.GetJob(job) + return err != nil && errors.IsNotFound(err) + }, 25*time.Second, 2*time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Check image should be cleaned in NodeImage") + gomega.Eventually(func() bool { + found, err := testerForNodeImage.IsImageInSpec(job.Spec.Image, nodes[0].Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return found + }, 25*time.Second, time.Second).Should(gomega.Equal(false)) + + ginkgo.By("Check secrets should be cleaned in kruise-daemon-config") + gomega.Eventually(func() bool { + synced, _ := testerForImagePullJob.ListSyncedSecrets(secret) + return len(synced) == 0 + }, 10*time.Second, time.Second).Should(gomega.Equal(true)) + }) + + framework.ConformanceIt("never completion pull job with updated pull secrets", func() { + var err error + base64Code := "eyJhdXRocyI6eyJodHRwczovL2luZGV4LmRvY2tlci5pby92MS8iOnsidXNlcm5hbWUiOiJtaW5jaG91IiwicGFzc3dvcmQiOiJtaW5nemhvdS5zd3giLCJlbWFpbCI6InZlYy5nLnN1bkBnbWFpbC5jb20iLCJhdXRoIjoiYldsdVkyaHZkVHB0YVc1bmVtaHZkUzV6ZDNnPSJ9fX0=" + newBase64Code := "eyJhdXRocyI6eyJodHRwczovL2luZGV4LmRvY2tlci5pby92MS8iOnsidXNlcm5hbWUiOiJtaW5jaG91IiwicGFzc3dvcmQiOiJtaW5nemhvdS50ZXN0IiwiZW1haWwiOiJ2ZWMuZy5zdW5AZ21haWwuY29tIiwiYXV0aCI6ImJXbHVZMmh2ZFRwdGFXNW5lbWh2ZFM1MFpYTjAifX19" + bytes, _ := base64.StdEncoding.DecodeString(base64Code) + newBytes, _ := base64.StdEncoding.DecodeString(newBase64Code) + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: "pull-secret", + }, + Type: "kubernetes.io/dockerconfigjson", + Data: map[string][]byte{ + ".dockerconfigjson": bytes, + }, + } + secret, err = testerForImagePullJob.CreateSecret(secret) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImagePullJobSpec{ + Image: NginxImage, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + Selector: &appsv1alpha1.ImagePullJobNodeSelector{LabelSelector: metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: framework.FakeNodeImageLabelKey, Operator: metav1.LabelSelectorOpDoesNotExist}, + }}}, + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + PullSecrets: []string{secret.Name}, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Never, + }, + }, + } + err = testerForImagePullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Desired should be equal to number of nodes") + gomega.Eventually(func() int32 { + job, err = testerForImagePullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(nodes)))) + + ginkgo.By("Secret is synced") + gomega.Eventually(func() bool { + synced, _ := testerForImagePullJob.ListSyncedSecrets(secret) + if len(synced) != 1 { + return false + } + if _, exists := referenceSetFromTarget(&synced[0])[client.ObjectKeyFromObject(job)]; !exists { + return false + } + return reflect.DeepEqual(synced[0].Data, secret.Data) + }, 10*time.Second, time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Update source secret") + secret.Data[".dockerconfigjson"] = newBytes + testerForImagePullJob.UpdateSecret(secret) + + ginkgo.By("Check target updated secret in 10s") + gomega.Eventually(func() bool { + synced, _ := testerForImagePullJob.ListSyncedSecrets(secret) + if len(synced) != 1 { + return false + } + if _, exists := referenceSetFromTarget(&synced[0])[client.ObjectKeyFromObject(job)]; !exists { + return false + } + return reflect.DeepEqual(synced[0].Data, secret.Data) + }, 10*time.Second, time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() bool { + job, err = testerForImagePullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Succeeded == int32(len(nodes)) + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Delete pull job and check in 10s") + err = testerForImagePullJob.DeleteJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + _, err := testerForImagePullJob.GetJob(job) + return errors.IsNotFound(err) + }, 10*time.Second, time.Second).Should(gomega.Equal(false)) + + ginkgo.By("Check image should be cleaned in NodeImage") + gomega.Eventually(func() bool { + found, err := testerForNodeImage.IsImageInSpec(job.Spec.Image, nodes[0].Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return found + }, 25*time.Second, time.Second).Should(gomega.Equal(false)) + + ginkgo.By("Check secrets should be cleaned in kruise-daemon-config") + gomega.Eventually(func() bool { + synced, _ := testerForImagePullJob.ListSyncedSecrets(secret) + return len(synced) == 0 + }, 10*time.Second, time.Second).Should(gomega.Equal(true)) + }) + framework.ConformanceIt("create an always job to pull an image on all real nodes", func() { job := baseJob.DeepCopy() job.Spec = appsv1alpha1.ImagePullJobSpec{ @@ -309,5 +521,4 @@ var _ = SIGDescribe("PullImage", func() { }, 60*time.Second, 3*time.Second).Should(gomega.Equal(int32(1))) }) }) - }) diff --git a/test/e2e/framework/imagepulljob_util.go b/test/e2e/framework/imagepulljob_util.go index bbe8bb3faa..3ed96f1e3b 100644 --- a/test/e2e/framework/imagepulljob_util.go +++ b/test/e2e/framework/imagepulljob_util.go @@ -21,8 +21,13 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/controller/imagepulljob" + "github.com/openkruise/kruise/pkg/util" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" ) type ImagePullJobTester struct { @@ -57,3 +62,31 @@ func (tester *ImagePullJobTester) GetJob(job *appsv1alpha1.ImagePullJob) (*appsv func (tester *ImagePullJobTester) ListJobs(ns string) (*appsv1alpha1.ImagePullJobList, error) { return tester.kc.AppsV1alpha1().ImagePullJobs(ns).List(context.TODO(), metav1.ListOptions{}) } + +func (tester *ImagePullJobTester) CreateSecret(secret *v1.Secret) (*v1.Secret, error) { + return tester.c.CoreV1().Secrets(secret.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) +} + +func (tester *ImagePullJobTester) UpdateSecret(secret *v1.Secret) (*v1.Secret, error) { + namespace, name := secret.GetNamespace(), secret.GetName() + var err error + var newSecret *v1.Secret + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + newSecret, err = tester.c.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return err + } + newSecret.Data = secret.Data + newSecret, err = tester.c.CoreV1().Secrets(namespace).Update(context.TODO(), newSecret, metav1.UpdateOptions{}) + return err + }) + return newSecret, err +} + +func (tester *ImagePullJobTester) ListSyncedSecrets(source *v1.Secret) ([]v1.Secret, error) { + options := metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(map[string]string{imagepulljob.SourceSecretUIDLabelKey: string(source.UID)}).String(), + } + lister, err := tester.c.CoreV1().Secrets(util.GetKruiseDaemonConfigNamespace()).List(context.TODO(), options) + return lister.Items, err +}