Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
SharedInformers: WaitForCacheSync before starting and don't call Run
Browse files Browse the repository at this point in the history
  • Loading branch information
wongma7 committed Aug 10, 2018
1 parent 4db2567 commit 1a624c9
Showing 1 changed file with 17 additions and 27 deletions.
44 changes: 17 additions & 27 deletions lib/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ type ProvisionController struct {
// * 1.6: storage classes enter GA
kubeVersion *utilversion.Version

// TODO remove this
claimSource cache.ListerWatcher

claimInformer cache.SharedInformer
claims cache.Store
claimController cache.Controller
Expand Down Expand Up @@ -142,10 +139,8 @@ type ProvisionController struct {
// The path of metrics endpoint path.
metricsPath string

// Parameters of leaderelection.LeaderElectionConfig. Leader election is for
// when multiple controllers are running: they race to lock (lead) every PVC
// so that only one calls Provision for it (saving API calls, CPU cycles...)
leaseDuration, renewDeadline, retryPeriod, termLimit time.Duration
// Parameters of leaderelection.LeaderElectionConfig.
leaseDuration, renewDeadline, retryPeriod time.Duration

hasRun bool
hasRunLock *sync.Mutex
Expand All @@ -172,8 +167,6 @@ const (
DefaultRenewDeadline = 10 * time.Second
// DefaultRetryPeriod is used when option function RetryPeriod is omitted
DefaultRetryPeriod = 2 * time.Second
// DefaultTermLimit is used when option function TermLimit is omitted
DefaultTermLimit = 30 * time.Second
// DefaultMetricsPort is used when option function MetricsPort is omitted
DefaultMetricsPort = 0
// DefaultMetricsAddress is used when option function MetricsAddress is omitted
Expand Down Expand Up @@ -307,19 +300,6 @@ func RetryPeriod(retryPeriod time.Duration) func(*ProvisionController) error {
}
}

// TermLimit is the maximum duration that a leader may remain the leader
// to complete the task before it must give up its leadership. 0 for forever
// or indefinite. Defaults to 30 seconds.
func TermLimit(termLimit time.Duration) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.termLimit = termLimit
return nil
}
}

// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a private (non-shared) informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
Expand Down Expand Up @@ -438,7 +418,6 @@ func NewProvisionController(
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
termLimit: DefaultTermLimit,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
Expand Down Expand Up @@ -474,7 +453,6 @@ func NewProvisionController(
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
controller.claimSource = claimSource

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
Expand Down Expand Up @@ -643,9 +621,21 @@ func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
}, 5*time.Second)
}

go ctrl.claimController.Run(stopCh)
go ctrl.volumeController.Run(stopCh)
go ctrl.classController.Run(stopCh)
// If a SharedInformer has been passed in, this controller should not
// call Run again
if ctrl.claimInformer == nil {
go ctrl.claimController.Run(stopCh)
}
if ctrl.volumeInformer == nil {
go ctrl.volumeController.Run(stopCh)
}
if ctrl.classInformer == nil {
go ctrl.classController.Run(stopCh)
}

if !cache.WaitForCacheSync(stopCh, ctrl.claimController.HasSynced, ctrl.volumeController.HasSynced, ctrl.classController.HasSynced) {
return
}

for i := 0; i < ctrl.threadiness; i++ {
go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)
Expand Down

0 comments on commit 1a624c9

Please sign in to comment.