From 089333274ea42570b399f95e43372e44f7fc59dc Mon Sep 17 00:00:00 2001 From: Mengqi Yu Date: Tue, 14 May 2019 15:13:16 -0700 Subject: [PATCH 1/2] :running: use right markers --- .../crd/pkg/{doc.go => groupversion_info.go} | 17 ++++++++++++++++- examples/crd/pkg/resource.go | 18 ++---------------- 2 files changed, 18 insertions(+), 17 deletions(-) rename examples/crd/pkg/{doc.go => groupversion_info.go} (51%) diff --git a/examples/crd/pkg/doc.go b/examples/crd/pkg/groupversion_info.go similarity index 51% rename from examples/crd/pkg/doc.go rename to examples/crd/pkg/groupversion_info.go index ac12c3fb7b..eccef4121e 100644 --- a/examples/crd/pkg/doc.go +++ b/examples/crd/pkg/groupversion_info.go @@ -14,10 +14,25 @@ See the License for the specific language governing permissions and limitations under the License. */ +// +kubebuilder:object:generate=true +// +groupName=chaosapps.metamagical.io package pkg import ( + "k8s.io/apimachinery/pkg/runtime/schema" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "sigs.k8s.io/controller-runtime/pkg/scheme" ) -var log = logf.Log.WithName("chaospod-resource") +var ( + log = logf.Log.WithName("chaospod-resource") + + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + + // AddToScheme is required by pkg/client/... + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/examples/crd/pkg/resource.go b/examples/crd/pkg/resource.go index 5e7c398039..844490dfea 100644 --- a/examples/crd/pkg/resource.go +++ b/examples/crd/pkg/resource.go @@ -23,8 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/scheme" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -41,12 +39,11 @@ type ChaosPodStatus struct { LastRun metav1.Time `json:"lastRun,omitempty"` } -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true // ChaosPod is the Schema for the randomjobs API // +kubebuilder:printcolumn:name="next stop",type="string",JSONPath=".spec.nextStop",format="date" // +kubebuilder:printcolumn:name="last run",type="string",JSONPath=".status.lastRun",format="date" -// +k8s:openapi-gen=true type ChaosPod struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -55,7 +52,7 @@ type ChaosPod struct { Status ChaosPodStatus `json:"status,omitempty"` } -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true // ChaosPodList contains a list of ChaosPod type ChaosPodList struct { @@ -106,14 +103,3 @@ func (c *ChaosPod) Default() { func init() { SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{}) } - -var ( - // SchemeGroupVersion is group version used to register these objects - SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"} - - // SchemeBuilder is used to add go types to the GroupVersionKind scheme - SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} - - // AddToScheme is required by pkg/client/... - AddToScheme = SchemeBuilder.AddToScheme -) From ea5a354e5c380cc57ea8f97880b3a023f38e0933 Mon Sep 17 00:00:00 2001 From: Mengqi Yu Date: Thu, 16 May 2019 12:41:15 -0700 Subject: [PATCH 2/2] :sparkles: support non-leaderelection Runnanles in controller manager webhook server can be run in the non-leaderelection mode --- pkg/manager/internal.go | 49 +++++++++++++++++++++++++++++-------- pkg/manager/manager.go | 13 ++++++++-- pkg/manager/manager_test.go | 4 +++ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 192354a32f..9852d0cf40 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -58,8 +58,12 @@ type controllerManager struct { // to scheme.scheme. scheme *runtime.Scheme - // runnables is the set of Controllers that the controllerManager injects deps into and Starts. - runnables []Runnable + // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. + // These Runnables are managed by lead election. + leaderElectionRunnables []Runnable + // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. + // These Runnables will not be blocked by lead election. + nonLeaderElectionRunnables []Runnable cache cache.Cache @@ -121,7 +125,7 @@ type controllerManager struct { retryPeriod time.Duration } -// Add sets dependencies on i, and adds it to the list of runnables to start. +// Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() @@ -131,8 +135,13 @@ func (cm *controllerManager) Add(r Runnable) error { return err } - // Add the runnable to the list - cm.runnables = append(cm.runnables, r) + // Add the runnable to the leader election or the non-leaderelection list + if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) + } else { + cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) + } + if cm.started { // If already started, start the controller go func() { @@ -254,13 +263,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { go cm.serveMetrics(cm.internalStop) } + go cm.startNonLeaderElectionRunnables() + if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { return err } } else { - go cm.start() + go cm.startLeaderElectionRunnables() } select { @@ -273,7 +284,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { } } -func (cm *controllerManager) start() { +func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() @@ -291,8 +302,26 @@ func (cm *controllerManager) start() { // TODO(community): Check the return value and write a test cm.cache.WaitForCacheSync(cm.internalStop) - // Start the runnables after the cache has synced - for _, c := range cm.runnables { + // Start the non-leaderelection Runnables after the cache has synced + for _, c := range cm.nonLeaderElectionRunnables { + // Controllers block, but we want to return an error if any have an error starting. + // Write any Start errors to a channel so we can return them + ctrl := c + go func() { + cm.errChan <- ctrl.Start(cm.internalStop) + }() + } + + cm.started = true +} + +func (cm *controllerManager) startLeaderElectionRunnables() { + // Wait for the caches to sync. + // TODO(community): Check the return value and write a test + cm.cache.WaitForCacheSync(cm.internalStop) + + // Start the leader election Runnables after the cache has synced + for _, c := range cm.leaderElectionRunnables { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them ctrl := c @@ -312,7 +341,7 @@ func (cm *controllerManager) startLeaderElection() (err error) { RetryPeriod: cm.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { - cm.start() + cm.startLeaderElectionRunnables() }, OnStoppedLeading: func() { // Most implementations of leader election log.Fatal() here. diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 54df99fb57..670df79d08 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -42,9 +42,11 @@ import ( // Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables. // A Manager is required to create Controllers. type Manager interface { - // Add will set reqeusted dependencies on the component, and cause the component to be + // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. Add will inject any dependencies for which the argument - // implements the inject interface - e.g. inject.Client + // implements the inject interface - e.g. inject.Client. + // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either + // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). Add(Runnable) error // SetFields will set any dependencies on an object for which the object has implemented the inject @@ -183,6 +185,13 @@ func (r RunnableFunc) Start(s <-chan struct{}) error { return r(s) } +// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. +type LeaderElectionRunnable interface { + // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. + // e.g. controllers need to be run in leader election mode, while webhook server doesn't. + NeedLeaderElection() bool +} + // New returns a new Manager for creating Controllers. func New(config *rest.Config, options Options) (Manager, error) { // Initialize a rest.config if none was specified diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 74e686ff0d..4141e56593 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -284,6 +284,10 @@ var _ = Describe("manger.Manager", func() { <-c2 <-c3 }) + + It("should return an error if any non-leaderelection Components fail to Start", func() { + // TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429 + }) } Context("with defaults", func() {