diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index bb360f66..56a8c2b8 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/controller" apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" commonutil "github.com/kubeflow/common/pkg/util" @@ -73,7 +72,7 @@ func (jc *JobController) AddPod(obj interface{}) { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { logger.Infof("Failed to get the jobkey: %v", err) return @@ -116,7 +115,7 @@ func (jc *JobController) UpdatePod(old, cur interface{}) { // The ControllerRef was changed. Sync the old controller, if any. if job := jc.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { logger.Infof("pod ControllerRef updated: %v, %v", curPod, oldPod) - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } @@ -132,7 +131,7 @@ func (jc *JobController) UpdatePod(old, cur interface{}) { return } logger.Debugf("pod has a ControllerRef: %v, %v", curPod, oldPod) - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } @@ -175,7 +174,7 @@ func (jc *JobController) DeletePod(obj interface{}) { if job == nil { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } diff --git a/pkg/controller.v1/common/service.go b/pkg/controller.v1/common/service.go index 2d49c2bd..969cee4e 100644 --- a/pkg/controller.v1/common/service.go +++ b/pkg/controller.v1/common/service.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/kubernetes/pkg/controller" ) // When a service is created, enqueue the controller that manages it and update its expectations. @@ -49,7 +48,7 @@ func (jc *JobController) AddService(obj interface{}) { return } - jobKey, err := controller.KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { return } diff --git a/pkg/controller.v1/control/controller_ref_manager.go b/pkg/controller.v1/control/controller_ref_manager.go new file mode 100644 index 00000000..1af581ad --- /dev/null +++ b/pkg/controller.v1/control/controller_ref_manager.go @@ -0,0 +1,377 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package control + +import ( + "fmt" + commonutil "github.com/kubeflow/common/pkg/util" + log "github.com/sirupsen/logrus" + "sync" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +type BaseControllerRefManager struct { + Controller metav1.Object + Selector labels.Selector + + canAdoptErr error + canAdoptOnce sync.Once + CanAdoptFunc func() error +} + +func (m *BaseControllerRefManager) CanAdopt() error { + m.canAdoptOnce.Do(func() { + if m.CanAdoptFunc != nil { + m.canAdoptErr = m.CanAdoptFunc() + } + }) + return m.canAdoptErr +} + +// ClaimObject tries to take ownership of an object for this controller. +// +// It will reconcile the following: +// * Adopt orphans if the match function returns true. +// * Release owned objects if the match function returns false. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The returned boolean indicates whether you now +// own the object. +// +// No reconciliation will be attempted if the controller is being deleted. +func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { + controllerRef := metav1.GetControllerOf(obj) + if controllerRef != nil { + if controllerRef.UID != m.Controller.GetUID() { + // Owned by someone else. Ignore. + return false, nil + } + if match(obj) { + // We already own it and the selector matches. + // Return true (successfully claimed) before checking deletion timestamp. + // We're still allowed to claim things we already own while being deleted + // because doing so requires taking no actions. + return true, nil + } + // Owned by us but selector doesn't match. + // Try to release, unless we're being deleted. + if m.Controller.GetDeletionTimestamp() != nil { + return false, nil + } + if err := release(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else released it, or there was a transient error. + // The controller should requeue and try again if it's still stale. + return false, err + } + // Successfully released. + return false, nil + } + + // It's an orphan. + if m.Controller.GetDeletionTimestamp() != nil || !match(obj) { + // Ignore if we're being deleted or selector doesn't match. + return false, nil + } + if obj.GetDeletionTimestamp() != nil { + // Ignore if the object is being deleted + return false, nil + } + // Selector matches. Try to adopt. + if err := adopt(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else claimed it first, or there was a transient error. + // The controller should requeue and try again if it's still orphaned. + return false, err + } + // Successfully adopted. + return true, nil +} + +type PodControllerRefManager struct { + BaseControllerRefManager + controllerKind schema.GroupVersionKind + podControl PodControlInterface +} + +// NewPodControllerRefManager returns a PodControllerRefManager that exposes +// methods to manage the controllerRef of pods. +// +// The CanAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If CanAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once CanAdopt() is called, it will not be called again by the same +// PodControllerRefManager instance. Create a new instance if it makes +// sense to check CanAdopt() again (e.g. in a different sync pass). +func NewPodControllerRefManager( + podControl PodControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *PodControllerRefManager { + return &PodControllerRefManager{ + BaseControllerRefManager: BaseControllerRefManager{ + Controller: controller, + Selector: selector, + CanAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + podControl: podControl, + } +} + +// ClaimPods tries to take ownership of a list of Pods. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// Optional: If one or more filters are specified, a Pod will only be claimed if +// all filters return true. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of Pods that you now own is returned. +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { + var claimed []*v1.Pod + var errlist []error + + match := func(obj metav1.Object) bool { + pod := obj.(*v1.Pod) + // Check selector first so filters only run on potentially matching Pods. + if !m.Selector.Matches(labels.Set(pod.Labels)) { + return false + } + for _, filter := range filters { + if !filter(pod) { + return false + } + } + return true + } + adopt := func(obj metav1.Object) error { + return m.AdoptPod(obj.(*v1.Pod)) + } + release := func(obj metav1.Object) error { + return m.ReleasePod(obj.(*v1.Pod)) + } + + for _, pod := range pods { + ok, err := m.ClaimObject(pod, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, pod) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptPod sends a patch to take control of the pod. It returns the error if +// the patching fails. +func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { + if err := m.CanAdopt(); err != nil { + return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.Controller.GetName(), m.Controller.GetUID(), pod.UID) + return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) +} + +// ReleasePod sends a patch to free the pod from the control of the controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { + log.Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", + pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), pod.UID) + err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the pod no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the pod + // has no owner reference, 2. the uid of the pod doesn't + // match, which means the pod is deleted and then recreated. + // In both cases, the error can be ignored. + + // TODO: If the pod has owner references, but none of them + // has the owner.UID, server will silently ignore the patch. + // Investigate why. + return nil + } + } + return err +} + +type ServiceControllerRefManager struct { + BaseControllerRefManager + + controllerKind schema.GroupVersionKind + serviceControl ServiceControlInterface +} + +// NewServiceControllerRefManager returns a ServiceControllerRefManager that exposes +// methods to manage the controllerRef of services. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// ServiceControllerRefManager instance. Create a new instance if it makes +// sense to check canAdopt() again (e.g. in a different sync pass). +func NewServiceControllerRefManager( + serviceControl ServiceControlInterface, + ctr metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *ServiceControllerRefManager { + return &ServiceControllerRefManager{ + BaseControllerRefManager: BaseControllerRefManager{ + Controller: ctr, + Selector: selector, + CanAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + serviceControl: serviceControl, + } +} + +// ClaimServices tries to take ownership of a list of Services. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// Optional: If one or more filters are specified, a Service will only be claimed if +// all filters return true. +// +// A non-nil error is returned if some form of reconciliation was attempted and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of Services that you now own is returned. +func (m *ServiceControllerRefManager) ClaimServices(services []*v1.Service, filters ...func(*v1.Service) bool) ([]*v1.Service, error) { + var claimed []*v1.Service + var errlist []error + + match := func(obj metav1.Object) bool { + service := obj.(*v1.Service) + // Check selector first so filters only run on potentially matching Services. + if !m.Selector.Matches(labels.Set(service.Labels)) { + return false + } + for _, filter := range filters { + if !filter(service) { + return false + } + } + return true + } + adopt := func(obj metav1.Object) error { + return m.AdoptService(obj.(*v1.Service)) + } + release := func(obj metav1.Object) error { + return m.ReleaseService(obj.(*v1.Service)) + } + + for _, service := range services { + ok, err := m.ClaimObject(service, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, service) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptService sends a patch to take control of the service. It returns the error if +// the patching fails. +func (m *ServiceControllerRefManager) AdoptService(service *v1.Service) error { + if err := m.CanAdopt(); err != nil { + return fmt.Errorf("can't adopt Service %v/%v (%v): %v", service.Namespace, service.Name, service.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.Controller.GetName(), m.Controller.GetUID(), service.UID) + return m.serviceControl.PatchService(service.Namespace, service.Name, []byte(addControllerPatch)) +} + +// ReleaseService sends a patch to free the service from the control of the controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *ServiceControllerRefManager) ReleaseService(service *v1.Service) error { + logger := commonutil.LoggerForService(service, m.controllerKind.Kind) + logger.Infof("patching service %s_%s to remove its controllerRef to %s/%s:%s", + service.Namespace, service.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), service.UID) + err := m.serviceControl.PatchService(service.Namespace, service.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the service no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the service + // has no owner reference, 2. the uid of the service doesn't + // match, which means the service is deleted and then recreated. + // In both cases, the error can be ignored. + + // TODO: If the service has owner references, but none of them + // has the owner.UID, server will silently ignore the patch. + // Investigate why. + return nil + } + } + return err +} diff --git a/pkg/controller.v1/control/service_ref_manager_test.go b/pkg/controller.v1/control/controller_ref_manager_test.go similarity index 54% rename from pkg/controller.v1/control/service_ref_manager_test.go rename to pkg/controller.v1/control/controller_ref_manager_test.go index 8256ba05..41b63fe0 100644 --- a/pkg/controller.v1/control/service_ref_manager_test.go +++ b/pkg/controller.v1/control/controller_ref_manager_test.go @@ -26,6 +26,179 @@ import ( testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1" ) +func TestClaimPods(t *testing.T) { + controllerUID := "123" + + type test struct { + name string + manager *PodControllerRefManager + pods []*v1.Pod + claimed []*v1.Pod + } + var tests = []test{ + func() test { + testJob := testutilv1.NewTestJob(1) + testJobLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(testJob.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + testPod := testutilv1.NewBasePod("pod2", testJob, nil) + testPod.Labels[testutilv1.LabelGroupName] = "testing" + + return test{ + name: "Claim pods with correct label", + manager: NewPodControllerRefManager(&FakePodControl{}, + testJob, + testJobLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", testJob, t), testPod}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", testJob, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + now := metav1.Now() + controller.DeletionTimestamp = &now + testPod1 := testutilv1.NewBasePod("pod1", controller, t) + testPod1.SetOwnerReferences([]metav1.OwnerReference{}) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.SetOwnerReferences([]metav1.OwnerReference{}) + return test{ + name: "Controller marked for deletion can not claim pods", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testPod1, testPod2}, + claimed: nil, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + now := metav1.Now() + controller.DeletionTimestamp = &now + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.SetOwnerReferences([]metav1.OwnerReference{}) + return test{ + name: "Controller marked for deletion can not claim new pods", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testPod2}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller2 := testutilv1.NewTestJob(1) + controller.UID = types.UID(controllerUID) + controller2.UID = types.UID("AAAAA") + return test{ + name: "Controller can not claim pods owned by another controller", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testutilv1.NewBasePod("pod2", controller2, t)}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.Labels[testutilv1.LabelGroupName] = "testing" + return test{ + name: "Controller releases claimed pods when selector doesn't match", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t), testPod2}, + claimed: []*v1.Pod{testutilv1.NewBasePod("pod1", controller, t)}, + } + }(), + func() test { + controller := testutilv1.NewTestJob(1) + controllerLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: testutilv1.GenLabels(controller.Name), + }) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + controller.UID = types.UID(controllerUID) + testPod1 := testutilv1.NewBasePod("pod1", controller, t) + testPod2 := testutilv1.NewBasePod("pod2", controller, t) + testPod2.Labels[testutilv1.LabelGroupName] = "testing" + now := metav1.Now() + testPod1.DeletionTimestamp = &now + testPod2.DeletionTimestamp = &now + + return test{ + name: "Controller does not claim orphaned pods marked for deletion", + manager: NewPodControllerRefManager(&FakePodControl{}, + controller, + controllerLabelSelector, + testjobv1.SchemeGroupVersionKind, + func() error { return nil }), + pods: []*v1.Pod{testPod1, testPod2}, + claimed: []*v1.Pod{testPod1}, + } + }(), + } + for _, test := range tests { + claimed, err := test.manager.ClaimPods(test.pods) + if err != nil { + t.Errorf("Test case `%s`, unexpected error: %v", test.name, err) + } else if !reflect.DeepEqual(test.claimed, claimed) { + t.Errorf("Test case `%s`, claimed wrong pods. Expected %v, got %v", test.name, podToStringSlice(test.claimed), podToStringSlice(claimed)) + } + + } +} + +func podToStringSlice(pods []*v1.Pod) []string { + var names []string + for _, pod := range pods { + names = append(names, pod.Name) + } + return names +} + func TestClaimServices(t *testing.T) { controllerUID := "123" diff --git a/pkg/controller.v1/control/pod_control.go b/pkg/controller.v1/control/pod_control.go index cb4b98bf..926889e7 100644 --- a/pkg/controller.v1/control/pod_control.go +++ b/pkg/controller.v1/control/pod_control.go @@ -18,6 +18,7 @@ import ( "fmt" commonutil "github.com/kubeflow/common/pkg/util" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -25,32 +26,48 @@ import ( "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/controller" + "sync" ) // Reasons for pod events const ( - // FailedCreatePodReason is added in an event and in a replica set condition + // FailedCreatePodReason is added in an event and in a job condition // when a pod for a replica set is failed to be created. FailedCreatePodReason = "FailedCreatePod" - // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // SuccessfulCreatePodReason is added in an event when a pod for a job // is successfully created. SuccessfulCreatePodReason = "SuccessfulCreatePod" - // FailedDeletePodReason is added in an event and in a replica set condition + // FailedDeletePodReason is added in an event and in a job condition // when a pod for a replica set is failed to be deleted. FailedDeletePodReason = "FailedDeletePod" - // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // SuccessfulDeletePodReason is added in an event when a pod for a job // is successfully deleted. SuccessfulDeletePodReason = "SuccessfulDeletePod" ) +// PodControlInterface is an interface that knows how to add or delete pods +// created as an interface to allow testing. +type PodControlInterface interface { + // CreatePods creates new pods according to the spec. + CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod according to the spec on the specified node, + // and sets the ControllerRef. + CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. + CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // DeletePod deletes the pod identified by podID. + DeletePod(namespace string, podID string, object runtime.Object) error + // PatchPod patches the pod. + PatchPod(namespace, name string, data []byte) error +} + // RealPodControl is the default implementation of PodControlInterface. type RealPodControl struct { KubeClient clientset.Interface Recorder record.EventRecorder } -var _ controller.PodControlInterface = &RealPodControl{} +var _ PodControlInterface = &RealPodControl{} func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set { desiredLabels := make(labels.Set) @@ -150,6 +167,17 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime return fmt.Errorf("object does not have ObjectMeta, %v", err) } logger := commonutil.LoggerForJob(accessor) + pod, err := r.KubeClient.CoreV1().Pods(namespace).Get(podID, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if pod.DeletionTimestamp != nil { + logger.Infof("pod %s/%s is terminating, skip deleting", pod.Namespace, pod.Name) + return nil + } logger.Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID) if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) @@ -159,3 +187,91 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime } return nil } + +type FakePodControl struct { + sync.Mutex + Templates []v1.PodTemplateSpec + ControllerRefs []metav1.OwnerReference + DeletePodName []string + Patches [][]byte + Err error + CreateLimit int + CreateCallCount int +} + +var _ PodControlInterface = &FakePodControl{} + +func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { + f.Lock() + defer f.Unlock() + f.Patches = append(f.Patches, data) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *spec) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *spec) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *template) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.DeletePodName = append(f.DeletePodName, podID) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) Clear() { + f.Lock() + defer f.Unlock() + f.DeletePodName = []string{} + f.Templates = []v1.PodTemplateSpec{} + f.ControllerRefs = []metav1.OwnerReference{} + f.Patches = [][]byte{} + f.CreateLimit = 0 + f.CreateCallCount = 0 +} \ No newline at end of file diff --git a/pkg/controller.v1/control/service_control.go b/pkg/controller.v1/control/service_control.go index 757aa397..94672c41 100644 --- a/pkg/controller.v1/control/service_control.go +++ b/pkg/controller.v1/control/service_control.go @@ -20,6 +20,7 @@ import ( log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -30,9 +31,17 @@ import ( ) const ( + // FailedCreateServiceReason is added in an event and in a job controller condition + // when a service for a job is failed to be created. FailedCreateServiceReason = "FailedCreateService" + // SuccessfulCreateServiceReason is added in an event when a service for a job + // is successfully created. SuccessfulCreateServiceReason = "SuccessfulCreateService" + // FailedDeleteServiceReason is added in an event and in a job condition + // when a service for a job is failed to be deleted. FailedDeleteServiceReason = "FailedDeleteService" + // SuccessfulDeleteServiceReason is added in an event when a service for a job + // is successfully deleted. SuccessfulDeleteServiceReason = "SuccessfulDeleteService" ) @@ -104,6 +113,17 @@ func (r RealServiceControl) DeleteService(namespace, serviceID string, object ru if err != nil { return fmt.Errorf("object does not have ObjectMeta, %v", err) } + service, err := r.KubeClient.CoreV1().Services(namespace).Get(serviceID, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + if service.DeletionTimestamp != nil { + log.Infof("service %s/%s is terminating, skip deleting", service.Namespace, service.Name) + return nil + } log.Infof("Controller %v deleting service %v/%v", accessor.GetName(), namespace, serviceID) if err := r.KubeClient.CoreV1().Services(namespace).Delete(serviceID, nil); err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeleteServiceReason, "Error deleting: %v", err) diff --git a/pkg/controller.v1/control/service_ref_manager.go b/pkg/controller.v1/control/service_ref_manager.go deleted file mode 100644 index cafc7aa2..00000000 --- a/pkg/controller.v1/control/service_ref_manager.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2019 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package control - -import ( - "fmt" - commonutil "github.com/kubeflow/common/pkg/util" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/kubernetes/pkg/controller" -) - -type ServiceControllerRefManager struct { - controller.BaseControllerRefManager - - controllerKind schema.GroupVersionKind - serviceControl ServiceControlInterface -} - -// NewServiceControllerRefManager returns a ServiceControllerRefManager that exposes -// methods to manage the controllerRef of services. -// -// The canAdopt() function can be used to perform a potentially expensive check -// (such as a live GET from the API server) prior to the first adoption. -// It will only be called (at most once) if an adoption is actually attempted. -// If canAdopt() returns a non-nil error, all adoptions will fail. -// -// NOTE: Once canAdopt() is called, it will not be called again by the same -// ServiceControllerRefManager instance. Create a new instance if it makes -// sense to check canAdopt() again (e.g. in a different sync pass). -func NewServiceControllerRefManager( - serviceControl ServiceControlInterface, - ctr metav1.Object, - selector labels.Selector, - controllerKind schema.GroupVersionKind, - canAdopt func() error, -) *ServiceControllerRefManager { - return &ServiceControllerRefManager{ - BaseControllerRefManager: controller.BaseControllerRefManager{ - Controller: ctr, - Selector: selector, - CanAdoptFunc: canAdopt, - }, - controllerKind: controllerKind, - serviceControl: serviceControl, - } -} - -// ClaimServices tries to take ownership of a list of Services. -// -// It will reconcile the following: -// * Adopt orphans if the selector matches. -// * Release owned objects if the selector no longer matches. -// -// Optional: If one or more filters are specified, a Service will only be claimed if -// all filters return true. -// -// A non-nil error is returned if some form of reconciliation was attempted and -// failed. Usually, controllers should try again later in case reconciliation -// is still needed. -// -// If the error is nil, either the reconciliation succeeded, or no -// reconciliation was necessary. The list of Services that you now own is returned. -func (m *ServiceControllerRefManager) ClaimServices(services []*v1.Service, filters ...func(*v1.Service) bool) ([]*v1.Service, error) { - var claimed []*v1.Service - var errlist []error - - match := func(obj metav1.Object) bool { - service := obj.(*v1.Service) - // Check selector first so filters only run on potentially matching Services. - if !m.Selector.Matches(labels.Set(service.Labels)) { - return false - } - for _, filter := range filters { - if !filter(service) { - return false - } - } - return true - } - adopt := func(obj metav1.Object) error { - return m.AdoptService(obj.(*v1.Service)) - } - release := func(obj metav1.Object) error { - return m.ReleaseService(obj.(*v1.Service)) - } - - for _, service := range services { - ok, err := m.ClaimObject(service, match, adopt, release) - if err != nil { - errlist = append(errlist, err) - continue - } - if ok { - claimed = append(claimed, service) - } - } - return claimed, utilerrors.NewAggregate(errlist) -} - -// AdoptService sends a patch to take control of the service. It returns the error if -// the patching fails. -func (m *ServiceControllerRefManager) AdoptService(service *v1.Service) error { - if err := m.CanAdopt(); err != nil { - return fmt.Errorf("can't adopt Service %v/%v (%v): %v", service.Namespace, service.Name, service.UID, err) - } - // Note that ValidateOwnerReferences() will reject this patch if another - // OwnerReference exists with controller=true. - addControllerPatch := fmt.Sprintf( - `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, - m.controllerKind.GroupVersion(), m.controllerKind.Kind, - m.Controller.GetName(), m.Controller.GetUID(), service.UID) - return m.serviceControl.PatchService(service.Namespace, service.Name, []byte(addControllerPatch)) -} - -// ReleaseService sends a patch to free the service from the control of the controller. -// It returns the error if the patching fails. 404 and 422 errors are ignored. -func (m *ServiceControllerRefManager) ReleaseService(service *v1.Service) error { - logger := commonutil.LoggerForService(service, m.controllerKind.Kind) - logger.Infof("patching service %s_%s to remove its controllerRef to %s/%s:%s", - service.Namespace, service.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) - deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), service.UID) - err := m.serviceControl.PatchService(service.Namespace, service.Name, []byte(deleteOwnerRefPatch)) - if err != nil { - if errors.IsNotFound(err) { - // If the service no longer exists, ignore it. - return nil - } - if errors.IsInvalid(err) { - // Invalid error will be returned in two cases: 1. the service - // has no owner reference, 2. the uid of the service doesn't - // match, which means the service is deleted and then recreated. - // In both cases, the error can be ignored. - - // TODO: If the service has owner references, but none of them - // has the owner.UID, server will silently ignore the patch. - // Investigate why. - return nil - } - } - return err -}