From f413eeb26fbb72b46220ba9b485149f551d85e35 Mon Sep 17 00:00:00 2001 From: Slavik Panasovets Date: Tue, 4 Feb 2025 19:06:20 +0000 Subject: [PATCH] Properly start (by calling Run) all multiproject informers --- pkg/multiproject/neg/neg.go | 205 +++++++++++++++++++++----------- pkg/multiproject/start/start.go | 1 + 2 files changed, 136 insertions(+), 70 deletions(-) diff --git a/pkg/multiproject/neg/neg.go b/pkg/multiproject/neg/neg.go index ac2437be94..ea3a33f5f3 100644 --- a/pkg/multiproject/neg/neg.go +++ b/pkg/multiproject/neg/neg.go @@ -28,6 +28,9 @@ import ( "k8s.io/klog/v2" ) +// StartNEGController creates and runs a NEG controller for the specified ProviderConfig. +// The returned channel is closed by StopControllersForProviderConfig to signal a shutdown +// specific to this ProviderConfig's controller. func StartNEGController( informersFactory informers.SharedInformerFactory, kubeClient kubernetes.Interface, @@ -51,7 +54,86 @@ func StartNEGController( providerConfigName := providerConfig.Name - // Using informer factory, create required namespaced informers for the NEG controller. + // The ProviderConfig-specific stop channel. We close this in StopControllersForProviderConfig. + providerConfigStopCh := make(chan struct{}) + + // joinedStopCh will close when either the globalStopCh or providerConfigStopCh is closed. + joinedStopCh := make(chan struct{}) + go func() { + defer func() { + close(joinedStopCh) + logger.V(2).Info("NEG controller stop channel closed") + }() + select { + case <-globalStopCh: + logger.V(2).Info("Global stop channel triggered NEG controller shutdown") + case <-providerConfigStopCh: + logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown") + } + }() + + informers, err := initializeInformers(informersFactory, svcNegClient, networkClient, nodeTopologyClient, providerConfigName, joinedStopCh, logger) + if err != nil { + return nil, err + } + hasSynced := createHasSyncedFunc(informers) + + zoneGetter := zonegetter.NewZoneGetter( + informers.nodeInformer, + informers.providerConfigFilteredNodeTopologyInformer, + cloud.SubnetworkURL(), + ) + + negController := createNEGController( + kubeClient, + svcNegClient, + eventRecorderClient, + kubeSystemUID, + informers.ingressInformer, + informers.serviceInformer, + informers.podInformer, + informers.nodeInformer, + informers.endpointSliceInformer, + informers.providerConfigFilteredSvcNegInformer, + informers.providerConfigFilteredNetworkInformer, + informers.providerConfigFilteredGkeNetworkParamsInformer, + hasSynced, + cloud, + zoneGetter, + clusterNamer, + l4Namer, + lpConfig, + joinedStopCh, + logger, + ) + + go negController.Run() + return providerConfigStopCh, nil +} + +type negInformers struct { + ingressInformer cache.SharedIndexInformer + serviceInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + nodeInformer cache.SharedIndexInformer + endpointSliceInformer cache.SharedIndexInformer + providerConfigFilteredSvcNegInformer cache.SharedIndexInformer + providerConfigFilteredNetworkInformer cache.SharedIndexInformer + providerConfigFilteredGkeNetworkParamsInformer cache.SharedIndexInformer + providerConfigFilteredNodeTopologyInformer cache.SharedIndexInformer +} + +// initializeInformers wraps the base SharedIndexInformers in a providerConfig filter +// and runs them. +func initializeInformers( + informersFactory informers.SharedInformerFactory, + svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, + providerConfigName string, + logger klog.Logger, + joinedStopCh <-chan struct{}, +) (*negInformers, error) { ingressInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Networking().V1().Ingresses().Informer(), providerConfigName) serviceInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Services().Informer(), providerConfigName) podInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Pods().Informer(), providerConfigName) @@ -80,75 +162,61 @@ func StartNEGController( providerConfigFilteredNodeTopologyInformer = filteredinformer.NewProviderConfigFilteredInformer(nodeTopologyInformer, providerConfigName) } - // Create a function to check if all the informers have synced. - hasSynced := func() bool { - synced := ingressInformer.HasSynced() && - serviceInformer.HasSynced() && - podInformer.HasSynced() && - nodeInformer.HasSynced() && - endpointSliceInformer.HasSynced() + // Start them with the joinedStopCh so they properly stop + go ingressInformer.Run(joinedStopCh) + go serviceInformer.Run(joinedStopCh) + go podInformer.Run(joinedStopCh) + go nodeInformer.Run(joinedStopCh) + go endpointSliceInformer.Run(joinedStopCh) + if providerConfigFilteredSvcNegInformer != nil { + go providerConfigFilteredSvcNegInformer.Run(joinedStopCh) + } + if providerConfigFilteredNetworkInformer != nil { + go providerConfigFilteredNetworkInformer.Run(joinedStopCh) + } + if providerConfigFilteredGkeNetworkParamsInformer != nil { + go providerConfigFilteredGkeNetworkParamsInformer.Run(joinedStopCh) + } + if providerConfigFilteredNodeTopologyInformer != nil { + go providerConfigFilteredNodeTopologyInformer.Run(joinedStopCh) + } + + logger.V(2).Info("NEG informers initialized", "providerConfigName", providerConfigName) + return &negInformers{ + ingressInformer: ingressInformer, + serviceInformer: serviceInformer, + podInformer: podInformer, + nodeInformer: nodeInformer, + endpointSliceInformer: endpointSliceInformer, + providerConfigFilteredSvcNegInformer: providerConfigFilteredSvcNegInformer, + providerConfigFilteredNetworkInformer: providerConfigFilteredNetworkInformer, + providerConfigFilteredGkeNetworkParamsInformer: providerConfigFilteredGkeNetworkParamsInformer, + providerConfigFilteredNodeTopologyInformer: providerConfigFilteredNodeTopologyInformer, + }, nil +} + +func createHasSyncedFunc(informers *negInformers) func() bool { + return func() bool { + synced := informers.ingressInformer.HasSynced() && + informers.serviceInformer.HasSynced() && + informers.podInformer.HasSynced() && + informers.nodeInformer.HasSynced() && + informers.endpointSliceInformer.HasSynced() - if providerConfigFilteredSvcNegInformer != nil { - synced = synced && providerConfigFilteredSvcNegInformer.HasSynced() + if informers.providerConfigFilteredSvcNegInformer != nil { + synced = synced && informers.providerConfigFilteredSvcNegInformer.HasSynced() } - if providerConfigFilteredNetworkInformer != nil { - synced = synced && providerConfigFilteredNetworkInformer.HasSynced() + if informers.providerConfigFilteredNetworkInformer != nil { + synced = synced && informers.providerConfigFilteredNetworkInformer.HasSynced() } - if providerConfigFilteredGkeNetworkParamsInformer != nil { - synced = synced && providerConfigFilteredGkeNetworkParamsInformer.HasSynced() + if informers.providerConfigFilteredGkeNetworkParamsInformer != nil { + synced = synced && informers.providerConfigFilteredGkeNetworkParamsInformer.HasSynced() } - if providerConfigFilteredNodeTopologyInformer != nil { - synced = synced && providerConfigFilteredNodeTopologyInformer.HasSynced() + if informers.providerConfigFilteredNodeTopologyInformer != nil { + synced = synced && informers.providerConfigFilteredNodeTopologyInformer.HasSynced() } return synced } - - zoneGetter := zonegetter.NewZoneGetter(nodeInformer, providerConfigFilteredNodeTopologyInformer, cloud.SubnetworkURL()) - - // Create a channel to stop the controller for this specific provider config. - providerConfigStopCh := make(chan struct{}) - - // joinedStopCh is a channel that will be closed when the global stop channel or the provider config stop channel is closed. - joinedStopCh := make(chan struct{}) - go func() { - defer func() { - close(joinedStopCh) - logger.V(2).Info("NEG controller stop channel closed") - }() - select { - case <-globalStopCh: - logger.V(2).Info("Global stop channel triggered NEG controller shutdown") - case <-providerConfigStopCh: - logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown") - } - }() - - negController := createNEGController( - kubeClient, - svcNegClient, - eventRecorderClient, - kubeSystemUID, - ingressInformer, - serviceInformer, - podInformer, - nodeInformer, - endpointSliceInformer, - providerConfigFilteredSvcNegInformer, - providerConfigFilteredNetworkInformer, - providerConfigFilteredGkeNetworkParamsInformer, - hasSynced, - cloud, - zoneGetter, - clusterNamer, - l4Namer, - lpConfig, - joinedStopCh, - logger, - ) - - go negController.Run() - - return providerConfigStopCh, nil } func createNEGController( @@ -174,18 +242,15 @@ func createNEGController( logger klog.Logger, ) *neg.Controller { - // The following adapter will use Network Selflink as Network Url instead of the NetworkUrl itself. - // Network Selflink is always composed by the network name even if the cluster was initialized with Network Id. - // All the components created from it will be consistent and always use the Url with network name and not the url with netowork Id + // The adapter uses Network SelfLink adapter, err := network.NewAdapterNetworkSelfLink(cloud) if err != nil { - logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard cloud network provider") + logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard provider") adapter = cloud } - noDefaultBackendServicePort := utils.ServicePort{} // we don't need default backend service port for standalone NEGs. - - var noNodeTopologyInformer cache.SharedIndexInformer = nil + noDefaultBackendServicePort := utils.ServicePort{} + var noNodeTopologyInformer cache.SharedIndexInformer asmServiceNEGSkipNamespaces := []string{} enableASM := false diff --git a/pkg/multiproject/start/start.go b/pkg/multiproject/start/start.go index 1c81ef44de..675c46736a 100644 --- a/pkg/multiproject/start/start.go +++ b/pkg/multiproject/start/start.go @@ -135,6 +135,7 @@ func Start( informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Providerconfig().V1().ProviderConfigs().Informer() + go providerConfigInformer.Run(stopCh) manager := manager.NewProviderConfigControllerManager( kubeClient,