Skip to content

Commit

Permalink
[multikueue] Cluster connection monitoring and reconnect. (kubernetes…
Browse files Browse the repository at this point in the history
…-sigs#1806)

* [multikueue] Cluster connection monitoring and reconnect.

* Review Remarks
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent fdf4684 commit 898e39c
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestWlReconcileJobset(t *testing.T) {
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &jobset.JobSetList{Items: tc.worker1JobSets})
worker1Client := worker1Builder.Build()

w1remoteClient := newRemoteClient(managerClient, nil, defaultOrigin)
w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w1remoteClient.client = worker1Client

cRec.remoteClients["worker1"] = w1remoteClient
Expand Down
141 changes: 105 additions & 36 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -36,39 +37,64 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

const (
eventChBufferSize = 10

// this set will provide waiting time between 0 to 5m20s
retryIncrement = 5 * time.Second
retryMaxSteps = 7
)

// retryAfter returns an exponentially increasing interval between
// 0 and 2^(retryMaxSteps-1) * retryIncrement
func retryAfter(failedAttempts uint) time.Duration {
if failedAttempts == 0 {
return 0
}
return (1 << (min(failedAttempts, retryMaxSteps) - 1)) * retryIncrement
}

type clientWithWatchBuilder func(config []byte, options client.Options) (client.WithWatch, error)

type remoteClient struct {
localClient client.Client
client client.WithWatch
wlUpdateCh chan<- event.GenericEvent
watchCancel map[string]func()
kubeconfig []byte
origin string
clusterName string
localClient client.Client
client client.WithWatch
wlUpdateCh chan<- event.GenericEvent
watchEndedCh chan<- event.GenericEvent
watchCancel func()
kubeconfig []byte
origin string

forceReconnect atomic.Bool
failedConnAttempts uint

// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
// and creating valid kubeconfig content is not trivial.
// The full client creation and usage is validated in the integration and e2e tests.
builderOverride clientWithWatchBuilder
}

func newRemoteClient(localClient client.Client, wlUpdateCh chan<- event.GenericEvent, origin string) *remoteClient {
func newRemoteClient(localClient client.Client, wlUpdateCh, watchEndedCh chan<- event.GenericEvent, origin, clusterName string) *remoteClient {
rc := &remoteClient{
wlUpdateCh: wlUpdateCh,
localClient: localClient,
origin: origin,
watchCancel: map[string]func(){},
clusterName: clusterName,
wlUpdateCh: wlUpdateCh,
watchEndedCh: watchEndedCh,
localClient: localClient,
origin: origin,
}
return rc
}
Expand Down Expand Up @@ -107,27 +133,36 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN
}

// setConfig - will try to recreate the k8s client and restart watching if the new config is different than
// the one currently used.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) error {
if equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig) {
return nil
// the one currently used or a reconnect was requested.
// If the encountered error is not permanent the duration after which a retry should be done is returned.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) {
configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig)
if !configChanged && !rc.forceReconnect.Load() {
return nil, nil
}

rc.StopWatchers()
if configChanged {
rc.kubeconfig = kubeconfig
rc.failedConnAttempts = 0
}

builder := newClientWithWatch
if rc.builderOverride != nil {
builder = rc.builderOverride
}
remoteClient, err := builder(kubeconfig, client.Options{Scheme: rc.localClient.Scheme()})
if err != nil {
return err
return nil, err
}

rc.client = remoteClient

watchCtx, rc.watchCancel = context.WithCancel(watchCtx)
err = rc.startWatcher(watchCtx, kueue.GroupVersion.WithKind("Workload").GroupKind().String(), &workloadKueueWatcher{})
if err != nil {
return err
rc.failedConnAttempts++
return ptr.To(retryAfter(rc.failedConnAttempts)), err
}

// add a watch for all the adapters implementing multiKueueWatcher
Expand All @@ -141,12 +176,14 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) e
// not being able to setup a watcher is not ideal but we can function with only the wl watcher.
ctrl.LoggerFrom(watchCtx).V(2).Error(err, "Unable to start the watcher", "kind", kind)
// however let's not accept this for now.
return err
rc.failedConnAttempts++
return ptr.To(retryAfter(rc.failedConnAttempts)), err
}
}

rc.kubeconfig = kubeconfig
return nil
rc.forceReconnect.Store(false)
rc.failedConnAttempts = 0
return nil, nil
}

func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKueueWatcher) error {
Expand All @@ -158,7 +195,6 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu

go func() {
log.V(2).Info("Starting watch")
defer log.V(2).Info("Watch ended")
for r := range newWatcher.ResultChan() {
wlKey, err := w.GetWorkloadKey(r.Object)
if err != nil {
Expand All @@ -167,16 +203,23 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu
rc.queueWorkloadEvent(ctx, wlKey)
}
}
log.V(2).Info("Watch ended", "ctxErr", ctx.Err())
// If the context is not yet Done , queue a reconcile to attempt reconnection
if ctx.Err() == nil {
oldReconnect := rc.forceReconnect.Swap(true)
//reconnect if this is the first watch failing.
if !oldReconnect {
log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName)
rc.queueWatchEndedEvent(ctx)
}
}
}()

rc.watchCancel[kind] = newWatcher.Stop
return nil
}

func (rc *remoteClient) StopWatchers() {
for kind, stop := range rc.watchCancel {
stop()
delete(rc.watchCancel, kind)
if rc.watchCancel != nil {
rc.watchCancel()
}
}

Expand All @@ -191,6 +234,15 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, wlKey types.Name
}
}

func (rc *remoteClient) queueWatchEndedEvent(ctx context.Context) {
cluster := &kueuealpha.MultiKueueCluster{}
if err := rc.localClient.Get(ctx, types.NamespacedName{Name: rc.clusterName}, cluster); err == nil {
rc.watchEndedCh <- event.GenericEvent{Object: cluster}
} else {
ctrl.LoggerFrom(ctx).Error(err, "sending watch ended event")
}
}

// runGC - lists all the remote workloads having the same multikueue-origin and remove those who
// no longer have a local correspondent (missing or awaiting deletion). If the remote workload
// is owned by a job, also delete the job.
Expand Down Expand Up @@ -264,6 +316,10 @@ type clustersReconciler struct {
// and creating valid kubeconfig content is not trivial.
// The full client creation and usage is validated in the integration and e2e tests.
builderOverride clientWithWatchBuilder

// watchEndedCh - an event chan used to request the reconciliation of the clusters for which the watch loop
// has ended (connection lost).
watchEndedCh chan event.GenericEvent
}

var _ manager.Runnable = (*clustersReconciler)(nil)
Expand All @@ -284,13 +340,13 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) {
}
}

func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte, origin string) error {
func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte, origin string) (*time.Duration, error) {
c.lock.Lock()
defer c.lock.Unlock()

client, found := c.remoteClients[clusterName]
if !found {
client = newRemoteClient(c.localClient, c.wlUpdateCh, origin)
client = newRemoteClient(c.localClient, c.wlUpdateCh, c.watchEndedCh, origin, clusterName)
if c.builderOverride != nil {
client.builderOverride = c.builderOverride
}
Expand All @@ -300,12 +356,11 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN
clientLog := ctrl.LoggerFrom(c.rootContext).WithValues("clusterName", clusterName)
clientCtx := ctrl.LoggerInto(c.rootContext, clientLog)

if err := client.setConfig(clientCtx, kubeconfig); err != nil {
if retryAfter, err := client.setConfig(clientCtx, kubeconfig); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to set kubeConfig in the remote client")
delete(c.remoteClients, clusterName)
return err
return retryAfter, err
}
return nil
return nil, nil
}

func (a *clustersReconciler) controllerFor(acName string) (*remoteClient, bool) {
Expand Down Expand Up @@ -343,9 +398,13 @@ func (c *clustersReconciler) Reconcile(ctx context.Context, req reconcile.Reques
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "BadConfig", err.Error())
}

if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, c.origin); err != nil {
log.Error(err, "setting kubeconfig")
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error())
if retryAfter, err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, c.origin); err != nil {
log.Error(err, "setting kubeconfig", "retryAfter", retryAfter)
if err := c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error()); err != nil {
return reconcile.Result{}, err
} else {
return reconcile.Result{RequeueAfter: ptr.Deref(retryAfter, 0)}, nil
}
}
return reconcile.Result{}, c.updateStatus(ctx, cluster, true, "Active", "Connected")
}
Expand Down Expand Up @@ -433,9 +492,10 @@ func newClustersReconciler(c client.Client, namespace string, gcInterval time.Du
localClient: c,
configNamespace: namespace,
remoteClients: make(map[string]*remoteClient),
wlUpdateCh: make(chan event.GenericEvent, 10),
wlUpdateCh: make(chan event.GenericEvent, eventChBufferSize),
gcInterval: gcInterval,
origin: origin,
watchEndedCh: make(chan event.GenericEvent, eventChBufferSize),
}
}

Expand All @@ -445,9 +505,18 @@ func (c *clustersReconciler) setupWithManager(mgr ctrl.Manager) error {
return err
}

syncHndl := handler.Funcs{
GenericFunc: func(_ context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
}})
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&kueuealpha.MultiKueueCluster{}).
Watches(&corev1.Secret{}, &secretHandler{client: c.localClient}).
WatchesRawSource(&source.Channel{Source: c.watchEndedCh}, syncHndl).
Complete(c)
}

Expand Down
Loading

0 comments on commit 898e39c

Please sign in to comment.