Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable HNC leader election in controllers #94

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func createManager() ctrl.Manager {
// TODO: Better understand the behaviour of Burst, and consider making it equal to QPS if
// it turns out to be harmful.
cfg.Burst = int(cfg.QPS * 1.5)

// If leader election is disabled then treat this instance as the HNC leader
if !enableLeaderElection {
config.IsLeader = true
}

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Expand Down
9 changes: 9 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,12 @@ rules:
- get
- patch
- update
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- list
- update
80 changes: 75 additions & 5 deletions internal/anchor/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -56,6 +58,11 @@ type Reconciler struct {
// Reconcile sets up some basic variables and then calls the business logic. It currently
// only handles the creation of the namespaces but no deletion or state reporting yet.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// If not leader exiting, without performing any write operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious that we can safely skip this entire reconciler. Maybe a better comment would be: this reconciler never updates the forest, so it's fine to skip it entirely if it's not the leader.

if !config.IsLeader {
return ctrl.Result{}, nil
}

log := logutils.WithRID(r.Log).WithValues("trigger", req.NamespacedName)
log.V(1).Info("Reconciling anchor")

Expand Down Expand Up @@ -415,9 +422,72 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
}},
}
}
return ctrl.NewControllerManagedBy(mgr).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than changing this initialization logic, can't we just add the NeedLeaderElection() method to all the reconcilers? That seems easier and less disruptive to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind, I forgot about the distinction between Controller and Reconciler. Sorry.

For(&api.SubnamespaceAnchor{}).
Watches(&source.Channel{Source: r.Affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(nsMapFn)).
Complete(r)
opts := controller.Options{Reconciler: r}
c, err := controller.NewUnmanaged("anchor-controller", mgr, opts)
if err != nil {
return err
}
c.Watch(&source.Kind{
Type: &api.SubnamespaceAnchor{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Affected},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(&source.Kind{
Type: &corev1.Namespace{}},
handler.EnqueueRequestsFromMapFunc(nsMapFn))
if err != nil {
return err

}
return config.AddNonLeaderCtrl(mgr, c)
}

// BecomeLeader requeues anchors (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.enqueueAllObjects(context.Background(), r.Log)
}

// getAnchorNames returns a list of anchor names in the given namespace.
func (r *Reconciler) getAnchorNames(ctx context.Context, nm string) ([]string, error) {
var anms []string
// List all the anchor in the namespace.
ul := &unstructured.UnstructuredList{}
ul.SetKind(api.AnchorKind)
ul.SetAPIVersion(api.AnchorAPIVersion)
if err := r.List(ctx, ul, client.InNamespace(nm)); err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
return anms, nil
}
// Create a list of strings of the anchor names.
for _, inst := range ul.Items {
anms = append(anms, inst.GetName())
}
return anms, nil
}

// enqueueAllObjects enqueues all the current anchor objects in all namespaces.
func (r *Reconciler) enqueueAllObjects(ctx context.Context, log logr.Logger) error {
keys := r.Forest.GetNamespaceNames()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need r.Forest.Lock() and defer r.Forest.Unlock() here

for _, ns := range keys {
// Get a list of subnamespace anchors in ns
anchors, err := r.getAnchorNames(ctx, ns)
if err != nil {
log.Error(err, "Error while trying to get subnamespace anchors", "namespace", ns)
//return err
}
for _, anchor := range anchors {
// Enqueue all anchors in the namespace.
r.Enqueue(r.Log, ns, anchor, "became leader")
}
}
return nil
}
18 changes: 18 additions & 0 deletions internal/config/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package config

import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type NonLeaderController struct {
controller.Controller
}

func (nlc *NonLeaderController) NeedLeaderElection() bool {
return false
}

func AddNonLeaderCtrl(mgr manager.Manager, c controller.Controller) error {
return mgr.Add(&NonLeaderController{c})
}
3 changes: 3 additions & 0 deletions internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ package config
// This value is controlled by the --unpropagated-annotation command line, which may be set multiple
// times.
var UnpropagatedAnnotations []string

// IsLeader is global which repesents whether this HNC instance is the leader
var IsLeader bool
Comment on lines +11 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using atomic.Value()? I'd guess that adds a memory fence which might be useful if the value has just changed.

5 changes: 3 additions & 2 deletions internal/forest/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type namedNamespaces map[string]*Namespace
// TypeSyncer syncs objects of a specific type. Reconcilers implement the interface so that they can be
// called by the HierarchyReconciler if the hierarchy changes.
type TypeSyncer interface {
// SyncNamespace syncs objects of a namespace for a specific type.
SyncNamespace(context.Context, logr.Logger, string) error
// SyncObjects syncs objects of a namespace for a specific type
// string is namespace, or "" for all namespaces
SyncObjects(context.Context, logr.Logger, string) error

// Provides the GVK that is handled by the reconciler who implements the interface.
GetGVK() schema.GroupVersionKind
Expand Down
62 changes: 54 additions & 8 deletions internal/hierarchyconfig/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log logr.Logger, nm string)
// Sync the Hierarchy singleton with the in-memory forest.
needUpdateObjects := r.syncWithForest(log, nsInst, inst, deletingCRD, anms)

// If not leader exit early, without performing any write operations
if !config.IsLeader {
return nil
}

// Write back if anything's changed. Early-exit if we just write back exactly what we had and this
// isn't the first time we're syncing.
updated, err := r.writeInstances(ctx, log, origHC, inst, origNS, nsInst)
Expand Down Expand Up @@ -755,7 +760,7 @@ func (r *Reconciler) updateObjects(ctx context.Context, log logr.Logger, ns stri
trs := r.Forest.GetTypeSyncers()
r.Forest.Unlock()
for _, tr := range trs {
if err := tr.SyncNamespace(ctx, log, ns); err != nil {
if err := tr.SyncObjects(ctx, log, ns); err != nil {
return err
}
}
Expand Down Expand Up @@ -850,12 +855,53 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, maxReconciles int) error
}
opts := controller.Options{
MaxConcurrentReconciles: maxReconciles,
Reconciler: r,
}
c, err := controller.NewUnmanaged("hierarchyconfig-controller", mgr, opts)
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &api.HierarchyConfiguration{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Affected},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &corev1.Namespace{}},
handler.EnqueueRequestsFromMapFunc(nsMapFn))
if err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&api.HierarchyConfiguration{}).
Watches(&source.Channel{Source: r.Affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(nsMapFn)).
Watches(&source.Kind{Type: &api.SubnamespaceAnchor{}}, handler.EnqueueRequestsFromMapFunc(anchorMapFn)).
WithOptions(opts).
Complete(r)
err = c.Watch(
&source.Kind{Type: &api.SubnamespaceAnchor{}},
handler.EnqueueRequestsFromMapFunc(anchorMapFn))
if err != nil {
return err
}
return config.AddNonLeaderCtrl(mgr, c)
}

// BecomeLeader requeues hierarchy configs (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.enqueueAllObjects(context.Background(), r.Log)
}

// enqueueAllObjects enqueues all the current objects in all namespaces.
func (r *Reconciler) enqueueAllObjects(ctx context.Context, log logr.Logger) error {
keys := r.Forest.GetNamespaceNames()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock/unlock forest here

for _, ns := range keys {
// Enqueue all the current objects in the namespace.
if err := r.updateObjects(ctx, log, ns); err != nil {
log.Error(err, "Error while trying to enqueue local objects", "namespace", ns)
return err
}
}
return nil
}
52 changes: 46 additions & 6 deletions internal/hncconfig/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"

api "sigs.k8s.io/hierarchical-namespaces/api/v1alpha2"
"sigs.k8s.io/hierarchical-namespaces/internal/config"
"sigs.k8s.io/hierarchical-namespaces/internal/crd"
"sigs.k8s.io/hierarchical-namespaces/internal/forest"
"sigs.k8s.io/hierarchical-namespaces/internal/objects"
Expand Down Expand Up @@ -131,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Load all conditions
r.loadNamespaceConditions(inst)

// Exit early if not the leader
if !config.IsLeader {
return ctrl.Result{}, nil
}

// Write back to the apiserver.
if err := r.writeSingleton(ctx, inst); err != nil {
r.Log.Error(err, "Couldn't write singleton")
Expand Down Expand Up @@ -556,12 +563,30 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
}

// Register the reconciler
err := ctrl.NewControllerManagedBy(mgr).
For(&api.HNCConfiguration{}).
Watches(&source.Channel{Source: r.Trigger}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &apiextensions.CustomResourceDefinition{}},
handler.EnqueueRequestsFromMapFunc(crdMapFn)).
Complete(r)
opts := controller.Options{Reconciler: r}
c, err := controller.NewUnmanaged("hncconfig-controller", mgr, opts)
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &api.HNCConfiguration{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Trigger},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &apiextensions.CustomResourceDefinition{}},
handler.EnqueueRequestsFromMapFunc(crdMapFn))
if err != nil {
return err
}
err = config.AddNonLeaderCtrl(mgr, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -631,3 +656,18 @@ func gvkForGR(gr schema.GroupResource, allRes []*restmapper.APIGroupResources) (
}
return schema.GroupVersionKind{}, &GVKErr{api.ReasonResourceNotFound, fmt.Sprintf("Resource %q not found", gr)}
}

// BecomeLeader calls SyncObjects on all type reconcillers (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.Log.V(1).Info("Requeue all objects (hierarchy updated or new namespace found)")
// Use mutex to guard the read from the types list of the forest to prevent the ConfigReconciler
// from modifying the list at the same time.
r.Forest.Lock()
trs := r.Forest.GetTypeSyncers()
r.Forest.Unlock()
for _, tr := range trs {
if err := tr.SyncObjects(context.Background(), r.Log, ""); err != nil {
r.Log.V(1).Error(err, "Failed to SyncObjects for HNCConfig")
}
}
}
Loading