From b1c9c1309fc29473db33d0937237c1510ea9042d Mon Sep 17 00:00:00 2001 From: Abner Date: Sun, 14 Aug 2022 10:42:33 +0800 Subject: [PATCH] support predownload image for ads Signed-off-by: Abner --- .../daemonset/daemonset_controller.go | 45 ++++- .../daemonset/daemonset_predownload_image.go | 158 ++++++++++++++++++ .../statefulset_predownload_image.go | 2 +- 3 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 pkg/controller/daemonset/daemonset_predownload_image.go diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index f59d65e56b..6f6a3b5dc9 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -21,6 +21,10 @@ import ( "context" "flag" "fmt" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" "reflect" "sort" "sync" @@ -74,6 +78,7 @@ import ( "github.com/openkruise/kruise/pkg/util/ratelimiter" "github.com/openkruise/kruise/pkg/util/requeueduration" "github.com/openkruise/kruise/pkg/util/revisionadapter" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) func init() { @@ -91,6 +96,8 @@ var ( onceBackoffGC sync.Once // this is a short cut for any sub-functions to notify the reconcile how long to wait to requeue durationStore = requeueduration.DurationStore{} + + isPreDownloadDisabled bool ) const ( @@ -127,7 +134,11 @@ func Add(mgr manager.Manager) error { if !utildiscovery.DiscoverGVK(controllerKind) { return nil } - + if !utildiscovery.DiscoverGVK(appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob")) || + !utilfeature.DefaultFeatureGate.Enabled(features.KruiseDaemon) || + !utilfeature.DefaultFeatureGate.Enabled(features.PreDownloadImageForInPlaceUpdate) { + isPreDownloadDisabled = true + } r, err := newReconciler(mgr) if err != nil { return err @@ -171,6 +182,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { cli := utilclient.NewClientFromManager(mgr, "daemonset-controller") dsc := &ReconcileDaemonSet{ + Client: cli, kubeClient: genericClient.KubeClient, kruiseClient: genericClient.KruiseClient, eventRecorder: recorder, @@ -254,6 +266,7 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{} // ReconcileDaemonSet reconciles a DaemonSet object type ReconcileDaemonSet struct { + runtimeclient.Client kubeClient clientset.Interface kruiseClient kruiseclientset.Interface eventRecorder record.EventRecorder @@ -390,6 +403,36 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error { return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) } + if !isPreDownloadDisabled { + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || + cur.Name != ds.Status.DaemonSetHash { + // get ads pre-download annotation + minUpdatedReadyPodsCount := 0 + if minUpdatedReadyPods, ok := ds.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok { + minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods) + minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(ds.Status.DesiredNumberScheduled), true) + if err != nil { + klog.Errorf("Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for %s: %v", request, err) + } + } + updatedReadyReplicas := ds.Status.UpdatedNumberScheduled + if cur.Name != ds.Status.DaemonSetHash { + updatedReadyReplicas = 0 + } + if int32(minUpdatedReadyPodsCount) <= updatedReadyReplicas { + // pre-download images for new revision + if err := dsc.createImagePullJobsForInPlaceUpdate(ds, old, cur); err != nil { + klog.Errorf("Failed to create ImagePullJobs for %s: %v", request, err) + } + } + } else { + // delete ImagePullJobs if revisions have been consistent + if err := imagejobutilfunc.DeleteJobsForWorkload(dsc.Client, ds); err != nil { + klog.Errorf("Failed to delete imagepulljobs for %s: %v", request, err) + } + } + } + err = dsc.manage(ds, nodeList, hash) if err != nil { return err diff --git a/pkg/controller/daemonset/daemonset_predownload_image.go b/pkg/controller/daemonset/daemonset_predownload_image.go new file mode 100644 index 0000000000..2a3271d19f --- /dev/null +++ b/pkg/controller/daemonset/daemonset_predownload_image.go @@ -0,0 +1,158 @@ +/* +Copyright 2021 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemonset + +import ( + "context" + "fmt" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" + imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" + "github.com/openkruise/kruise/pkg/util/inplaceupdate" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/history" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func (r *ReconcileDaemonSet) createImagePullJobsForInPlaceUpdate(ds *appsv1alpha1.DaemonSet, oldRevisions []*apps.ControllerRevision, updateRevision *apps.ControllerRevision) error { + if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadCreatedKey]; ok { + return nil + } else if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadIgnoredKey]; ok { + return nil + } + + //// ignore if replicas <= minimumReplicasToPreDownloadImage + //if *ds.Spec.Replicas <= minimumReplicasToPreDownloadImage { + // klog.V(4).Infof("CloneSet %s/%s skipped to create ImagePullJob for replicas %d <= %d", + // ds.Namespace, ds.Name, *ds.Spec.Replicas, minimumReplicasToPreDownloadImage) + // return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") + //} + + // ignore if all Pods update in one batch + var partition, maxUnavailable int + var dsPodsNumber = int(ds.Status.DesiredNumberScheduled) + if ds.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*ds.Spec.UpdateStrategy.RollingUpdate.Partition) + } + maxUnavailable, _ = intstrutil.GetValueFromIntOrPercent( + intstrutil.ValueOrDefault(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), dsPodsNumber, false) + if partition == 0 && maxUnavailable >= dsPodsNumber { + klog.V(4).Infof("DaemonSet %s/%s skipped to create ImagePullJob for all Pods update in one batch, replicas=%d, partition=%d, maxUnavailable=%d", + ds.Namespace, ds.Name, dsPodsNumber, partition, maxUnavailable) + return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") + } + + // start to create jobs + var pullSecrets []string + for _, s := range ds.Spec.Template.Spec.ImagePullSecrets { + pullSecrets = append(pullSecrets, s.Name) + } + + selector := ds.Spec.Selector.DeepCopy() + selector.MatchExpressions = append(selector.MatchExpressions, metav1.LabelSelectorRequirement{ + Key: apps.ControllerRevisionHashLabelKey, + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{updateRevision.Name, updateRevision.Labels[history.ControllerRevisionHashLabel]}, + }) + + // As deamonset is the job's owner, we have the convention that all resources owned by deamonset + // have to match the selector of deamonset, such as pod, pvc and controllerrevision. + // So we had better put the labels into jobs. + labelMap := make(map[string]string) + for k, v := range ds.Spec.Template.Labels { + labelMap[k] = v + } + labelMap[history.ControllerRevisionHashLabel] = updateRevision.Labels[history.ControllerRevisionHashLabel] + + containerImages := diffImagesBetweenRevisions(oldRevisions, updateRevision) + klog.V(3).Infof("DaemonSet %s/%s begin to create ImagePullJobs for revision %s: %v", + ds.Namespace, ds.Name, updateRevision.Name, containerImages) + for name, image := range containerImages { + // job name is revision name + container name, it can not be more than 255 characters + jobName := fmt.Sprintf("%s-%s", updateRevision.Name, name) + err := imagejobutilfunc.CreateJobForWorkload(r.Client, ds, clonesetutils.ControllerKind, jobName, image, labelMap, *selector, pullSecrets) + if err != nil { + if !errors.IsAlreadyExists(err) { + klog.Errorf("DaemonSet %s/%s failed to create ImagePullJob %s: %v", ds.Namespace, ds.Name, jobName, err) + r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedCreateImagePullJob", "failed to create ImagePullJob %s: %v", jobName, err) + } + continue + } + klog.V(3).Infof("DaemonSet %s/%s created ImagePullJob %s for image: %s", ds.Namespace, ds.Name, jobName, image) + r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "CreatedImagePullJob", "created ImagePullJob %s for image: %s", jobName, image) + } + + return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadCreatedKey, "true") +} + +func (r *ReconcileDaemonSet) patchControllerRevisionLabels(revision *apps.ControllerRevision, key, value string) error { + oldRevision := revision.ResourceVersion + body := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, key, value) + if err := r.Patch(context.TODO(), revision, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + return err + } + if oldRevision != revision.ResourceVersion { + clonesetutils.ResourceVersionExpectations.Expect(revision) + } + return nil +} + +func diffImagesBetweenRevisions(oldRevisions []*apps.ControllerRevision, newRevision *apps.ControllerRevision) map[string]string { + var oldTemps []*v1.PodTemplateSpec + for _, oldRevision := range oldRevisions { + oldTemp, err := inplaceupdate.GetTemplateFromRevision(oldRevision) + if err != nil { + return nil + } + oldTemps = append(oldTemps, oldTemp) + } + + newTemp, err := inplaceupdate.GetTemplateFromRevision(newRevision) + if err != nil { + return nil + } + + containerImages := make(map[string]string) + for i := range newTemp.Spec.Containers { + name := newTemp.Spec.Containers[i].Name + newImage := newTemp.Spec.Containers[i].Image + + var found bool + for _, oldTemp := range oldTemps { + for j := range oldTemp.Spec.Containers { + if oldTemp.Spec.Containers[j].Name != name { + continue + } + if oldTemp.Spec.Containers[j].Image != newImage { + containerImages[name] = newImage + } + found = true + break + } + } + if !found { + containerImages[name] = newImage + } + } + return containerImages +} diff --git a/pkg/controller/statefulset/statefulset_predownload_image.go b/pkg/controller/statefulset/statefulset_predownload_image.go index 8b98068c62..8f9c274cc6 100644 --- a/pkg/controller/statefulset/statefulset_predownload_image.go +++ b/pkg/controller/statefulset/statefulset_predownload_image.go @@ -74,7 +74,7 @@ func (dss *defaultStatefulSetControl) createImagePullJobsForInPlaceUpdate(sts *a return dss.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true") } - // opt is update option, this sectoin is to get update option + // opt is update option, this section is to get update option opts := &inplaceupdate.UpdateOptions{} if sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy != nil { opts.GracePeriodSeconds = sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy.GracePeriodSeconds