From 5117780de8f78e62c73c850b6da60002084a80f5 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 6 Jun 2024 11:39:48 +0300 Subject: [PATCH] [multikueue][GC] Skip disconnected clients. --- .../multikueue/jobset_adapter_test.go | 1 + .../multikueue/multikueuecluster.go | 17 ++++++++++++----- .../multikueue/multikueuecluster_test.go | 3 ++- .../admissionchecks/multikueue/workload.go | 2 +- .../admissionchecks/multikueue/workload_test.go | 5 +++-- 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index 7050d27e8b..c9f8e7a012 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -342,6 +342,7 @@ func TestWlReconcileJobset(t *testing.T) { w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "") w1remoteClient.client = worker1Client + w1remoteClient.connecting.Store(false) cRec.remoteClients["worker1"] = w1remoteClient diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 6635d3ef8a..15af85b2d6 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -81,7 +81,7 @@ type remoteClient struct { kubeconfig []byte origin string - pendingReconnect atomic.Bool + connecting atomic.Bool failedConnAttempts uint // For unit testing only. There is now need of creating fully functional remote clients in the unit tests @@ -98,6 +98,7 @@ func newRemoteClient(localClient client.Client, wlUpdateCh, watchEndedCh chan<- localClient: localClient, origin: origin, } + rc.connecting.Store(true) return rc } @@ -139,7 +140,7 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN // If the encountered error is not permanent the duration after which a retry should be done is returned. func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) { configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig) - if !configChanged && !rc.pendingReconnect.Load() { + if !configChanged && !rc.connecting.Load() { return nil, nil } @@ -183,7 +184,7 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) ( } } - rc.pendingReconnect.Store(false) + rc.connecting.Store(false) rc.failedConnAttempts = 0 return nil, nil } @@ -218,9 +219,9 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu log.V(2).Info("Watch ended", "ctxErr", ctx.Err()) // If the context is not yet Done , queue a reconcile to attempt reconnection if ctx.Err() == nil { - oldReconnect := rc.pendingReconnect.Swap(true) + oldConnecting := rc.connecting.Swap(true) // reconnect if this is the first watch failing. - if !oldReconnect { + if !oldConnecting { log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName) rc.queueWatchEndedEvent(ctx) } @@ -258,6 +259,12 @@ func (rc *remoteClient) queueWatchEndedEvent(ctx context.Context) { // is owned by a job, also delete the job. func (rc *remoteClient) runGC(ctx context.Context) { log := ctrl.LoggerFrom(ctx) + + if rc.connecting.Load() { + log.V(5).Info("Skip disconnected client") + return + } + lst := &kueue.WorkloadList{} err := rc.client.List(ctx, lst, client.MatchingLabels{kueuealpha.MultiKueueOriginLabel: rc.origin}) if err != nil { diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index fb05a27bf0..b841f24341 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -76,7 +76,7 @@ func newTestClient(config string, watchCancel func()) *remoteClient { func setReconnectState(rc *remoteClient, a uint) *remoteClient { rc.failedConnAttempts = a - rc.pendingReconnect.Store(true) + rc.connecting.Store(true) return rc } @@ -535,6 +535,7 @@ func TestRemoteClientGC(t *testing.T) { w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "") w1remoteClient.client = worker1Client + w1remoteClient.connecting.Store(false) w1remoteClient.runGC(ctx) diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index e09ce05cfc..5c7dfbc3f1 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -247,7 +247,7 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m for _, clusterName := range cfg.Spec.Clusters { if client, found := w.clusters.controllerFor(clusterName); found { // Skip the client if its reconnect is ongoing. - if !client.pendingReconnect.Load() { + if !client.connecting.Load() { clients[clusterName] = client } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index a99e9e7335..cfa577ecce 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -1012,6 +1012,7 @@ func TestWlReconcile(t *testing.T) { w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "") w1remoteClient.client = worker1Client + w1remoteClient.connecting.Store(false) cRec.remoteClients["worker1"] = w1remoteClient var worker2Client client.WithWatch @@ -1042,8 +1043,8 @@ func TestWlReconcile(t *testing.T) { w2remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "") w2remoteClient.client = worker2Client - if tc.worker2Reconnecting { - w2remoteClient.pendingReconnect.Store(true) + if !tc.worker2Reconnecting { + w2remoteClient.connecting.Store(false) } cRec.remoteClients["worker2"] = w2remoteClient }