Skip to content

Commit

Permalink
Support predownload image in ads (#1057)
Browse files Browse the repository at this point in the history
Signed-off-by: Abner <abner199709@gmail.com>

* support predownload image for ads
  • Loading branch information
ABNER-1 authored Sep 28, 2022
1 parent f46097d commit 9270d04
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 2 deletions.
41 changes: 40 additions & 1 deletion pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -51,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon/util"
daemonsetutil "k8s.io/kubernetes/pkg/controller/daemon/util"
"k8s.io/utils/integer"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -65,10 +67,13 @@ import (
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/client/clientset/versioned/scheme"
kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
kruiseutil "github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
Expand All @@ -91,6 +96,8 @@ var (
onceBackoffGC sync.Once
// this is a short cut for any sub-functions to notify the reconcile how long to wait to requeue
durationStore = requeueduration.DurationStore{}

isPreDownloadDisabled bool
)

const (
Expand Down Expand Up @@ -127,7 +134,11 @@ func Add(mgr manager.Manager) error {
if !utildiscovery.DiscoverGVK(controllerKind) {
return nil
}

if !utildiscovery.DiscoverGVK(appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob")) ||
!utilfeature.DefaultFeatureGate.Enabled(features.KruiseDaemon) ||
!utilfeature.DefaultFeatureGate.Enabled(features.PreDownloadImageForInPlaceUpdate) {
isPreDownloadDisabled = true
}
r, err := newReconciler(mgr)
if err != nil {
return err
Expand Down Expand Up @@ -171,6 +182,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {

cli := utilclient.NewClientFromManager(mgr, "daemonset-controller")
dsc := &ReconcileDaemonSet{
Client: cli,
kubeClient: genericClient.KubeClient,
kruiseClient: genericClient.KruiseClient,
eventRecorder: recorder,
Expand Down Expand Up @@ -254,6 +266,7 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{}

// ReconcileDaemonSet reconciles a DaemonSet object
type ReconcileDaemonSet struct {
runtimeclient.Client
kubeClient clientset.Interface
kruiseClient kruiseclientset.Interface
eventRecorder record.EventRecorder
Expand Down Expand Up @@ -390,6 +403,32 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

if !isPreDownloadDisabled && dsc.Client != nil {
if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled ||
hash != ds.Status.DaemonSetHash {
// get ads pre-download annotation
minUpdatedReadyPodsCount := 0
if minUpdatedReadyPods, ok := ds.Annotations[appsv1alpha1.ImagePreDownloadMinUpdatedReadyPods]; ok {
minUpdatedReadyPodsIntStr := intstrutil.Parse(minUpdatedReadyPods)
minUpdatedReadyPodsCount, err = intstrutil.GetScaledValueFromIntOrPercent(&minUpdatedReadyPodsIntStr, int(ds.Status.DesiredNumberScheduled), true)
if err != nil {
klog.Errorf("Failed to GetScaledValueFromIntOrPercent of minUpdatedReadyPods for %s: %v", request, err)
}
}
// todo: check whether the updatedReadyPodsCount greater than minUpdatedReadyPodsCount
_ = minUpdatedReadyPodsCount
// pre-download images for new revision
if err := dsc.createImagePullJobsForInPlaceUpdate(ds, old, cur); err != nil {
klog.Errorf("Failed to create ImagePullJobs for %s: %v", request, err)
}
} else {
// delete ImagePullJobs if revisions have been consistent
if err := imagejobutilfunc.DeleteJobsForWorkload(dsc.Client, ds); err != nil {
klog.Errorf("Failed to delete imagepulljobs for %s: %v", request, err)
}
}
}

err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
Expand Down
160 changes: 160 additions & 0 deletions pkg/controller/daemonset/daemonset_predownload_image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package daemonset

import (
"context"
"fmt"

apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
)

func (r *ReconcileDaemonSet) createImagePullJobsForInPlaceUpdate(ds *appsv1alpha1.DaemonSet, oldRevisions []*apps.ControllerRevision, updateRevision *apps.ControllerRevision) error {
if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadCreatedKey]; ok {
return nil
} else if _, ok := updateRevision.Labels[appsv1alpha1.ImagePreDownloadIgnoredKey]; ok {
return nil
}

//// ignore if replicas <= minimumReplicasToPreDownloadImage
//if *ds.Spec.Replicas <= minimumReplicasToPreDownloadImage {
// klog.V(4).Infof("CloneSet %s/%s skipped to create ImagePullJob for replicas %d <= %d",
// ds.Namespace, ds.Name, *ds.Spec.Replicas, minimumReplicasToPreDownloadImage)
// return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true")
//}

// ignore if all Pods update in one batch
var partition, maxUnavailable int
var dsPodsNumber = int(ds.Status.DesiredNumberScheduled)
if ds.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
partition = int(*ds.Spec.UpdateStrategy.RollingUpdate.Partition)
}
maxUnavailable, _ = intstrutil.GetValueFromIntOrPercent(
intstrutil.ValueOrDefault(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, intstrutil.FromInt(1)), dsPodsNumber, false)
if partition == 0 && maxUnavailable >= dsPodsNumber {
klog.V(4).Infof("DaemonSet %s/%s skipped to create ImagePullJob for all Pods update in one batch, replicas=%d, partition=%d, maxUnavailable=%d",
ds.Namespace, ds.Name, dsPodsNumber, partition, maxUnavailable)
return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true")
}

// start to create jobs
var pullSecrets []string
for _, s := range ds.Spec.Template.Spec.ImagePullSecrets {
pullSecrets = append(pullSecrets, s.Name)
}

selector := ds.Spec.Selector.DeepCopy()
selector.MatchExpressions = append(selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: apps.ControllerRevisionHashLabelKey,
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{updateRevision.Name, updateRevision.Labels[history.ControllerRevisionHashLabel]},
})

// As deamonset is the job's owner, we have the convention that all resources owned by deamonset
// have to match the selector of deamonset, such as pod, pvc and controllerrevision.
// So we had better put the labels into jobs.
labelMap := make(map[string]string)
for k, v := range ds.Spec.Template.Labels {
labelMap[k] = v
}
labelMap[history.ControllerRevisionHashLabel] = updateRevision.Labels[history.ControllerRevisionHashLabel]

containerImages := diffImagesBetweenRevisions(oldRevisions, updateRevision)
klog.V(3).Infof("DaemonSet %s/%s begin to create ImagePullJobs for revision %s: %v",
ds.Namespace, ds.Name, updateRevision.Name, containerImages)
for name, image := range containerImages {
// job name is revision name + container name, it can not be more than 255 characters
jobName := fmt.Sprintf("%s-%s", updateRevision.Name, name)
err := imagejobutilfunc.CreateJobForWorkload(r.Client, ds, controllerKind, jobName, image, labelMap, *selector, pullSecrets)
if err != nil {
if !errors.IsAlreadyExists(err) {
klog.Errorf("DaemonSet %s/%s failed to create ImagePullJob %s: %v", ds.Namespace, ds.Name, jobName, err)
r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "FailedCreateImagePullJob", "failed to create ImagePullJob %s: %v", jobName, err)
}
continue
}
klog.V(3).Infof("DaemonSet %s/%s created ImagePullJob %s for image: %s", ds.Namespace, ds.Name, jobName, image)
r.eventRecorder.Eventf(ds, v1.EventTypeNormal, "CreatedImagePullJob", "created ImagePullJob %s for image: %s", jobName, image)
}

return r.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadCreatedKey, "true")
}

func (r *ReconcileDaemonSet) patchControllerRevisionLabels(revision *apps.ControllerRevision, key, value string) error {
oldRevision := revision.ResourceVersion
body := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, key, value)
if err := r.Patch(context.TODO(), revision, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
return err
}
if oldRevision != revision.ResourceVersion {
clonesetutils.ResourceVersionExpectations.Expect(revision)
}
return nil
}

func diffImagesBetweenRevisions(oldRevisions []*apps.ControllerRevision, newRevision *apps.ControllerRevision) map[string]string {
var oldTemps []*v1.PodTemplateSpec
for _, oldRevision := range oldRevisions {
oldTemp, err := inplaceupdate.GetTemplateFromRevision(oldRevision)
if err != nil {
return nil
}
oldTemps = append(oldTemps, oldTemp)
}

newTemp, err := inplaceupdate.GetTemplateFromRevision(newRevision)
if err != nil {
return nil
}

containerImages := make(map[string]string)
for i := range newTemp.Spec.Containers {
name := newTemp.Spec.Containers[i].Name
newImage := newTemp.Spec.Containers[i].Image

var found bool
for _, oldTemp := range oldTemps {
for j := range oldTemp.Spec.Containers {
if oldTemp.Spec.Containers[j].Name != name {
continue
}
if oldTemp.Spec.Containers[j].Image != newImage {
containerImages[name] = newImage
}
found = true
break
}
}
if !found {
containerImages[name] = newImage
}
}
return containerImages
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (dss *defaultStatefulSetControl) createImagePullJobsForInPlaceUpdate(sts *a
return dss.patchControllerRevisionLabels(updateRevision, appsv1alpha1.ImagePreDownloadIgnoredKey, "true")
}

// opt is update option, this sectoin is to get update option
// opt is update option, this section is to get update option
opts := &inplaceupdate.UpdateOptions{}
if sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy != nil {
opts.GracePeriodSeconds = sts.Spec.UpdateStrategy.RollingUpdate.InPlaceUpdateStrategy.GracePeriodSeconds
Expand Down

0 comments on commit 9270d04

Please sign in to comment.