From 7574f6c3c5b5112d1a7088254c0ee35948e2889b Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 14:36:11 -0700 Subject: [PATCH 1/7] Add PodControlInterface and FakePodControl --- pkg/controller.v1/control/pod_control.go | 116 +++++++++++++++++++++-- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/pkg/controller.v1/control/pod_control.go b/pkg/controller.v1/control/pod_control.go index cb4b98bf..6c59172f 100644 --- a/pkg/controller.v1/control/pod_control.go +++ b/pkg/controller.v1/control/pod_control.go @@ -25,32 +25,48 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/controller" + "sync" ) // Reasons for pod events const ( - // FailedCreatePodReason is added in an event and in a replica set condition + // FailedCreatePodReason is added in an event and in a job condition // when a pod for a replica set is failed to be created. FailedCreatePodReason = "FailedCreatePod" - // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // SuccessfulCreatePodReason is added in an event when a pod for a job // is successfully created. SuccessfulCreatePodReason = "SuccessfulCreatePod" - // FailedDeletePodReason is added in an event and in a replica set condition + // FailedDeletePodReason is added in an event and in a job condition // when a pod for a replica set is failed to be deleted. FailedDeletePodReason = "FailedDeletePod" - // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // SuccessfulDeletePodReason is added in an event when a pod for a job // is successfully deleted. SuccessfulDeletePodReason = "SuccessfulDeletePod" ) +// PodControlInterface is an interface that knows how to add or delete pods +// created as an interface to allow testing. +type PodControlInterface interface { + // CreatePods creates new pods according to the spec. + CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod according to the spec on the specified node, + // and sets the ControllerRef. + CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. + CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // DeletePod deletes the pod identified by podID. + DeletePod(namespace string, podID string, object runtime.Object) error + // PatchPod patches the pod. + PatchPod(namespace, name string, data []byte) error +} + // RealPodControl is the default implementation of PodControlInterface. type RealPodControl struct { KubeClient clientset.Interface Recorder record.EventRecorder } -var _ controller.PodControlInterface = &RealPodControl{} +var _ PodControlInterface = &RealPodControl{} func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set { desiredLabels := make(labels.Set) @@ -159,3 +175,91 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime } return nil } + +type FakePodControl struct { + sync.Mutex + Templates []v1.PodTemplateSpec + ControllerRefs []metav1.OwnerReference + DeletePodName []string + Patches [][]byte + Err error + CreateLimit int + CreateCallCount int +} + +var _ PodControlInterface = &FakePodControl{} + +func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { + f.Lock() + defer f.Unlock() + f.Patches = append(f.Patches, data) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *spec) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *spec) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *template) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.DeletePodName = append(f.DeletePodName, podID) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) Clear() { + f.Lock() + defer f.Unlock() + f.DeletePodName = []string{} + f.Templates = []v1.PodTemplateSpec{} + f.ControllerRefs = []metav1.OwnerReference{} + f.Patches = [][]byte{} + f.CreateLimit = 0 + f.CreateCallCount = 0 +} \ No newline at end of file From 088f1255efa040660e2813cb183174bd83c833f1 Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 14:36:34 -0700 Subject: [PATCH 2/7] Add comments for service event reasons --- pkg/controller.v1/control/service_control.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/controller.v1/control/service_control.go b/pkg/controller.v1/control/service_control.go index 757aa397..6737d1b1 100644 --- a/pkg/controller.v1/control/service_control.go +++ b/pkg/controller.v1/control/service_control.go @@ -30,9 +30,17 @@ import ( ) const ( + // FailedCreateServiceReason is added in an event and in a job controller condition + // when a service for a job is failed to be created. FailedCreateServiceReason = "FailedCreateService" + // SuccessfulCreateServiceReason is added in an event when a service for a job + // is successfully created. SuccessfulCreateServiceReason = "SuccessfulCreateService" + // FailedDeleteServiceReason is added in an event and in a job condition + // when a service for a job is failed to be deleted. FailedDeleteServiceReason = "FailedDeleteService" + // SuccessfulDeleteServiceReason is added in an event when a service for a job + // is successfully deleted. SuccessfulDeleteServiceReason = "SuccessfulDeleteService" ) From e518b2e8cc2cacc4fa35bc5c42b535f7b77a7f5d Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 14:46:49 -0700 Subject: [PATCH 3/7] Skip deleting pods/services already in termination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kubernetes pkg/controller/controller_utils.go doesn’t have this logic. This is from kubeflow/tf-operator/pull/998. I think it’s safe to keep the logic here. --- pkg/controller.v1/control/pod_control.go | 12 ++++++++++++ pkg/controller.v1/control/service_control.go | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/pkg/controller.v1/control/pod_control.go b/pkg/controller.v1/control/pod_control.go index 6c59172f..926889e7 100644 --- a/pkg/controller.v1/control/pod_control.go +++ b/pkg/controller.v1/control/pod_control.go @@ -18,6 +18,7 @@ import ( "fmt" commonutil "github.com/kubeflow/common/pkg/util" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -166,6 +167,17 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime return fmt.Errorf("object does not have ObjectMeta, %v", err) } logger := commonutil.LoggerForJob(accessor) + pod, err := r.KubeClient.CoreV1().Pods(namespace).Get(podID, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if pod.DeletionTimestamp != nil { + logger.Infof("pod %s/%s is terminating, skip deleting", pod.Namespace, pod.Name) + return nil + } logger.Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID) if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) diff --git a/pkg/controller.v1/control/service_control.go b/pkg/controller.v1/control/service_control.go index 6737d1b1..362a187f 100644 --- a/pkg/controller.v1/control/service_control.go +++ b/pkg/controller.v1/control/service_control.go @@ -16,6 +16,7 @@ package control import ( "fmt" + "k8s.io/apimachinery/pkg/api/errors" "sync" log "github.com/sirupsen/logrus" @@ -112,6 +113,17 @@ func (r RealServiceControl) DeleteService(namespace, serviceID string, object ru if err != nil { return fmt.Errorf("object does not have ObjectMeta, %v", err) } + service, err := r.KubeClient.CoreV1().Services(namespace).Get(serviceID, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if service.DeletionTimestamp != nil { + log.Infof("service %s/%s is terminating, skip deleting", service.Namespace, service.Name) + return nil + } log.Infof("Controller %v deleting service %v/%v", accessor.GetName(), namespace, serviceID) if err := r.KubeClient.CoreV1().Services(namespace).Delete(serviceID, nil); err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeleteServiceReason, "Error deleting: %v", err) From 3966c840944ba9364c88c0279204e54290f38d9e Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 15:35:11 -0700 Subject: [PATCH 4/7] Remove Kubernetes controller dependency Use `KeyFunc` instead of k8s.io/kubernetes/pkg/controller.KeyFunc --- pkg/controller.v1/common/pod.go | 9 ++++----- pkg/controller.v1/common/service.go | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index bb360f66..56a8c2b8 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/controller" apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" commonutil "github.com/kubeflow/common/pkg/util" @@ -73,7 +72,7 @@ func (jc *JobController) AddPod(obj interface{}) { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { logger.Infof("Failed to get the jobkey: %v", err) return @@ -116,7 +115,7 @@ func (jc *JobController) UpdatePod(old, cur interface{}) { // The ControllerRef was changed. Sync the old controller, if any. if job := jc.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { logger.Infof("pod ControllerRef updated: %v, %v", curPod, oldPod) - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } @@ -132,7 +131,7 @@ func (jc *JobController) UpdatePod(old, cur interface{}) { return } logger.Debugf("pod has a ControllerRef: %v, %v", curPod, oldPod) - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } @@ -175,7 +174,7 @@ func (jc *JobController) DeletePod(obj interface{}) { if job == nil { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } diff --git a/pkg/controller.v1/common/service.go b/pkg/controller.v1/common/service.go index 2d49c2bd..969cee4e 100644 --- a/pkg/controller.v1/common/service.go +++ b/pkg/controller.v1/common/service.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/kubernetes/pkg/controller" ) // When a service is created, enqueue the controller that manages it and update its expectations. @@ -49,7 +48,7 @@ func (jc *JobController) AddService(obj interface{}) { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } From 497f48822091a4f4dce7abf1730841493f43089b Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 15:40:08 -0700 Subject: [PATCH 5/7] Add Base and PodControllerRefManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Current implementation use PodControllerRefManager from `k8s.io/kubernetes/pkg/controller` and self-implemented `ServiceControllerRefManager` which brings some problems. 1. Pod impl may change along with k8s upgrade, service impl doesn’t 2. It’s not helping us remove Kubernetes direct dependency. 3. It’s not that straighforward for maintainers and contributors. This change make sure we folk everything we need and remove ref_manager dependency. --- .../control/service_ref_manager.go | 224 +++++++++++++++++- .../control/service_ref_manager_test.go | 173 ++++++++++++++ 2 files changed, 394 insertions(+), 3 deletions(-) diff --git a/pkg/controller.v1/control/service_ref_manager.go b/pkg/controller.v1/control/service_ref_manager.go index cafc7aa2..1af581ad 100644 --- a/pkg/controller.v1/control/service_ref_manager.go +++ b/pkg/controller.v1/control/service_ref_manager.go @@ -17,6 +17,8 @@ package control import ( "fmt" commonutil "github.com/kubeflow/common/pkg/util" + log "github.com/sirupsen/logrus" + "sync" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -24,11 +26,227 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/kubernetes/pkg/controller" ) +type BaseControllerRefManager struct { + Controller metav1.Object + Selector labels.Selector + + canAdoptErr error + canAdoptOnce sync.Once + CanAdoptFunc func() error +} + +func (m *BaseControllerRefManager) CanAdopt() error { + m.canAdoptOnce.Do(func() { + if m.CanAdoptFunc != nil { + m.canAdoptErr = m.CanAdoptFunc() + } + }) + return m.canAdoptErr +} + +// ClaimObject tries to take ownership of an object for this controller. +// +// It will reconcile the following: +// * Adopt orphans if the match function returns true. +// * Release owned objects if the match function returns false. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The returned boolean indicates whether you now +// own the object. +// +// No reconciliation will be attempted if the controller is being deleted. +func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { + controllerRef := metav1.GetControllerOf(obj) + if controllerRef != nil { + if controllerRef.UID != m.Controller.GetUID() { + // Owned by someone else. Ignore. + return false, nil + } + if match(obj) { + // We already own it and the selector matches. + // Return true (successfully claimed) before checking deletion timestamp. + // We're still allowed to claim things we already own while being deleted + // because doing so requires taking no actions. + return true, nil + } + // Owned by us but selector doesn't match. + // Try to release, unless we're being deleted. + if m.Controller.GetDeletionTimestamp() != nil { + return false, nil + } + if err := release(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else released it, or there was a transient error. + // The controller should requeue and try again if it's still stale. + return false, err + } + // Successfully released. + return false, nil + } + + // It's an orphan. + if m.Controller.GetDeletionTimestamp() != nil || !match(obj) { + // Ignore if we're being deleted or selector doesn't match. + return false, nil + } + if obj.GetDeletionTimestamp() != nil { + // Ignore if the object is being deleted + return false, nil + } + // Selector matches. Try to adopt. + if err := adopt(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else claimed it first, or there was a transient error. + // The controller should requeue and try again if it's still orphaned. + return false, err + } + // Successfully adopted. + return true, nil +} + +type PodControllerRefManager struct { + BaseControllerRefManager + controllerKind schema.GroupVersionKind + podControl PodControlInterface +} + +// NewPodControllerRefManager returns a PodControllerRefManager that exposes +// methods to manage the controllerRef of pods. +// +// The CanAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If CanAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once CanAdopt() is called, it will not be called again by the same +// PodControllerRefManager instance. Create a new instance if it makes +// sense to check CanAdopt() again (e.g. in a different sync pass). +func NewPodControllerRefManager( + podControl PodControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *PodControllerRefManager { + return &PodControllerRefManager{ + BaseControllerRefManager: BaseControllerRefManager{ + Controller: controller, + Selector: selector, + CanAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + podControl: podControl, + } +} + +// ClaimPods tries to take ownership of a list of Pods. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// Optional: If one or more filters are specified, a Pod will only be claimed if +// all filters return true. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of Pods that you now own is returned. +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { + var claimed []*v1.Pod + var errlist []error + + match := func(obj metav1.Object) bool { + pod := obj.(*v1.Pod) + // Check selector first so filters only run on potentially matching Pods. + if !m.Selector.Matches(labels.Set(pod.Labels)) { + return false + } + for _, filter := range filters { + if !filter(pod) { + return false + } + } + return true + } + adopt := func(obj metav1.Object) error { + return m.AdoptPod(obj.(*v1.Pod)) + } + release := func(obj metav1.Object) error { + return m.ReleasePod(obj.(*v1.Pod)) + } + + for _, pod := range pods { + ok, err := m.ClaimObject(pod, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, pod) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptPod sends a patch to take control of the pod. It returns the error if +// the patching fails. +func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { + if err := m.CanAdopt(); err != nil { + return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.Controller.GetName(), m.Controller.GetUID(), pod.UID) + return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) +} + +// ReleasePod sends a patch to free the pod from the control of the controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { + log.Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", + pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), pod.UID) + err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the pod no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the pod + // has no owner reference, 2. the uid of the pod doesn't + // match, which means the pod is deleted and then recreated. + // In both cases, the error can be ignored. + + // TODO: If the pod has owner references, but none of them + // has the owner.UID, server will silently ignore the patch. + // Investigate why. + return nil + } + } + return err +} + type ServiceControllerRefManager struct { - controller.BaseControllerRefManager + BaseControllerRefManager controllerKind schema.GroupVersionKind serviceControl ServiceControlInterface @@ -53,7 +271,7 @@ func NewServiceControllerRefManager( canAdopt func() error, ) *ServiceControllerRefManager { return &ServiceControllerRefManager{ - BaseControllerRefManager: controller.BaseControllerRefManager{ + BaseControllerRefManager: BaseControllerRefManager{ Controller: ctr, Selector: selector, CanAdoptFunc: canAdopt, diff --git a/pkg/controller.v1/control/service_ref_manager_test.go b/pkg/controller.v1/control/service_ref_manager_test.go index 8256ba05..41b63fe0 100644 --- a/pkg/controller.v1/control/service_ref_manager_test.go +++ b/pkg/controller.v1/control/service_ref_manager_test.go @@ -26,6 +26,179 @@ import ( testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1" ) +func TestClaimPods(t *testing.T) { + controllerUID := "123" + + type test struct { + name string + manager *PodControllerRefManager + pods []*v1.Pod + claimed []*v1.Pod + } + var tests = []test{ + func() test { + testJob := testutilv1.NewTestJob(1) + testJobLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(testJob.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + testPod := testutilv1.NewBasePod("pod2", testJob, nil) + testPod.Labels[testutilv1.LabelGroupName] = "testing" + + return test{ + name: "Claim pods with correct label", + manager: NewPodControllerRefManager(&FakePodControl{}, + testJob, + testJobLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", testJob, t), testPod}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", testJob, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + now := metav1.Now() + controller.DeletionTimestamp = &now + testPod1 := testutilv1.NewBasePod("pod1", controller, t) + testPod1.SetOwnerReferences([]metav1.OwnerReference{}) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.SetOwnerReferences([]metav1.OwnerReference{}) + return test{ + name: "Controller marked for deletion can not claim pods", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testPod1, testPod2}, + claimed: nil, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + now := metav1.Now() + controller.DeletionTimestamp = &now + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.SetOwnerReferences([]metav1.OwnerReference{}) + return test{ + name: "Controller marked for deletion can not claim new pods", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testPod2}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller2 := testutilv1.NewTestJob(1) + controller.UID = types.UID(controllerUID) + controller2.UID = types.UID("AAAAA") + return test{ + name: "Controller can not claim pods owned by another controller", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testutilv1.NewBasePod("pod2", controller2, t)}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.Labels[testutilv1.LabelGroupName] = "testing" + return test{ + name: "Controller releases claimed pods when selector doesn't match", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testPod2}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + testPod1 := testutilv1.NewBasePod("pod1", controller, t) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.Labels[testutilv1.LabelGroupName] = "testing" + now := metav1.Now() + testPod1.DeletionTimestamp = &now + testPod2.DeletionTimestamp = &now + + return test{ + name: "Controller does not claim orphaned pods marked for deletion", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testPod1, testPod2}, + claimed: []*v1.Pod{testPod1}, + } + }(), + } + for _, test := range tests { + claimed, err := test.manager.ClaimPods(test.pods) + if err != nil { + t.Errorf("Test case `%s`, unexpected error: %v", test.name, err) + } else if !reflect.DeepEqual(test.claimed, claimed) { + t.Errorf("Test case `%s`, claimed wrong pods. Expected %v, got %v", test.name, podToStringSlice(test.claimed), podToStringSlice(claimed)) + } + + } +} + +func podToStringSlice(pods []*v1.Pod) []string { + var names []string + for _, pod := range pods { + names = append(names, pod.Name) + } + return names +} + func TestClaimServices(t *testing.T) { controllerUID := "123" From bbcb9bee7b6088f0be7d6e31eb0875d6c4ff1943 Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 15:42:19 -0700 Subject: [PATCH 6/7] Rename service_ref_manager to controller_ref_manager Since this file contains both BaseControllerRefManager, PodControllerRefManager and ServiceControllerRefManager. It makes sense to rename it to controller_ref_manager.go --- .../control/{service_ref_manager.go => controller_ref_manager.go} | 0 ...service_ref_manager_test.go => controller_ref_manager_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/controller.v1/control/{service_ref_manager.go => controller_ref_manager.go} (100%) rename pkg/controller.v1/control/{service_ref_manager_test.go => controller_ref_manager_test.go} (100%) diff --git a/pkg/controller.v1/control/service_ref_manager.go b/pkg/controller.v1/control/controller_ref_manager.go similarity index 100% rename from pkg/controller.v1/control/service_ref_manager.go rename to pkg/controller.v1/control/controller_ref_manager.go diff --git a/pkg/controller.v1/control/service_ref_manager_test.go b/pkg/controller.v1/control/controller_ref_manager_test.go similarity index 100% rename from pkg/controller.v1/control/service_ref_manager_test.go rename to pkg/controller.v1/control/controller_ref_manager_test.go From 96b4b115f51fb1bc5b043e63ddbdfa627081642b Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Wed, 15 Apr 2020 20:38:58 -0700 Subject: [PATCH 7/7] Address code review feedbacks --- pkg/controller.v1/control/service_control.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller.v1/control/service_control.go b/pkg/controller.v1/control/service_control.go index 362a187f..94672c41 100644 --- a/pkg/controller.v1/control/service_control.go +++ b/pkg/controller.v1/control/service_control.go @@ -16,11 +16,11 @@ package control import ( "fmt" - "k8s.io/apimachinery/pkg/api/errors" "sync" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels"