From ecbd6965a0100d9a070110233762593b16023287 Mon Sep 17 00:00:00 2001 From: Hao Zhou Date: Mon, 24 Apr 2023 19:34:46 -0700 Subject: [PATCH] add healthz subpathes for all controllers (#201) --- controllers/apps/deployment_controller.go | 12 ++- controllers/core/configmap_controller.go | 16 ++- controllers/core/node_controller.go | 54 ++++++++-- controllers/core/pod_controller.go | 44 +++++++- controllers/custom/builder.go | 29 ++--- controllers/custom/custom_controller.go | 38 +++++++ main.go | 101 ++++++++---------- .../pkg/node/manager/mock_manager.go | 15 +++ .../pkg/provider/mock_provider.go | 15 +++ .../pkg/resource/mock_resources.go | 15 +++ pkg/aws/ec2/api/eni_cleanup.go | 10 +- pkg/healthz/healthz.go | 83 ++++++++++++++ pkg/healthz/healthz_test.go | 62 +++++++++++ pkg/node/manager/manager.go | 32 ++++-- pkg/node/manager/manager_test.go | 7 +- pkg/provider/branch/provider.go | 32 +++++- pkg/provider/ip/provider.go | 27 ++++- pkg/provider/provider.go | 3 + pkg/resource/introspect.go | 9 +- pkg/resource/manager.go | 29 ++++- pkg/resource/manager_test.go | 8 +- pkg/worker/worker.go | 6 ++ .../core/annotation_validation_webhook.go | 19 ++++ webhooks/core/node_update_webhook.go | 19 ++++ webhooks/core/pod_webhook.go | 21 ++++ webhooks/core/pod_webhook_test.go | 4 +- 26 files changed, 601 insertions(+), 109 deletions(-) create mode 100644 pkg/healthz/healthz.go create mode 100644 pkg/healthz/healthz_test.go diff --git a/controllers/apps/deployment_controller.go b/controllers/apps/deployment_controller.go index a33ad478..a560c573 100644 --- a/controllers/apps/deployment_controller.go +++ b/controllers/apps/deployment_controller.go @@ -16,15 +16,17 @@ package apps import ( "context" - "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/core" + controllers "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/core" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" "github.com/go-logr/logr" appV1 "k8s.io/api/apps/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) type DeploymentReconciler struct { @@ -63,7 +65,13 @@ func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func (r *DeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *DeploymentReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error { + // add health check on subpath for deployment controller + // TODO: this is a simple controller and unlikely hit blocking issue but we can revisit this after subpaths are released for a while + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-deploy-controller": rcHealthz.SimplePing("deployment controller", r.Log)}, + ) + return ctrl.NewControllerManagedBy(mgr). For(&appV1.Deployment{}). Complete(r) diff --git a/controllers/core/configmap_controller.go b/controllers/core/configmap_controller.go index 675a198c..5c1ae69f 100644 --- a/controllers/core/configmap_controller.go +++ b/controllers/core/configmap_controller.go @@ -19,6 +19,7 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" "github.com/go-logr/logr" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) // ConfigMapReconciler reconciles a ConfigMap object @@ -38,6 +40,7 @@ type ConfigMapReconciler struct { K8sAPI k8s.K8sWrapper Condition condition.Conditions curWinIPAMEnabledCond bool + Context context.Context } //+kubebuilder:rbac:groups=core,resources=configmaps,namespace=kube-system,resourceNames=amazon-vpc-cni,verbs=get;list;watch @@ -84,7 +87,12 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // SetupWithManager sets up the controller with the Manager. -func (r *ConfigMapReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *ConfigMapReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error { + // add health check on subpath for CM controller + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-cm-controller": r.check()}, + ) + return ctrl.NewControllerManagedBy(mgr). For(&corev1.ConfigMap{}). Complete(r) @@ -110,3 +118,9 @@ func UpdateNodesOnConfigMapChanges(k8sAPI k8s.K8sWrapper, nodeManager manager.Ma } return nil } + +func (r *ConfigMapReconciler) check() healthz.Checker { + r.Log.Info("ConfigMap controller's healthz subpath was added") + // We can revisit this to use PingWithTimeout() instead if we have concerns on this controller. + return rcHealthz.SimplePing("configmap controller", r.Log) +} diff --git a/controllers/core/node_controller.go b/controllers/core/node_controller.go index 96e17812..9e191c22 100644 --- a/controllers/core/node_controller.go +++ b/controllers/core/node_controller.go @@ -16,17 +16,22 @@ package controllers import ( "context" goErr "errors" + "net/http" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" + "github.com/google/uuid" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) // MaxNodeConcurrentReconciles is the number of go routines that can invoke @@ -43,7 +48,7 @@ type NodeReconciler struct { Scheme *runtime.Scheme Manager manager.Manager Conditions condition.Conditions - // NodeEventCache *bigcache.BigCache + Context context.Context } // +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch @@ -68,6 +73,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil { if errors.IsNotFound(err) { + r.Log.V(1).Info("the requested node couldn't be found by k8s client", "Node", req.NamespacedName) _, found := r.Manager.GetNode(req.Name) // if cachedNode != nil && cachedNode.HasInstance() { // // delete the not found node instance id from node event cache for housekeeping @@ -80,7 +86,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. logger.Error(err, "failed to delete node from manager") return ctrl.Result{}, nil } - logger.Info("deleted the node from manager") + logger.V(1).Info("deleted the node from manager") } } return ctrl.Result{}, client.IgnoreNotFound(err) @@ -98,17 +104,45 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } -// func (r *NodeReconciler) deleteNodeFromNodeEventCache(nodeId string) { -// if err := r.NodeEventCache.Delete(nodeId); err != nil { -// r.Log.V(1).Info("node controller removing node from node event cache failed", "Error", err) -// } else { -// r.Log.V(1).Info("node controller removed the node from node event cache successfully", "InstanceId", nodeId) -// } -// } +func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error { + // add health check on subpath for node controller + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-node-controller": r.Check()}, + ) -func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}). Complete(r) } + +func (r *NodeReconciler) Check() healthz.Checker { + r.Log.Info("Node controller's healthz subpath was added") + return func(req *http.Request) error { + // if the reconciler is not ready, using the simple ping to test + // this can test the referenced cached pod datastore + if !r.Conditions.GetPodDataStoreSyncStatus() { + r.Log.V(1).Info("***** node controller healthz enpoint tested Simple Ping *****") + return nil + } + + err := rcHealthz.PingWithTimeout(func(c chan<- error) { + // when the reconciler is ready, testing the reconciler with a fake node request + pingRequest := &ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: corev1.NamespaceDefault, + Name: uuid.New().String(), + }, + } + + // expecting to 'return ctrl.Result{}, client.IgnoreNotFound(err)' + // IgnoreNotFound returns nil on NotFound errors. + // this can test the pod cached datastore and node cached datastore + _, rErr := r.Reconcile(r.Context, *pingRequest) + r.Log.V(1).Info("***** node controller healthz endpoint tested Reconcile *****") + c <- rErr + }, r.Log) + + return err + } +} diff --git a/controllers/core/pod_controller.go b/controllers/core/pod_controller.go index 088923d8..5cafa6b8 100644 --- a/controllers/core/pod_controller.go +++ b/controllers/core/pod_controller.go @@ -15,20 +15,25 @@ package controllers import ( "context" + "net/http" "time" "github.com/aws/amazon-vpc-resource-controller-k8s/controllers/custom" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s/pod" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/resource" + "github.com/google/uuid" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) // +kubebuilder:rbac:groups="",resources=events,verbs=create;update;patch @@ -71,7 +76,7 @@ func (r *PodReconciler) Reconcile(request custom.Request) (ctrl.Result, error) { return ctrl.Result{}, nil } if !exists { - r.Log.Info("pod doesn't exists in the cache anymore", + r.Log.V(1).Info("pod doesn't exists in the cache anymore", "namespace name", request.NamespacedName.String()) return ctrl.Result{}, nil } @@ -174,9 +179,10 @@ func getAggregateResources(pod *v1.Pod) map[string]int64 { // list of runnable. After Manager acquire the lease the pod controller runnable // will be started and the Pod events will be sent to Reconcile function func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manager, - clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration) error { + clientSet *kubernetes.Clientset, pageLimit int, syncPeriod time.Duration, healthzHandler *rcHealthz.HealthzHandler) error { r.Log.Info("The pod controller is using MaxConcurrentReconciles", "Routines", MaxPodConcurrentReconciles) - return custom.NewControllerManagedBy(ctx, manager). + + customChecker, err := custom.NewControllerManagedBy(ctx, manager). WithLogger(r.Log.WithName("custom pod controller")). UsingDataStore(r.DataStore). WithClientSet(clientSet). @@ -188,4 +194,36 @@ func (r *PodReconciler) SetupWithManager(ctx context.Context, manager ctrl.Manag ResyncPeriod: syncPeriod, MaxConcurrentReconciles: MaxPodConcurrentReconciles, }).UsingConditions(r.Condition).Complete(r) + + // add health check on subpath for pod and pod customized controllers + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{ + "health-pod-controller": r.check(), + "health-custom-pod-controller": customChecker, + }, + ) + + return err +} + +func (r *PodReconciler) check() healthz.Checker { + r.Log.Info("Pod controller's healthz subpath was added") + // more meaningful ping + return func(req *http.Request) error { + err := rcHealthz.PingWithTimeout(func(c chan<- error) { + pingRequest := &custom.Request{ + NamespacedName: types.NamespacedName{ + Namespace: v1.NamespaceDefault, + Name: uuid.New().String(), + }, + DeletedObject: nil, + } + // calling reconcile will test pod cache + _, rErr := r.Reconcile(*pingRequest) + r.Log.V(1).Info("***** pod controller healthz endpoint tested reconcile *****") + c <- rErr + }, r.Log) + + return err + } } diff --git a/controllers/custom/builder.go b/controllers/custom/builder.go index c8e7aceb..7c2a4658 100644 --- a/controllers/custom/builder.go +++ b/controllers/custom/builder.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -89,21 +90,21 @@ func NewControllerManagedBy(ctx context.Context, mgr manager.Manager) *Builder { // Complete adds the controller to manager's Runnable. The Controller // runnable will start when the manager starts -func (b *Builder) Complete(reconciler Reconciler) error { +func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) { // Loggr is no longer an interface // The suggestion is using LogSink to do nil check now if b.log.GetSink() == nil { - return fmt.Errorf("need to set the logger") + return nil, fmt.Errorf("need to set the logger") } if b.converter == nil { - return fmt.Errorf("converter not provided, " + + return nil, fmt.Errorf("converter not provided, " + "must use high level controller if conversion not required") } if b.clientSet == nil { - return fmt.Errorf("need to set kubernetes clienset") + return nil, fmt.Errorf("need to set kubernetes clienset") } if b.dataStore == nil { - return fmt.Errorf("need datastore to start the controller") + return nil, fmt.Errorf("need datastore to start the controller") } b.SetDefaults() @@ -172,17 +173,17 @@ func (b *Builder) Complete(reconciler Reconciler) error { }, } - controller := &CustomController{ - log: b.log, - options: b.options, - config: config, - Do: reconciler, - workQueue: workQueue, - conditions: b.conditions, - } + controller := NewCustomController( + b.log, + b.options, + config, + reconciler, + workQueue, + b.conditions, + ) // Adds the controller to the manager's Runnable - return b.mgr.Add(controller) + return controller.checker, b.mgr.Add(controller) } // SetDefaults sets the default options for controller diff --git a/controllers/custom/custom_controller.go b/controllers/custom/custom_controller.go index 7ae027f1..b3bfeee5 100644 --- a/controllers/custom/custom_controller.go +++ b/controllers/custom/custom_controller.go @@ -16,6 +16,7 @@ package custom import ( "context" "fmt" + "net/http" "time" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" @@ -28,6 +29,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" ) // Converter for converting k8s object and object list used in watches and list operation @@ -93,6 +97,8 @@ type CustomController struct { // the controller options Options conditions condition.Conditions + + checker healthz.Checker } // Request for Add/Update only contains the Namespace/Name @@ -106,6 +112,25 @@ type Request struct { DeletedObject interface{} } +func NewCustomController( + log logr.Logger, + options Options, + config *cache.Config, + reconciler Reconciler, + workQueue workqueue.RateLimitingInterface, + conditions condition.Conditions) *CustomController { + cc := &CustomController{ + log: log, + options: options, + config: config, + Do: reconciler, + workQueue: workQueue, + conditions: conditions, + } + cc.checker = cc.CustomCheck() + return cc +} + // Starts the low level controller func (c *CustomController) Start(ctx context.Context) error { // This is important to allow the data store to be synced @@ -256,3 +281,16 @@ func (c *CustomController) reconcileHandler(obj interface{}) bool { // Return true, don't take a break return true } + +func (c *CustomController) CustomCheck() healthz.Checker { + return func(req *http.Request) error { + err := rcHealthz.PingWithTimeout(func(status chan<- error) { + var ping interface{} + c.workQueue.NumRequeues(ping) + c.log.V(1).Info("***** health check on custom pod controller tested workQueue NumRequeues *****") + status <- nil + }, c.log) + + return err + } +} diff --git a/main.go b/main.go index 022d9fb6..06e18ac5 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ import ( ec2API "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s/pod" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager" @@ -52,7 +53,6 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/webhook" // +kubebuilder:scaffold:imports ) @@ -96,6 +96,7 @@ func main() { var outputPath string var introspectBindAddr string var leaseOnly bool + var healthCheckTimeout int flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") @@ -123,6 +124,8 @@ func main() { flag.IntVar(&listPageLimit, "page-limit", 100, "The page size limiting the number of response for list operation to API Server") flag.StringVar(&outputPath, "log-file", "stderr", "The path to redirect controller logs") + flag.IntVar(&healthCheckTimeout, "health-check-timeout", 10, + "How long healthz check waits before failing the attempt") flag.StringVar(&introspectBindAddr, "introspect-bind-addr", ":22775", "Port for serving the introspection API") flag.BoolVar(&leaseOnly, "lease-only", false, "Controller uses lease only for leader election") @@ -235,20 +238,16 @@ func main() { os.Exit(1) } + healthzHandler := rcHealthz.NewHealthzHandler(healthCheckTimeout) + // add root health ping on manager in general + healthzHandler.AddControllerHealthChecker("health-root-manager-ping", rcHealthz.SimplePing("root manager", setupLog)) + clientSet, err := kubernetes.NewForConfig(kubeConfig) if err != nil { setupLog.Error(err, "failed to create client set") os.Exit(1) } - // Add liveness probe - err = mgr.AddHealthzCheck("health-ping", healthz.Ping) - setupLog.Info("adding health check for controller") - if err != nil { - setupLog.Error(err, "unable add a health check") - os.Exit(1) - } - ctx := ctrl.SetupSignalHandler() ec2Wrapper, err := ec2API.NewEC2Wrapper(roleARN, setupLog) @@ -276,7 +275,8 @@ func main() { } supportedResources := []string{config.ResourceNamePodENI, config.ResourceNameIPAddress} - resourceManager, err := resource.NewResourceManager(ctx, supportedResources, apiWrapper) + resourceManager, err := resource.NewResourceManager( + ctx, supportedResources, apiWrapper, ctrl.Log.WithName("managers").WithName("resource"), healthzHandler) if err != nil { ctrl.Log.Error(err, "failed to init resources", "resources", supportedResources) os.Exit(1) @@ -289,7 +289,8 @@ func main() { nodeManagerWorkers := asyncWorkers.NewDefaultWorkerPool("node async workers", 10, 1, ctrl.Log.WithName("node async workers"), ctx) nodeManager, err := manager.NewNodeManager(ctrl.Log.WithName("node manager"), resourceManager, - apiWrapper, nodeManagerWorkers, controllerConditions) + apiWrapper, nodeManagerWorkers, controllerConditions, healthzHandler) + if err != nil { ctrl.Log.Error(err, "failed to init node manager") os.Exit(1) @@ -297,85 +298,67 @@ func main() { // IMPORTANT: The Pod Reconciler must be the first controller to Run. The controller // will not allow any other controller to run till the cache has synced. - if err = (&corecontroller.PodReconciler{ + if err := (&corecontroller.PodReconciler{ Log: ctrl.Log.WithName("controllers").WithName("Pod Reconciler"), ResourceManager: resourceManager, NodeManager: nodeManager, K8sAPI: k8sApi, DataStore: dataStore, Condition: controllerConditions, - }).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod); err != nil { + }).SetupWithManager(ctx, mgr, clientSet, listPageLimit, syncPeriod, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "pod") os.Exit(1) } - if err = (&ec2API.ENICleaner{ + if err := (&ec2API.ENICleaner{ EC2Wrapper: ec2Wrapper, ClusterName: clusterName, Log: ctrl.Log.WithName("eni cleaner"), - }).SetupWithManager(ctx, mgr); err != nil { + }).SetupWithManager(ctx, mgr, healthzHandler); err != nil { setupLog.Error(err, "unable to start eni cleaner") os.Exit(1) } - // config := bigcache.Config{ - // Shards: config.InstancesCacheShards, - // LifeWindow: config.InstancesCacheTTL, - // HardMaxCacheSize: config.InstancesCacheMaxSize, - // } - // nodeEventCache, err := bigcache.New(context.Background(), config) - if err != nil { - setupLog.Error(err, "Initializing node cache failed") - } - - if err = (&corecontroller.NodeReconciler{ + if err := (&corecontroller.NodeReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("Node"), Scheme: mgr.GetScheme(), Manager: nodeManager, Conditions: controllerConditions, - // NodeEventCache: nodeEventCache, - }).SetupWithManager(mgr); err != nil { + Context: ctx, + }).SetupWithManager(mgr, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Node") os.Exit(1) } - if err = (&corecontroller.ConfigMapReconciler{ + if err := (&corecontroller.ConfigMapReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("ConfigMap"), Scheme: mgr.GetScheme(), NodeManager: nodeManager, K8sAPI: k8sApi, Condition: controllerConditions, - }).SetupWithManager(mgr); err != nil { + Context: ctx, + }).SetupWithManager(mgr, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ConfigMap") os.Exit(1) } - if err = (&apps.DeploymentReconciler{ + if err := (&apps.DeploymentReconciler{ Log: ctrl.Log.WithName("controllers").WithName("Deployment"), NodeManager: nodeManager, K8sAPI: k8sApi, Condition: controllerConditions, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManager(mgr, healthzHandler); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Deployment") os.Exit(1) } - // Disable Event controller for now due to known possible performance impacts on etcd servers - // if err = (corecontroller.NewEventReconciler( - // ctrl.Log.WithName("Controllers").WithName("Event"), - // mgr.GetScheme(), - // k8sApi, nodeEventCache).SetupWithManager(mgr)); err != nil { - // setupLog.Error(err, "unable to create controller", "controller", "Event") - // os.Exit(1) - // } - - if err = (&resource.IntrospectHandler{ + if err := (&resource.IntrospectHandler{ Log: ctrl.Log.WithName("introspect"), BindAddress: introspectBindAddr, ResourceManager: resourceManager, - }).SetupWithManager(mgr); err != nil { + }).SetupWithManager(mgr, healthzHandler); err != nil { setupLog.Error(err, "unable to create introspect API") os.Exit(1) } @@ -385,25 +368,31 @@ func main() { webhookServer := mgr.GetWebhookServer() setupLog.Info("registering webhooks to the webhook server") + podMutationWebhook := webhookcore.NewPodMutationWebHook( + sgpAPI, ctrl.Log.WithName("resource mutating webhook"), controllerConditions, healthzHandler) webhookServer.Register("/mutate-v1-pod", &webhook.Admission{ - Handler: &webhookcore.PodMutationWebHook{ - SGPAPI: sgpAPI, - Log: ctrl.Log.WithName("resource mutation webhook"), - Condition: controllerConditions, - }}) + Handler: podMutationWebhook, + }) + nodeValidateWebhook := webhookcore.NewNodeUpdateWebhook( + controllerConditions, ctrl.Log.WithName("node validating webhook"), healthzHandler) webhookServer.Register("/validate-v1-node", &webhook.Admission{ - Handler: &webhookcore.NodeUpdateWebhook{ - Log: ctrl.Log.WithName("node validation webhook"), - Condition: controllerConditions, - }}) + Handler: nodeValidateWebhook}) // Validating webhook for pod. + annotationValidator := webhookcore.NewAnnotationValidator( + controllerConditions, ctrl.Log.WithName("annotation validating webhook"), healthzHandler) webhookServer.Register("/validate-v1-pod", &webhook.Admission{ - Handler: &webhookcore.AnnotationValidator{ - Log: ctrl.Log.WithName("annotation validation webhook"), - Condition: controllerConditions, - }}) + Handler: annotationValidator}) + + // Enabled each controllers' health check and aggregate them to endpoint /healthz + // curl localhost:61779/healthz?verbose can list all controllers' healthy status + err = healthzHandler.AddControllersHealthStatusChecksToManager(mgr) + setupLog.Info("adding health check for controllers") + if err != nil { + setupLog.Error(err, "unable add health check to all controllers") + os.Exit(1) + } setupLog.Info("starting manager") if err := mgr.Start(ctx); err != nil { diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go index 62ab3fa8..b9145f98 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go @@ -22,6 +22,7 @@ import ( node "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node" gomock "github.com/golang/mock/gomock" + healthz "sigs.k8s.io/controller-runtime/pkg/healthz" ) // MockManager is a mock of Manager interface. @@ -75,6 +76,20 @@ func (mr *MockManagerMockRecorder) DeleteNode(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNode", reflect.TypeOf((*MockManager)(nil).DeleteNode), arg0) } +// GetChecker mocks base method. +func (m *MockManager) GetChecker() healthz.Checker { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetChecker") + ret0, _ := ret[0].(healthz.Checker) + return ret0 +} + +// GetChecker indicates an expected call of GetChecker. +func (mr *MockManagerMockRecorder) GetChecker() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChecker", reflect.TypeOf((*MockManager)(nil).GetChecker)) +} + // GetNode mocks base method. func (m *MockManager) GetNode(arg0 string) (node.Node, bool) { m.ctrl.T.Helper() diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go index f28a4d57..fe60bb16 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go @@ -23,6 +23,7 @@ import ( ec2 "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" pool "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/pool" gomock "github.com/golang/mock/gomock" + healthz "sigs.k8s.io/controller-runtime/pkg/healthz" reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -63,6 +64,20 @@ func (mr *MockResourceProviderMockRecorder) DeInitResource(arg0 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeInitResource", reflect.TypeOf((*MockResourceProvider)(nil).DeInitResource), arg0) } +// GetHealthChecker mocks base method. +func (m *MockResourceProvider) GetHealthChecker() healthz.Checker { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHealthChecker") + ret0, _ := ret[0].(healthz.Checker) + return ret0 +} + +// GetHealthChecker indicates an expected call of GetHealthChecker. +func (mr *MockResourceProviderMockRecorder) GetHealthChecker() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHealthChecker", reflect.TypeOf((*MockResourceProvider)(nil).GetHealthChecker)) +} + // GetPool mocks base method. func (m *MockResourceProvider) GetPool(arg0 string) (pool.Pool, bool) { m.ctrl.T.Helper() diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/resource/mock_resources.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/resource/mock_resources.go index 90e94c66..e549a3bb 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/resource/mock_resources.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/resource/mock_resources.go @@ -23,6 +23,7 @@ import ( handler "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/handler" provider "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider" gomock "github.com/golang/mock/gomock" + healthz "sigs.k8s.io/controller-runtime/pkg/healthz" ) // MockResourceManager is a mock of ResourceManager interface. @@ -48,6 +49,20 @@ func (m *MockResourceManager) EXPECT() *MockResourceManagerMockRecorder { return m.recorder } +// GetCheckers mocks base method. +func (m *MockResourceManager) GetCheckers() map[string]healthz.Checker { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCheckers") + ret0, _ := ret[0].(map[string]healthz.Checker) + return ret0 +} + +// GetCheckers indicates an expected call of GetCheckers. +func (mr *MockResourceManagerMockRecorder) GetCheckers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCheckers", reflect.TypeOf((*MockResourceManager)(nil).GetCheckers)) +} + // GetResourceHandler mocks base method. func (m *MockResourceManager) GetResourceHandler(arg0 string) (handler.Handler, bool) { m.ctrl.T.Helper() diff --git a/pkg/aws/ec2/api/eni_cleanup.go b/pkg/aws/ec2/api/eni_cleanup.go index d0737a13..f51e29b2 100644 --- a/pkg/aws/ec2/api/eni_cleanup.go +++ b/pkg/aws/ec2/api/eni_cleanup.go @@ -19,11 +19,13 @@ import ( "time" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) type ENICleaner struct { @@ -37,11 +39,17 @@ type ENICleaner struct { ctx context.Context } -func (e *ENICleaner) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (e *ENICleaner) SetupWithManager(ctx context.Context, mgr ctrl.Manager, healthzHandler *rcHealthz.HealthzHandler) error { e.clusterNameTagKey = fmt.Sprintf(config.ClusterNameTagKeyFormat, e.ClusterName) e.availableENIs = make(map[string]struct{}) e.ctx = ctx + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{ + "health-interface-cleaner": rcHealthz.SimplePing("interface cleanup", e.Log), + }, + ) + return mgr.Add(e) } diff --git a/pkg/healthz/healthz.go b/pkg/healthz/healthz.go new file mode 100644 index 00000000..afee21dc --- /dev/null +++ b/pkg/healthz/healthz.go @@ -0,0 +1,83 @@ +package healthz + +import ( + "errors" + "fmt" + "net/http" + "time" + + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type HealthzHandler struct { + CheckersMap map[string]healthz.Checker +} + +var ( + HealthzTimeout time.Duration = 2 +) + +func NewHealthzHandler(timeout int) *HealthzHandler { + HealthzTimeout = time.Duration(timeout) + return &HealthzHandler{ + CheckersMap: make(map[string]healthz.Checker), + } +} + +func (hh *HealthzHandler) AddControllerHealthChecker(name string, controllerCheck healthz.Checker) { + // only add health check if the map doesn't already contain + if _, ok := hh.CheckersMap[name]; !ok { + hh.CheckersMap[name] = controllerCheck + } +} + +func (hh *HealthzHandler) AddControllersHealthCheckers(controllerCheckers map[string]healthz.Checker) { + for key, value := range controllerCheckers { + hh.AddControllerHealthChecker(key, value) + } +} + +func (hh *HealthzHandler) AddControllersHealthStatusChecksToManager(mgr manager.Manager) error { + if len(hh.CheckersMap) > 0 { + return hh.checkControllersHealthStatus(mgr, hh.CheckersMap) + } else { + return errors.New("couldn't find any controller's check to add for healthz endpoint") + } +} + +func (hh *HealthzHandler) checkControllersHealthStatus(mgr manager.Manager, checkers map[string]healthz.Checker) error { + var err error + for name, check := range checkers { + err = mgr.AddHealthzCheck(name, check) + fmt.Printf("Added Health check for %s with error %v\n", name, err) + if err != nil { + break + } + } + return err +} + +func SimplePing(controllerName string, log logr.Logger) healthz.Checker { + log.Info(fmt.Sprintf("%s's healthz subpath was added", controllerName)) + return func(req *http.Request) error { + log.V(1).Info(fmt.Sprintf("***** %s healthz endpoint was pinged *****", controllerName)) + return nil + } +} + +func PingWithTimeout(healthCheck func(c chan<- error), logger logr.Logger) error { + status := make(chan error, 1) + var err error + go healthCheck(status) + + select { + case err = <-status: + logger.V(1).Info("finished healthz check on controller before probing times out", "TimeoutInSecond", HealthzTimeout*time.Second) + case <-time.After(HealthzTimeout * time.Second): + err = errors.New("healthz check failed due to timeout") + logger.Error(err, "healthz check has a preset timeout to fail no responding probing", "TimeoutInSecond", HealthzTimeout*time.Second) + } + return err +} diff --git a/pkg/healthz/healthz_test.go b/pkg/healthz/healthz_test.go new file mode 100644 index 00000000..c779a321 --- /dev/null +++ b/pkg/healthz/healthz_test.go @@ -0,0 +1,62 @@ +package healthz + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var testTimeout = 3 + +// TestHealthzHandler tests creating a new healthz handler with timeout value passed to it +func TestHealthzHandler(t *testing.T) { + handler := NewHealthzHandler(testTimeout) + assert.True(t, handler != nil) + assert.True(t, HealthzTimeout == time.Duration(testTimeout)) +} + +// TestAddControllerHealthChecker tests adding individual healthz checker +func TestAddControllerHealthChecker(t *testing.T) { + handler := NewHealthzHandler(testTimeout) + checker := healthz.Ping + name := "test-ping" + handler.AddControllerHealthChecker(name, checker) + assert.True(t, len(handler.CheckersMap) == 1, "Should be only one healthz checker") + _, ok := handler.CheckersMap[name] + assert.True(t, ok) +} + +// TestAddControllersHealthCheckers tests adding the map of healthz checkers +func TestAddControllersHealthCheckers(t *testing.T) { + handler := NewHealthzHandler(testTimeout) + checkers := map[string]healthz.Checker{ + "test-checker-1": healthz.Ping, + "test-checker-2": SimplePing("test", zap.New()), + } + handler.AddControllersHealthCheckers(checkers) + assert.True(t, len(handler.CheckersMap) == 2, "Two checkers should be added") +} + +// TestPingWithTimeout_Success tests ping responding before timeout +func TestPingWithTimeout_Success(t *testing.T) { + err := PingWithTimeout(func(c chan<- error) { + time.Sleep(1 * time.Second) + c <- nil + }, zap.New()) + time.Sleep(5 * time.Second) + assert.NoError(t, err) +} + +// TestPingWithTimeout_Failure tests ping responding after timeout +func TestPingWithTimeout_Failure(t *testing.T) { + err := PingWithTimeout(func(c chan<- error) { + time.Sleep(4 * time.Second) + c <- nil + }, zap.New()) + time.Sleep(5 * time.Second) + assert.Error(t, err) + assert.EqualErrorf(t, err, "healthz check failed due to timeout", "Healthz check should fail due to timeout") +} diff --git a/pkg/node/manager/manager.go b/pkg/node/manager/manager.go index b6c9ad7a..75db777b 100644 --- a/pkg/node/manager/manager.go +++ b/pkg/node/manager/manager.go @@ -15,16 +15,20 @@ package manager import ( "fmt" + "net/http" "strings" "sync" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/resource" asyncWorker "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/worker" + "github.com/google/uuid" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" @@ -84,7 +88,7 @@ type AsyncOperationJob struct { // NewNodeManager returns a new node manager func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager, - wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions) (Manager, error) { + wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) { manager := &manager{ resourceManager: resourceManager, @@ -95,6 +99,11 @@ func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager conditions: conditions, } + // add health check on subpath for node manager + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-node-manager": manager.check()}, + ) + return manager, worker.StartWorkerPool(manager.performAsyncOperation) } @@ -404,14 +413,19 @@ func (m *manager) removeNodeSafe(nodeName string) { delete(m.dataStore, nodeName) } -func (m *manager) updateNodeTrunkLabel(nodeName, labelKey, labelValue string) error { - if node, err := m.wrapper.K8sAPI.GetNode(nodeName); err != nil { - return err - } else { - updated, err := m.wrapper.K8sAPI.AddLabelToManageNode(node, labelKey, labelValue) - if !updated { - m.Log.Info("failed updating the node label for trunk when operating on node", "NodeName", nodeName, "LabelKey", labelKey, "LabelValue", labelValue) - } +func (m *manager) check() healthz.Checker { + // instead of using SimplePing, testing the node cache from manager makes the test more accurate + return func(req *http.Request) error { + err := rcHealthz.PingWithTimeout(func(c chan<- error) { + randomName := uuid.New().String() + _, found := m.GetNode(randomName) + m.Log.V(1).Info("health check tested ping GetNode to check on datastore cache in node manager successfully", "TesedNodeName", randomName, "NodeFound", found) + var ping interface{} + m.worker.SubmitJob(ping) + m.Log.V(1).Info("health check tested ping SubmitJob with a nil job to check on worker queue in node manager successfully") + c <- nil + }, m.Log) + return err } } diff --git a/pkg/node/manager/manager_test.go b/pkg/node/manager/manager_test.go index 10614f46..f9ce2aad 100644 --- a/pkg/node/manager/manager_test.go +++ b/pkg/node/manager/manager_test.go @@ -26,6 +26,7 @@ import ( mock_worker "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/worker" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node" "github.com/golang/mock/gomock" @@ -71,6 +72,8 @@ var ( unManagedNode = node.NewUnManagedNode(zap.New(), nodeName, instanceID, config.OSLinux) managedNode = node.NewManagedNode(zap.New(), nodeName, instanceID, config.OSLinux) + + healthzHandler = healthz.NewHealthzHandler(5) ) type AsyncJobMatcher struct { @@ -144,7 +147,7 @@ func Test_GetNewManager(t *testing.T) { mock := NewMock(ctrl, map[string]node.Node{}) mock.MockWorker.EXPECT().StartWorkerPool(gomock.Any()).Return(nil) - manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions) + manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, healthzHandler) assert.NotNil(t, manager) assert.NoError(t, err) @@ -158,7 +161,7 @@ func Test_GetNewManager_Error(t *testing.T) { mock := NewMock(ctrl, map[string]node.Node{}) mock.MockWorker.EXPECT().StartWorkerPool(gomock.Any()).Return(mockError) - manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions) + manager, err := NewNodeManager(zap.New(), nil, api.Wrapper{}, mock.MockWorker, mock.MockConditions, healthzHandler) assert.NotNil(t, manager) assert.Error(t, err, mockError) diff --git a/pkg/provider/branch/provider.go b/pkg/provider/branch/provider.go index 4e33f5dd..27833e62 100644 --- a/pkg/provider/branch/provider.go +++ b/pkg/provider/branch/provider.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "strconv" "sync" "time" @@ -25,16 +26,19 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/pool" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/branch/trunk" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/worker" + "github.com/google/uuid" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -94,6 +98,7 @@ type branchENIProvider struct { // apiWrapper apiWrapper api.Wrapper ctx context.Context + checker healthz.Checker } // NewBranchENIProvider returns the Branch ENI Provider for all nodes across the cluster @@ -102,13 +107,15 @@ func NewBranchENIProvider(logger logr.Logger, wrapper api.Wrapper, prometheusRegister() trunk.PrometheusRegister() - return &branchENIProvider{ + provider := &branchENIProvider{ apiWrapper: wrapper, log: logger, workerPool: worker, trunkENICache: make(map[string]trunk.TrunkENI), ctx: ctx, } + provider.checker = provider.check() + return provider } // prometheusRegister registers prometheus metrics @@ -486,3 +493,26 @@ func (b *branchENIProvider) IntrospectNode(nodeName string) interface{} { } return trunkENI.Introspect() } + +func (b *branchENIProvider) check() healthz.Checker { + b.log.Info("Branch provider's healthz subpath was added") + return func(req *http.Request) error { + err := rcHealthz.PingWithTimeout(func(c chan<- error) { + var ping interface{} + // check on job queue + b.SubmitAsyncJob(ping) + // check on trunk cache map + testNodeName := "test-node" + uuid.New().String() + trunk, found := b.getTrunkFromCache(testNodeName) + b.log.V(1).Info("healthz check vulnerable site on locks around trunk map", "TestTrunk", trunk, "FoundInCache", found) + b.log.V(1).Info("***** health check on branch ENI provider tested SubmitAsyncJob *****") + c <- nil + }, b.log) + + return err + } +} + +func (b *branchENIProvider) GetHealthChecker() healthz.Checker { + return b.checker +} diff --git a/pkg/provider/ip/provider.go b/pkg/provider/ip/provider.go index 095a37d4..95351e5e 100644 --- a/pkg/provider/ip/provider.go +++ b/pkg/provider/ip/provider.go @@ -15,12 +15,14 @@ package ip import ( "fmt" + "net/http" "sync" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/pool" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/ip/eni" @@ -28,6 +30,7 @@ import ( "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) type ipv4Provider struct { @@ -43,6 +46,8 @@ type ipv4Provider struct { lock sync.RWMutex // guards the following // instanceResources stores the ENIManager and the resource pool per instance instanceProviderAndPool map[string]ResourceProviderAndPool + // healthz check subpath + checker healthz.Checker } // InstanceResource contains the instance's ENI manager and the resource pool @@ -53,13 +58,15 @@ type ResourceProviderAndPool struct { func NewIPv4Provider(log logr.Logger, apiWrapper api.Wrapper, workerPool worker.Worker, resourceConfig config.ResourceConfig) provider.ResourceProvider { - return &ipv4Provider{ + provider := &ipv4Provider{ instanceProviderAndPool: make(map[string]ResourceProviderAndPool), config: resourceConfig.WarmPoolConfig, log: log, apiWrapper: apiWrapper, workerPool: workerPool, } + provider.checker = provider.check() + return provider } func (p *ipv4Provider) InitResource(instance ec2.EC2Instance) error { @@ -344,3 +351,21 @@ func (p *ipv4Provider) IntrospectNode(nodeName string) interface{} { } return resource.resourcePool.Introspect() } + +func (p *ipv4Provider) check() healthz.Checker { + p.log.Info("IPv4 provider's healthz subpath was added") + return func(req *http.Request) error { + err := rcHealthz.PingWithTimeout(func(c chan<- error) { + var ping interface{} + p.SubmitAsyncJob(ping) + p.log.V(1).Info("***** health check on IPv4 provider tested SubmitAsyncJob *****") + c <- nil + }, p.log) + + return err + } +} + +func (p *ipv4Provider) GetHealthChecker() healthz.Checker { + return p.checker +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index b989fdb9..ab45a5c0 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -17,6 +17,7 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/pool" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) // ResourceProvider is the provider interface that each resource managed by the controller has to implement @@ -39,4 +40,6 @@ type ResourceProvider interface { Introspect() interface{} // IntrospectNode allows introspection of a node for the given resource IntrospectNode(node string) interface{} + // GetHealthChecker provider a health check subpath for pinging provider + GetHealthChecker() healthz.Checker } diff --git a/pkg/resource/introspect.go b/pkg/resource/introspect.go index ebbf2592..eb7d9d14 100644 --- a/pkg/resource/introspect.go +++ b/pkg/resource/introspect.go @@ -18,8 +18,10 @@ import ( "encoding/json" "net/http" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" ) const ( @@ -93,6 +95,11 @@ func (i *IntrospectHandler) NodeResourceHandler(w http.ResponseWriter, r *http.R w.Write(jsonData) } -func (i *IntrospectHandler) SetupWithManager(mgr ctrl.Manager) error { +func (i *IntrospectHandler) SetupWithManager(mgr ctrl.Manager, healthzHanlder *rcHealthz.HealthzHandler) error { + // add health check on subpath for introspect controller + healthzHanlder.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-introspect-controller": rcHealthz.SimplePing("Introspect controller", i.Log)}, + ) + return mgr.Add(i) } diff --git a/pkg/resource/manager.go b/pkg/resource/manager.go index d2a2250b..e087f462 100644 --- a/pkg/resource/manager.go +++ b/pkg/resource/manager.go @@ -20,16 +20,26 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/handler" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/branch" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/ip" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/worker" + "github.com/go-logr/logr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" +) + +var ( + managerHealthCheckSubpath = "health-resource-manager" + branchProviderHealthCheckSubpath = "health-branch-provider" + ipv4ProviderHealthCheckSubpath = "health-ipv4-provider" ) type Manager struct { resource map[string]Resource + log logr.Logger } type Resource struct { @@ -42,12 +52,17 @@ type ResourceManager interface { GetResourceHandler(resourceName string) (handler.Handler, bool) } -func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api.Wrapper) (ResourceManager, error) { +func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api.Wrapper, log logr.Logger, healthzHandler *rcHealthz.HealthzHandler) (ResourceManager, error) { // Load that static configuration of the resource resourceConfig := config.LoadResourceConfig() resources := make(map[string]Resource) + healthCheckers := make(map[string]healthz.Checker) + + // add manager subpath into health checker map first + healthCheckers[managerHealthCheckSubpath] = rcHealthz.SimplePing("resource manager", log) + // For each supported resource, initialize the resource provider and handler for _, resourceName := range resourceNames { @@ -56,14 +71,14 @@ func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api return nil, fmt.Errorf("failed to find resource configuration %s", resourceName) } - ctrl.Log.Info("initializing resource", "resource name", + log.Info("initializing resource", "resource name", resourceName, "resource count", resourceConfig.WorkerCount) workers := worker.NewDefaultWorkerPool( resourceConfig.Name, resourceConfig.WorkerCount, config.WorkQueueDefaultMaxRetries, - ctrl.Log.WithName(fmt.Sprintf("%s-%s", resourceName, "worker")), ctx) + log.WithName(fmt.Sprintf("%s-%s", resourceName, "worker")), ctx) var resourceHandler handler.Handler var resourceProvider provider.ResourceProvider @@ -71,11 +86,13 @@ func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api if resourceName == config.ResourceNameIPAddress { resourceProvider = ip.NewIPv4Provider(ctrl.Log.WithName("ipv4 provider"), wrapper, workers, resourceConfig) + healthCheckers[ipv4ProviderHealthCheckSubpath] = resourceProvider.GetHealthChecker() resourceHandler = handler.NewWarmResourceHandler(ctrl.Log.WithName(resourceName), wrapper, resourceName, resourceProvider, ctx) } else if resourceName == config.ResourceNamePodENI { resourceProvider = branch.NewBranchENIProvider(ctrl.Log.WithName("branch eni provider"), wrapper, workers, resourceConfig, ctx) + healthCheckers[branchProviderHealthCheckSubpath] = resourceProvider.GetHealthChecker() resourceHandler = handler.NewOnDemandHandler(ctrl.Log.WithName(resourceName), resourceName, resourceProvider) } else { @@ -92,12 +109,16 @@ func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api ResourceProvider: resourceProvider, } - ctrl.Log.Info("successfully initialized resource handler and provider", + log.Info("successfully initialized resource handler and provider", "resource name", resourceName) } + // add health check on subpath for resource manager which includes providers as well + healthzHandler.AddControllersHealthCheckers(healthCheckers) + return &Manager{ resource: resources, + log: log, }, nil } diff --git a/pkg/resource/manager_test.go b/pkg/resource/manager_test.go index 50857e97..acdaa598 100644 --- a/pkg/resource/manager_test.go +++ b/pkg/resource/manager_test.go @@ -17,11 +17,13 @@ import ( "context" "testing" - "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/handler" + mock_handler "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/handler" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/log/zap" ) type Mock struct { @@ -29,6 +31,8 @@ type Mock struct { Wrapper api.Wrapper } +var healthzHandler = healthz.NewHealthzHandler(5) + func NewMock(controller *gomock.Controller) Mock { return Mock{ Handler: mock_handler.NewMockHandler(controller), @@ -43,7 +47,7 @@ func Test_NewResourceManager(t *testing.T) { mock := NewMock(ctrl) resources := []string{config.ResourceNamePodENI, config.ResourceNameIPAddress} - manger, err := NewResourceManager(context.TODO(), resources, mock.Wrapper) + manger, err := NewResourceManager(context.TODO(), resources, mock.Wrapper, zap.New(zap.UseDevMode(true)), healthzHandler) assert.NoError(t, err) _, ok := manger.GetResourceHandler(config.ResourceNamePodENI) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index d4809d2e..66de88ca 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -126,6 +126,12 @@ func (w *worker) SetWorkerFunc(workerFunc func(interface{}) (ctrl.Result, error) // SubmitJob adds the job to the rate limited queue func (w *worker) SubmitJob(job interface{}) { + // in theory, only health check endpoint should send a nil job to test periodically + if job == nil { + queueLen := w.queue.Len() + w.Log.V(1).Info("For informational / health check purpose only to check worker queue availability", "WorkerQueueLen", queueLen) + return + } w.queue.Add(job) jobsSubmittedCount.WithLabelValues(w.resourceName).Inc() } diff --git a/webhooks/core/annotation_validation_webhook.go b/webhooks/core/annotation_validation_webhook.go index cfb27354..5e160ee3 100644 --- a/webhooks/core/annotation_validation_webhook.go +++ b/webhooks/core/annotation_validation_webhook.go @@ -20,10 +20,12 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/go-logr/logr" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) @@ -35,6 +37,23 @@ type AnnotationValidator struct { decoder *admission.Decoder Condition condition.Conditions Log logr.Logger + Checker healthz.Checker +} + +func NewAnnotationValidator(condition condition.Conditions, log logr.Logger, healthzHandler *rcHealthz.HealthzHandler) *AnnotationValidator { + annotationValidator := &AnnotationValidator{ + Condition: condition, + Log: log, + } + + // add health check on subpath for pod annotation validating webhook + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{ + "health-annotation-validating-webhook": rcHealthz.SimplePing("pod annotation validating webhook", log), + }, + ) + + return annotationValidator } // We are allowing multiple usernames to annotate the Windows/SGP Pod, eventually we will diff --git a/webhooks/core/node_update_webhook.go b/webhooks/core/node_update_webhook.go index 1f743ba5..c3e52f05 100644 --- a/webhooks/core/node_update_webhook.go +++ b/webhooks/core/node_update_webhook.go @@ -7,8 +7,10 @@ import ( "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) @@ -16,6 +18,23 @@ type NodeUpdateWebhook struct { decoder *admission.Decoder Condition condition.Conditions Log logr.Logger + Checker healthz.Checker +} + +func NewNodeUpdateWebhook(condition condition.Conditions, log logr.Logger, healthzHandler *rcHealthz.HealthzHandler) *NodeUpdateWebhook { + nodeUpdateWebhook := &NodeUpdateWebhook{ + Condition: condition, + Log: log, + } + + // add health check on subpath for node validation webhook + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{ + "health-node-validating-webhook": rcHealthz.SimplePing("node validating webhook", log), + }, + ) + + return nodeUpdateWebhook } const awsNodeUsername = "system:serviceaccount:kube-system:aws-node" diff --git a/webhooks/core/pod_webhook.go b/webhooks/core/pod_webhook.go index 6cfa0474..b1eb0cc8 100644 --- a/webhooks/core/pod_webhook.go +++ b/webhooks/core/pod_webhook.go @@ -22,10 +22,12 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" + rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils" ) @@ -45,6 +47,25 @@ type PodMutationWebHook struct { Condition condition.Conditions } +func NewPodMutationWebHook( + sgpAPI utils.SecurityGroupForPodsAPI, + log logr.Logger, + condition condition.Conditions, + healthzHandler *rcHealthz.HealthzHandler, +) *PodMutationWebHook { + podWebhook := &PodMutationWebHook{ + SGPAPI: sgpAPI, + Log: log, + Condition: condition, + } + // add health check on subpath for pod mutation webhook + healthzHandler.AddControllersHealthCheckers( + map[string]healthz.Checker{"health-pod-mutating-webhook": rcHealthz.SimplePing("pod mutating webhook", log)}, + ) + + return podWebhook +} + type PodType string var ( diff --git a/webhooks/core/pod_webhook_test.go b/webhooks/core/pod_webhook_test.go index 5549af41..ec4c4fe1 100644 --- a/webhooks/core/pod_webhook_test.go +++ b/webhooks/core/pod_webhook_test.go @@ -20,8 +20,8 @@ import ( "strings" "testing" - "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition" - "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/utils" + mock_condition "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition" + mock_utils "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/utils" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" "github.com/golang/mock/gomock"