From 71590b565f73b3678d6186c6b5eb6c98435525ac Mon Sep 17 00:00:00 2001 From: Siyu Wang Date: Tue, 5 Jul 2022 19:38:24 +0800 Subject: [PATCH] Reduce kruise-manager memory allocation (#1015) --- main.go | 2 +- .../advancedcronjob_controller.go | 4 +- .../broadcastjob/broadcastjob_controller.go | 4 +- .../cloneset/cloneset_controller.go | 3 +- .../cloneset/cloneset_controller_test.go | 3 +- .../container_launch_priority_controller.go | 4 +- .../crr_controller.go | 3 +- .../daemonset/daemonset_controller.go | 3 +- .../ephemeraljob/ephemeraljob_controller.go | 3 +- .../imagepulljob/imagepulljob_controller.go | 3 +- .../nodeimage/nodeimage_controller.go | 3 +- .../persistent_pod_state_controller.go | 3 +- .../podreadiness/pod_readiness_controller.go | 4 +- .../resourcedistribution_controller.go | 14 +- .../sidecarset/sidecarset_controller.go | 16 +- .../statefulset/statefulset_controller.go | 8 +- .../uniteddeployment_controller.go | 4 +- ...ddeployment_controller_statefulset_test.go | 4 +- .../uniteddeployment_controller_test.go | 4 +- .../workloadspread_controller.go | 3 +- pkg/util/client/cache.go | 77 ++++++++ pkg/util/{ => client}/client.go | 16 +- pkg/util/client/delegating_client.go | 167 ------------------ pkg/util/imagejob/imagejob_reader_test.go | 4 +- pkg/util/workloadspread/workloadspread.go | 12 +- .../workloadspread/workloadspread_test.go | 2 +- ...ontainer_launch_priority_initialization.go | 14 +- .../pod/mutating/persistent_pod_state.go | 44 ++--- .../pod/mutating/persistent_pod_state_test.go | 2 +- .../pod/mutating/pod_create_update_handler.go | 35 ++-- pkg/webhook/pod/mutating/pod_readiness.go | 7 +- .../pod/mutating/pod_unavailable_budget.go | 10 +- pkg/webhook/pod/mutating/sidecarset.go | 22 +-- .../mutating/sidecarset_hotupgrade_test.go | 2 +- pkg/webhook/pod/mutating/sidecarset_test.go | 18 +- pkg/webhook/pod/mutating/workloadspread.go | 11 +- 36 files changed, 229 insertions(+), 309 deletions(-) create mode 100644 pkg/util/client/cache.go rename pkg/util/{ => client}/client.go (73%) delete mode 100644 pkg/util/client/delegating_client.go diff --git a/main.go b/main.go index 549e611719..c538e8261d 100644 --- a/main.go +++ b/main.go @@ -144,7 +144,7 @@ func main() { LeaderElectionResourceLock: resourcelock.ConfigMapsResourceLock, Namespace: namespace, SyncPeriod: syncPeriod, - NewClient: utilclient.NewClient, + NewCache: utilclient.NewCache, }) if err != nil { setupLog.Error(err, "unable to start manager") diff --git a/pkg/controller/advancedcronjob/advancedcronjob_controller.go b/pkg/controller/advancedcronjob/advancedcronjob_controller.go index dfa0fe8f64..28ed1978e2 100644 --- a/pkg/controller/advancedcronjob/advancedcronjob_controller.go +++ b/pkg/controller/advancedcronjob/advancedcronjob_controller.go @@ -23,7 +23,7 @@ import ( "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -64,7 +64,7 @@ func Add(mgr manager.Manager) error { func newReconciler(mgr manager.Manager) reconcile.Reconciler { recorder := mgr.GetEventRecorderFor("advancedcronjob-controller") return &ReconcileAdvancedCronJob{ - Client: util.NewClientFromManager(mgr, "advancedcronjob-controller"), + Client: utilclient.NewClientFromManager(mgr, "advancedcronjob-controller"), scheme: mgr.GetScheme(), recorder: recorder, Clock: realClock{}, diff --git a/pkg/controller/broadcastjob/broadcastjob_controller.go b/pkg/controller/broadcastjob/broadcastjob_controller.go index 162eea4094..6710dd8436 100644 --- a/pkg/controller/broadcastjob/broadcastjob_controller.go +++ b/pkg/controller/broadcastjob/broadcastjob_controller.go @@ -25,7 +25,7 @@ import ( "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/expectations" "github.com/openkruise/kruise/pkg/util/ratelimiter" @@ -89,7 +89,7 @@ func Add(mgr manager.Manager) error { func newReconciler(mgr manager.Manager) reconcile.Reconciler { recorder := mgr.GetEventRecorderFor("broadcastjob-controller") return &ReconcileBroadcastJob{ - Client: util.NewClientFromManager(mgr, "broadcastjob-controller"), + Client: utilclient.NewClientFromManager(mgr, "broadcastjob-controller"), scheme: mgr.GetScheme(), recorder: recorder, } diff --git a/pkg/controller/cloneset/cloneset_controller.go b/pkg/controller/cloneset/cloneset_controller.go index f84f2af28b..b1362c5109 100644 --- a/pkg/controller/cloneset/cloneset_controller.go +++ b/pkg/controller/cloneset/cloneset_controller.go @@ -29,7 +29,6 @@ import ( synccontrol "github.com/openkruise/kruise/pkg/controller/cloneset/sync" clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/features" - "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/expectations" @@ -95,7 +94,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cli.KubeClient.CoreV1().Events("")}) recorder = eventBroadcaster.NewRecorder(mgr.GetScheme(), v1.EventSource{Component: "cloneset-controller"}) } - cli := util.NewClientFromManager(mgr, "cloneset-controller") + cli := utilclient.NewClientFromManager(mgr, "cloneset-controller") reconciler := &ReconcileCloneSet{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/controller/cloneset/cloneset_controller_test.go b/pkg/controller/cloneset/cloneset_controller_test.go index b7abe84cbd..24aaf6f59f 100644 --- a/pkg/controller/cloneset/cloneset_controller_test.go +++ b/pkg/controller/cloneset/cloneset_controller_test.go @@ -29,6 +29,7 @@ import ( clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/fieldindex" "golang.org/x/net/context" @@ -91,7 +92,7 @@ func TestReconcile(t *testing.T) { mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"}) _ = fieldindex.RegisterFieldIndexes(mgr.GetCache()) g.Expect(err).NotTo(gomega.HaveOccurred()) - c = util.NewClientFromManager(mgr, "test-cloneset-controller") + c = utilclient.NewClientFromManager(mgr, "test-cloneset-controller") //recFn, requests := SetupTestReconcile(newReconciler(mgr)) g.Expect(add(mgr, newReconciler(mgr))).NotTo(gomega.HaveOccurred()) diff --git a/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go b/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go index ca23137430..6045447b3d 100644 --- a/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go +++ b/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go @@ -22,7 +22,7 @@ import ( "strconv" "time" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -52,7 +52,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcileContainerLaunchPriority { return &ReconcileContainerLaunchPriority{ - Client: util.NewClientFromManager(mgr, "container-launch-priority-controller"), + Client: utilclient.NewClientFromManager(mgr, "container-launch-priority-controller"), recorder: mgr.GetEventRecorderFor("container-launch-priority-controller"), } } diff --git a/pkg/controller/containerrecreaterequest/crr_controller.go b/pkg/controller/containerrecreaterequest/crr_controller.go index f99fa11414..99a36fa64f 100644 --- a/pkg/controller/containerrecreaterequest/crr_controller.go +++ b/pkg/controller/containerrecreaterequest/crr_controller.go @@ -26,6 +26,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" utilfeature "github.com/openkruise/kruise/pkg/util/feature" utilpodreadiness "github.com/openkruise/kruise/pkg/util/podreadiness" @@ -70,7 +71,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcileContainerRecreateRequest { - cli := util.NewClientFromManager(mgr, "containerrecreaterequest-controller") + cli := utilclient.NewClientFromManager(mgr, "containerrecreaterequest-controller") return &ReconcileContainerRecreateRequest{ Client: cli, clock: clock.RealClock{}, diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index 694bfed76f..f59d65e56b 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -66,6 +66,7 @@ import ( "github.com/openkruise/kruise/pkg/client/clientset/versioned/scheme" kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" kruiseutil "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations" "github.com/openkruise/kruise/pkg/util/inplaceupdate" @@ -168,7 +169,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { failedPodsBackoff := flowcontrol.NewBackOff(1*time.Second, 15*time.Minute) revisionAdapter := revisionadapter.NewDefaultImpl() - cli := kruiseutil.NewClientFromManager(mgr, "daemonset-controller") + cli := utilclient.NewClientFromManager(mgr, "daemonset-controller") dsc := &ReconcileDaemonSet{ kubeClient: genericClient.KubeClient, kruiseClient: genericClient.KruiseClient, diff --git a/pkg/controller/ephemeraljob/ephemeraljob_controller.go b/pkg/controller/ephemeraljob/ephemeraljob_controller.go index b514698c3d..28fdaefe76 100644 --- a/pkg/controller/ephemeraljob/ephemeraljob_controller.go +++ b/pkg/controller/ephemeraljob/ephemeraljob_controller.go @@ -26,6 +26,7 @@ import ( clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/controller/ephemeraljob/econtainer" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/expectations" v1 "k8s.io/api/core/v1" @@ -64,7 +65,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcileEphemeralJob { return &ReconcileEphemeralJob{ - Client: util.NewClientFromManager(mgr, "ephemeraljob-controller"), + Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"), scheme: mgr.GetScheme(), } } diff --git a/pkg/controller/imagepulljob/imagepulljob_controller.go b/pkg/controller/imagepulljob/imagepulljob_controller.go index 8911881b8c..3c1f3d6149 100644 --- a/pkg/controller/imagepulljob/imagepulljob_controller.go +++ b/pkg/controller/imagepulljob/imagepulljob_controller.go @@ -27,6 +27,7 @@ import ( daemonutil "github.com/openkruise/kruise/pkg/daemon/util" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/expectations" utilfeature "github.com/openkruise/kruise/pkg/util/feature" @@ -74,7 +75,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcileImagePullJob { return &ReconcileImagePullJob{ - Client: util.NewClientFromManager(mgr, "imagepulljob-controller"), + Client: utilclient.NewClientFromManager(mgr, "imagepulljob-controller"), scheme: mgr.GetScheme(), clock: clock.RealClock{}, } diff --git a/pkg/controller/nodeimage/nodeimage_controller.go b/pkg/controller/nodeimage/nodeimage_controller.go index 70135c0d71..9f8b74997d 100644 --- a/pkg/controller/nodeimage/nodeimage_controller.go +++ b/pkg/controller/nodeimage/nodeimage_controller.go @@ -28,6 +28,7 @@ import ( kruiseclient "github.com/openkruise/kruise/pkg/client" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" utilfeature "github.com/openkruise/kruise/pkg/util/feature" utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob" @@ -92,7 +93,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { recorder = eventBroadcaster.NewRecorder(mgr.GetScheme(), v1.EventSource{Component: controllerName}) } return &ReconcileNodeImage{ - Client: util.NewClientFromManager(mgr, controllerName), + Client: utilclient.NewClientFromManager(mgr, controllerName), scheme: mgr.GetScheme(), clock: clock.RealClock{}, eventRecorder: recorder, diff --git a/pkg/controller/persistentpodstate/persistent_pod_state_controller.go b/pkg/controller/persistentpodstate/persistent_pod_state_controller.go index ab698870fd..a350544abe 100644 --- a/pkg/controller/persistentpodstate/persistent_pod_state_controller.go +++ b/pkg/controller/persistentpodstate/persistent_pod_state_controller.go @@ -26,6 +26,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/controllerfinder" "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/ratelimiter" @@ -80,7 +81,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - cli := util.NewClientFromManager(mgr, "persistentpodstate-controller") + cli := utilclient.NewClientFromManager(mgr, "persistentpodstate-controller") return &ReconcilePersistentPodState{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/controller/podreadiness/pod_readiness_controller.go b/pkg/controller/podreadiness/pod_readiness_controller.go index c771236998..fd155497bb 100644 --- a/pkg/controller/podreadiness/pod_readiness_controller.go +++ b/pkg/controller/podreadiness/pod_readiness_controller.go @@ -21,7 +21,7 @@ import ( "time" appspub "github.com/openkruise/kruise/apis/apps/pub" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utilpodreadiness "github.com/openkruise/kruise/pkg/util/podreadiness" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -49,7 +49,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcilePodReadiness { return &ReconcilePodReadiness{ - Client: util.NewClientFromManager(mgr, "pod-readiness-controller"), + Client: utilclient.NewClientFromManager(mgr, "pod-readiness-controller"), } } diff --git a/pkg/controller/resourcedistribution/resourcedistribution_controller.go b/pkg/controller/resourcedistribution/resourcedistribution_controller.go index 49d3d6247e..b2c3817a9f 100644 --- a/pkg/controller/resourcedistribution/resourcedistribution_controller.go +++ b/pkg/controller/resourcedistribution/resourcedistribution_controller.go @@ -22,12 +22,6 @@ import ( "fmt" "reflect" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" - utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" - "github.com/openkruise/kruise/pkg/util/ratelimiter" - utils "github.com/openkruise/kruise/pkg/webhook/resourcedistribution/validating" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -45,6 +39,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + utilclient "github.com/openkruise/kruise/pkg/util/client" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + "github.com/openkruise/kruise/pkg/util/ratelimiter" + utils "github.com/openkruise/kruise/pkg/webhook/resourcedistribution/validating" ) func init() { @@ -67,7 +67,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - cli := util.NewClientFromManager(mgr, "resourcedistribution-controller") + cli := utilclient.NewClientFromManager(mgr, "resourcedistribution-controller") return &ReconcileResourceDistribution{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/controller/sidecarset/sidecarset_controller.go b/pkg/controller/sidecarset/sidecarset_controller.go index a84407a3fa..8c662541a1 100644 --- a/pkg/controller/sidecarset/sidecarset_controller.go +++ b/pkg/controller/sidecarset/sidecarset_controller.go @@ -20,13 +20,6 @@ import ( "context" "flag" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/control/sidecarcontrol" - "github.com/openkruise/kruise/pkg/util" - utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" - "github.com/openkruise/kruise/pkg/util/expectations" - "github.com/openkruise/kruise/pkg/util/ratelimiter" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -39,6 +32,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/control/sidecarcontrol" + utilclient "github.com/openkruise/kruise/pkg/util/client" + utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" + "github.com/openkruise/kruise/pkg/util/expectations" + "github.com/openkruise/kruise/pkg/util/ratelimiter" ) func init() { @@ -68,7 +68,7 @@ func Add(mgr manager.Manager) error { func newReconciler(mgr manager.Manager) reconcile.Reconciler { expectations := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) recorder := mgr.GetEventRecorderFor("sidecarset-controller") - cli := util.NewClientFromManager(mgr, "sidecarset-controller") + cli := utilclient.NewClientFromManager(mgr, "sidecarset-controller") return &ReconcileSidecarSet{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/controller/statefulset/statefulset_controller.go b/pkg/controller/statefulset/statefulset_controller.go index d31feb279f..32474ec870 100644 --- a/pkg/controller/statefulset/statefulset_controller.go +++ b/pkg/controller/statefulset/statefulset_controller.go @@ -53,7 +53,7 @@ import ( kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1beta1" "github.com/openkruise/kruise/pkg/features" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/expectations" utilfeature "github.com/openkruise/kruise/pkg/util/feature" @@ -137,7 +137,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"}) // new a client - sigsruntimeClient = util.NewClientFromManager(mgr, "statefulset-controller") + sigsruntimeClient = utilclient.NewClientFromManager(mgr, "statefulset-controller") return &ReconcileStatefulSet{ kruiseClient: genericClient.KruiseClient, @@ -148,8 +148,8 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { podLister, pvcLister, recorder), - inplaceupdate.New(util.NewClientFromManager(mgr, "statefulset-controller"), revisionadapter.NewDefaultImpl()), - lifecycle.New(util.NewClientFromManager(mgr, "statefulset-controller")), + inplaceupdate.New(utilclient.NewClientFromManager(mgr, "statefulset-controller"), revisionadapter.NewDefaultImpl()), + lifecycle.New(utilclient.NewClientFromManager(mgr, "statefulset-controller")), NewRealStatefulSetStatusUpdater(genericClient.KruiseClient, statefulSetLister), history.NewHistory(genericClient.KubeClient, appslisters.NewControllerRevisionLister(revInformer.(toolscache.SharedIndexInformer).GetIndexer())), recorder, diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index df028440ac..4e4df3eb10 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -25,7 +25,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/ratelimiter" appsv1 "k8s.io/api/apps/v1" @@ -83,7 +83,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - cli := util.NewClientFromManager(mgr, "uniteddeployment-controller") + cli := utilclient.NewClientFromManager(mgr, "uniteddeployment-controller") return &ReconcileUnitedDeployment{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_statefulset_test.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_statefulset_test.go index 1f70fa7613..6307872897 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_statefulset_test.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_statefulset_test.go @@ -37,7 +37,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" kruisectlutil "github.com/openkruise/kruise/pkg/controller/util" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" ) var c client.Client @@ -1610,7 +1610,7 @@ func setUp(t *testing.T) (*gomega.GomegaWithT, chan reconcile.Request, context.C // channel when it is finished. mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"}) g.Expect(err).NotTo(gomega.HaveOccurred()) - c = util.NewClientFromManager(mgr, "test-uniteddeployment-controller") + c = utilclient.NewClientFromManager(mgr, "test-uniteddeployment-controller") recFn, requests := SetupTestReconcile(newReconciler(mgr)) g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go index a45d1b2331..6d52c50d65 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go @@ -21,7 +21,7 @@ import ( "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" "golang.org/x/net/context" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -99,7 +99,7 @@ func TestReconcile(t *testing.T) { // channel when it is finished. mgr, err := manager.New(cfg, manager.Options{}) g.Expect(err).NotTo(gomega.HaveOccurred()) - c = util.NewClientFromManager(mgr, "test-uniteddeployment-controller") + c = utilclient.NewClientFromManager(mgr, "test-uniteddeployment-controller") recFn, requests := SetupTestReconcile(newReconciler(mgr)) g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred()) diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index 92236b433b..b544b90a24 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -45,6 +45,7 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/controllerfinder" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/fieldindex" @@ -149,7 +150,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - cli := util.NewClientFromManager(mgr, controllerName) + cli := utilclient.NewClientFromManager(mgr, controllerName) return &ReconcileWorkloadSpread{ Client: cli, scheme: mgr.GetScheme(), diff --git a/pkg/util/client/cache.go b/pkg/util/client/cache.go new file mode 100644 index 0000000000..3665e85b79 --- /dev/null +++ b/pkg/util/client/cache.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "flag" + + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + disableNoDeepCopy bool +) + +func init() { + flag.BoolVar(&disableNoDeepCopy, "disable-no-deepcopy", false, "If you are going to disable NoDeepCopy List in some controllers and webhooks.") +} + +type internalCache struct { + cache.Cache + noDeepCopyLister *noDeepCopyLister +} + +func NewCache(config *rest.Config, opts cache.Options) (cache.Cache, error) { + if opts.Scheme == nil { + opts.Scheme = clientgoscheme.Scheme + } + c, err := cache.New(config, opts) + if err != nil { + return nil, err + } + return &internalCache{ + Cache: c, + noDeepCopyLister: &noDeepCopyLister{cache: c, scheme: opts.Scheme}, + }, nil +} + +func (ic *internalCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if !disableNoDeepCopy && isDisableDeepCopy(opts) { + return ic.noDeepCopyLister.List(ctx, list, opts...) + } + return ic.Cache.List(ctx, list, opts...) +} + +var DisableDeepCopy = disableDeepCopy{} + +type disableDeepCopy struct{} + +func (disableDeepCopy) ApplyToList(*client.ListOptions) { +} + +func isDisableDeepCopy(opts []client.ListOption) bool { + for _, opt := range opts { + if opt == DisableDeepCopy { + return true + } + } + return false +} diff --git a/pkg/util/client.go b/pkg/util/client/client.go similarity index 73% rename from pkg/util/client.go rename to pkg/util/client/client.go index 4ec15d9f3e..ce17ff645e 100644 --- a/pkg/util/client.go +++ b/pkg/util/client/client.go @@ -14,27 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package client import ( "fmt" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/manager" ) func NewClientFromManager(mgr manager.Manager, name string) client.Client { - cfg := *mgr.GetConfig() + cfg := rest.CopyConfig(mgr.GetConfig()) cfg.UserAgent = fmt.Sprintf("kruise-manager/%s", name) - c, err := client.New(&cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) - if err != nil { - panic(err) - } - - delegatingClient, _ := client.NewDelegatingClient(client.NewDelegatingClientInput{ - CacheReader: mgr.GetCache(), - Client: c, - }) + delegatingClient, _ := cluster.DefaultNewClient(mgr.GetCache(), cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()}) return delegatingClient } diff --git a/pkg/util/client/delegating_client.go b/pkg/util/client/delegating_client.go deleted file mode 100644 index 29022fbf61..0000000000 --- a/pkg/util/client/delegating_client.go +++ /dev/null @@ -1,167 +0,0 @@ -/* -Copyright 2022 The Kruise Authors. -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package client - -import ( - "context" - "flag" - "strings" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" -) - -var ( - disableNoDeepCopy bool -) - -func init() { - flag.BoolVar(&disableNoDeepCopy, "disable-no-deepcopy", false, "If you are going to disable NoDeepCopy List in some controllers and webhooks.") -} - -// NewClient creates the default caching client with disable deepcopy list from cache. -func NewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { - c, err := client.New(config, options) - if err != nil { - return nil, err - } - - uncachedGVKs := map[schema.GroupVersionKind]struct{}{} - for _, obj := range uncachedObjects { - gvk, err := apiutil.GVKForObject(obj, c.Scheme()) - if err != nil { - return nil, err - } - uncachedGVKs[gvk] = struct{}{} - } - - return &delegatingClient{ - scheme: c.Scheme(), - mapper: c.RESTMapper(), - Reader: &delegatingReader{ - CacheReader: cache, - ClientReader: c, - noDeepCopyLister: &noDeepCopyLister{cache: cache, scheme: c.Scheme()}, - scheme: c.Scheme(), - uncachedGVKs: uncachedGVKs, - }, - Writer: c, - StatusClient: c, - }, nil -} - -type delegatingClient struct { - client.Reader - client.Writer - client.StatusClient - - scheme *runtime.Scheme - mapper meta.RESTMapper -} - -// Scheme returns the scheme this client is using. -func (d *delegatingClient) Scheme() *runtime.Scheme { - return d.scheme -} - -// RESTMapper returns the rest mapper this client is using. -func (d *delegatingClient) RESTMapper() meta.RESTMapper { - return d.mapper -} - -// delegatingReader forms a Reader that will cause Get and List requests for -// unstructured types to use the ClientReader while requests for any other type -// of object with use the CacheReader. This avoids accidentally caching the -// entire cluster in the common case of loading arbitrary unstructured objects -// (e.g. from OwnerReferences). -type delegatingReader struct { - CacheReader client.Reader - ClientReader client.Reader - - noDeepCopyLister *noDeepCopyLister - - uncachedGVKs map[schema.GroupVersionKind]struct{} - scheme *runtime.Scheme - cacheUnstructured bool -} - -func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) { - gvk, err := apiutil.GVKForObject(obj, d.scheme) - if err != nil { - return false, err - } - // TODO: this is producing unsafe guesses that don't actually work, - // but it matches ~99% of the cases out there. - if meta.IsListType(obj) { - gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") - } - if _, isUncached := d.uncachedGVKs[gvk]; isUncached { - return true, nil - } - if !d.cacheUnstructured { - _, isUnstructured := obj.(*unstructured.Unstructured) - _, isUnstructuredList := obj.(*unstructured.UnstructuredList) - return isUnstructured || isUnstructuredList, nil - } - return false, nil -} - -// Get retrieves an obj for a given object key from the Kubernetes Cluster. -func (d *delegatingReader) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { - if isUncached, err := d.shouldBypassCache(obj); err != nil { - return err - } else if isUncached { - return d.ClientReader.Get(ctx, key, obj) - } - return d.CacheReader.Get(ctx, key, obj) -} - -// List retrieves list of objects for a given namespace and list options. -func (d *delegatingReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - if isUncached, err := d.shouldBypassCache(list); err != nil { - return err - } else if isUncached { - return d.ClientReader.List(ctx, list, opts...) - } - if !disableNoDeepCopy && isDisableDeepCopy(opts) { - return d.noDeepCopyLister.List(ctx, list, opts...) - } - return d.CacheReader.List(ctx, list, opts...) -} - -var DisableDeepCopy = disableDeepCopy{} - -type disableDeepCopy struct{} - -func (_ disableDeepCopy) ApplyToList(_ *client.ListOptions) { -} - -func isDisableDeepCopy(opts []client.ListOption) bool { - for _, opt := range opts { - if opt == DisableDeepCopy { - return true - } - } - return false -} diff --git a/pkg/util/imagejob/imagejob_reader_test.go b/pkg/util/imagejob/imagejob_reader_test.go index 4a75622ece..1625b854eb 100644 --- a/pkg/util/imagejob/imagejob_reader_test.go +++ b/pkg/util/imagejob/imagejob_reader_test.go @@ -27,7 +27,7 @@ import ( "github.com/onsi/gomega" "github.com/openkruise/kruise/apis" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/util" + utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/fieldindex" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -164,7 +164,7 @@ func TestAll(t *testing.T) { mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"}) g.Expect(err).NotTo(gomega.HaveOccurred()) _ = fieldindex.RegisterFieldIndexes(mgr.GetCache()) - c = util.NewClientFromManager(mgr, "test-nodeimage-utils") + c = utilclient.NewClientFromManager(mgr, "test-nodeimage-utils") for _, o := range initialNodeImages { if err = c.Create(context.TODO(), o); err != nil { diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index 04bc22c291..418cdc6408 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -152,7 +152,7 @@ func matchReference(ref *metav1.OwnerReference) (bool, error) { // TODO consider pod/status update operation -func (h *Handler) HandlePodCreation(pod *corev1.Pod) error { +func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { start := time.Now() // filter out pods, include the following: @@ -161,18 +161,18 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) error { // 3. Pod.OwnerReference is nil // 4. Pod.OwnerReference is not one of workloads, such as CloneSet, Deployment, ReplicaSet. if !kubecontroller.IsPodActive(pod) { - return nil + return true, nil } ref := metav1.GetControllerOf(pod) matched, err := matchReference(ref) if err != nil || !matched { - return nil + return true, nil } var matchedWS *appsv1alpha1.WorkloadSpread workloadSpreadList := &appsv1alpha1.WorkloadSpreadList{} if err = h.Client.List(context.TODO(), workloadSpreadList, &client.ListOptions{Namespace: pod.Namespace}); err != nil { - return err + return false, err } for _, ws := range workloadSpreadList.Items { if ws.Spec.TargetReference == nil || !ws.DeletionTimestamp.IsZero() { @@ -187,7 +187,7 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) error { } // not found matched workloadSpread if matchedWS == nil { - return nil + return true, nil } defer func() { @@ -195,7 +195,7 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) error { matchedWS.Namespace, matchedWS.Name, time.Since(start)) }() - return h.mutatingPod(matchedWS, pod, nil, CreateOperation) + return false, h.mutatingPod(matchedWS, pod, nil, CreateOperation) } func (h *Handler) HandlePodDeletion(pod *corev1.Pod, operation Operation) error { diff --git a/pkg/util/workloadspread/workloadspread_test.go b/pkg/util/workloadspread/workloadspread_test.go index 00390808ae..47d21dc37b 100644 --- a/pkg/util/workloadspread/workloadspread_test.go +++ b/pkg/util/workloadspread/workloadspread_test.go @@ -751,7 +751,7 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { var err error switch cs.getOperation() { case CreateOperation: - err = handler.HandlePodCreation(podIn) + _, err = handler.HandlePodCreation(podIn) case DeleteOperation: err = handler.HandlePodDeletion(podIn, DeleteOperation) case EvictionOperation: diff --git a/pkg/webhook/pod/mutating/container_launch_priority_initialization.go b/pkg/webhook/pod/mutating/container_launch_priority_initialization.go index d1174f818a..8c6ad450b9 100644 --- a/pkg/webhook/pod/mutating/container_launch_priority_initialization.go +++ b/pkg/webhook/pod/mutating/container_launch_priority_initialization.go @@ -12,15 +12,15 @@ import ( ) // start containers based on priority order -func (h *PodCreateHandler) containerLaunchPriorityInitialization(ctx context.Context, req admission.Request, pod *corev1.Pod) error { +func (h *PodCreateHandler) containerLaunchPriorityInitialization(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { if len(req.AdmissionRequest.SubResource) > 0 || req.AdmissionRequest.Operation != admissionv1.Create || req.AdmissionRequest.Resource.Resource != "pods" { - return nil + return true, nil } if len(pod.Spec.Containers) == 1 { - return nil + return true, nil } // if ordered flag has been set, then just process ordered logic and skip check for priority @@ -30,20 +30,20 @@ func (h *PodCreateHandler) containerLaunchPriorityInitialization(ctx context.Con priority[i] = 0 - i } h.setPodEnv(priority, pod) - return nil + return false, nil } // check whether containers have KRUISE_CONTAINER_PRIORITY key value pairs priority, priorityFlag, err := h.getPriority(pod) if err != nil { - return err + return false, err } if !priorityFlag { - return nil + return true, nil } h.setPodEnv(priority, pod) - return nil + return false, nil } // the return []int is prioirty for each container in the pod, ordered as container diff --git a/pkg/webhook/pod/mutating/persistent_pod_state.go b/pkg/webhook/pod/mutating/persistent_pod_state.go index e2c359219a..e03fcc609c 100644 --- a/pkg/webhook/pod/mutating/persistent_pod_state.go +++ b/pkg/webhook/pod/mutating/persistent_pod_state.go @@ -35,15 +35,15 @@ const ( ) // mutate pod based on static ip -func (h *PodCreateHandler) persistentPodStateMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) error { +func (h *PodCreateHandler) persistentPodStateMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { // only handler Create Pod Object Request if len(req.AdmissionRequest.SubResource) > 0 || req.AdmissionRequest.Operation != admissionv1.Create || req.AdmissionRequest.Resource.Resource != "pods" { - return nil + return true, nil } ref := metav1.GetControllerOf(pod) if ref == nil || ref.Kind != "StatefulSet" { - return nil + return true, nil } // selector persistentPodState persistentPodState := SelectorPersistentPodState(h.Client, appsv1alpha1.TargetReference{ @@ -52,23 +52,30 @@ func (h *PodCreateHandler) persistentPodStateMutatingPod(ctx context.Context, re Name: ref.Name, }, pod.Namespace) if persistentPodState == nil || len(persistentPodState.Status.PodStates) == 0 { - return nil + return true, nil } // when data is NotFound, indicates that the pod is created for the first time and the scenario does not require persistent pod state podState, ok := persistentPodState.Status.PodStates[pod.Name] if !ok || len(podState.NodeTopologyLabels) == 0 { - return nil + return true, nil } // inject PersistentPodState node affinity in pod nodeSelector, preference := createNodeAffinity(persistentPodState.Spec, podState) if len(nodeSelector) == 0 && len(preference) == 0 { - return nil + return true, nil } klog.V(3).Infof("inject node affinity(required: %s, preferred: %s) in pod(%s/%s) for PersistentPodState", util.DumpJSON(nodeSelector), util.DumpJSON(preference), pod.Namespace, pod.Name) + + // inject persistentPodState annotation in pod + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + pod.Annotations[InjectedPersistentPodStateKey] = persistentPodState.Name + // nodeSelector if len(nodeSelector) != 0 { if pod.Spec.NodeSelector == nil { @@ -81,22 +88,17 @@ func (h *PodCreateHandler) persistentPodStateMutatingPod(ctx context.Context, re } // preferences - if len(preference) == 0 { - return nil - } - if pod.Spec.Affinity == nil { - pod.Spec.Affinity = &corev1.Affinity{} - } - if pod.Spec.Affinity.NodeAffinity == nil { - pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} - } - pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, preference...) - // inject persistentPodState annotation in pod - if pod.Annotations == nil { - pod.Annotations = map[string]string{} + if len(preference) > 0 { + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &corev1.Affinity{} + } + if pod.Spec.Affinity.NodeAffinity == nil { + pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} + } + pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, preference...) } - pod.Annotations[InjectedPersistentPodStateKey] = persistentPodState.Name - return nil + + return false, nil } // return two parameters: diff --git a/pkg/webhook/pod/mutating/persistent_pod_state_test.go b/pkg/webhook/pod/mutating/persistent_pod_state_test.go index 39c2808c02..7964589de8 100644 --- a/pkg/webhook/pod/mutating/persistent_pod_state_test.go +++ b/pkg/webhook/pod/mutating/persistent_pod_state_test.go @@ -225,7 +225,7 @@ func TestPersistentPodStateMutatingPod(t *testing.T) { podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.persistentPodStateMutatingPod(context.Background(), req, podOut) + _, err := podHandler.persistentPodStateMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } diff --git a/pkg/webhook/pod/mutating/pod_create_update_handler.go b/pkg/webhook/pod/mutating/pod_create_update_handler.go index 1f332ba58a..9a8641ae76 100644 --- a/pkg/webhook/pod/mutating/pod_create_update_handler.go +++ b/pkg/webhook/pod/mutating/pod_create_update_handler.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "net/http" - "reflect" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util/controllerfinder" @@ -55,48 +54,56 @@ func (h *PodCreateHandler) Handle(ctx context.Context, req admission.Request) ad if err != nil { return admission.Errored(http.StatusBadRequest, err) } - clone := obj.DeepCopy() // when pod.namespace is empty, using req.namespace if obj.Namespace == "" { obj.Namespace = req.Namespace } - injectPodReadinessGate(req, obj) + var changed bool + + if skip := injectPodReadinessGate(req, obj); !skip { + changed = true + } if utilfeature.DefaultFeatureGate.Enabled(features.WorkloadSpread) { - err = h.workloadSpreadMutatingPod(ctx, req, obj) - if err != nil { + if skip, err := h.workloadSpreadMutatingPod(ctx, req, obj); err != nil { return admission.Errored(http.StatusInternalServerError, err) + } else if !skip { + changed = true } } - err = h.sidecarsetMutatingPod(ctx, req, obj) - if err != nil { + if skip, err := h.sidecarsetMutatingPod(ctx, req, obj); err != nil { return admission.Errored(http.StatusInternalServerError, err) + } else if !skip { + changed = true } // "the order matters and sidecarsetMutatingPod must precede containerLaunchPriorityInitialization" - err = h.containerLaunchPriorityInitialization(ctx, req, obj) - if err != nil { + if skip, err := h.containerLaunchPriorityInitialization(ctx, req, obj); err != nil { return admission.Errored(http.StatusInternalServerError, err) + } else if !skip { + changed = true } // patch related-pub annotation in pod if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) || utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetDeleteGate) { - err = h.pubMutatingPod(ctx, req, obj) - if err != nil { + if skip, err := h.pubMutatingPod(ctx, req, obj); err != nil { return admission.Errored(http.StatusInternalServerError, err) + } else if !skip { + changed = true } } // persistent pod state - err = h.persistentPodStateMutatingPod(ctx, req, obj) - if err != nil { + if skip, err := h.persistentPodStateMutatingPod(ctx, req, obj); err != nil { return admission.Errored(http.StatusInternalServerError, err) + } else if !skip { + changed = true } - if reflect.DeepEqual(obj, clone) { + if !changed { return admission.Allowed("") } marshalled, err := json.Marshal(obj) diff --git a/pkg/webhook/pod/mutating/pod_readiness.go b/pkg/webhook/pod/mutating/pod_readiness.go index 24fae2d58b..624fccc98b 100644 --- a/pkg/webhook/pod/mutating/pod_readiness.go +++ b/pkg/webhook/pod/mutating/pod_readiness.go @@ -26,12 +26,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) -func injectPodReadinessGate(req admission.Request, pod *v1.Pod) { +func injectPodReadinessGate(req admission.Request, pod *v1.Pod) (skip bool) { if req.Operation != admissionv1.Create { - return + return true } if !util.IsPodOwnedByKruise(pod) && !utilfeature.DefaultFeatureGate.Enabled(features.KruisePodReadinessGate) { - return + return true } util.InjectReadinessGateToPod(pod, appspub.KruisePodReadyConditionType) + return false } diff --git a/pkg/webhook/pod/mutating/pod_unavailable_budget.go b/pkg/webhook/pod/mutating/pod_unavailable_budget.go index 1dae0deb1d..86b5f7197a 100644 --- a/pkg/webhook/pod/mutating/pod_unavailable_budget.go +++ b/pkg/webhook/pod/mutating/pod_unavailable_budget.go @@ -28,21 +28,21 @@ import ( ) // mutating relate-pub annotation in pod -func (h *PodCreateHandler) pubMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) error { +func (h *PodCreateHandler) pubMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { if len(req.AdmissionRequest.SubResource) > 0 || req.AdmissionRequest.Operation != admissionv1.Create || req.AdmissionRequest.Resource.Resource != "pods" { - return nil + return true, nil } pub, err := podunavailablebudget.GetPubForPod(h.Client, pod) if err != nil { - return err + return false, err } else if pub == nil { - return nil + return true, nil } if pod.Annotations == nil { pod.Annotations = map[string]string{} } pod.Annotations[pubcontrol.PodRelatedPubAnnotation] = pub.Name klog.V(3).Infof("mutating add pod(%s/%s) annotation[%s]=%s", pod.Namespace, pod.Name, pubcontrol.PodRelatedPubAnnotation, pub.Name) - return nil + return false, nil } diff --git a/pkg/webhook/pod/mutating/sidecarset.go b/pkg/webhook/pod/mutating/sidecarset.go index 78e092dd75..b4bb2e8f95 100644 --- a/pkg/webhook/pod/mutating/sidecarset.go +++ b/pkg/webhook/pod/mutating/sidecarset.go @@ -37,17 +37,17 @@ import ( ) // mutate pod based on SidecarSet Object -func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) error { +func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { if len(req.AdmissionRequest.SubResource) > 0 || (req.AdmissionRequest.Operation != admissionv1.Create && req.AdmissionRequest.Operation != admissionv1.Update) || req.AdmissionRequest.Resource.Resource != "pods" { - return nil + return true, nil } // filter out pods that don't require inject, include the following: // 1. Deletion pod // 2. ignore namespace: "kube-system", "kube-public" if !sidecarcontrol.IsActivePod(pod) { - return nil + return true, nil } var oldPod *corev1.Pod @@ -59,14 +59,14 @@ func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admiss if err := h.Decoder.Decode( admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}}, oldPod); err != nil { - return err + return false, err } } // DisableDeepCopy:true, indicates must be deep copy before update sidecarSet objection sidecarsetList := &appsv1alpha1.SidecarSetList{} if err := h.Client.List(ctx, sidecarsetList, utilclient.DisableDeepCopy); err != nil { - return err + return false, err } matchedSidecarSets := make([]sidecarcontrol.SidecarControl, 0) @@ -75,7 +75,7 @@ func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admiss continue } if matched, err := sidecarcontrol.PodMatchedSidecarSet(pod, sidecarSet); err != nil { - return err + return false, err } else if !matched { continue } @@ -88,14 +88,14 @@ func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admiss matchedSidecarSets = append(matchedSidecarSets, control) } if len(matchedSidecarSets) == 0 { - return nil + return true, nil } // check pod if isUpdated { if !matchedSidecarSets[0].IsPodAvailabilityChanged(pod, oldPod) { klog.V(3).Infof("pod(%s/%s) availability unchanged for sidecarSet, and ignore", pod.Namespace, pod.Name) - return nil + return true, nil } } @@ -104,10 +104,10 @@ func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admiss //build sidecar containers, sidecar initContainers, sidecar volumes, annotations to inject into pod object sidecarContainers, sidecarInitContainers, sidecarSecrets, volumesInSidecar, injectedAnnotations, err := buildSidecars(isUpdated, pod, oldPod, matchedSidecarSets) if err != nil { - return err + return false, err } else if len(sidecarContainers) == 0 && len(sidecarInitContainers) == 0 { klog.V(3).Infof("[sidecar inject] pod(%s/%s) don't have injected containers", pod.Namespace, pod.Name) - return nil + return true, nil } klog.V(3).Infof("[sidecar inject] begin inject sidecarContainers(%v) sidecarInitContainers(%v) sidecarSecrets(%v), volumes(%s)"+ @@ -136,7 +136,7 @@ func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admiss pod.Annotations[k] = v } klog.V(4).Infof("[sidecar inject] after mutating: %v", util.DumpJSON(pod)) - return nil + return false, nil } func mergeSidecarSecrets(secretsInPod, secretsInSidecar []corev1.LocalObjectReference) (allSecrets []corev1.LocalObjectReference) { diff --git a/pkg/webhook/pod/mutating/sidecarset_hotupgrade_test.go b/pkg/webhook/pod/mutating/sidecarset_hotupgrade_test.go index ecf28e72f3..fc01d62bff 100644 --- a/pkg/webhook/pod/mutating/sidecarset_hotupgrade_test.go +++ b/pkg/webhook/pod/mutating/sidecarset_hotupgrade_test.go @@ -45,7 +45,7 @@ func testInjectHotUpgradeSidecar(t *testing.T, sidecarSetIn *appsv1alpha1.Sideca podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } diff --git a/pkg/webhook/pod/mutating/sidecarset_test.go b/pkg/webhook/pod/mutating/sidecarset_test.go index 6b1110ebd4..077740a32f 100644 --- a/pkg/webhook/pod/mutating/sidecarset_test.go +++ b/pkg/webhook/pod/mutating/sidecarset_test.go @@ -393,7 +393,7 @@ func testPodHasNoMatchedSidecarSet(t *testing.T, sidecarSetIn *appsv1alpha1.Side client := fake.NewClientBuilder().WithObjects(sidecarSetIn).Build() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if len(podOut.Spec.Containers) != len(podIn.Spec.Containers) { t.Fatalf("expect %v containers but got %v", len(podIn.Spec.Containers), len(podOut.Spec.Containers)) @@ -435,7 +435,7 @@ func doMergeSidecarSecretsTest(t *testing.T, sidecarSetIn *appsv1alpha1.SidecarS client := fake.NewClientBuilder().WithObjects(sidecarSetIn).Build() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if len(podOut.Spec.ImagePullSecrets) != len(podIn.Spec.ImagePullSecrets)+len(sidecarSetIn.Spec.ImagePullSecrets)-repeat { t.Fatalf("expect %v secrets but got %v", len(podIn.Spec.ImagePullSecrets)+len(sidecarSetIn.Spec.ImagePullSecrets)-repeat, len(podOut.Spec.ImagePullSecrets)) @@ -456,7 +456,7 @@ func testInjectionStrategyPaused(t *testing.T, sidecarIn *appsv1alpha1.SidecarSe client := fake.NewClientBuilder().WithObjects(sidecarPaused).Build() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, _ = podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if len(podOut.Spec.Containers) != len(podIn.Spec.Containers) { t.Fatalf("expect %v containers but got %v", len(podIn.Spec.Containers), len(podOut.Spec.Containers)) @@ -475,7 +475,7 @@ func testSidecarSetPodInjectPolicy(t *testing.T, sidecarSetIn *appsv1alpha1.Side podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } @@ -552,7 +552,7 @@ func testSidecarVolumesAppend(t *testing.T, sidecarSetIn *appsv1alpha1.SidecarSe podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } @@ -700,7 +700,7 @@ func testPodVolumeMountsAppend(t *testing.T, sidecarSetIn *appsv1alpha1.SidecarS podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } @@ -732,7 +732,7 @@ func testSidecarSetTransferEnv(t *testing.T, sidecarSetIn *appsv1alpha1.SidecarS podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } @@ -766,7 +766,7 @@ func testSidecarSetHashInject(t *testing.T, sidecarSetIn1 *appsv1alpha1.SidecarS podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } @@ -797,7 +797,7 @@ func testSidecarSetNameInject(t *testing.T, sidecarSetIn1, sidecarSetIn3 *appsv1 podOut := podIn.DeepCopy() podHandler := &PodCreateHandler{Decoder: decoder, Client: client} req := newAdmission(admissionv1.Create, runtime.RawExtension{}, runtime.RawExtension{}, "") - err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) + _, err := podHandler.sidecarsetMutatingPod(context.Background(), req, podOut) if err != nil { t.Fatalf("inject sidecar into pod failed, err: %v", err) } diff --git a/pkg/webhook/pod/mutating/workloadspread.go b/pkg/webhook/pod/mutating/workloadspread.go index d87895ee68..7fc9da0f99 100644 --- a/pkg/webhook/pod/mutating/workloadspread.go +++ b/pkg/webhook/pod/mutating/workloadspread.go @@ -28,11 +28,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) -func (h *PodCreateHandler) workloadSpreadMutatingPod(ctx context.Context, req admission.Request, - pod *corev1.Pod) error { +func (h *PodCreateHandler) workloadSpreadMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) (skip bool, err error) { if len(req.AdmissionRequest.SubResource) > 0 || req.AdmissionRequest.Resource.Resource != "pods" { - return nil + return true, nil } workloadSpreadHandler := wsutil.NewWorkloadSpreadHandler(h.Client) @@ -43,16 +42,16 @@ func (h *PodCreateHandler) workloadSpreadMutatingPod(ctx context.Context, req ad options := &metav1.CreateOptions{} err := h.Decoder.DecodeRaw(req.Options, options) if err != nil { - return err + return false, err } // check dry run dryRun = dryrun.IsDryRun(options.DryRun) if dryRun { klog.V(5).Infof("Operation[%s] Pod (%s/%s) is a dry run, then admit", req.AdmissionRequest.Operation, pod.Namespace, pod.Name) - return nil + return true, nil } return workloadSpreadHandler.HandlePodCreation(pod) default: - return nil + return true, nil } }