Skip to content

Commit

Permalink
Properly start (by calling Run) all multiproject informers
Browse files Browse the repository at this point in the history
  • Loading branch information
panslava committed Feb 4, 2025
1 parent d2ddae9 commit f413eeb
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 70 deletions.
205 changes: 135 additions & 70 deletions pkg/multiproject/neg/neg.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"k8s.io/klog/v2"
)

// StartNEGController creates and runs a NEG controller for the specified ProviderConfig.
// The returned channel is closed by StopControllersForProviderConfig to signal a shutdown
// specific to this ProviderConfig's controller.
func StartNEGController(
informersFactory informers.SharedInformerFactory,
kubeClient kubernetes.Interface,
Expand All @@ -51,7 +54,86 @@ func StartNEGController(

providerConfigName := providerConfig.Name

// Using informer factory, create required namespaced informers for the NEG controller.
// The ProviderConfig-specific stop channel. We close this in StopControllersForProviderConfig.
providerConfigStopCh := make(chan struct{})

// joinedStopCh will close when either the globalStopCh or providerConfigStopCh is closed.
joinedStopCh := make(chan struct{})
go func() {
defer func() {
close(joinedStopCh)
logger.V(2).Info("NEG controller stop channel closed")
}()
select {
case <-globalStopCh:
logger.V(2).Info("Global stop channel triggered NEG controller shutdown")
case <-providerConfigStopCh:
logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown")
}
}()

informers, err := initializeInformers(informersFactory, svcNegClient, networkClient, nodeTopologyClient, providerConfigName, joinedStopCh, logger)
if err != nil {
return nil, err
}
hasSynced := createHasSyncedFunc(informers)

zoneGetter := zonegetter.NewZoneGetter(
informers.nodeInformer,
informers.providerConfigFilteredNodeTopologyInformer,
cloud.SubnetworkURL(),
)

negController := createNEGController(
kubeClient,
svcNegClient,
eventRecorderClient,
kubeSystemUID,
informers.ingressInformer,
informers.serviceInformer,
informers.podInformer,
informers.nodeInformer,
informers.endpointSliceInformer,
informers.providerConfigFilteredSvcNegInformer,
informers.providerConfigFilteredNetworkInformer,
informers.providerConfigFilteredGkeNetworkParamsInformer,
hasSynced,
cloud,
zoneGetter,
clusterNamer,
l4Namer,
lpConfig,
joinedStopCh,
logger,
)

go negController.Run()
return providerConfigStopCh, nil
}

type negInformers struct {
ingressInformer cache.SharedIndexInformer
serviceInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
nodeInformer cache.SharedIndexInformer
endpointSliceInformer cache.SharedIndexInformer
providerConfigFilteredSvcNegInformer cache.SharedIndexInformer
providerConfigFilteredNetworkInformer cache.SharedIndexInformer
providerConfigFilteredGkeNetworkParamsInformer cache.SharedIndexInformer
providerConfigFilteredNodeTopologyInformer cache.SharedIndexInformer
}

// initializeInformers wraps the base SharedIndexInformers in a providerConfig filter
// and runs them.
func initializeInformers(
informersFactory informers.SharedInformerFactory,
svcNegClient svcnegclient.Interface,
networkClient networkclient.Interface,
nodeTopologyClient nodetopologyclient.Interface,
providerConfigName string,
logger klog.Logger,
joinedStopCh <-chan struct{},
) (*negInformers, error) {
ingressInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Networking().V1().Ingresses().Informer(), providerConfigName)
serviceInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Services().Informer(), providerConfigName)
podInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Pods().Informer(), providerConfigName)
Expand Down Expand Up @@ -80,75 +162,61 @@ func StartNEGController(
providerConfigFilteredNodeTopologyInformer = filteredinformer.NewProviderConfigFilteredInformer(nodeTopologyInformer, providerConfigName)
}

// Create a function to check if all the informers have synced.
hasSynced := func() bool {
synced := ingressInformer.HasSynced() &&
serviceInformer.HasSynced() &&
podInformer.HasSynced() &&
nodeInformer.HasSynced() &&
endpointSliceInformer.HasSynced()
// Start them with the joinedStopCh so they properly stop
go ingressInformer.Run(joinedStopCh)
go serviceInformer.Run(joinedStopCh)
go podInformer.Run(joinedStopCh)
go nodeInformer.Run(joinedStopCh)
go endpointSliceInformer.Run(joinedStopCh)
if providerConfigFilteredSvcNegInformer != nil {
go providerConfigFilteredSvcNegInformer.Run(joinedStopCh)
}
if providerConfigFilteredNetworkInformer != nil {
go providerConfigFilteredNetworkInformer.Run(joinedStopCh)
}
if providerConfigFilteredGkeNetworkParamsInformer != nil {
go providerConfigFilteredGkeNetworkParamsInformer.Run(joinedStopCh)
}
if providerConfigFilteredNodeTopologyInformer != nil {
go providerConfigFilteredNodeTopologyInformer.Run(joinedStopCh)
}

logger.V(2).Info("NEG informers initialized", "providerConfigName", providerConfigName)
return &negInformers{
ingressInformer: ingressInformer,
serviceInformer: serviceInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
endpointSliceInformer: endpointSliceInformer,
providerConfigFilteredSvcNegInformer: providerConfigFilteredSvcNegInformer,
providerConfigFilteredNetworkInformer: providerConfigFilteredNetworkInformer,
providerConfigFilteredGkeNetworkParamsInformer: providerConfigFilteredGkeNetworkParamsInformer,
providerConfigFilteredNodeTopologyInformer: providerConfigFilteredNodeTopologyInformer,
}, nil
}

func createHasSyncedFunc(informers *negInformers) func() bool {
return func() bool {
synced := informers.ingressInformer.HasSynced() &&
informers.serviceInformer.HasSynced() &&
informers.podInformer.HasSynced() &&
informers.nodeInformer.HasSynced() &&
informers.endpointSliceInformer.HasSynced()

if providerConfigFilteredSvcNegInformer != nil {
synced = synced && providerConfigFilteredSvcNegInformer.HasSynced()
if informers.providerConfigFilteredSvcNegInformer != nil {
synced = synced && informers.providerConfigFilteredSvcNegInformer.HasSynced()
}
if providerConfigFilteredNetworkInformer != nil {
synced = synced && providerConfigFilteredNetworkInformer.HasSynced()
if informers.providerConfigFilteredNetworkInformer != nil {
synced = synced && informers.providerConfigFilteredNetworkInformer.HasSynced()
}
if providerConfigFilteredGkeNetworkParamsInformer != nil {
synced = synced && providerConfigFilteredGkeNetworkParamsInformer.HasSynced()
if informers.providerConfigFilteredGkeNetworkParamsInformer != nil {
synced = synced && informers.providerConfigFilteredGkeNetworkParamsInformer.HasSynced()
}
if providerConfigFilteredNodeTopologyInformer != nil {
synced = synced && providerConfigFilteredNodeTopologyInformer.HasSynced()
if informers.providerConfigFilteredNodeTopologyInformer != nil {
synced = synced && informers.providerConfigFilteredNodeTopologyInformer.HasSynced()
}
return synced
}

zoneGetter := zonegetter.NewZoneGetter(nodeInformer, providerConfigFilteredNodeTopologyInformer, cloud.SubnetworkURL())

// Create a channel to stop the controller for this specific provider config.
providerConfigStopCh := make(chan struct{})

// joinedStopCh is a channel that will be closed when the global stop channel or the provider config stop channel is closed.
joinedStopCh := make(chan struct{})
go func() {
defer func() {
close(joinedStopCh)
logger.V(2).Info("NEG controller stop channel closed")
}()
select {
case <-globalStopCh:
logger.V(2).Info("Global stop channel triggered NEG controller shutdown")
case <-providerConfigStopCh:
logger.V(2).Info("Provider config stop channel triggered NEG controller shutdown")
}
}()

negController := createNEGController(
kubeClient,
svcNegClient,
eventRecorderClient,
kubeSystemUID,
ingressInformer,
serviceInformer,
podInformer,
nodeInformer,
endpointSliceInformer,
providerConfigFilteredSvcNegInformer,
providerConfigFilteredNetworkInformer,
providerConfigFilteredGkeNetworkParamsInformer,
hasSynced,
cloud,
zoneGetter,
clusterNamer,
l4Namer,
lpConfig,
joinedStopCh,
logger,
)

go negController.Run()

return providerConfigStopCh, nil
}

func createNEGController(
Expand All @@ -174,18 +242,15 @@ func createNEGController(
logger klog.Logger,
) *neg.Controller {

// The following adapter will use Network Selflink as Network Url instead of the NetworkUrl itself.
// Network Selflink is always composed by the network name even if the cluster was initialized with Network Id.
// All the components created from it will be consistent and always use the Url with network name and not the url with netowork Id
// The adapter uses Network SelfLink
adapter, err := network.NewAdapterNetworkSelfLink(cloud)
if err != nil {
logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard cloud network provider")
logger.Error(err, "Failed to create network adapter with SelfLink, falling back to standard provider")
adapter = cloud
}

noDefaultBackendServicePort := utils.ServicePort{} // we don't need default backend service port for standalone NEGs.

var noNodeTopologyInformer cache.SharedIndexInformer = nil
noDefaultBackendServicePort := utils.ServicePort{}
var noNodeTopologyInformer cache.SharedIndexInformer

asmServiceNEGSkipNamespaces := []string{}
enableASM := false
Expand Down
1 change: 1 addition & 0 deletions pkg/multiproject/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func Start(
informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod)

providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Providerconfig().V1().ProviderConfigs().Informer()
go providerConfigInformer.Run(stopCh)

manager := manager.NewProviderConfigControllerManager(
kubeClient,
Expand Down

0 comments on commit f413eeb

Please sign in to comment.