diff --git a/.github/workflows/e2e-1.16.yaml b/.github/workflows/e2e-1.16.yaml index cf61281dcd..1ffdfcb906 100644 --- a/.github/workflows/e2e-1.16.yaml +++ b/.github/workflows/e2e-1.16.yaml @@ -155,7 +155,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e - ./bin/ginkgo -timeout 60m -v --focus='\[apps\] (PullImage|ContainerRecreateRequest)' test/e2e + ./bin/ginkgo -timeout 60m -v --focus='\[apps\] (PullImage|ContainerRecreateRequest|PullImages)' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') if [ "${restartCount}" -eq "0" ];then @@ -456,7 +456,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e - ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (StatefulSet|PullImage|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e + ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') if [ "${restartCount}" -eq "0" ];then diff --git a/.github/workflows/e2e-1.24.yaml b/.github/workflows/e2e-1.24.yaml index 6d76ff51fd..3d6c432a8d 100644 --- a/.github/workflows/e2e-1.24.yaml +++ b/.github/workflows/e2e-1.24.yaml @@ -143,7 +143,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e - ./bin/ginkgo -timeout 60m -v --focus='\[apps\] (PullImage|ContainerRecreateRequest)' test/e2e + ./bin/ginkgo -timeout 60m -v --focus='\[apps\] (PullImage|ContainerRecreateRequest|PullImages)' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') if [ "${restartCount}" -eq "0" ];then @@ -502,7 +502,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e - ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (StatefulSet|PullImage|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e + ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') if [ "${restartCount}" -eq "0" ];then diff --git a/apis/apps/defaults/v1alpha1.go b/apis/apps/defaults/v1alpha1.go index 5b81d265fd..51ca5c8142 100644 --- a/apis/apps/defaults/v1alpha1.go +++ b/apis/apps/defaults/v1alpha1.go @@ -370,3 +370,19 @@ func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) { obj.Spec.PullPolicy.BackoffLimit = utilpointer.Int32Ptr(3) } } + +// SetDefaultsImageListPullJob set default values for ImageListPullJob. +func SetDefaultsImageListPullJob(obj *v1alpha1.ImageListPullJob) { + if obj.Spec.CompletionPolicy.Type == "" { + obj.Spec.CompletionPolicy.Type = v1alpha1.Always + } + if obj.Spec.PullPolicy == nil { + obj.Spec.PullPolicy = &v1alpha1.PullPolicy{} + } + if obj.Spec.PullPolicy.TimeoutSeconds == nil { + obj.Spec.PullPolicy.TimeoutSeconds = utilpointer.Int32Ptr(600) + } + if obj.Spec.PullPolicy.BackoffLimit == nil { + obj.Spec.PullPolicy.BackoffLimit = utilpointer.Int32Ptr(3) + } +} diff --git a/apis/apps/v1alpha1/imagelistpulljob_types.go b/apis/apps/v1alpha1/imagelistpulljob_types.go index 222045e37c..9e3430131f 100644 --- a/apis/apps/v1alpha1/imagelistpulljob_types.go +++ b/apis/apps/v1alpha1/imagelistpulljob_types.go @@ -57,12 +57,6 @@ type ImageListPullJobStatus struct { // +optional Succeeded int32 `json:"succeeded"` - // Phase Indicates the completion progress of the job,the format is Succeeded/Completed - // Succeeded: The number of ImagePullJobs which reached status.Succeeded==status.Desired - // Completed: The number of ImagePullJobs which are finished - // +optional - Phase string `json:"phase"` - // The status of ImagePullJob which has the failed nodes(status.Failed>0) . // +optional FailedImageStatuses []*FailedImageStatus `json:"failedImageStatuses,omitempty"` @@ -89,7 +83,7 @@ type FailedImageStatus struct { // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="TOTAL",type="integer",JSONPath=".status.desired",description="Number of image pull job" // +kubebuilder:printcolumn:name="SUCCEEDED",type="integer",JSONPath=".status.succeeded",description="Number of image pull job succeeded" -// +kubebuilder:printcolumn:name="STATUS",type="string",JSONPath=".status.phase",description="status.succeeded/status.completed" +// +kubebuilder:printcolumn:name="COMPLETED",type="integer",JSONPath=".status.completed",description="Number of ImagePullJobs which are finished" // +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC." // ImageListPullJob is the Schema for the imagelistpulljobs API diff --git a/config/crd/bases/apps.kruise.io_imagelistpulljobs.yaml b/config/crd/bases/apps.kruise.io_imagelistpulljobs.yaml index 1286e37788..87a2185f68 100644 --- a/config/crd/bases/apps.kruise.io_imagelistpulljobs.yaml +++ b/config/crd/bases/apps.kruise.io_imagelistpulljobs.yaml @@ -25,10 +25,10 @@ spec: jsonPath: .status.succeeded name: SUCCEEDED type: integer - - description: status.succeeded/status.completed - jsonPath: .status.phase - name: STATUS - type: string + - description: Number of ImagePullJobs which are finished + jsonPath: .status.completed + name: COMPLETED + type: integer - description: CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented @@ -277,12 +277,6 @@ spec: type: string type: object type: array - phase: - description: 'Phase Indicates the completion progress of the job,the - format is Succeeded/Completed Succeeded: The number of ImagePullJobs - which reached status.Succeeded==status.Desired Completed: The number - of ImagePullJobs which are finished' - type: string startTime: description: Represents time when the job was acknowledged by the job controller. It is not guaranteed to be set in happens-before diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index ec0e7fd368..9b9fd4206d 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -19,6 +19,7 @@ resources: - bases/apps.kruise.io_persistentpodstates.yaml - bases/apps.kruise.io_podprobemarkers.yaml - bases/apps.kruise.io_nodepodprobes.yaml +- bases/apps.kruise.io_imagelistpulljobs.yaml # +kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 7e6f19a620..b2c392460d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -295,6 +295,32 @@ rules: - get - patch - update +- apiGroups: + - apps.kruise.io + resources: + - imagelistpulljobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - apps.kruise.io + resources: + - imagelistpulljobs/finalizers + verbs: + - update +- apiGroups: + - apps.kruise.io + resources: + - imagelistpulljobs/status + verbs: + - get + - patch + - update - apiGroups: - apps.kruise.io resources: diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 7cd14510e7..98fb9aa9e8 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -131,6 +131,27 @@ webhooks: resources: - daemonsets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-kruise-io-v1alpha1-imagelistpulljob + failurePolicy: Fail + name: mimagelistpulljob.kb.io + rules: + - apiGroups: + - apps.kruise.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - imagelistpulljobs + sideEffects: None - admissionReviewVersions: - v1 - v1beta1 @@ -453,6 +474,27 @@ webhooks: resources: - daemonsets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-apps-kruise-io-v1alpha1-imagelistpulljob + failurePolicy: Fail + name: vimagelistpulljob.kb.io + rules: + - apiGroups: + - apps.kruise.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - imagelistpulljobs + sideEffects: None - admissionReviewVersions: - v1 - v1beta1 diff --git a/pkg/controller/controllers.go b/pkg/controller/controllers.go index 9a14d57385..bef3d95ccd 100644 --- a/pkg/controller/controllers.go +++ b/pkg/controller/controllers.go @@ -24,6 +24,7 @@ import ( "github.com/openkruise/kruise/pkg/controller/containerrecreaterequest" "github.com/openkruise/kruise/pkg/controller/daemonset" "github.com/openkruise/kruise/pkg/controller/ephemeraljob" + "github.com/openkruise/kruise/pkg/controller/imagelistpulljob" "github.com/openkruise/kruise/pkg/controller/imagepulljob" "github.com/openkruise/kruise/pkg/controller/nodeimage" "github.com/openkruise/kruise/pkg/controller/nodepodprobe" @@ -65,6 +66,7 @@ func init() { controllerAddFuncs = append(controllerAddFuncs, sidecarterminator.Add) controllerAddFuncs = append(controllerAddFuncs, podprobemarker.Add) controllerAddFuncs = append(controllerAddFuncs, nodepodprobe.Add) + controllerAddFuncs = append(controllerAddFuncs, imagelistpulljob.Add) } func SetupWithManager(m manager.Manager) error { diff --git a/pkg/controller/imagelistpulljob/imagelistpulljob_controller.go b/pkg/controller/imagelistpulljob/imagelistpulljob_controller.go new file mode 100644 index 0000000000..17221281aa --- /dev/null +++ b/pkg/controller/imagelistpulljob/imagelistpulljob_controller.go @@ -0,0 +1,422 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imagelistpulljob + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/util/slice" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" + "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + "github.com/openkruise/kruise/pkg/util/expectations" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "github.com/openkruise/kruise/pkg/util/fieldindex" +) + +var ( + concurrentReconciles = 3 + controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("ImageListPullJob") + slowStartInitialBatchSize = 3 + controllerName = "imagelistpulljob-controller" + resourceVersionExpectations = expectations.NewResourceVersionExpectation() + scaleExpectations = expectations.NewScaleExpectations() +) + +// Add creates a new ImageListPullJob Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func Add(mgr manager.Manager) error { + if !utildiscovery.DiscoverGVK(controllerKind) || !utilfeature.DefaultFeatureGate.Enabled(features.KruiseDaemon) { + return nil + } + return add(mgr, newReconciler(mgr)) +} + +// newReconciler returns a new reconcile.Reconciler +func newReconciler(mgr manager.Manager) *ReconcileImageListPullJob { + return &ReconcileImageListPullJob{ + Client: utilclient.NewClientFromManager(mgr, controllerName), + scheme: mgr.GetScheme(), + clock: clock.RealClock{}, + recorder: mgr.GetEventRecorderFor(controllerName), + } +} + +// add a new Controller to mgr with r as the reconcile.Reconciler +func add(mgr manager.Manager, r *ReconcileImageListPullJob) error { + // Create a new controller + c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r, MaxConcurrentReconciles: concurrentReconciles}) + if err != nil { + return err + } + + // Watch for changes to ImageListPullJob + err = c.Watch(&source.Kind{Type: &appsv1alpha1.ImageListPullJob{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj := e.ObjectOld.(*appsv1alpha1.ImageListPullJob) + newObj := e.ObjectNew.(*appsv1alpha1.ImageListPullJob) + return newObj.DeletionTimestamp.IsZero() && oldObj.Generation != newObj.Generation + }, + }) + if err != nil { + return err + } + // Watch for changes to ImagePullJob + // todo the imagelistpulljob(status) will not change if the pull job status does not change significantly (ex. number of failed nodeimage changes from 1 to 2) + err = c.Watch(&source.Kind{Type: &appsv1alpha1.ImagePullJob{}}, &imagePullJobEventHandler{ + enqueueHandler: handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &appsv1alpha1.ImageListPullJob{}, + }, + }) + if err != nil { + return err + } + return nil +} + +var _ reconcile.Reconciler = &ReconcileImageListPullJob{} + +// ReconcileImageListPullJob reconciles a ImageListPullJob object +type ReconcileImageListPullJob struct { + client.Client + scheme *runtime.Scheme + clock clock.Clock + recorder record.EventRecorder +} + +// +kubebuilder:rbac:groups=apps.kruise.io,resources=imagelistpulljobs,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps.kruise.io,resources=imagelistpulljobs/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.kruise.io,resources=imagelistpulljobs/finalizers,verbs=update + +// Reconcile reads that state of the cluster for a ImageListPullJob object and makes changes based on the state read +// and what is in the ImageListPullJob.Spec +// Automatically generate RBAC rules to allow the Controller to read and write ImageListPullJob +func (r *ReconcileImageListPullJob) Reconcile(_ context.Context, request reconcile.Request) (res reconcile.Result, err error) { + klog.V(5).Infof("Starting to process ImageListPullJob %v", request.NamespacedName) + + // 1.Fetch the ImageListPullJob instance + job := &appsv1alpha1.ImageListPullJob{} + err = r.Get(context.TODO(), request.NamespacedName, job) + if err != nil { + if errors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // The Job has been finished + if job.Status.CompletionTime != nil { + var leftTime time.Duration + if job.Spec.CompletionPolicy.TTLSecondsAfterFinished != nil { + leftTime = time.Duration(*job.Spec.CompletionPolicy.TTLSecondsAfterFinished)*time.Second - time.Since(job.Status.CompletionTime.Time) + if leftTime <= 0 { + klog.Infof("Deleting ImageListPullJob %s/%s for ttlSecondsAfterFinished", job.Namespace, job.Name) + if err = r.Delete(context.TODO(), job); err != nil { + return reconcile.Result{}, fmt.Errorf("delete ImageListPullJob error: %v", err) + } + return reconcile.Result{}, nil + } + } + return reconcile.Result{RequeueAfter: leftTime}, nil + } + + if scaleSatisfied, unsatisfiedDuration, scaleDirtyImagePullJobs := scaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied { + if unsatisfiedDuration >= expectations.ExpectationTimeout { + klog.Warningf("Expectation unsatisfied overtime for ImageListPullJob %v, scaleDirtyImagePullJobs=%v, overtime=%v", request.String(), scaleDirtyImagePullJobs, unsatisfiedDuration) + return reconcile.Result{}, nil + } + klog.V(4).Infof("Not satisfied scale for ImageListPullJob %v, scaleDirtyImagePullJobs=%v", request.String(), scaleDirtyImagePullJobs) + return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil + } + + // 2. Get ImagePullJob owned by this job + imagePullJobsMap, err := r.getOwnedImagePullJob(job) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get imagePullJob: %v", err) + } + + // If resourceVersion expectations have not satisfied yet, just skip this reconcile + for _, imagePullJob := range imagePullJobsMap { + resourceVersionExpectations.Observe(imagePullJob) + if isSatisfied, unsatisfiedDuration := resourceVersionExpectations.IsSatisfied(imagePullJob); !isSatisfied { + if unsatisfiedDuration >= expectations.ExpectationTimeout { + klog.Warningf("Expectation unsatisfied overtime for %v, timeout=%v", request.String(), unsatisfiedDuration) + return reconcile.Result{}, nil + } + klog.V(4).Infof("Not satisfied resourceVersion for %v", request.String()) + return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil + } + } + + // 3. Calculate the new status for this job + newStatus := r.calculateStatus(job, imagePullJobsMap) + + // 4. Compute ImagePullJobActions + needToCreate, needToDelete := r.computeImagePullJobActions(job, imagePullJobsMap) + + // 5. Sync ImagePullJob + err = r.syncImagePullJob(job, needToCreate, needToDelete) + if err != nil { + return reconcile.Result{}, err + } + + // 6. Update status + if !util.IsJSONObjectEqual(&job.Status, newStatus) { + if err = r.updateStatus(job, newStatus); err != nil { + return reconcile.Result{}, fmt.Errorf("update ImageListPullJob status error: %v", err) + } + } + + return reconcile.Result{}, nil +} + +func (r *ReconcileImageListPullJob) updateStatus(job *appsv1alpha1.ImageListPullJob, newStatus *appsv1alpha1.ImageListPullJobStatus) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + imageListPullJob := &appsv1alpha1.ImageListPullJob{} + if err := r.Get(context.TODO(), types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, imageListPullJob); err != nil { + return err + } + imageListPullJob.Status = *newStatus + return r.Status().Update(context.TODO(), imageListPullJob) + }) +} + +func (r *ReconcileImageListPullJob) computeImagePullJobActions(job *appsv1alpha1.ImageListPullJob, imagePullJobs map[string]*appsv1alpha1.ImagePullJob) ([]*appsv1alpha1.ImagePullJob, []*appsv1alpha1.ImagePullJob) { + var needToDelete, needToCreate []*appsv1alpha1.ImagePullJob + //1. need to create + images, needToDelete := r.filterImagesAndImagePullJobs(job, imagePullJobs) + needToCreate = r.newImagePullJobs(job, images) + // some images delete from ImageListPullJob.Spec.Images + for image, imagePullJob := range imagePullJobs { + if !slice.ContainsString(job.Spec.Images, image, nil) { + needToDelete = append(needToDelete, imagePullJob) + } + } + return needToCreate, needToDelete +} + +func (r *ReconcileImageListPullJob) calculateStatus(job *appsv1alpha1.ImageListPullJob, imagePullJobs map[string]*appsv1alpha1.ImagePullJob) *appsv1alpha1.ImageListPullJobStatus { + var active, completed, succeeded int32 + // record the failed image status + var failedImageStatuses []*appsv1alpha1.FailedImageStatus + + for _, imagePullJob := range imagePullJobs { + if imagePullJob.Status.StartTime == nil { + continue + } + + if imagePullJob.Status.Active > 0 { + active = active + 1 + } + + if imagePullJob.Status.Desired == (imagePullJob.Status.Failed + imagePullJob.Status.Succeeded) { + completed = completed + 1 + // + if imagePullJob.Status.Failed > 0 { + failedImagePullJobStatus := &appsv1alpha1.FailedImageStatus{ + ImagePullJob: imagePullJob.Name, + Name: imagePullJob.Spec.Image, + Message: fmt.Sprintf("Please check for details which nodes failed by using %s.", imagePullJob.Name), + } + failedImageStatuses = append(failedImageStatuses, failedImagePullJobStatus) + } + + } + + if imagePullJob.Status.Desired == imagePullJob.Status.Succeeded { + succeeded = succeeded + 1 + } + } + + // 4. status + newStatus := &appsv1alpha1.ImageListPullJobStatus{ + Desired: int32(len(job.Spec.Images)), + Active: active, + Completed: completed, + Succeeded: succeeded, + StartTime: job.Status.StartTime, + FailedImageStatuses: failedImageStatuses, + } + + now := metav1.NewTime(r.clock.Now()) + if newStatus.StartTime == nil { + newStatus.StartTime = &now + } + + if job.Spec.CompletionPolicy.Type != appsv1alpha1.Never && newStatus.Desired == newStatus.Completed { + newStatus.CompletionTime = &now + } + return newStatus +} + +func (r *ReconcileImageListPullJob) syncImagePullJob(job *appsv1alpha1.ImageListPullJob, needToCreate, needToDelete []*appsv1alpha1.ImagePullJob) error { + + //1. manage creating + var errs []error + if len(needToCreate) > 0 { + var createdNum int + var createdErr error + + createdNum, createdErr = util.SlowStartBatch(len(needToCreate), slowStartInitialBatchSize, func(idx int) error { + imagePullJob := needToCreate[idx] + key := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}.String() + scaleExpectations.ExpectScale(key, expectations.Create, imagePullJob.Spec.Image) + err := r.Create(context.TODO(), imagePullJob) + if err != nil { + scaleExpectations.ObserveScale(key, expectations.Create, imagePullJob.Spec.Image) + if !errors.IsTimeout(err) { + return fmt.Errorf("fail to create ImagePullJob (%s/%s): %s", imagePullJob.Namespace, imagePullJob.Spec.Image, err.Error()) + } + } + + return nil + }) + if createdErr == nil { + r.recorder.Eventf(job, corev1.EventTypeNormal, "Successful create ImagePullJob", "Create %d ImagePullJob", createdNum) + } else { + errs = append(errs, createdErr) + } + } + + //2. manage deleting + if len(needToDelete) > 0 { + var deleteErrs []error + for _, imagePullJob := range needToDelete { + key := types.NamespacedName{Namespace: job.Namespace, Name: job.Name}.String() + scaleExpectations.ExpectScale(key, expectations.Delete, imagePullJob.Spec.Image) + if err := r.Delete(context.TODO(), imagePullJob); err != nil { + scaleExpectations.ObserveScale(key, expectations.Delete, imagePullJob.Spec.Image) + deleteErrs = append(deleteErrs, fmt.Errorf("fail to delete ImagePullJob (%s/%s) for : %s", imagePullJob.Namespace, imagePullJob.Name, err)) + } + } + + if len(deleteErrs) > 0 { + errs = append(errs, deleteErrs...) + } else { + r.recorder.Eventf(job, corev1.EventTypeNormal, "Successful delete ImagePullJob", "Delete %d ImagePullJob", len(needToDelete)) + } + } + + return utilerrors.NewAggregate(errs) +} + +func (r *ReconcileImageListPullJob) filterImagesAndImagePullJobs(job *appsv1alpha1.ImageListPullJob, imagePullJobs map[string]*appsv1alpha1.ImagePullJob) ([]string, []*appsv1alpha1.ImagePullJob) { + var images []string + var imagesInCurrentImagePullJob []string + var needToDelete []*appsv1alpha1.ImagePullJob + + for image := range imagePullJobs { + imagesInCurrentImagePullJob = append(imagesInCurrentImagePullJob, image) + } + + for _, image := range job.Spec.Images { + + // should create imagePullJob for new image + if len(imagePullJobs) <= 0 || !slice.ContainsString(imagesInCurrentImagePullJob, image, nil) { + images = append(images, image) + continue + } + imagePullJob, ok := imagePullJobs[image] + if !ok { + klog.Warningf("can not found imagePullJob for image name %s", image) + continue + } + // should create new imagePullJob if the template is changed. + if imagePullJobSpecTemplateChanged(imagePullJob, job.Spec.ImagePullJobTemplate) { + images = append(images, image) + // should delete old imagepulljob + needToDelete = append(needToDelete, imagePullJob) + } + } + + return images, needToDelete +} + +func (r *ReconcileImageListPullJob) newImagePullJobs(job *appsv1alpha1.ImageListPullJob, images []string) []*appsv1alpha1.ImagePullJob { + var needToCreate []*appsv1alpha1.ImagePullJob + if len(images) <= 0 { + return needToCreate + } + for _, image := range images { + imagePullJob := &appsv1alpha1.ImagePullJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + GenerateName: fmt.Sprintf("%s-", job.Name), + Labels: make(map[string]string), + Annotations: make(map[string]string), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, controllerKind), + }, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: image, + ImagePullJobTemplate: job.Spec.ImagePullJobTemplate, + }, + } + needToCreate = append(needToCreate, imagePullJob) + } + + return needToCreate +} + +func (r *ReconcileImageListPullJob) getOwnedImagePullJob(job *appsv1alpha1.ImageListPullJob) (map[string]*appsv1alpha1.ImagePullJob, error) { + + opts := &client.ListOptions{ + Namespace: job.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(job.UID)}), + } + imagePullJobList := &appsv1alpha1.ImagePullJobList{} + err := r.List(context.TODO(), imagePullJobList, opts, utilclient.DisableDeepCopy) + if err != nil { + return nil, err + } + imagePullJobsMap := make(map[string]*appsv1alpha1.ImagePullJob) + for i := range imagePullJobList.Items { + imagePullJob := imagePullJobList.Items[i] + if imagePullJob.DeletionTimestamp.IsZero() { + imagePullJobsMap[imagePullJob.Spec.Image] = &imagePullJob + } + } + return imagePullJobsMap, nil +} diff --git a/pkg/controller/imagelistpulljob/imagelistpulljob_controller_test.go b/pkg/controller/imagelistpulljob/imagelistpulljob_controller_test.go new file mode 100644 index 0000000000..583f0d86a7 --- /dev/null +++ b/pkg/controller/imagelistpulljob/imagelistpulljob_controller_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imagelistpulljob + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" +) + +var testscheme *k8sruntime.Scheme + +var ( + images = []string{"nginx:1.9.1", "busybox:1.35", "httpd:2.4.38"} + jobUID = "123" +) + +func init() { + testscheme = k8sruntime.NewScheme() + _ = corev1.AddToScheme(testscheme) + _ = appsv1alpha1.AddToScheme(testscheme) +} + +func TestReconcile(t *testing.T) { + now := metav1.Now() + instance := &appsv1alpha1.ImageListPullJob{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", UID: types.UID(jobUID)}, + Spec: appsv1alpha1.ImageListPullJobSpec{ + Images: images, + }, + } + cases := []struct { + name string + expectImagePullJob []*appsv1alpha1.ImagePullJob + }{ + { + name: "test-imagelistpulljob-controller", + expectImagePullJob: []*appsv1alpha1.ImagePullJob{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "j01", + Namespace: instance.Namespace, + OwnerReferences: []metav1.OwnerReference{ + {UID: types.UID(jobUID), Name: instance.Name}, + }, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: images[0], + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + CompletionPolicy: appsv1alpha1.CompletionPolicy{Type: appsv1alpha1.Always}, + }, + }, + Status: appsv1alpha1.ImagePullJobStatus{ + Active: 0, + StartTime: &now, + CompletionTime: &now, + Succeeded: 1, + Failed: 0, + Desired: 1, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "j02", + Namespace: instance.Namespace, + OwnerReferences: []metav1.OwnerReference{ + {UID: types.UID(jobUID), Name: instance.Name}, + }, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: images[1], + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + CompletionPolicy: appsv1alpha1.CompletionPolicy{Type: appsv1alpha1.Always}, + }, + }, + Status: appsv1alpha1.ImagePullJobStatus{ + Desired: 1, + Active: 1, + StartTime: &now, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "j03", + Namespace: instance.Namespace, + OwnerReferences: []metav1.OwnerReference{ + {UID: types.UID(jobUID), Name: instance.Name}, + }, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: images[2], + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + CompletionPolicy: appsv1alpha1.CompletionPolicy{Type: appsv1alpha1.Always}, + }, + }, + Status: appsv1alpha1.ImagePullJobStatus{ + Desired: 1, + Active: 0, + StartTime: &now, + CompletionTime: &now, + Failed: 1, + Succeeded: 0, + FailedNodes: []string{"1.1.1.1"}, + }, + }, + }, + }, + } + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + reconcileJob := createReconcileJob(testscheme, instance, cs.expectImagePullJob[0], cs.expectImagePullJob[1], cs.expectImagePullJob[2]) + + request := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + }, + } + + _, err := reconcileJob.Reconcile(context.TODO(), request) + assert.NoError(t, err) + retrievedJob := &appsv1alpha1.ImageListPullJob{} + err = reconcileJob.Get(context.TODO(), request.NamespacedName, retrievedJob) + assert.NoError(t, err) + + imagePullJobList := &appsv1alpha1.ImagePullJobList{} + listOptions := client.InNamespace(request.Namespace) + err = reconcileJob.List(context.TODO(), imagePullJobList, listOptions) + assert.NoError(t, err) + assert.Equal(t, int32(len(images)), retrievedJob.Status.Desired) + assert.Equal(t, int32(1), retrievedJob.Status.Active) + assert.Equal(t, int32(1), retrievedJob.Status.Succeeded) + assert.Equal(t, int32(2), retrievedJob.Status.Completed) + assert.Equal(t, 1, len(retrievedJob.Status.FailedImageStatuses)) + for _, job := range imagePullJobList.Items { + assert.Contains(t, instance.Spec.Images, job.Spec.Image) + } + }) + } +} + +func createReconcileJob(scheme *k8sruntime.Scheme, initObjs ...client.Object) ReconcileImageListPullJob { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(initObjs...).Build() + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "imagelistpulljob-controller"}) + reconcileJob := ReconcileImageListPullJob{ + Client: fakeClient, + scheme: scheme, + recorder: recorder, + clock: clock.RealClock{}, + } + return reconcileJob +} diff --git a/pkg/controller/imagelistpulljob/imagelistpulljob_event_handler.go b/pkg/controller/imagelistpulljob/imagelistpulljob_event_handler.go new file mode 100644 index 0000000000..5ba293d273 --- /dev/null +++ b/pkg/controller/imagelistpulljob/imagelistpulljob_event_handler.go @@ -0,0 +1,89 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imagelistpulljob + +import ( + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util/expectations" +) + +type imagePullJobEventHandler struct { + enqueueHandler handler.EnqueueRequestForOwner +} + +func isImageListPullJobController(controllerRef *metav1.OwnerReference) bool { + refGV, err := schema.ParseGroupVersion(controllerRef.APIVersion) + if err != nil { + klog.Errorf("Could not parse OwnerReference %v APIVersion: %v", controllerRef, err) + return false + } + return controllerRef.Kind == controllerKind.Kind && refGV.Group == controllerKind.Group +} + +func (p *imagePullJobEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + job := evt.Object.(*appsv1alpha1.ImagePullJob) + if job.DeletionTimestamp != nil { + p.Delete(event.DeleteEvent{Object: evt.Object}, q) + return + } + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && isImageListPullJobController(controllerRef) { + key := types.NamespacedName{Namespace: job.Namespace, Name: controllerRef.Name}.String() + scaleExpectations.ObserveScale(key, expectations.Create, job.Spec.Image) + p.enqueueHandler.Create(evt, q) + } +} + +func (p *imagePullJobEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + job := evt.Object.(*appsv1alpha1.ImagePullJob) + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && isImageListPullJobController(controllerRef) { + key := types.NamespacedName{Namespace: job.Namespace, Name: controllerRef.Name}.String() + scaleExpectations.ObserveScale(key, expectations.Delete, job.Spec.Image) + } + p.enqueueHandler.Delete(evt, q) +} + +func (p *imagePullJobEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + newJob := evt.ObjectNew.(*appsv1alpha1.ImagePullJob) + resourceVersionExpectations.Expect(newJob) + p.enqueueHandler.Update(evt, q) +} + +func (p *imagePullJobEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +} + +var _ inject.Mapper = &imagePullJobEventHandler{} + +func (p *imagePullJobEventHandler) InjectScheme(s *runtime.Scheme) error { + return p.enqueueHandler.InjectScheme(s) +} + +var _ inject.Mapper = &imagePullJobEventHandler{} + +func (p *imagePullJobEventHandler) InjectMapper(m meta.RESTMapper) error { + return p.enqueueHandler.InjectMapper(m) +} diff --git a/pkg/controller/imagelistpulljob/imagelistpulljob_utils.go b/pkg/controller/imagelistpulljob/imagelistpulljob_utils.go new file mode 100644 index 0000000000..f42d507d3f --- /dev/null +++ b/pkg/controller/imagelistpulljob/imagelistpulljob_utils.go @@ -0,0 +1,33 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package imagelistpulljob + +import ( + "reflect" + + "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" +) + +func imagePullJobSpecTemplateChanged(oldImagePullJob *appsv1alpha1.ImagePullJob, newImagePullJobTemplate appsv1alpha1.ImagePullJobTemplate) bool { + if !reflect.DeepEqual(oldImagePullJob.Spec.ImagePullJobTemplate, newImagePullJobTemplate) { + klog.V(3).Infof("imagePullJob(%s/%s) specification changed", oldImagePullJob.Namespace, oldImagePullJob.Name) + return true + } + return false +} diff --git a/pkg/util/fieldindex/register.go b/pkg/util/fieldindex/register.go index c280b76d34..52bf7c9f84 100644 --- a/pkg/util/fieldindex/register.go +++ b/pkg/util/fieldindex/register.go @@ -62,6 +62,11 @@ func RegisterFieldIndexes(c cache.Cache) error { if err = c.IndexField(context.TODO(), &v1.PersistentVolumeClaim{}, IndexNameForOwnerRefUID, ownerIndexFunc); err != nil { return } + // ImagePullJob ownerReference + if err = c.IndexField(context.TODO(), &appsv1alpha1.ImagePullJob{}, IndexNameForOwnerRefUID, ownerIndexFunc); err != nil { + return + } + // pod name if err = indexPodNodeName(c); err != nil { return diff --git a/pkg/webhook/add_imagelistpulljob.go b/pkg/webhook/add_imagelistpulljob.go new file mode 100644 index 0000000000..a72994a105 --- /dev/null +++ b/pkg/webhook/add_imagelistpulljob.go @@ -0,0 +1,27 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "github.com/openkruise/kruise/pkg/webhook/imagelistpulljob/mutating" + "github.com/openkruise/kruise/pkg/webhook/imagelistpulljob/validating" +) + +func init() { + addHandlers(mutating.HandlerMap) + addHandlers(validating.HandlerMap) +} diff --git a/pkg/webhook/imagelistpulljob/mutating/imagelistpulljob_create_update_handler.go b/pkg/webhook/imagelistpulljob/mutating/imagelistpulljob_create_update_handler.go new file mode 100644 index 0000000000..a95837c085 --- /dev/null +++ b/pkg/webhook/imagelistpulljob/mutating/imagelistpulljob_create_update_handler.go @@ -0,0 +1,71 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutating + +import ( + "context" + "encoding/json" + "net/http" + "reflect" + + "github.com/openkruise/kruise/apis/apps/defaults" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// ImageListPullJobCreateUpdateHandler handles ImagePullJob +type ImageListPullJobCreateUpdateHandler struct { + // Decoder decodes objects + Decoder *admission.Decoder +} + +var _ admission.Handler = &ImageListPullJobCreateUpdateHandler{} + +// Handle handles admission requests. +func (h *ImageListPullJobCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response { + obj := &appsv1alpha1.ImageListPullJob{} + err := h.Decoder.Decode(req, obj) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + var copy runtime.Object = obj.DeepCopy() + defaults.SetDefaultsImageListPullJob(obj) + if reflect.DeepEqual(obj, copy) { + return admission.Allowed("") + } + marshalled, err := json.Marshal(obj) + if err != nil { + return admission.Errored(http.StatusInternalServerError, err) + } + resp := admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled) + if len(resp.Patches) > 0 { + klog.V(5).Infof("Admit ImageListPullJob %s patches: %v", obj.Name, util.DumpJSON(resp.Patches)) + } + + return resp +} + +var _ admission.DecoderInjector = &ImageListPullJobCreateUpdateHandler{} + +// InjectDecoder injects the decoder into the ImageListPullJobCreateUpdateHandler +func (h *ImageListPullJobCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error { + h.Decoder = d + return nil +} diff --git a/pkg/webhook/imagelistpulljob/mutating/webhooks.go b/pkg/webhook/imagelistpulljob/mutating/webhooks.go new file mode 100644 index 0000000000..4f6bf06dfc --- /dev/null +++ b/pkg/webhook/imagelistpulljob/mutating/webhooks.go @@ -0,0 +1,30 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutating + +import ( + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-imagelistpulljob,mutating=true,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=imagelistpulljobs,verbs=create;update,versions=v1alpha1,name=mimagelistpulljob.kb.io + +var ( + // HandlerMap contains admission webhook handlers + HandlerMap = map[string]admission.Handler{ + "mutate-apps-kruise-io-v1alpha1-imagelistpulljob": &ImageListPullJobCreateUpdateHandler{}, + } +) diff --git a/pkg/webhook/imagelistpulljob/validating/imagelistpulljob_create_update_handler.go b/pkg/webhook/imagelistpulljob/validating/imagelistpulljob_create_update_handler.go new file mode 100644 index 0000000000..9c62494352 --- /dev/null +++ b/pkg/webhook/imagelistpulljob/validating/imagelistpulljob_create_update_handler.go @@ -0,0 +1,131 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validating + +import ( + "context" + "fmt" + "net/http" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + daemonutil "github.com/openkruise/kruise/pkg/daemon/util" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" +) + +// ImageListPullJobCreateUpdateHandler handles ImagePullJob +type ImageListPullJobCreateUpdateHandler struct { + // Decoder decodes objects + Decoder *admission.Decoder +} + +var _ admission.Handler = &ImageListPullJobCreateUpdateHandler{} + +// Handle handles admission requests. +func (h *ImageListPullJobCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response { + obj := &appsv1alpha1.ImageListPullJob{} + + err := h.Decoder.Decode(req, obj) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + if !utilfeature.DefaultFeatureGate.Enabled(features.KruiseDaemon) { + return admission.Errored(http.StatusForbidden, fmt.Errorf("feature-gate %s is not enabled", features.KruiseDaemon)) + } + + if err := validate(obj); err != nil { + klog.Warningf("Error validate ImageListPullJob %s/%s: %v", obj.Namespace, obj.Name, err) + return admission.Errored(http.StatusBadRequest, err) + } + + return admission.ValidationResponse(true, "allowed") +} + +func validate(obj *appsv1alpha1.ImageListPullJob) error { + if obj.Spec.Selector != nil { + if obj.Spec.Selector.MatchLabels != nil || obj.Spec.Selector.MatchExpressions != nil { + if obj.Spec.Selector.Names != nil { + return fmt.Errorf("can not set both names and labelSelector in this spec.selector") + } + if _, err := metav1.LabelSelectorAsSelector(&obj.Spec.Selector.LabelSelector); err != nil { + return fmt.Errorf("invalid selector: %v", err) + } + } + if obj.Spec.Selector.Names != nil { + names := sets.NewString(obj.Spec.Selector.Names...) + if names.Len() != len(obj.Spec.Selector.Names) { + return fmt.Errorf("duplicated name in selector names") + } + } + } + if obj.Spec.PodSelector != nil { + if obj.Spec.Selector != nil { + return fmt.Errorf("can not set both selector and podSelector") + } + if _, err := metav1.LabelSelectorAsSelector(&obj.Spec.PodSelector.LabelSelector); err != nil { + return fmt.Errorf("invalid podSelector: %v", err) + } + } + + if len(obj.Spec.Images) == 0 { + return fmt.Errorf("image can not be empty") + } + + for i := 0; i < len(obj.Spec.Images); i++ { + for j := i + 1; j < len(obj.Spec.Images); j++ { + if obj.Spec.Images[i] == obj.Spec.Images[j] { + return fmt.Errorf("images cannot have duplicate values") + } + } + } + + if len(obj.Spec.Images) > 255 { + return fmt.Errorf("the maximum number of images cannot > 255") + } + + for _, image := range obj.Spec.Images { + if _, err := daemonutil.NormalizeImageRef(image); err != nil { + return fmt.Errorf("invalid image %s: %v", image, err) + } + } + + switch obj.Spec.CompletionPolicy.Type { + case appsv1alpha1.Always: + // is a no-op here.No need to do parameter dependency verification in this type. + case appsv1alpha1.Never: + if obj.Spec.CompletionPolicy.ActiveDeadlineSeconds != nil || obj.Spec.CompletionPolicy.TTLSecondsAfterFinished != nil { + return fmt.Errorf("activeDeadlineSeconds and ttlSecondsAfterFinished can only work with Always CompletionPolicyType") + } + default: + return fmt.Errorf("unknown type of completionPolicy: %s", obj.Spec.CompletionPolicy.Type) + } + + return nil +} + +var _ admission.DecoderInjector = &ImageListPullJobCreateUpdateHandler{} + +// InjectDecoder injects the decoder into the ImageListPullJobCreateUpdateHandler +func (h *ImageListPullJobCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error { + h.Decoder = d + return nil +} diff --git a/pkg/webhook/imagelistpulljob/validating/webhooks.go b/pkg/webhook/imagelistpulljob/validating/webhooks.go new file mode 100644 index 0000000000..bb04a46cc9 --- /dev/null +++ b/pkg/webhook/imagelistpulljob/validating/webhooks.go @@ -0,0 +1,30 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validating + +import ( + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// +kubebuilder:webhook:path=/validate-apps-kruise-io-v1alpha1-imagelistpulljob,mutating=false,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=imagelistpulljobs,verbs=create;update,versions=v1alpha1,name=vimagelistpulljob.kb.io + +var ( + // HandlerMap contains admission webhook handlers + HandlerMap = map[string]admission.Handler{ + "validate-apps-kruise-io-v1alpha1-imagelistpulljob": &ImageListPullJobCreateUpdateHandler{}, + } +) diff --git a/test/e2e/apps/imagelistpulljobs.go b/test/e2e/apps/imagelistpulljobs.go new file mode 100644 index 0000000000..c79d214ba0 --- /dev/null +++ b/test/e2e/apps/imagelistpulljobs.go @@ -0,0 +1,287 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apps + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" + utilpointer "k8s.io/utils/pointer" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" +) + +var _ = SIGDescribe("PullImages", func() { + f := framework.NewDefaultFramework("imagelistpulljobs") + var ns string + var c clientset.Interface + var kc kruiseclientset.Interface + var testerForNodeImage *framework.NodeImageTester + var testerForImageListPullJob *framework.ImageListPullJobTester + var testerForImagePullJob *framework.ImagePullJobTester + var nodes []*v1.Node + var imagePullJobs *appsv1alpha1.ImagePullJobList + + f.AfterEachActions = []func(){ + func() { + // Print debug info if it fails + if ginkgo.CurrentGinkgoTestDescription().Failed { + imagePullJobList, err := testerForImagePullJob.ListJobs(ns) + if err != nil { + framework.Logf("[FAILURE_DEBUG] List ImagePullJobs in %s error: %v", ns, err) + } else { + framework.Logf("[FAILURE_DEBUG] List ImagePullJobs in %s: %v", ns, util.DumpJSON(imagePullJobList)) + } + imageListPullJobList, err := testerForImageListPullJob.ListJobs(ns) + if err != nil { + framework.Logf("[FAILURE_DEBUG] List ImageListPullJobs in %s error: %v", ns, err) + } else { + framework.Logf("[FAILURE_DEBUG] List ImageListPullJobs in %s: %v", ns, util.DumpJSON(imageListPullJobList)) + } + } + }, + } + + ginkgo.BeforeEach(func() { + c = f.ClientSet + kc = f.KruiseClientSet + ns = f.Namespace.Name + testerForNodeImage = framework.NewNodeImageTester(c, kc) + testerForImageListPullJob = framework.NewImageListPullJobTester(c, kc) + testerForImagePullJob = framework.NewImagePullJobTester(c, kc) + err := testerForNodeImage.CreateFakeNodeImageIfNotPresent() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nodes, err = testerForNodeImage.ExpectNodes() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.AfterEach(func() { + err := testerForNodeImage.DeleteFakeNodeImage() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = testerForImagePullJob.DeleteAllJobs(ns) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + framework.KruiseDescribe("ImageListPullJob pulling images functionality [ImageListPullJob]", func() { + var baseJob *appsv1alpha1.ImageListPullJob + intorstr4 := intstr.FromInt(4) + + ginkgo.BeforeEach(func() { + baseJob = &appsv1alpha1.ImageListPullJob{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "test-imagelistpulljob"}} + }) + + framework.ConformanceIt("create an always job to pull two images on all real nodes", func() { + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImageListPullJobSpec{ + Images: []string{NginxImage, BusyboxImage}, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + Selector: &appsv1alpha1.ImagePullJobNodeSelector{LabelSelector: metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: framework.FakeNodeImageLabelKey, Operator: metav1.LabelSelectorOpDoesNotExist}, + }}}, + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Always, + ActiveDeadlineSeconds: utilpointer.Int64Ptr(50), + TTLSecondsAfterFinished: utilpointer.Int32Ptr(20), + }, + }, + } + err := testerForImageListPullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By(fmt.Sprintf("Wait %d imagepulljob created", len(job.Spec.Images))) + gomega.Eventually(func() int32 { + imagePullJobs, err = testerForImagePullJob.ListJobs(job.Namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return int32(len(imagePullJobs.Items)) + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Desired should be equal to number of imagepulljobs [2]") + gomega.Eventually(func() int32 { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() bool { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.CompletionTime != nil + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(true)) + gomega.Expect(job.Status.Succeeded).To(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Wait clean in 25s") + gomega.Eventually(func() bool { + _, err = testerForImageListPullJob.GetJob(job) + return err != nil && errors.IsNotFound(err) + }, 25*time.Second, 2*time.Second).Should(gomega.Equal(true)) + + ginkgo.By("Check imagepulljob should be cleaned") + gomega.Eventually(func() bool { + imagePullJobs, err := testerForImagePullJob.ListJobs(job.Namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return len(imagePullJobs.Items) > 0 + }, 10*time.Second, time.Second).Should(gomega.Equal(false)) + }) + + framework.ConformanceIt("create an always job to pull two images on one real node", func() { + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImageListPullJobSpec{ + Images: []string{NewNginxImage, BusyboxImage}, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + Selector: &appsv1alpha1.ImagePullJobNodeSelector{Names: []string{nodes[0].Name}}, + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Always, + }, + }, + } + err := testerForImageListPullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Desired should be equal to 2") + gomega.Eventually(func() int32 { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() bool { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.CompletionTime != nil + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(true)) + gomega.Expect(job.Status.Succeeded).To(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Delete job") + err = testerForImageListPullJob.DeleteJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Check imagepulljob should be cleaned") + gomega.Eventually(func() bool { + imagePullJobs, err := testerForImagePullJob.ListJobs(job.Namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return len(imagePullJobs.Items) > 0 + }, 3*time.Second, time.Second).Should(gomega.Equal(false)) + }) + + framework.ConformanceIt("create an always job to pull an image on all nodes", func() { + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImageListPullJobSpec{ + Images: []string{WebserverImage}, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Always, + }, + }, + } + err := testerForImageListPullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By(fmt.Sprintf("Wait %d imagepulljob created", len(job.Spec.Images))) + gomega.Eventually(func() int32 { + imagePullJobs, err := testerForImagePullJob.ListJobs(job.Namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return int32(len(imagePullJobs.Items)) + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Desired should be equal to 1") + gomega.Eventually(func() int32 { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() bool { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.CompletionTime != nil + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(true)) + gomega.Expect(len(job.Status.FailedImageStatuses)).To(gomega.Equal(len(job.Spec.Images))) + + }) + + framework.ConformanceIt("create a never job to pull an image on all nodes", func() { + job := baseJob.DeepCopy() + job.Spec = appsv1alpha1.ImageListPullJobSpec{ + Images: []string{WebserverImage}, + ImagePullJobTemplate: appsv1alpha1.ImagePullJobTemplate{ + PullPolicy: &appsv1alpha1.PullPolicy{ + TimeoutSeconds: utilpointer.Int32Ptr(50), + BackoffLimit: utilpointer.Int32Ptr(2), + }, + Parallelism: &intorstr4, + CompletionPolicy: appsv1alpha1.CompletionPolicy{ + Type: appsv1alpha1.Never, + }, + }, + } + err := testerForImageListPullJob.CreateJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By(fmt.Sprintf("Wait %d imagepulljob created", len(job.Spec.Images))) + gomega.Eventually(func() int32 { + imagePullJobs, err = testerForImagePullJob.ListJobs(job.Namespace) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return int32(len(imagePullJobs.Items)) + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Desired should be equal to 1") + gomega.Eventually(func() int32 { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Desired + }, 3*time.Second, time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + + ginkgo.By("Wait completed in 180s") + gomega.Eventually(func() int32 { + job, err = testerForImageListPullJob.GetJob(job) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return job.Status.Completed + }, 180*time.Second, 3*time.Second).Should(gomega.Equal(int32(len(job.Spec.Images)))) + gomega.Expect(len(job.Status.FailedImageStatuses)).To(gomega.Equal(len(job.Spec.Images))) + + }) + }) + +}) diff --git a/test/e2e/framework/imagelistpulljob_util.go b/test/e2e/framework/imagelistpulljob_util.go new file mode 100644 index 0000000000..e9cb9e9619 --- /dev/null +++ b/test/e2e/framework/imagelistpulljob_util.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" +) + +type ImageListPullJobTester struct { + c clientset.Interface + kc kruiseclientset.Interface +} + +func NewImageListPullJobTester(c clientset.Interface, kc kruiseclientset.Interface) *ImageListPullJobTester { + return &ImageListPullJobTester{ + c: c, + kc: kc, + } +} + +func (tester *ImageListPullJobTester) CreateJob(job *appsv1alpha1.ImageListPullJob) error { + _, err := tester.kc.AppsV1alpha1().ImageListPullJobs(job.Namespace).Create(context.TODO(), job, metav1.CreateOptions{}) + return err +} + +func (tester *ImageListPullJobTester) DeleteJob(job *appsv1alpha1.ImageListPullJob) error { + return tester.kc.AppsV1alpha1().ImageListPullJobs(job.Namespace).Delete(context.TODO(), job.Name, metav1.DeleteOptions{}) +} + +func (tester *ImageListPullJobTester) DeleteAllJobs(ns string) error { + return tester.kc.AppsV1alpha1().ImageListPullJobs(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) +} + +func (tester *ImageListPullJobTester) GetJob(job *appsv1alpha1.ImageListPullJob) (*appsv1alpha1.ImageListPullJob, error) { + return tester.kc.AppsV1alpha1().ImageListPullJobs(job.Namespace).Get(context.TODO(), job.Name, metav1.GetOptions{}) +} + +func (tester *ImageListPullJobTester) ListJobs(ns string) (*appsv1alpha1.ImageListPullJobList, error) { + return tester.kc.AppsV1alpha1().ImageListPullJobs(ns).List(context.TODO(), metav1.ListOptions{}) +}