Skip to content

Commit

Permalink
Merge pull request #180 from adrienjt/fix-gc
Browse files Browse the repository at this point in the history
Cross-cluster GC after delete
  • Loading branch information
adrienjt committed Jul 11, 2023
2 parents 14954be + 3c5a4fc commit a2e557e
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 138 deletions.
5 changes: 5 additions & 0 deletions charts/multicluster-scheduler/templates/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ spec:
- name: controller-manager
args: ["--leader-elect"]
env:
- name: CLUSTER_NAME
value: {{ .Values.clusterName }}
# POD_NAME for leader election
- name: POD_NAME
valueFrom:
Expand Down Expand Up @@ -104,6 +106,9 @@ spec:
- name: proxy-scheduler
image: {{ .Values.scheduler.image.repository }}:{{ default .Chart.AppVersion .Values.scheduler.image.tag }}
args: ["--config", "/etc/admiralty/proxy-scheduler-config"]
env:
- name: CLUSTER_NAME
value: {{ .Values.clusterName }}
volumeMounts:
- name: config
mountPath: /etc/admiralty
Expand Down
2 changes: 2 additions & 0 deletions charts/multicluster-scheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ fullnameOverride: ""

#imagePullSecretName: ""

clusterName: ""

sourceController:
enabled: true

Expand Down
10 changes: 9 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import (
"context"
"flag"
"fmt"
"os"
"time"

"admiralty.io/multicluster-scheduler/pkg/controllers/cleanup"
Expand Down Expand Up @@ -125,6 +126,8 @@ func startOldStyleControllers(
var factories []startable
var controllers []runnable

clusterName := os.Getenv("CLUSTER_NAME")

for _, target := range agentCfg.Targets {
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k, time.Second*30, kubeinformers.WithNamespace(target.Namespace))
factories = append(factories, kubeInformerFactory)
Expand Down Expand Up @@ -156,6 +159,7 @@ func startOldStyleControllers(
controllers = append(
controllers,
follow.NewConfigMapController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -164,6 +168,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().ConfigMaps(),
),
service.NewController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -173,6 +178,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().Services(),
),
follow.NewSecretController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -181,6 +187,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().Secrets(),
),
ingress.NewIngressController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -193,6 +200,7 @@ func startOldStyleControllers(
controllers = append(
controllers,
feedback.NewController(
clusterName,
target,
k,
targetCustomClient,
Expand Down
10 changes: 6 additions & 4 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,9 +63,11 @@ var (

LabelKeyHasFinalizer = KeyPrefix + "has-finalizer"

LabelKeyParentUID = KeyPrefix + "parent-uid"
LabelKeyParentName = KeyPrefix + "parent-name"
LabelKeyParentNamespace = KeyPrefix + "parent-namespace"
LabelKeyParentClusterName = KeyPrefix + "parent-cluster-name"
// used to get pod chaperon (whose name is generated) given proxy pod ("list one" hack), without indexer
LabelKeyParentUID = KeyPrefix + "parent-uid"
AnnotationKeyParentName = KeyPrefix + "parent-name"
AnnotationKeyParentNamespace = KeyPrefix + "parent-namespace"

AnnotationKeyCiliumGlobalService = "io.cilium/global-service"

Expand Down
80 changes: 48 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,51 +164,67 @@ func (c *Controller) EnqueueController(ownerKind string, getOwner GetOwner) func
}
}

func (c *Controller) EnqueueRemoteController(ownerKind string, getOwner GetOwner) func(obj interface{}) {
func (c *Controller) EnqueueRemoteController(parentClusterName string) func(obj interface{}) {
return func(obj interface{}) {
object := obj.(metav1.Object)
a := object.GetAnnotations()
parentUID, ok := a[common.LabelKeyParentUID]
if !ok {
// for backward compatibility use labels instead,
// even though didn't work for parent names longer than 63 characters
a = object.GetLabels()
parentUID, ok = a[common.LabelKeyParentUID]
if IsRemoteControlled(object, parentClusterName) {
c.workqueue.Add(ParentKey(object))
return
}
if ok {
parentNamespace := a[common.LabelKeyParentNamespace]
if parentNamespace == "" {
parentNamespace = object.GetNamespace()
}
parentName := a[common.LabelKeyParentName]
if parentName == "" {
parentName = object.GetName()
}
owner, err := getOwner(parentNamespace, parentName)
if err != nil {
return
}
}
}

if string(owner.GetUID()) != parentUID {
// TODO handle unlikely yet possible cross-cluster UID conflict with signing
return
}
func IsRemoteControlled(object metav1.Object, parentClusterName string) bool {
v, ok := object.GetLabels()[common.LabelKeyParentClusterName]
// support empty parent cluster name
// check that label is present to filter out regular objects
return ok && v == parentClusterName
}

c.EnqueueObject(owner)
return
func ParentKey(child metav1.Object) string {
a := child.GetAnnotations()
parentNamespace := a[common.AnnotationKeyParentNamespace]
if parentNamespace == "" {
parentNamespace = child.GetNamespace()
}
parentName := a[common.AnnotationKeyParentName]
if parentName == "" {
parentName = child.GetName()
}
if parentNamespace != "" {
return parentNamespace + "/" + parentName
}
return parentName
}

func IndexByRemoteController(parentClusterName string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
meta, ok := obj.(metav1.Object)
if !ok {
return nil, nil
}
if !IsRemoteControlled(meta, parentClusterName) {
return nil, nil
}
return []string{ParentKey(meta)}, nil
}
}

func AddRemoteControllerReference(child metav1.Object, parent metav1.Object) {
func AddRemoteControllerReference(child metav1.Object, parent metav1.Object, parentClusterName string) {
l := child.GetLabels()
if l == nil {
l = map[string]string{}
child.SetLabels(l)
}
l[common.LabelKeyParentUID] = string(parent.GetUID())
l[common.LabelKeyParentClusterName] = parentClusterName
a := child.GetAnnotations()
if a == nil {
a = map[string]string{}
child.SetAnnotations(a)
}
a[common.LabelKeyParentUID] = string(parent.GetUID())
a[common.LabelKeyParentNamespace] = parent.GetNamespace()
a[common.LabelKeyParentName] = parent.GetName()
a[common.AnnotationKeyParentNamespace] = parent.GetNamespace()
a[common.AnnotationKeyParentName] = parent.GetName()
}

func ParentControlsChild(child metav1.Object, parent metav1.Object) bool {
Expand Down
67 changes: 37 additions & 30 deletions pkg/controllers/feedback/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,30 +50,24 @@ import (

// this file is modified from k8s.io/sample-controller

const controllerAgentName = "admiralty"

const (
// SuccessSynced is used as part of the Event 'reason' when a proxy pod is synced
SuccessSynced = "Synced"
// MessageResourceSynced is the message used for an Event fired when a proxy pod
// is synced successfully
MessageResourceSynced = "proxy pod synced successfully"
)
const podChaperonByPodNamespacedName = "podChaperonByPodNamespacedName"

type reconciler struct {
target agent.Target
clusterName string
target agent.Target

kubeclientset kubernetes.Interface
customclientset clientset.Interface

podsLister corelisters.PodLister
podChaperonsLister listers.PodChaperonLister

recorder record.EventRecorder
podChaperonIndex cache.Indexer
}

// NewController returns a new feedback controller
func NewController(
clusterName string,
target agent.Target,
kubeclientset kubernetes.Interface,
customclientset clientset.Interface,
Expand All @@ -85,21 +79,20 @@ func NewController(
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

r := &reconciler{
target: target,
clusterName: clusterName,
target: target,

kubeclientset: kubeclientset,
customclientset: customclientset,

podsLister: podInformer.Lister(),
podChaperonsLister: podChaperonInformer.Lister(),

recorder: recorder,
podChaperonIndex: podChaperonInformer.Informer().GetIndexer(),
}

getPod := func(namespace, name string) (metav1.Object, error) { return r.podsLister.Pods(namespace).Get(name) }
c := controller.New("feedback", r, podInformer.Informer().HasSynced, podChaperonInformer.Informer().HasSynced)

enqueueProxyPod := func(obj interface{}) {
Expand All @@ -110,7 +103,11 @@ func NewController(
}

podInformer.Informer().AddEventHandler(controller.HandleAddUpdateWith(enqueueProxyPod))
podChaperonInformer.Informer().AddEventHandler(controller.HandleAllWith(c.EnqueueRemoteController("Pod", getPod)))
podChaperonInformer.Informer().AddEventHandler(controller.HandleAllWith(c.EnqueueRemoteController(clusterName)))

utilruntime.Must(podChaperonInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
podChaperonByPodNamespacedName: controller.IndexByRemoteController(clusterName),
}))

return c
}
Expand All @@ -125,17 +122,26 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
proxyPod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("proxy pod '%s' in work queue no longer exists", key))
objs, err := c.podChaperonIndex.ByIndex(podChaperonByPodNamespacedName, key)
utilruntime.Must(err)
for _, obj := range objs {
candidate := obj.(*multiclusterv1alpha1.PodChaperon)
if err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Delete(ctx, candidate.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return nil, fmt.Errorf("cannot delete orphaned pod chaperon: %v", err)
}
}
return nil, nil
} else {
return nil, fmt.Errorf("cannot get proxy pod: %v", err)
}

return nil, err
}

proxyPodTerminating := proxyPod.DeletionTimestamp != nil

proxyPodHasFinalizer, j := controller.HasFinalizer(proxyPod.Finalizers, c.target.Finalizer)

// get pod chaperon by parent UID (when parent still exists) rather than using index
// for backward compatibility with existing pod chaperons
var candidate *multiclusterv1alpha1.PodChaperon
l, err := c.podChaperonsLister.PodChaperons(namespace).List(labels.SelectorFromSet(map[string]string{common.LabelKeyParentUID: string(proxyPod.UID)}))
if err != nil {
Expand All @@ -148,20 +154,16 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
candidate = l[0]
}

didSomething := false

virtualNodeName := proxypod.GetScheduledClusterName(proxyPod)
if proxyPodTerminating || virtualNodeName != "" && virtualNodeName != c.target.VirtualNodeName {
if candidate != nil {
if err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Delete(ctx, candidate.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return nil, err
}
didSomething = true
} else if proxyPodHasFinalizer {
if proxyPod, err = c.removeFinalizer(ctx, proxyPod, j); err != nil {
return nil, err
}
didSomething = true
}
}

Expand All @@ -183,7 +185,6 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
if proxyPod, err = c.kubeclientset.CoreV1().Pods(namespace).Update(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
return nil, err
}
didSomething = true
}

// we can't group annotation and status updates into an update,
Expand All @@ -198,17 +199,23 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
if proxyPod, err = c.kubeclientset.CoreV1().Pods(namespace).UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
return nil, err
}
didSomething = true
}
}

if didSomething {
c.recorder.Event(proxyPod, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
needRemoteUpdate := delegate.Labels[common.LabelKeyParentClusterName] != c.clusterName
if needRemoteUpdate {
delegateCopy := delegate.DeepCopy()
delegateCopy.Labels[common.LabelKeyParentClusterName] = c.clusterName
var err error
if delegate, err = c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Update(ctx, delegateCopy, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("cannot update candidate pod chaperon")
}
}
}

return nil, nil
}

func (c reconciler) removeFinalizer(ctx context.Context, pod *corev1.Pod, j int) (*corev1.Pod, error) {
func (c *reconciler) removeFinalizer(ctx context.Context, pod *corev1.Pod, j int) (*corev1.Pod, error) {
podCopy := pod.DeepCopy()
podCopy.Finalizers = append(podCopy.Finalizers[:j], podCopy.Finalizers[j+1:]...)
return c.kubeclientset.CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
Expand Down
Loading

0 comments on commit a2e557e

Please sign in to comment.