Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProviderConfig Controller #2792

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
59 changes: 44 additions & 15 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/instancegroups"
"k8s.io/ingress-gce/pkg/l4lb"
"k8s.io/ingress-gce/pkg/multiproject/sharedcontext"
multiprojectstart "k8s.io/ingress-gce/pkg/multiproject/start"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/psc"
"k8s.io/ingress-gce/pkg/serviceattachment"
Expand Down Expand Up @@ -234,17 +234,51 @@ func main() {
systemHealth := systemhealth.NewSystemHealth(rootLogger)
go app.RunHTTPServer(systemHealth.HealthCheck, rootLogger)

hostname, err := os.Hostname()
if err != nil {
klog.Fatalf("unable to get hostname: %v", err)
}

if flags.F.EnableMultiProjectMode {
_ = sharedcontext.NewSharedContext(
kubeClient,
svcNegClient,
kubeSystemUID,
eventRecorderKubeClient,
namer,
rootLogger,
stopCh,
)
rootLogger.Info("Multi-project mode is enabled, starting project-syncer")

runWithWg(func() {
if flags.F.LeaderElection.LeaderElect {
err := multiprojectstart.StartWithLeaderElection(
context.Background(),
leaderElectKubeClient,
hostname,
kubeConfig,
rootLogger,
kubeClient,
svcNegClient,
kubeSystemUID,
eventRecorderKubeClient,
namer,
stopCh,
)
if err != nil {
rootLogger.Error(err, "Failed to start multi-project syncer with leader election")
}
} else {
multiprojectstart.Start(
kubeConfig,
rootLogger,
kubeClient,
svcNegClient,
kubeSystemUID,
eventRecorderKubeClient,
namer,
stopCh,
)
}
}, rOption.wg)

// Wait for the multi-project syncer to finish.
waitWithTimeout(rOption.wg, rootLogger)

// Since we only want multi-project mode functionality, exit here
return
}

cloud := app.NewGCEClient(rootLogger)
Expand Down Expand Up @@ -286,11 +320,6 @@ func main() {
}
ctx := ingctx.NewControllerContext(kubeClient, backendConfigClient, frontendConfigClient, firewallCRClient, svcNegClient, svcAttachmentClient, networkClient, nodeTopologyClient, eventRecorderKubeClient, cloud, namer, kubeSystemUID, ctxConfig, rootLogger)

hostname, err := os.Hostname()
if err != nil {
klog.Fatalf("unable to get hostname: %v", err)
}

leOption := leaderElectionOption{
client: leaderElectKubeClient,
recorder: ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace),
Expand Down
142 changes: 142 additions & 0 deletions pkg/multiproject/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Package controller implements the ProviderConfig controller that starts and stops controllers for each ProviderConfig.
package controller

import (
"context"
"fmt"
"math/rand"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)

const (
providerConfigControllerName = "provider-config-controller"
workersNum = 5
)

// ProviderConfigControllerManager implements the logic for starting and stopping controllers for each ProviderConfig.
type ProviderConfigControllerManager interface {
StartControllersForProviderConfig(pc *providerconfig.ProviderConfig) error
StopControllersForProviderConfig(pc *providerconfig.ProviderConfig)
}

// ProviderConfigController is a controller that manages the ProviderConfig resource.
// It is responsible for starting and stopping controllers for each ProviderConfig.
// Currently, it only manages the NEG controller using the ProviderConfigControllerManager.
type ProviderConfigController struct {
panslava marked this conversation as resolved.
Show resolved Hide resolved
manager ProviderConfigControllerManager

providerConfigLister cache.Indexer
providerConfigQueue utils.TaskQueue
numWorkers int
logger klog.Logger
stopCh <-chan struct{}
hasSynced func() bool
}

// NewProviderConfigController creates a new instance of the ProviderConfig controller.
func NewProviderConfigController(manager ProviderConfigControllerManager, providerConfigInformer cache.SharedIndexInformer, stopCh <-chan struct{}, logger klog.Logger) *ProviderConfigController {
logger = logger.WithName(providerConfigControllerName)
pcc := &ProviderConfigController{
providerConfigLister: providerConfigInformer.GetIndexer(),
stopCh: stopCh,
numWorkers: workersNum,
logger: logger,
hasSynced: providerConfigInformer.HasSynced,
manager: manager,
}

pcc.providerConfigQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers(providerConfigControllerName, "provider-configs", pcc.numWorkers, pcc.syncWrapper, logger)

providerConfigInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { pcc.providerConfigQueue.Enqueue(obj) },
UpdateFunc: func(old, cur interface{}) { pcc.providerConfigQueue.Enqueue(cur) },
})

pcc.logger.Info("ProviderConfig controller created")
return pcc
}

func (pcc *ProviderConfigController) Run() {
defer pcc.shutdown()

pcc.logger.Info("Starting ProviderConfig controller")
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-pcc.stopCh
cancel()
}()

err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pcc.logger.Info("Waiting for initial cache sync before starting ProviderConfig Controller")
return pcc.hasSynced(), nil
})
if err != nil {
pcc.logger.Error(err, "Failed to wait for initial cache sync before starting ProviderConfig Controller")
}

pcc.logger.Info("Started ProviderConfig Controller", "numWorkers", pcc.numWorkers)
pcc.providerConfigQueue.Run()

<-pcc.stopCh
}

func (pcc *ProviderConfigController) shutdown() {
pcc.logger.Info("Shutting down ProviderConfig Controller")
pcc.providerConfigQueue.Shutdown()
}

func (pcc *ProviderConfigController) syncWrapper(key string) error {
syncID := rand.Int31()
svcLogger := pcc.logger.WithValues("providerConfigKey", key, "syncId", syncID)

defer func() {
if r := recover(); r != nil {
svcLogger.Error(fmt.Errorf("panic in ProviderConfig sync worker goroutine: %v", r), "Recovered from panic")
}
}()
err := pcc.sync(key, svcLogger)
if err != nil {
svcLogger.Error(err, "Error syncing providerConfig", "key", key)
}
return err
}

func (pcc *ProviderConfigController) sync(key string, logger klog.Logger) error {
logger = logger.WithName("providerConfig.sync")

providerConfig, exists, err := pcc.providerConfigLister.GetByKey(key)
if err != nil {
return fmt.Errorf("failed to lookup providerConfig for key %s: %w", key, err)
}
if !exists || providerConfig == nil {
logger.V(3).Info("ProviderConfig does not exist anymore")
return nil
}
pc, ok := providerConfig.(*providerconfig.ProviderConfig)
if !ok {
return fmt.Errorf("unexpected type for providerConfig, expected *ProviderConfig but got %T", providerConfig)
}

if pc.DeletionTimestamp != nil {
logger.Info("ProviderConfig is being deleted, stopping controllers", "providerConfig", pc)

pcc.manager.StopControllersForProviderConfig(pc)
return nil
}

logger.V(2).Info("Syncing providerConfig", "providerConfig", pc)
err = pcc.manager.StartControllersForProviderConfig(pc)
if err != nil {
return fmt.Errorf("failed to start controllers for providerConfig %v: %w", pc, err)
}

logger.V(2).Info("Successfully synced providerConfig", "providerConfig", pc)
return nil
}
Loading