Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Reorder boot and verify informer cache sync (#2103)
Browse files Browse the repository at this point in the history
 Reorder boot and verify informer cache sync
  • Loading branch information
hiddeco authored May 29, 2019
2 parents 123e661 + 38fa0ec commit b45b6dd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
46 changes: 30 additions & 16 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -166,19 +167,16 @@ func main() {
TLSHostname: *tillerTLSHostname,
})

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// setup shared informer for HelmReleases
nsOpt := ifinformers.WithNamespace(*namespace)
ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt)
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()
go ifInformerFactory.Start(shutdown)

// setup workqueue for HelmReleases
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease")

// release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes
// release instance is needed during the sync of git chart changes
// and during the sync of HelmRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient)
chartSync := chartsync.New(
log.With(logger, "component", "chartsync"),
Expand All @@ -188,21 +186,37 @@ func main() {
chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval},
*namespace,
)
chartSync.Run(shutdown, errc, shutdownWg)

// start FluxRelease informer
// prepare operator and start FluxRelease informer
// NB: the operator needs to do its magic with the informer
// _before_ starting it or else the cache sync seems to hang at
// random
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync)
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))
go ifInformerFactory.Start(shutdown)

// wait for the caches to be synced before starting _any_ workers
mainLogger.Log("info", "waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(shutdown, fhrInformer.Informer().HasSynced); !ok {
mainLogger.Log("error", "failed to wait for caches to sync")
os.Exit(1)
}
mainLogger.Log("info", "informer caches synced")

// start operator
go opr.Run(1, shutdown, shutdownWg)

// start git sync loop
go chartSync.Run(shutdown, errc, shutdownWg)

// the status updater, to keep track of the release status for
// every HelmRelease
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

// start HTTP server
go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown)

// start operator
go func() {
if err = opr.Run(1, shutdown, shutdownWg); err != nil {
errc <- fmt.Errorf(ErrOperatorFailure, err)
}
}()
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

shutdownErr := <-errc
logger.Log("exiting...", shutdownErr)
Expand Down
19 changes: 4 additions & 15 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package operator

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -117,22 +116,14 @@ func New(
return controller
}

// Run sets up the event handlers for our Custom Resource, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
// Run starts workers handling the enqueued events. It will block until
// stopCh is closed, at which point it will shutdown the workqueue and
// wait for workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer runtime.HandleCrash()
defer c.releaseWorkqueue.ShutDown()

c.logger.Log("info", "starting operator")
// Wait for the caches to be synced before starting workers
c.logger.Log("info", "waiting for informer caches to sync")

if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
return errors.New("failed to wait for caches to sync")
}
c.logger.Log("info", "unformer caches synced")

c.logger.Log("info", "starting workers")
for i := 0; i < threadiness; i++ {
Expand All @@ -145,8 +136,6 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
wg.Done()
}
c.logger.Log("info", "stopping workers")

return nil
}

// runWorker is a long-running function calling the
Expand Down

0 comments on commit b45b6dd

Please sign in to comment.