diff --git a/pkg/common/util/reconciler.go b/pkg/common/util/reconciler.go index 78db6a9d70..cae8b19ec0 100644 --- a/pkg/common/util/reconciler.go +++ b/pkg/common/util/reconciler.go @@ -18,14 +18,11 @@ import ( "fmt" "reflect" - "github.com/sirupsen/logrus" - - commonutil "github.com/kubeflow/common/pkg/util" - - "github.com/kubeflow/common/pkg/controller.v1/common" - commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/expectation" + commonutil "github.com/kubeflow/common/pkg/util" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/event" diff --git a/pkg/common/util/reconciler_generic.go b/pkg/common/util/reconciler_generic.go new file mode 100644 index 0000000000..b32f0b7e24 --- /dev/null +++ b/pkg/common/util/reconciler_generic.go @@ -0,0 +1,133 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +package util + +import ( + "fmt" + "reflect" + "strings" + + "github.com/kubeflow/common/pkg/controller.v1/common" + log "github.com/sirupsen/logrus" + + commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/kubeflow/common/pkg/controller.v1/expectation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// GenExpectationGenericKey generates an expectation key for {Kind} of a job +func GenExpectationGenericKey(jobKey string, replicaType string, pl string) string { + return jobKey + "/" + strings.ToLower(replicaType) + "/" + pl +} + +// LoggerForGenericKind generates log entry for generic Kubernetes resource Kind +func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry { + job := "" + if controllerRef := metav1.GetControllerOf(obj); controllerRef != nil { + if controllerRef.Kind == kind { + job = obj.GetNamespace() + "." + controllerRef.Name + } + } + return log.WithFields(log.Fields{ + // We use job to match the key used in controller.go + // In controller.go we log the key used with the workqueue. + "job": job, + kind: obj.GetNamespace() + "." + obj.GetName(), + "uid": obj.GetUID(), + }) +} + +// OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed. +func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool { + return func(e event.CreateEvent) bool { + rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] + if len(rtype) == 0 { + return false + } + + if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil { + jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name) + var expectKey string + pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s" + expectKey = GenExpectationGenericKey(jobKey, rtype, pl) + exp.CreationObserved(expectKey) + return true + } + + return true + } +} + +// OnDependentUpdateFuncGeneric modify expectations when dependent (pod/service) update observed. +func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent event.UpdateEvent) bool { + return func(e event.UpdateEvent) bool { + newObj := e.ObjectNew + oldObj := e.ObjectOld + if newObj.GetResourceVersion() == oldObj.GetResourceVersion() { + // Periodic resync will send update events for all known pods. + // Two different versions of the same pod will always have different RVs. + return false + } + + kind := jc.Controller.GetAPIGroupVersionKind().Kind + var logger = LoggerForGenericKind(newObj, kind) + + newControllerRef := metav1.GetControllerOf(newObj) + oldControllerRef := metav1.GetControllerOf(oldObj) + controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef) + + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if job := resolveControllerRef(jc, oldObj.GetName(), oldControllerRef); job != nil { + logger.Infof("%s controller ref updated: %v, %v", kind, newObj, oldObj) + return true + } + } + + // If it has a controller ref, that's all that matters. + if newControllerRef != nil { + job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef) + if job == nil { + return false + } + logger.Debugf("%s has a controller ref: %v, %v", kind, newObj, oldObj) + return true + } + return false + } +} + +// OnDependentDeleteFuncGeneric modify expectations when dependent (pod/service) deletion observed. +func OnDependentDeleteFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool { + return func(e event.DeleteEvent) bool { + + rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] + if len(rtype) == 0 { + return false + } + + if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil { + jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name) + pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s" + var expectKey = GenExpectationGenericKey(jobKey, rtype, pl) + + exp.DeletionObserved(expectKey) + return true + } + + return true + } +} diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index f55c47874b..507c206cd3 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -40,7 +40,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" @@ -130,15 +129,12 @@ func (r *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err = validation.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil { logger.Info(err.Error(), "MPIJob failed validation", req.NamespacedName.String()) + return ctrl.Result{}, err } - // Check if reconciliation is needed - jobKey, err := common.KeyFunc(mpijob) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get jobKey for job object %#v: %v", mpijob, err)) - } - replicaTypes := util.GetReplicaTypes(mpijob.Spec.MPIReplicaSpecs) - needReconcile := util.SatisfiedExpectations(r.Expectations, jobKey, replicaTypes) + // In the new reconcile mode, we will always proceed the reconciling and let each `createOrUpdate` function + // to determine if updating/creating is needed + needReconcile := true if !needReconcile || mpijob.GetDeletionTimestamp() != nil { return ctrl.Result{}, nil @@ -187,14 +183,50 @@ func (r *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return err } - // inject watching for job related service - if err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ + // inject watching for job related ConfigMap + if err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &mpiv1.MPIJob{}, }, predicate.Funcs{ - CreateFunc: util.OnDependentCreateFunc(r.Expectations), - UpdateFunc: util.OnDependentUpdateFunc(&r.JobController), - DeleteFunc: util.OnDependentDeleteFunc(r.Expectations), + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + + // inject watching for job related Role + if err = c.Watch(&source.Kind{Type: &rbacv1.Role{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &mpiv1.MPIJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + + // inject watching for job related RoleBinding + if err = c.Watch(&source.Kind{Type: &rbacv1.RoleBinding{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &mpiv1.MPIJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + + // inject watching for job related ServiceAccount + if err = c.Watch(&source.Kind{Type: &corev1.ServiceAccount{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &mpiv1.MPIJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), }); err != nil { return err } @@ -202,7 +234,7 @@ func (r *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return nil } -//mpijob not need delete services +// DeletePodsAndServices is overridden because mpi-reconciler.v1 needs not deleting services func (r *MPIJobReconciler) DeletePodsAndServices(runPolicy *commonv1.RunPolicy, job interface{}, pods []*corev1.Pod) error { if len(pods) == 0 { return nil @@ -227,9 +259,7 @@ func (r *MPIJobReconciler) DeletePodsAndServices(runPolicy *commonv1.RunPolicy, return nil } -// reconcileServices checks and updates services for each given ReplicaSpec. -// It will requeue the job in case of an error while creating/deleting services. -// mpijob not need services +// ReconcileServices is overridden because mpi-reconciler.v1 does not need to reconcile services func (jc *MPIJobReconciler) ReconcileServices( job metav1.Object, services []*corev1.Service, @@ -254,7 +284,7 @@ func (r *MPIJobReconciler) GetGroupNameLabelValue() string { return mpiv1.GroupVersion.Group } -// Same as Func (tc *TFController) SetClusterSpec(...) in pod.go +// SetClusterSpec is overridden because no cluster spec is needed for MPIJob func (r *MPIJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { return nil }