diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index 67a8f97844..f36ebae99f 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" @@ -51,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon/util" daemonsetutil "k8s.io/kubernetes/pkg/controller/daemon/util" "k8s.io/utils/integer" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -65,10 +67,13 @@ import ( kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" "github.com/openkruise/kruise/pkg/client/clientset/versioned/scheme" kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" kruiseutil "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" "github.com/openkruise/kruise/pkg/util/inplaceupdate" "github.com/openkruise/kruise/pkg/util/lifecycle" "github.com/openkruise/kruise/pkg/util/ratelimiter" @@ -91,6 +96,8 @@ var ( onceBackoffGC sync.Once // this is a short cut for any sub-functions to notify the reconcile how long to wait to requeue durationStore = requeueduration.DurationStore{} + + isPreDownloadDisabled bool ) const ( @@ -127,7 +134,11 @@ func Add(mgr manager.Manager) error { if !utildiscovery.DiscoverGVK(controllerKind) { return nil } - + if !utildiscovery.DiscoverGVK(appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob")) || + !utilfeature.DefaultFeatureGate.Enabled(features.KruiseDaemon) || + !utilfeature.DefaultFeatureGate.Enabled(features.PreDownloadImageForInPlaceUpdate) { + isPreDownloadDisabled = true + } r, err := newReconciler(mgr) if err != nil { return err @@ -171,6 +182,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { cli := utilclient.NewClientFromManager(mgr, "daemonset-controller") dsc := &ReconcileDaemonSet{ + Client: cli, kubeClient: genericClient.KubeClient, kruiseClient: genericClient.KruiseClient, eventRecorder: recorder, @@ -254,6 +266,7 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{} // ReconcileDaemonSet reconciles a DaemonSet object type ReconcileDaemonSet struct { + runtimeclient.Client kubeClient clientset.Interface kruiseClient kruiseclientset.Interface eventRecorder record.EventRecorder @@ -390,6 +403,32 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error { return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) } + if !isPreDownloadDisabled && dsc.Client != nil { + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || + hash != ds.Status.DaemonSetHash { + // get ads pre-download annotation + minUpdatedReadyPodsCount := 0 + if minUpdatedReadyPods, ok := ds.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok { + minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods) + minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(ds.Status.DesiredNumberScheduled), true) + if err != nil { + klog.Errorf("Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for %s: %v", request, err) + } + } + // todo: check whether the updatedReadyPodsCount greater than minUpdatedReadyPodsCount + _ = minUpdatedReadyPodsCount + // pre-download images for new revision + if err := dsc.createImagePullJobsForInPlaceUpdate(ds, old, cur); err != nil { + klog.Errorf("Failed to create ImagePullJobs for %s: %v", request, err) + } + } else { + // delete ImagePullJobs if revisions have been consistent + if err := imagejobutilfunc.DeleteJobsForWorkload(dsc.Client, ds); err != nil { + klog.Errorf("Failed to delete imagepulljobs for %s: %v", request, err) + } + } + } + err = dsc.manage(ds, nodeList, hash) if err != nil { return err diff --git a/pkg/controller/daemonset/daemonset_predownload_image.go b/pkg/controller/daemonset/daemonset_predownload_image.go new file mode 100644 index 0000000000..efea8a39f5 --- /dev/null +++ b/pkg/controller/daemonset/daemonset_predownload_image.go @@ -0,0 +1,160 @@ +/* +Copyright 2021 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonset + +import ( + "context" + "fmt" + + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/history" + "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" + imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" + "github.com/openkruise/kruise/pkg/util/inplaceupdate" +) + +func (r *ReconcileDaemonSet) createImagePullJobsForInPlaceUpdate(ds *appsv1alpha1.DaemonSet, oldRevisions []*apps.ControllerRevision, updateRevision *apps.ControllerRevision) error { + if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadCreatedKey]; ok { + return nil + } else if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadIgnoredKey]; ok { + return nil + } + + //// ignore if replicas <= minimumReplicasToPreDownloadImage + //if *ds.Spec.Replicas <= minimumReplicasToPreDownloadImage { + // klog.V(4).Infof("CloneSet %s/%s skipped to create ImagePullJob for replicas %d <= %d", + // ds.Namespace, ds.Name, *ds.Spec.Replicas, minimumReplicasToPreDownloadImage) + // return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") + //} + + // ignore if all Pods update in one batch + var partition, maxUnavailable int + var dsPodsNumber = int(ds.Status.DesiredNumberScheduled) + if ds.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*ds.Spec.UpdateStrategy.RollingUpdate.Partition) + } + maxUnavailable, _ = intstrutil.GetValueFromIntOrPercent( + intstrutil.ValueOrDefault(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), dsPodsNumber, false) + if partition == 0 && maxUnavailable >= dsPodsNumber { + klog.V(4).Infof("DaemonSet %s/%s skipped to create ImagePullJob for all Pods update in one batch, replicas=%d, partition=%d, maxUnavailable=%d", + ds.Namespace, ds.Name, dsPodsNumber, partition, maxUnavailable) + return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") + } + + // start to create jobs + var pullSecrets []string + for _, s := range ds.Spec.Template.Spec.ImagePullSecrets { + pullSecrets = append(pullSecrets, s.Name) + } + + selector := ds.Spec.Selector.DeepCopy() + selector.MatchExpressions = append(selector.MatchExpressions, metav1.LabelSelectorRequirement{ + Key: apps.ControllerRevisionHashLabelKey, + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{updateRevision.Name, updateRevision.Labels[history.ControllerRevisionHashLabel]}, + }) + + // As deamonset is the job's owner, we have the convention that all resources owned by deamonset + // have to match the selector of deamonset, such as pod, pvc and controllerrevision. + // So we had better put the labels into jobs. + labelMap := make(map[string]string) + for k, v := range ds.Spec.Template.Labels { + labelMap[k] = v + } + labelMap[history.ControllerRevisionHashLabel] = updateRevision.Labels[history.ControllerRevisionHashLabel] + + containerImages := diffImagesBetweenRevisions(oldRevisions, updateRevision) + klog.V(3).Infof("DaemonSet %s/%s begin to create ImagePullJobs for revision %s: %v", + ds.Namespace, ds.Name, updateRevision.Name, containerImages) + for name, image := range containerImages { + // job name is revision name + container name, it can not be more than 255 characters + jobName := fmt.Sprintf("%s-%s", updateRevision.Name, name) + err := imagejobutilfunc.CreateJobForWorkload(r.Client, ds, controllerKind, jobName, image, labelMap, *selector, pullSecrets) + if err != nil { + if !errors.IsAlreadyExists(err) { + klog.Errorf("DaemonSet %s/%s failed to create ImagePullJob %s: %v", ds.Namespace, ds.Name, jobName, err) + r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedCreateImagePullJob", "failed to create ImagePullJob %s: %v", jobName, err) + } + continue + } + klog.V(3).Infof("DaemonSet %s/%s created ImagePullJob %s for image: %s", ds.Namespace, ds.Name, jobName, image) + r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "CreatedImagePullJob", "created ImagePullJob %s for image: %s", jobName, image) + } + + return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadCreatedKey, "true") +} + +func (r *ReconcileDaemonSet) patchControllerRevisionLabels(revision *apps.ControllerRevision, key, value string) error { + oldRevision := revision.ResourceVersion + body := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, key, value) + if err := r.Patch(context.TODO(), revision, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + return err + } + if oldRevision != revision.ResourceVersion { + clonesetutils.ResourceVersionExpectations.Expect(revision) + } + return nil +} + +func diffImagesBetweenRevisions(oldRevisions []*apps.ControllerRevision, newRevision *apps.ControllerRevision) map[string]string { + var oldTemps []*v1.PodTemplateSpec + for _, oldRevision := range oldRevisions { + oldTemp, err := inplaceupdate.GetTemplateFromRevision(oldRevision) + if err != nil { + return nil + } + oldTemps = append(oldTemps, oldTemp) + } + + newTemp, err := inplaceupdate.GetTemplateFromRevision(newRevision) + if err != nil { + return nil + } + + containerImages := make(map[string]string) + for i := range newTemp.Spec.Containers { + name := newTemp.Spec.Containers[i].Name + newImage := newTemp.Spec.Containers[i].Image + + var found bool + for _, oldTemp := range oldTemps { + for j := range oldTemp.Spec.Containers { + if oldTemp.Spec.Containers[j].Name != name { + continue + } + if oldTemp.Spec.Containers[j].Image != newImage { + containerImages[name] = newImage + } + found = true + break + } + } + if !found { + containerImages[name] = newImage + } + } + return containerImages +} diff --git a/pkg/controller/statefulset/statefulset_predownload_image.go b/pkg/controller/statefulset/statefulset_predownload_image.go index 8b98068c62..8f9c274cc6 100644 --- a/pkg/controller/statefulset/statefulset_predownload_image.go +++ b/pkg/controller/statefulset/statefulset_predownload_image.go @@ -74,7 +74,7 @@ func (dss *defaultStatefulSetControl) createImagePullJobsForInPlaceUpdate(sts *a return dss.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") } - // opt is update option, this sectoin is to get update option + // opt is update option, this section is to get update option opts := &inplaceupdate.UpdateOptions{} if sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy != nil { opts.GracePeriodSeconds = sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy.GracePeriodSeconds