Skip to content

Commit

Permalink
Enable HNC leader election in controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
joe2far committed Jan 31, 2022
1 parent 22c4260 commit 2bb9745
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 34 deletions.
6 changes: 6 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func createManager() ctrl.Manager {
// TODO: Better understand the behaviour of Burst, and consider making it equal to QPS if
// it turns out to be harmful.
cfg.Burst = int(cfg.QPS * 1.5)

// If leader election is disabled then treat this instance as the HNC leader
if !enableLeaderElection {
config.IsLeader = true
}

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Expand Down
9 changes: 9 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,12 @@ rules:
- get
- patch
- update
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- list
- update
80 changes: 75 additions & 5 deletions internal/anchor/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -56,6 +58,11 @@ type Reconciler struct {
// Reconcile sets up some basic variables and then calls the business logic. It currently
// only handles the creation of the namespaces but no deletion or state reporting yet.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// If not leader exiting, without performing any write operations
if !config.IsLeader {
return ctrl.Result{}, nil
}

log := logutils.WithRID(r.Log).WithValues("trigger", req.NamespacedName)
log.V(1).Info("Reconciling anchor")

Expand Down Expand Up @@ -415,9 +422,72 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
}},
}
}
return ctrl.NewControllerManagedBy(mgr).
For(&api.SubnamespaceAnchor{}).
Watches(&source.Channel{Source: r.Affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(nsMapFn)).
Complete(r)
opts := controller.Options{Reconciler: r}
c, err := controller.NewUnmanaged("anchor-controller", mgr, opts)
if err != nil {
return err
}
c.Watch(&source.Kind{
Type: &api.SubnamespaceAnchor{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Affected},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(&source.Kind{
Type: &corev1.Namespace{}},
handler.EnqueueRequestsFromMapFunc(nsMapFn))
if err != nil {
return err

}
return config.AddNonLeaderCtrl(mgr, c)
}

// BecomeLeader requeues anchors (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.enqueueAllObjects(context.Background(), r.Log)
}

// getAnchorNames returns a list of anchor names in the given namespace.
func (r *Reconciler) getAnchorNames(ctx context.Context, nm string) ([]string, error) {
var anms []string
// List all the anchor in the namespace.
ul := &unstructured.UnstructuredList{}
ul.SetKind(api.AnchorKind)
ul.SetAPIVersion(api.AnchorAPIVersion)
if err := r.List(ctx, ul, client.InNamespace(nm)); err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
return anms, nil
}
// Create a list of strings of the anchor names.
for _, inst := range ul.Items {
anms = append(anms, inst.GetName())
}
return anms, nil
}

// enqueueAllObjects enqueues all the current anchor objects in all namespaces.
func (r *Reconciler) enqueueAllObjects(ctx context.Context, log logr.Logger) error {
keys := r.Forest.GetNamespaceNames()
for _, ns := range keys {
// Get a list of subnamespace anchors in ns
anchors, err := r.getAnchorNames(ctx, ns)
if err != nil {
log.Error(err, "Error while trying to get subnamespace anchors", "namespace", ns)
//return err
}
for _, anchor := range anchors {
// Enqueue all anchors in the namespace.
r.Enqueue(r.Log, ns, anchor, "became leader")
}
}
return nil
}
18 changes: 18 additions & 0 deletions internal/config/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package config

import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type NonLeaderController struct {
controller.Controller
}

func (nlc *NonLeaderController) NeedLeaderElection() bool {
return false
}

func AddNonLeaderCtrl(mgr manager.Manager, c controller.Controller) error {
return mgr.Add(&NonLeaderController{c})
}
3 changes: 3 additions & 0 deletions internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ package config
// This value is controlled by the --unpropagated-annotation command line, which may be set multiple
// times.
var UnpropagatedAnnotations []string

// IsLeader is global which repesents whether this HNC instance is the leader
var IsLeader bool
5 changes: 3 additions & 2 deletions internal/forest/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type namedNamespaces map[string]*Namespace
// TypeSyncer syncs objects of a specific type. Reconcilers implement the interface so that they can be
// called by the HierarchyReconciler if the hierarchy changes.
type TypeSyncer interface {
// SyncNamespace syncs objects of a namespace for a specific type.
SyncNamespace(context.Context, logr.Logger, string) error
// SyncObjects syncs objects of a namespace for a specific type
// string is namespace, or "" for all namespaces
SyncObjects(context.Context, logr.Logger, string) error

// Provides the GVK that is handled by the reconciler who implements the interface.
GetGVK() schema.GroupVersionKind
Expand Down
62 changes: 54 additions & 8 deletions internal/hierarchyconfig/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log logr.Logger, nm string)
// Sync the Hierarchy singleton with the in-memory forest.
needUpdateObjects := r.syncWithForest(log, nsInst, inst, deletingCRD, anms)

// If not leader exit early, without performing any write operations
if !config.IsLeader {
return nil
}

// Write back if anything's changed. Early-exit if we just write back exactly what we had and this
// isn't the first time we're syncing.
updated, err := r.writeInstances(ctx, log, origHC, inst, origNS, nsInst)
Expand Down Expand Up @@ -755,7 +760,7 @@ func (r *Reconciler) updateObjects(ctx context.Context, log logr.Logger, ns stri
trs := r.Forest.GetTypeSyncers()
r.Forest.Unlock()
for _, tr := range trs {
if err := tr.SyncNamespace(ctx, log, ns); err != nil {
if err := tr.SyncObjects(ctx, log, ns); err != nil {
return err
}
}
Expand Down Expand Up @@ -850,12 +855,53 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, maxReconciles int) error
}
opts := controller.Options{
MaxConcurrentReconciles: maxReconciles,
Reconciler: r,
}
c, err := controller.NewUnmanaged("hierarchyconfig-controller", mgr, opts)
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &api.HierarchyConfiguration{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Affected},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &corev1.Namespace{}},
handler.EnqueueRequestsFromMapFunc(nsMapFn))
if err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&api.HierarchyConfiguration{}).
Watches(&source.Channel{Source: r.Affected}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(nsMapFn)).
Watches(&source.Kind{Type: &api.SubnamespaceAnchor{}}, handler.EnqueueRequestsFromMapFunc(anchorMapFn)).
WithOptions(opts).
Complete(r)
err = c.Watch(
&source.Kind{Type: &api.SubnamespaceAnchor{}},
handler.EnqueueRequestsFromMapFunc(anchorMapFn))
if err != nil {
return err
}
return config.AddNonLeaderCtrl(mgr, c)
}

// BecomeLeader requeues hierarchy configs (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.enqueueAllObjects(context.Background(), r.Log)
}

// enqueueAllObjects enqueues all the current objects in all namespaces.
func (r *Reconciler) enqueueAllObjects(ctx context.Context, log logr.Logger) error {
keys := r.Forest.GetNamespaceNames()
for _, ns := range keys {
// Enqueue all the current objects in the namespace.
if err := r.updateObjects(ctx, log, ns); err != nil {
log.Error(err, "Error while trying to enqueue local objects", "namespace", ns)
return err
}
}
return nil
}
52 changes: 46 additions & 6 deletions internal/hncconfig/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"

api "sigs.k8s.io/hierarchical-namespaces/api/v1alpha2"
"sigs.k8s.io/hierarchical-namespaces/internal/config"
"sigs.k8s.io/hierarchical-namespaces/internal/crd"
"sigs.k8s.io/hierarchical-namespaces/internal/forest"
"sigs.k8s.io/hierarchical-namespaces/internal/objects"
Expand Down Expand Up @@ -131,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Load all conditions
r.loadNamespaceConditions(inst)

// Exit early if not the leader
if !config.IsLeader {
return ctrl.Result{}, nil
}

// Write back to the apiserver.
if err := r.writeSingleton(ctx, inst); err != nil {
r.Log.Error(err, "Couldn't write singleton")
Expand Down Expand Up @@ -556,12 +563,30 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
}

// Register the reconciler
err := ctrl.NewControllerManagedBy(mgr).
For(&api.HNCConfiguration{}).
Watches(&source.Channel{Source: r.Trigger}, &handler.EnqueueRequestForObject{}).
Watches(&source.Kind{Type: &apiextensions.CustomResourceDefinition{}},
handler.EnqueueRequestsFromMapFunc(crdMapFn)).
Complete(r)
opts := controller.Options{Reconciler: r}
c, err := controller.NewUnmanaged("hncconfig-controller", mgr, opts)
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &api.HNCConfiguration{}},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Channel{Source: r.Trigger},
&handler.EnqueueRequestForObject{})
if err != nil {
return err
}
err = c.Watch(
&source.Kind{Type: &apiextensions.CustomResourceDefinition{}},
handler.EnqueueRequestsFromMapFunc(crdMapFn))
if err != nil {
return err
}
err = config.AddNonLeaderCtrl(mgr, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -631,3 +656,18 @@ func gvkForGR(gr schema.GroupResource, allRes []*restmapper.APIGroupResources) (
}
return schema.GroupVersionKind{}, &GVKErr{api.ReasonResourceNotFound, fmt.Sprintf("Resource %q not found", gr)}
}

// BecomeLeader calls SyncObjects on all type reconcillers (required when instance becomes leader)
func (r *Reconciler) BecomeLeader() {
r.Log.V(1).Info("Requeue all objects (hierarchy updated or new namespace found)")
// Use mutex to guard the read from the types list of the forest to prevent the ConfigReconciler
// from modifying the list at the same time.
r.Forest.Lock()
trs := r.Forest.GetTypeSyncers()
r.Forest.Unlock()
for _, tr := range trs {
if err := tr.SyncObjects(context.Background(), r.Log, ""); err != nil {
r.Log.V(1).Error(err, "Failed to SyncObjects for HNCConfig")
}
}
}
Loading

0 comments on commit 2bb9745

Please sign in to comment.