Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiKueue skip garbage collection for disconnected clients. #2369

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func TestWlReconcileJobset(t *testing.T) {

w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w1remoteClient.client = worker1Client
w1remoteClient.connecting.Store(false)

cRec.remoteClients["worker1"] = w1remoteClient

Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type remoteClient struct {
kubeconfig []byte
origin string

pendingReconnect atomic.Bool
connecting atomic.Bool
failedConnAttempts uint

// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
Expand All @@ -98,6 +98,7 @@ func newRemoteClient(localClient client.Client, wlUpdateCh, watchEndedCh chan<-
localClient: localClient,
origin: origin,
}
rc.connecting.Store(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for the skip itself or it fixes another scenario?

Copy link
Contributor Author

@trasc trasc Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed, it's one of the two key parts of the fix, the flaky panic wold come up when the GC ran during the initial connect attempt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

return rc
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN
// If the encountered error is not permanent the duration after which a retry should be done is returned.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) {
configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig)
if !configChanged && !rc.pendingReconnect.Load() {
if !configChanged && !rc.connecting.Load() {
return nil, nil
}

Expand Down Expand Up @@ -183,7 +184,7 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (
}
}

rc.pendingReconnect.Store(false)
rc.connecting.Store(false)
rc.failedConnAttempts = 0
return nil, nil
}
Expand Down Expand Up @@ -218,9 +219,9 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu
log.V(2).Info("Watch ended", "ctxErr", ctx.Err())
// If the context is not yet Done , queue a reconcile to attempt reconnection
if ctx.Err() == nil {
oldReconnect := rc.pendingReconnect.Swap(true)
oldConnecting := rc.connecting.Swap(true)
// reconnect if this is the first watch failing.
if !oldReconnect {
if !oldConnecting {
log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName)
rc.queueWatchEndedEvent(ctx)
}
Expand Down Expand Up @@ -258,6 +259,12 @@ func (rc *remoteClient) queueWatchEndedEvent(ctx context.Context) {
// is owned by a job, also delete the job.
func (rc *remoteClient) runGC(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)

if rc.connecting.Load() {
log.V(5).Info("Skip disconnected client")
return
}

lst := &kueue.WorkloadList{}
err := rc.client.List(ctx, lst, client.MatchingLabels{kueuealpha.MultiKueueOriginLabel: rc.origin})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newTestClient(config string, watchCancel func()) *remoteClient {

func setReconnectState(rc *remoteClient, a uint) *remoteClient {
rc.failedConnAttempts = a
rc.pendingReconnect.Store(true)
rc.connecting.Store(true)
return rc
}

Expand Down Expand Up @@ -535,6 +535,7 @@ func TestRemoteClientGC(t *testing.T) {

w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w1remoteClient.client = worker1Client
w1remoteClient.connecting.Store(false)

w1remoteClient.runGC(ctx)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m
for _, clusterName := range cfg.Spec.Clusters {
if client, found := w.clusters.controllerFor(clusterName); found {
// Skip the client if its reconnect is ongoing.
if !client.pendingReconnect.Load() {
if !client.connecting.Load() {
clients[clusterName] = client
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ func TestWlReconcile(t *testing.T) {

w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w1remoteClient.client = worker1Client
w1remoteClient.connecting.Store(false)
cRec.remoteClients["worker1"] = w1remoteClient

var worker2Client client.WithWatch
Expand Down Expand Up @@ -1042,8 +1043,8 @@ func TestWlReconcile(t *testing.T) {

w2remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w2remoteClient.client = worker2Client
if tc.worker2Reconnecting {
w2remoteClient.pendingReconnect.Store(true)
if !tc.worker2Reconnecting {
w2remoteClient.connecting.Store(false)
}
cRec.remoteClients["worker2"] = w2remoteClient
}
Expand Down