Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multikueue] Cluster connection monitoring and reconnect. #1806

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestWlReconcileJobset(t *testing.T) {
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.worker1Workloads}, &jobset.JobSetList{Items: tc.worker1JobSets})
worker1Client := worker1Builder.Build()

w1remoteClient := newRemoteClient(managerClient, nil, defaultOrigin)
w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w1remoteClient.client = worker1Client

cRec.remoteClients["worker1"] = w1remoteClient
Expand Down
140 changes: 104 additions & 36 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -36,39 +37,61 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

const (
eventChBufferSize = 10

// this set will provide waiting time between 5s to 5min
retryIncrement = 5 * time.Second
trasc marked this conversation as resolved.
Show resolved Hide resolved
retryMaxSteps = 6
)

// retryAfter returns an exponentially increasing interval between
// retryIncrement and 2^retryMaxSteps * retryIncrement
func retryAfter(failedAttempts uint) time.Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I would leave the backoff calculations to the built-in mechanisms, if feasible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being able to connect should not be seen as a reconcile error in my opinion, as it is not related to k8s state.
Also with this we maintain the control over the retry timing.

Copy link
Contributor

@mimowo mimowo Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being able to connect should not be seen as a reconcile error in my opinion, as it is not related to k8s state.

In most cases when Kueue sends a request from node to kube API server, and the API server drops the request we handle the failure as reconcile error.

However, this is "internal" (within cluster) connect error, maybe for external connect errors longer baseDelay is preferred indeed.

Also with this we maintain the control over the retry timing.

I see, I just have a preference for the KISS principle, we could introduce our timing mechanism later when proven to be needed.

However, on the fence here, because maybe communication with external cluster higher baseDelay is preferred indeed. WDYT @alculquicondor ?

If case we want to control the timings, is it much of complication to use the standard rate limiting queue, like for example here. Then we could pass the baseDelay and maxDelay. However, if this is a big complication, I'm fine as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use the Backoff class from k8s.io/apimachinery/pkg/util/wait

But on the nit side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trasc if you prefer to keep the custom timings, I'm fine just do a quick review if we can simplify the code by using the rate limiter or the package suggested by Aldo, so that we avoid reinventing the wheel. If you find this is the simplest approach. I'm ok, but please review the options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look at Backoff in k8s.io/apimachinery/pkg/util/wait but it's a bit overkill for what we are doing here.
Another thing I was thinking of was to just double the time since the cluster was declared inactive , so if it failed 5min ago , we try now and failed again retry in 5 min, the plus side of this being that we don't need to keep an internal state but the behavior is harder to predict.

return (1 << min(failedAttempts, retryMaxSteps)) * retryIncrement
trasc marked this conversation as resolved.
Show resolved Hide resolved
}

type clientWithWatchBuilder func(config []byte, options client.Options) (client.WithWatch, error)

type remoteClient struct {
localClient client.Client
client client.WithWatch
wlUpdateCh chan<- event.GenericEvent
watchCancel map[string]func()
kubeconfig []byte
origin string
clusterName string
localClient client.Client
client client.WithWatch
wlUpdateCh chan<- event.GenericEvent
watchEndedCh chan<- event.GenericEvent
watchCancel func()
kubeconfig []byte
origin string

forceReconnect atomic.Bool
failedConnAttempts uint

// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
// and creating valid kubeconfig content is not trivial.
// The full client creation and usage is validated in the integration and e2e tests.
builderOverride clientWithWatchBuilder
}

func newRemoteClient(localClient client.Client, wlUpdateCh chan<- event.GenericEvent, origin string) *remoteClient {
func newRemoteClient(localClient client.Client, wlUpdateCh, watchEndedCh chan<- event.GenericEvent, origin, clusterName string) *remoteClient {
rc := &remoteClient{
wlUpdateCh: wlUpdateCh,
localClient: localClient,
origin: origin,
watchCancel: map[string]func(){},
clusterName: clusterName,
wlUpdateCh: wlUpdateCh,
watchEndedCh: watchEndedCh,
localClient: localClient,
origin: origin,
}
return rc
}
Expand Down Expand Up @@ -107,27 +130,37 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN
}

// setConfig - will try to recreate the k8s client and restart watching if the new config is different than
// the one currently used.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) error {
if equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig) {
return nil
// the one currently used or a reconnect was requested.
// If the encountered error is not permanent the duration after which a retry should be done is returned.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) {
configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig)
if !configChanged && !rc.forceReconnect.Load() {
return nil, nil
}

rc.StopWatchers()
if configChanged {
rc.kubeconfig = kubeconfig
rc.failedConnAttempts = 0
}

builder := newClientWithWatch
if rc.builderOverride != nil {
builder = rc.builderOverride
}
remoteClient, err := builder(kubeconfig, client.Options{Scheme: rc.localClient.Scheme()})
if err != nil {
return err
return nil, err
}

rc.client = remoteClient

watchCtx, rc.watchCancel = context.WithCancel(watchCtx)
err = rc.startWatcher(watchCtx, kueue.GroupVersion.WithKind("Workload").GroupKind().String(), &workloadKueueWatcher{})
if err != nil {
return err
retryIn := retryAfter(rc.failedConnAttempts)
rc.failedConnAttempts++
return &retryIn, err
}

// add a watch for all the adapters implementing multiKueueWatcher
Expand All @@ -141,12 +174,15 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) e
// not being able to setup a watcher is not ideal but we can function with only the wl watcher.
ctrl.LoggerFrom(watchCtx).V(2).Error(err, "Unable to start the watcher", "kind", kind)
// however let's not accept this for now.
return err
retryIn := retryAfter(rc.failedConnAttempts)
rc.failedConnAttempts++
return &retryIn, err
}
}

rc.kubeconfig = kubeconfig
return nil
rc.forceReconnect.Store(false)
rc.failedConnAttempts = 0
return nil, nil
}

func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKueueWatcher) error {
Expand All @@ -158,7 +194,6 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu

go func() {
log.V(2).Info("Starting watch")
defer log.V(2).Info("Watch ended")
for r := range newWatcher.ResultChan() {
wlKey, err := w.GetWorkloadKey(r.Object)
if err != nil {
Expand All @@ -167,16 +202,23 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu
rc.queueWorkloadEvent(ctx, wlKey)
}
}
log.V(2).Info("Watch ended", "ctxErr", ctx.Err())
// If the context is not yet Done , queue a reconcile to attempt reconnection
if ctx.Err() == nil {
oldReconnect := rc.forceReconnect.Swap(true)
//reconnect if this is the first watch failing.
if !oldReconnect {
log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName)
rc.queueWatchEndedEvent(ctx)
}
}
}()

rc.watchCancel[kind] = newWatcher.Stop
return nil
}

func (rc *remoteClient) StopWatchers() {
for kind, stop := range rc.watchCancel {
stop()
delete(rc.watchCancel, kind)
if rc.watchCancel != nil {
rc.watchCancel()
}
}

Expand All @@ -191,6 +233,15 @@ func (rc *remoteClient) queueWorkloadEvent(ctx context.Context, wlKey types.Name
}
}

func (rc *remoteClient) queueWatchEndedEvent(ctx context.Context) {
cluster := &kueuealpha.MultiKueueCluster{}
if err := rc.localClient.Get(ctx, types.NamespacedName{Name: rc.clusterName}, cluster); err == nil {
rc.watchEndedCh <- event.GenericEvent{Object: cluster}
} else {
ctrl.LoggerFrom(ctx).Error(err, "sending watch ended event")
}
}

// runGC - lists all the remote workloads having the same multikueue-origin and remove those who
// no longer have a local correspondent (missing or awaiting deletion). If the remote workload
// is owned by a job, also delete the job.
Expand Down Expand Up @@ -264,6 +315,10 @@ type clustersReconciler struct {
// and creating valid kubeconfig content is not trivial.
// The full client creation and usage is validated in the integration and e2e tests.
builderOverride clientWithWatchBuilder

// watchEndedCh - an event chan used to request the reconciliation of the clusters for which the watch loop
// has ended (connection lost).
watchEndedCh chan event.GenericEvent
}

var _ manager.Runnable = (*clustersReconciler)(nil)
Expand All @@ -284,13 +339,13 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) {
}
}

func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte, origin string) error {
func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, kubeconfig []byte, origin string) (*time.Duration, error) {
c.lock.Lock()
defer c.lock.Unlock()

client, found := c.remoteClients[clusterName]
if !found {
client = newRemoteClient(c.localClient, c.wlUpdateCh, origin)
client = newRemoteClient(c.localClient, c.wlUpdateCh, c.watchEndedCh, origin, clusterName)
if c.builderOverride != nil {
client.builderOverride = c.builderOverride
}
Expand All @@ -300,12 +355,11 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN
clientLog := ctrl.LoggerFrom(c.rootContext).WithValues("clusterName", clusterName)
clientCtx := ctrl.LoggerInto(c.rootContext, clientLog)

if err := client.setConfig(clientCtx, kubeconfig); err != nil {
if retryAfter, err := client.setConfig(clientCtx, kubeconfig); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to set kubeConfig in the remote client")
delete(c.remoteClients, clusterName)
return err
return retryAfter, err
}
return nil
return nil, nil
}

func (a *clustersReconciler) controllerFor(acName string) (*remoteClient, bool) {
Expand Down Expand Up @@ -343,9 +397,13 @@ func (c *clustersReconciler) Reconcile(ctx context.Context, req reconcile.Reques
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "BadConfig", err.Error())
}

if err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, c.origin); err != nil {
log.Error(err, "setting kubeconfig")
return reconcile.Result{}, c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error())
if retryAfter, err := c.setRemoteClientConfig(ctx, cluster.Name, kubeConfig, c.origin); err != nil {
log.Error(err, "setting kubeconfig", "retryAfter", retryAfter)
if err := c.updateStatus(ctx, cluster, false, "ClientConnectionFailed", err.Error()); err != nil {
return reconcile.Result{}, err
} else {
return reconcile.Result{RequeueAfter: ptr.Deref(retryAfter, 0)}, nil
}
}
return reconcile.Result{}, c.updateStatus(ctx, cluster, true, "Active", "Connected")
}
Expand Down Expand Up @@ -433,9 +491,10 @@ func newClustersReconciler(c client.Client, namespace string, gcInterval time.Du
localClient: c,
configNamespace: namespace,
remoteClients: make(map[string]*remoteClient),
wlUpdateCh: make(chan event.GenericEvent, 10),
wlUpdateCh: make(chan event.GenericEvent, eventChBufferSize),
gcInterval: gcInterval,
origin: origin,
watchEndedCh: make(chan event.GenericEvent, eventChBufferSize),
}
}

Expand All @@ -445,9 +504,18 @@ func (c *clustersReconciler) setupWithManager(mgr ctrl.Manager) error {
return err
}

syncHndl := handler.Funcs{
GenericFunc: func(_ context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: e.Object.GetName(),
}})
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&kueuealpha.MultiKueueCluster{}).
Watches(&corev1.Secret{}, &secretHandler{client: c.localClient}).
WatchesRawSource(&source.Channel{Source: c.watchEndedCh}, syncHndl).
Complete(c)
}

Expand Down
Loading