Skip to content

Commit

Permalink
Merge pull request #184 from adrienjt/recreate-delegate-only-if-clust…
Browse files Browse the repository at this point in the history
…er-connection-lost

Recreate delegate only if cluster connection lost
  • Loading branch information
adrienjt committed Jul 22, 2023
2 parents 01688ea + 860a0b9 commit 7116f41
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 107 deletions.
5 changes: 5 additions & 0 deletions charts/multicluster-scheduler/templates/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ spec:
ports:
- containerPort: 9443
- containerPort: 10250
- containerPort: 8080
readinessProbe:
httpGet:
port: 8080
path: /readyz
volumeMounts:
- mountPath: /tmp/k8s-webhook-server/serving-certs
name: cert
Expand Down
9 changes: 6 additions & 3 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,18 @@ func addClusterScopedFactoriesAndControllers(

func startWebhook(ctx context.Context, cfg *rest.Config, agentCfg agentconfig.Config) {
webhookMgr, err := manager.New(cfg, manager.Options{
Port: 9443,
CertDir: "/tmp/k8s-webhook-server/serving-certs",
MetricsBindAddress: "0",
Port: 9443,
CertDir: "/tmp/k8s-webhook-server/serving-certs",
MetricsBindAddress: "0",
HealthProbeBindAddress: ":8080",
})
utilruntime.Must(err)

hookServer := webhookMgr.GetWebhookServer()
hookServer.Register("/mutate-v1-pod", &webhook.Admission{Handler: proxypod.NewHandler(agentCfg.GetKnownFinalizersByNamespace())})

utilruntime.Must(webhookMgr.AddReadyzCheck("webhook-ready", hookServer.StartedChecker()))

go func() {
utilruntime.Must(webhookMgr.Start(ctx))
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
AnnotationKeyIsReserved = KeyPrefix + "is-reserved"
AnnotationKeyIsAllowed = KeyPrefix + "is-allowed"

AnnotationKeyPodMissingSince = KeyPrefix + "pod-missing-since"

// annotations on following services and ingresses (for cloud controller manager to configure DNS)

AnnotationKeyGlobal = KeyPrefix + "global"
Expand Down
123 changes: 49 additions & 74 deletions pkg/controllers/chaperon/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,47 +22,29 @@ import (
"reflect"
"time"

multiclusterv1alpha1 "admiralty.io/multicluster-scheduler/pkg/apis/multicluster/v1alpha1"
"admiralty.io/multicluster-scheduler/pkg/common"
"admiralty.io/multicluster-scheduler/pkg/controller"
clientset "admiralty.io/multicluster-scheduler/pkg/generated/clientset/versioned"
customscheme "admiralty.io/multicluster-scheduler/pkg/generated/clientset/versioned/scheme"
informers "admiralty.io/multicluster-scheduler/pkg/generated/informers/externalversions/multicluster/v1alpha1"
listers "admiralty.io/multicluster-scheduler/pkg/generated/listers/multicluster/v1alpha1"
"github.com/go-test/deep"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

multiclusterv1alpha1 "admiralty.io/multicluster-scheduler/pkg/apis/multicluster/v1alpha1"
"admiralty.io/multicluster-scheduler/pkg/common"
"admiralty.io/multicluster-scheduler/pkg/controller"
clientset "admiralty.io/multicluster-scheduler/pkg/generated/clientset/versioned"
customscheme "admiralty.io/multicluster-scheduler/pkg/generated/clientset/versioned/scheme"
informers "admiralty.io/multicluster-scheduler/pkg/generated/informers/externalversions/multicluster/v1alpha1"
listers "admiralty.io/multicluster-scheduler/pkg/generated/listers/multicluster/v1alpha1"
)

// this file is modified from k8s.io/sample-controller

const controllerAgentName = "admiralty"

const (
// SuccessSynced is used as part of the Event 'reason' when a PodChaperon is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a PodChaperon fails
// to sync due to a Pod of the same name already existing.
ErrResourceExists = "ErrResourceExists"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Pod already existing
MessageResourceExists = "Resource %q already exists and is not managed by PodChaperon"
// MessageResourceSynced is the message used for an Event fired when a PodChaperon
// is synced successfully
MessageResourceSynced = "PodChaperon synced successfully"
)
// TODO: configurable
var PodRecreatedIfPodChaperonNotDeletedAfter = time.Minute

type reconciler struct {
kubeclientset kubernetes.Interface
Expand All @@ -74,26 +56,19 @@ type reconciler struct {
recorder record.EventRecorder
}

// NewController returns a new chaperon controller
func NewController(
kubeclientset kubernetes.Interface,
customclientset clientset.Interface,
podInformer coreinformers.PodInformer,
podChaperonInformer informers.PodChaperonInformer) *controller.Controller {

utilruntime.Must(customscheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

r := &reconciler{
kubeclientset: kubeclientset,
customclientset: customclientset,
podsLister: podInformer.Lister(),
podChaperonsLister: podChaperonInformer.Lister(),
recorder: recorder,
}

getPodChaperon := func(namespace, name string) (metav1.Object, error) {
Expand All @@ -108,53 +83,65 @@ func NewController(
return c
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the PodChaperon resource
// with the current status of the resource.
func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err error) {
ctx := context.Background()

// Convert the namespace/name string into a distinct namespace and name
key := obj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
utilruntime.Must(err)

// Get the PodChaperon resource with this namespace/name
podChaperon, err := c.podChaperonsLister.PodChaperons(namespace).Get(name)
if err != nil {
// The PodChaperon resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("podChaperon '%s' in work queue no longer exists", key))
return nil, nil
}

return nil, err
}

didSomething := false

// Get the pod with the name specified in PodChaperon.spec
pod, err := c.podsLister.Pods(podChaperon.Namespace).Get(podChaperon.Name)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
pod, err = c.kubeclientset.CoreV1().Pods(podChaperon.Namespace).Create(ctx, newPod(podChaperon), metav1.CreateOptions{})
didSomething = true
var podMissingSince time.Time
podMissingSinceStr, podMissing := podChaperon.Annotations[common.AnnotationKeyPodMissingSince]
if podMissing {
var err error
podMissingSince, err = time.Parse(time.RFC3339, podMissingSinceStr)
if err != nil {
return nil, fmt.Errorf("cannot parse %s annotation value: %v", common.AnnotationKeyPodMissingSince, err)
}
}

// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
pod, err := c.podsLister.Pods(podChaperon.Namespace).Get(podChaperon.Name)
if err != nil {
return nil, err
if !errors.IsNotFound(err) {
return nil, err
}

podNeverExisted := podChaperon.Status.Phase == ""
if podNeverExisted || podMissing && time.Since(podMissingSince) > PodRecreatedIfPodChaperonNotDeletedAfter && podChaperon.Spec.RestartPolicy == corev1.RestartPolicyAlways {
var err error
pod, err = c.kubeclientset.CoreV1().Pods(podChaperon.Namespace).Create(ctx, newPod(podChaperon), metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("cannot create pod for pod chaperon %v", err)
}
} else if !podMissing {
patch := []byte(`{"metadata":{"annotations":{"` + common.AnnotationKeyPodMissingSince + `":"` + time.Now().Format(time.RFC3339) + `"}}}`)
if _, err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Patch(ctx, name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return nil, fmt.Errorf("cannot patch pod chaperon: %v", err)
}
return nil, nil
} else {
return nil, nil
}
}

// If the Pod is not controlled by this PodChaperon resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(pod, podChaperon) {
msg := fmt.Sprintf(MessageResourceExists, pod.Name)
c.recorder.Event(podChaperon, corev1.EventTypeWarning, ErrResourceExists, msg)
return nil, fmt.Errorf(msg)
return nil, fmt.Errorf("resource %q already exists and is not managed by PodChaperon", pod.Name)
}

if podMissing {
patch := []byte(`{"metadata":{"annotations":{"` + common.AnnotationKeyPodMissingSince + `":null}}}`)
_, err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Patch(ctx, name, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("cannot patch pod chaperon: %v", err)
}
}

// TODO: support allowed pod spec updates: containers[*].image, initContainers[*].image, activeDeadlineSeconds, tolerations (only additions to tolerations)
Expand All @@ -168,10 +155,6 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
_, otherPodAnnotations := common.SplitLabelsOrAnnotations(pod.Annotations)
needUpdate := !reflect.DeepEqual(otherPodChaperonAnnotations, otherPodAnnotations)

// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance

if needStatusUpdate {
podChaperonCopy := podChaperon.DeepCopy()
pod.Status.DeepCopyInto(&podChaperonCopy.Status)
Expand All @@ -185,7 +168,6 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
}
return nil, err
}
didSomething = true
}

if needUpdate {
Expand All @@ -204,18 +186,11 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
}
return nil, err
}
didSomething = true
}

if didSomething {
c.recorder.Event(podChaperon, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
}
return nil, nil
}

// newPod creates a new Pod for a PodChaperon resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the PodChaperon resource that 'owns' it.
func newPod(podChaperon *multiclusterv1alpha1.PodChaperon) *corev1.Pod {
annotations := make(map[string]string)
for k, v := range podChaperon.Annotations {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/feedback/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
}
}

if candidate != nil {
if _, podMissing := candidate.Annotations[common.AnnotationKeyPodMissingSince]; podMissing {
if err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Delete(ctx, candidate.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return nil, fmt.Errorf("cannot delete pod chaperon")
}
}
}

if virtualNodeName == c.target.VirtualNodeName {
if candidate != nil {
delegate := candidate
Expand Down
8 changes: 5 additions & 3 deletions test/e2e/cleanup/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ cleanup_test() {

k $i apply -f test/e2e/cleanup/test.yaml
k $i rollout status deploy cleanup
target="$(k $i get pod -l app=cleanup -o json | jq -er '.items[0].metadata.finalizers[0] | split("-") | .[1]')"
k $i delete target $target
nodeName="$(k $i get pod -l app=cleanup -o json | jq -er '.items[0].spec.nodeName')"
j="${nodeName: -1}"
k $i delete target c$j

export -f cleanup_test_iteration
timeout --foreground 180s bash -c "until cleanup_test_iteration $i; do sleep 1; done"
# use --foreground to catch ctrl-c
# https://unix.stackexchange.com/a/233685

admiralty_connect $i "${target: -1}"
admiralty_connect $i $j
k $i wait --for condition=available --timeout=120s deployment multicluster-scheduler-controller-manager -n admiralty
k $i delete -f test/e2e/cleanup/test.yaml
}

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/delete-chaperon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ delete-chaperon_test() {

k $i apply -f test/e2e/delete-chaperon/test.yaml
k $i wait pod test-delete-chaperon --for=condition=PodScheduled
target="$(k $i get pod test-delete-chaperon -o json | jq -er '.spec.nodeName')"
j="${target: -1}"
nodeName="$(k $i get pod test-delete-chaperon -o json | jq -er '.spec.nodeName')"
j="${nodeName: -1}"
uid="$(k $i get pod test-delete-chaperon -o json | jq -er '.metadata.uid')"
k $j delete podchaperon -l multicluster.admiralty.io/parent-uid="$uid" --wait --timeout=30s
k $i wait pod test-delete-chaperon --for=delete
Expand Down
33 changes: 25 additions & 8 deletions test/e2e/delete-delegate/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,46 @@ source test/e2e/admiralty.sh

delete-delegate_test() {
i=$1
j=$2

k $j label node --all a=b --overwrite
k $i apply -f test/e2e/delete-delegate/test.yaml
k $i rollout status deploy delete-delegate
target="$(k $i get pod -l app=delete-delegate -o json | jq -er '.items[0].metadata.finalizers[0] | split("-") | .[1]')"
j="${target: -1}"
k $j delete pod -l multicluster.admiralty.io/app=delete-delegate --wait --timeout=30s
k $i wait pod test-delete-delegate --for=condition=PodScheduled

# when the cluster connection is interrupted for more than a minute,
# the delegate pod (with restart policy always) should be recreated

k $i scale deploy -n admiralty multicluster-scheduler-controller-manager --replicas=0
uid="$(k $j get pod -l multicluster.admiralty.io/app=delete-delegate -o json | jq -er '.items[0].metadata.uid')"
echo $uid
k $j delete pod -l multicluster.admiralty.io/app=delete-delegate

export -f delete-delegate_test_iteration
timeout --foreground 30s bash -c "until delete-delegate_test_iteration $j; do sleep 1; done"
timeout --foreground 120s bash -c "until delete-delegate_test_iteration $j $uid; do sleep 1; done"
# use --foreground to catch ctrl-c
# https://unix.stackexchange.com/a/233685

k $j wait pod -l multicluster.admiralty.io/app=delete-delegate --for=condition=PodScheduled
k $i delete -f test/e2e/delete-delegate/test.yaml

# when the cluster connection is working, the proxy pod should be deleted
# to respect the invariant that pods can't resuscitate

k $i scale deploy -n admiralty multicluster-scheduler-controller-manager --replicas=2
k $j delete pod -l multicluster.admiralty.io/app=delete-delegate --wait --timeout=30s

k $i wait pod test-delete-delegate --for=delete

k $j label node --all a-
}

delete-delegate_test_iteration() {
j=$1
old_uid=$2

set -euo pipefail
source test/e2e/aliases.sh

[ "$(k "$j" get pod -l multicluster.admiralty.io/app=delete-delegate | wc -l)" -eq 2 ] || return 1 # including header
new_uid="$(k "$j" get pod -l multicluster.admiralty.io/app=delete-delegate -o json | jq -er '.items[0].metadata.uid')" || return 1
[ "$new_uid" != "$old_uid" ] || return 1
}

if [[ "${BASH_SOURCE[0]:-}" == "${0}" ]]; then
Expand Down
28 changes: 12 additions & 16 deletions test/e2e/delete-delegate/test.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
apiVersion: apps/v1
kind: Deployment
apiVersion: v1
kind: Pod
metadata:
name: delete-delegate
name: test-delete-delegate
labels:
app: delete-delegate
annotations:
multicluster.admiralty.io/elect: ""
spec:
selector:
matchLabels:
app: delete-delegate
template:
metadata:
labels:
app: delete-delegate
annotations:
multicluster.admiralty.io/elect: ""
spec:
containers:
- name: pause
image: gcr.io/google_containers/pause
nodeSelector:
a: b
containers:
- name: pause
image: gcr.io/google_containers/pause
Loading

0 comments on commit 7116f41

Please sign in to comment.