Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support non leader election components #424

Merged
merged 2 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,25 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// +kubebuilder:object:generate=true
// +groupName=chaosapps.metamagical.io
package pkg

import (
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)

var log = logf.Log.WithName("chaospod-resource")
var (
log = logf.Log.WithName("chaospod-resource")

// SchemeGroupVersion is group version used to register these objects
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}

// AddToScheme is required by pkg/client/...
AddToScheme = SchemeBuilder.AddToScheme
)
18 changes: 2 additions & 16 deletions examples/crd/pkg/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/scheme"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand All @@ -41,12 +39,11 @@ type ChaosPodStatus struct {
LastRun metav1.Time `json:"lastRun,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need an object-gen marker here, no?

// +kubebuilder:object:root=true

// ChaosPod is the Schema for the randomjobs API
// +kubebuilder:printcolumn:name="next stop",type="string",JSONPath=".spec.nextStop",format="date"
// +kubebuilder:printcolumn:name="last run",type="string",JSONPath=".status.lastRun",format="date"
// +k8s:openapi-gen=true
type ChaosPod struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand All @@ -55,7 +52,7 @@ type ChaosPod struct {
Status ChaosPodStatus `json:"status,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true

// ChaosPodList contains a list of ChaosPod
type ChaosPodList struct {
Expand Down Expand Up @@ -106,14 +103,3 @@ func (c *ChaosPod) Default() {
func init() {
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})
}

var (
// SchemeGroupVersion is group version used to register these objects
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}

// AddToScheme is required by pkg/client/...
AddToScheme = SchemeBuilder.AddToScheme
)
49 changes: 39 additions & 10 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type controllerManager struct {
// to scheme.scheme.
scheme *runtime.Scheme

// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
runnables []Runnable
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
// These Runnables are managed by lead election.
leaderElectionRunnables []Runnable
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
// These Runnables will not be blocked by lead election.
nonLeaderElectionRunnables []Runnable

cache cache.Cache

Expand Down Expand Up @@ -121,7 +125,7 @@ type controllerManager struct {
retryPeriod time.Duration
}

// Add sets dependencies on i, and adds it to the list of runnables to start.
// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand All @@ -131,8 +135,13 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

// Add the runnable to the list
cm.runnables = append(cm.runnables, r)
// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if cm.started {
// If already started, start the controller
go func() {
Expand Down Expand Up @@ -254,13 +263,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
go cm.serveMetrics(cm.internalStop)
}

go cm.startNonLeaderElectionRunnables()

if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start()
go cm.startLeaderElectionRunnables()
}

select {
Expand All @@ -273,7 +284,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start() {
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

Expand All @@ -291,8 +302,26 @@ func (cm *controllerManager) start() {
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElectionRunnables() {
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
Expand All @@ -312,7 +341,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
cm.start()
cm.startLeaderElectionRunnables()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
Expand Down
13 changes: 11 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import (
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
// Add will set reqeusted dependencies on the component, and cause the component to be
// Add will set requested dependencies on the component, and cause the component to be
// started when Start is called. Add will inject any dependencies for which the argument
// implements the inject interface - e.g. inject.Client
// implements the inject interface - e.g. inject.Client.
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error

// SetFields will set any dependencies on an object for which the object has implemented the inject
Expand Down Expand Up @@ -183,6 +185,13 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
return r(s)
}

// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode.
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
NeedLeaderElection() bool
}

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Initialize a rest.config if none was specified
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ var _ = Describe("manger.Manager", func() {
<-c2
<-c3
})

It("should return an error if any non-leaderelection Components fail to Start", func() {
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
})
}

Context("with defaults", func() {
Expand Down