From b7f185eacc26f04524b501de7afe22636f541263 Mon Sep 17 00:00:00 2001 From: Sunyanan Choochotkaew Date: Tue, 9 Apr 2024 17:58:50 +0900 Subject: [PATCH] add namespace watcher Signed-off-by: Sunyanan Choochotkaew --- controllers/multinicnetwork_controller.go | 36 +++++++++++--- controllers/multinicnetwork_handler.go | 36 ++++++++++++-- controllers/namespace_watcher.go | 59 ++++++++++++++++++++++ controllers/vars/vars.go | 2 + main.go | 41 +++++++++------- plugin/net_attach_def.go | 60 +++++++++++++++-------- unit-test/multinicnetwork_test.go | 20 ++++++++ 7 files changed, 203 insertions(+), 51 deletions(-) create mode 100644 controllers/namespace_watcher.go diff --git a/controllers/multinicnetwork_controller.go b/controllers/multinicnetwork_controller.go index 65a9d761..ea7eee51 100644 --- a/controllers/multinicnetwork_controller.go +++ b/controllers/multinicnetwork_controller.go @@ -132,8 +132,6 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ instance.Status.DiscoverStatus.ExistDaemon = daemonSize instance.Status.InterfaceInfoAvailable = infoAvailableSize - // Get main plugin - mainPlugin, annotations, err := r.GetMainPluginConf(instance) multinicnetworkName := instance.GetName() if err != nil { message := fmt.Sprintf("Failed to get main config %s: %v", multinicnetworkName, err) @@ -143,10 +141,7 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ } vars.NetworkLog.V(2).Info(message) } else { - mainPlugin = plugin.RemoveEmpty(instance.Spec.MainPlugin.CNIArgs, mainPlugin) - vars.NetworkLog.V(2).Info(fmt.Sprintf("main plugin: %s", mainPlugin)) - // Create net attach def - err = r.NetAttachDefHandler.CreateOrUpdate(instance, mainPlugin, annotations) + err = r.GenerateNetAttachDef(instance) if err != nil { message := fmt.Sprintf("Failed to create %s: %v", multinicnetworkName, err) err = r.CIDRHandler.MultiNicNetworkHandler.UpdateNetConfigStatus(instance, multinicv1.ConfigFailed, message) @@ -204,6 +199,18 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } +func (r *MultiNicNetworkReconciler) GenerateNetAttachDef(instance *multinicv1.MultiNicNetwork) error { + // Get main plugin + mainPlugin, annotations, err := r.GetMainPluginConf(instance) + if err == nil { + mainPlugin = plugin.RemoveEmpty(instance.Spec.MainPlugin.CNIArgs, mainPlugin) + vars.NetworkLog.V(2).Info(fmt.Sprintf("main plugin: %s", mainPlugin)) + // Create net attach def + err = r.NetAttachDefHandler.CreateOrUpdate(instance, mainPlugin, annotations) + } + return err +} + func (r *MultiNicNetworkReconciler) GetMainPluginConf(instance *multinicv1.MultiNicNetwork) (string, map[string]string, error) { spec := instance.Spec.MainPlugin if p, exist := r.PluginMap[spec.Type]; exist { @@ -291,3 +298,20 @@ func (r *MultiNicNetworkReconciler) callFinalizer(reqLogger logr.Logger, instanc r.CIDRHandler.MultiNicNetworkHandler.SafeCache.UnsetCache(instance.Name) return nil } + +// HandleNewNamespace handles new namespace +// - generate NAD +func (r *MultiNicNetworkReconciler) HandleNewNamespace(namespace string) { + multinicnetworks := r.CIDRHandler.MultiNicNetworkHandler.FilterNetworksByNamespace(namespace) + for _, net := range multinicnetworks { + // Get main plugin + mainPlugin, annotations, err := r.GetMainPluginConf(&net) + if err == nil { + mainPlugin = plugin.RemoveEmpty(net.Spec.MainPlugin.CNIArgs, mainPlugin) + err = r.NetAttachDefHandler.CreateOrUpdateOnNamespace(namespace, &net, mainPlugin, annotations) + } + if err != nil { + vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to create networkAttachementDefinition for %s on %s: %v", net.Name, namespace, err)) + } + } +} diff --git a/controllers/multinicnetwork_handler.go b/controllers/multinicnetwork_handler.go index f8a80aa9..fcb2d67c 100644 --- a/controllers/multinicnetwork_handler.go +++ b/controllers/multinicnetwork_handler.go @@ -115,6 +115,7 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo if !NetStatusUpdated(instance, netStatus) { vars.NetworkLog.V(2).Info(fmt.Sprintf("No status update %s", instance.Name)) + h.SetCache(instance.Name, *instance) return netStatus, nil } @@ -126,7 +127,7 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo if err != nil { vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err)) } else { - h.SetStatusCache(instance.Name, instance.Status) + h.SetCache(instance.Name, *instance) } return netStatus, err } @@ -171,12 +172,12 @@ func (h *MultiNicNetworkHandler) UpdateNetConfigStatus(instance *multinicv1.Mult if err != nil { vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err)) } else { - h.SetStatusCache(instance.Name, instance.Status) + h.SetCache(instance.Name, *instance) } return err } -func (h *MultiNicNetworkHandler) SetStatusCache(key string, value multinicv1.MultiNicNetworkStatus) { +func (h *MultiNicNetworkHandler) SetCache(key string, value multinicv1.MultiNicNetwork) { h.SafeCache.SetCache(key, value) } @@ -185,15 +186,40 @@ func (h *MultiNicNetworkHandler) GetStatusCache(key string) (multinicv1.MultiNic if value == nil { return multinicv1.MultiNicNetworkStatus{}, fmt.Errorf(vars.NotFoundError) } - return value.(multinicv1.MultiNicNetworkStatus), nil + return value.(multinicv1.MultiNicNetwork).Status, nil } func (h *MultiNicNetworkHandler) ListStatusCache() map[string]multinicv1.MultiNicNetworkStatus { snapshot := make(map[string]multinicv1.MultiNicNetworkStatus) h.SafeCache.Lock() for key, value := range h.cache { - snapshot[key] = value.(multinicv1.MultiNicNetworkStatus) + snapshot[key] = value.(multinicv1.MultiNicNetwork).Status } h.SafeCache.Unlock() return snapshot } + +func (h *MultiNicNetworkHandler) FilterNetworksByNamespace(target string) []multinicv1.MultiNicNetwork { + filteredNetworks := []multinicv1.MultiNicNetwork{} + h.SafeCache.Lock() + for _, value := range h.cache { + net := value.(multinicv1.MultiNicNetwork) + namespaces := net.Spec.Namespaces + if len(namespaces) == 0 { + vars.NetworkLog.V(2).Info(fmt.Sprintf("FilterNetworksByNamespace %s has no namespace set", net.Name)) + // NOTE: if namespace list is not specified, append + filteredNetworks = append(filteredNetworks, net) + } else { + vars.NetworkLog.V(2).Info(fmt.Sprintf("FilterNetworksByNamespace %s set %v", net.Name, namespaces)) + // NOTE: if namespace list is specified and target is in the list, append + for _, ns := range namespaces { + if ns == target { + filteredNetworks = append(filteredNetworks, net) + break + } + } + } + } + h.SafeCache.Unlock() + return filteredNetworks +} diff --git a/controllers/namespace_watcher.go b/controllers/namespace_watcher.go new file mode 100644 index 00000000..fc862f37 --- /dev/null +++ b/controllers/namespace_watcher.go @@ -0,0 +1,59 @@ +/* + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache2.0 + */ + +package controllers + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// NamespaceWatcher watches new namespace and generate net-attach-def +type NamespaceWatcher struct { + *kubernetes.Clientset + NamespaceQueue chan string + Quit chan struct{} + *MultiNicNetworkReconciler +} + +// NewNamespaceWatcher creates new namespace watcher +func NewNamespaceWatcher(client client.Client, config *rest.Config, multinicnetworkReconciler *MultiNicNetworkReconciler, quit chan struct{}) *NamespaceWatcher { + clientset, _ := kubernetes.NewForConfig(config) + watcher := &NamespaceWatcher{ + Clientset: clientset, + NamespaceQueue: make(chan string), + Quit: quit, + MultiNicNetworkReconciler: multinicnetworkReconciler, + } + factory := informers.NewSharedInformerFactory(clientset, 0) + nsInformer := factory.Core().V1().Namespaces() + + nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if ns, ok := obj.(*v1.Namespace); ok { + watcher.NamespaceQueue <- ns.Name + } + }, + }) + factory.Start(watcher.Quit) + + return watcher +} + +// Run executes namespace watcher routine until get quit signal +func (w *NamespaceWatcher) Run() { + defer close(w.NamespaceQueue) + wait.Until(w.ProcessNamespaceQueue, 0, w.Quit) +} + +func (w *NamespaceWatcher) ProcessNamespaceQueue() { + ns := <-w.NamespaceQueue + w.MultiNicNetworkReconciler.HandleNewNamespace(ns) +} diff --git a/controllers/vars/vars.go b/controllers/vars/vars.go index 3079a5a0..93d64f4e 100644 --- a/controllers/vars/vars.go +++ b/controllers/vars/vars.go @@ -77,6 +77,7 @@ var ( // logger options to change log level on the fly ZapOpts *zap.Options + SetupLog logr.Logger DaemonLog logr.Logger DefLog logr.Logger CIDRLog logr.Logger @@ -103,6 +104,7 @@ func SetLog() { zapp := zap.New(zap.UseFlagOptions(ZapOpts)) dlog := logf.NewDelegatingLogSink(zapp.GetSink()) ctrl.Log = logr.New(dlog) + SetupLog = ctrl.Log.WithName("setup") DaemonLog = ctrl.Log.WithName("controllers").WithName("Daemon") DefLog = ctrl.Log.WithName("controllers").WithName("NetAttachDef") CIDRLog = ctrl.Log.WithName("controllers").WithName("CIDR") diff --git a/main.go b/main.go index fcba61f2..c8f90c04 100644 --- a/main.go +++ b/main.go @@ -37,7 +37,6 @@ import ( var ( scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") MultiNicNetworkReconcilerPointer *controllers.MultiNicNetworkReconciler ) @@ -75,7 +74,7 @@ func main() { // Become the leader before proceeding err := leader.Become(context.TODO(), "5aaf67fd.fms.io") if err != nil { - setupLog.Error(err, "cannot become leader") + vars.SetupLog.Error(err, "cannot become leader") os.Exit(1) } } @@ -92,7 +91,7 @@ func main() { }) if err != nil { - setupLog.Error(err, "unable to start manager") + vars.SetupLog.Error(err, "unable to start manager") os.Exit(1) } @@ -107,13 +106,13 @@ func main() { defHandler, err := plugin.GetNetAttachDefHandler(config) if err != nil { - setupLog.Error(err, "unable to create NetworkAttachmentdefinition handler") + vars.SetupLog.Error(err, "unable to create NetworkAttachmentdefinition handler") os.Exit(1) } clientset, err := kubernetes.NewForConfig(config) if err != nil { - setupLog.Error(err, "unable to init clientset") + vars.SetupLog.Error(err, "unable to init clientset") os.Exit(1) } @@ -121,14 +120,12 @@ func main() { go cidrHandler.Run() pluginMap := controllers.GetPluginMap(config) - setupLog.V(2).Info(fmt.Sprintf("Plugin Map: %v", pluginMap)) + vars.SetupLog.V(2).Info(fmt.Sprintf("Plugin Map: %v", pluginMap)) podQueue := make(chan *v1.Pod, vars.MaxQueueSize) - setupLog.V(7).Info("New Daemon Watcher") daemonWatcher := controllers.NewDaemonWatcher(mgr.GetClient(), config, hostInterfaceHandler, daemonCacheHandler, podQueue, quit) - setupLog.V(7).Info("Run Daemon Watcher") + vars.SetupLog.V(1).Info("Run Daemon Watcher") go daemonWatcher.Run() - setupLog.V(7).Info("New Reconcilers") cidrReconciler := &controllers.CIDRReconciler{ Client: mgr.GetClient(), @@ -137,7 +134,7 @@ func main() { DaemonWatcher: daemonWatcher, } if err = (cidrReconciler).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "CIDR") + vars.SetupLog.Error(err, "unable to create controller", "controller", "CIDR") os.Exit(1) } hostInterfaceReconciler := &controllers.HostInterfaceReconciler{ @@ -148,7 +145,7 @@ func main() { CIDRHandler: cidrHandler, } if err = (hostInterfaceReconciler).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "HostInterface") + vars.SetupLog.Error(err, "unable to create controller", "controller", "HostInterface") os.Exit(1) } @@ -158,9 +155,10 @@ func main() { CIDRHandler: cidrHandler, } if err = (ippoolReconciler).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "IPPool") + vars.SetupLog.Error(err, "unable to create controller", "controller", "IPPool") os.Exit(1) } + MultiNicNetworkReconcilerPointer = &controllers.MultiNicNetworkReconciler{ Client: mgr.GetClient(), NetAttachDefHandler: defHandler, @@ -169,9 +167,14 @@ func main() { PluginMap: pluginMap, } if err = (MultiNicNetworkReconcilerPointer).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "MultiNicNetwork") + vars.SetupLog.Error(err, "unable to create controller", "controller", "MultiNicNetwork") os.Exit(1) } + + namespaceWatcher := controllers.NewNamespaceWatcher(mgr.GetClient(), config, MultiNicNetworkReconcilerPointer, quit) + vars.SetupLog.V(1).Info("Run Namespace Watcher") + go namespaceWatcher.Run() + cfgReconciler := &controllers.ConfigReconciler{ Client: mgr.GetClient(), Clientset: clientset, @@ -181,21 +184,21 @@ func main() { Scheme: mgr.GetScheme(), } if err = (cfgReconciler).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Config") + vars.SetupLog.Error(err, "unable to create controller", "controller", "Config") os.Exit(1) } err = cfgReconciler.CreateDefaultDaemonConfig() if err != nil { - setupLog.Info(fmt.Sprintf("fail to create default config: %v", err)) + vars.SetupLog.Info(fmt.Sprintf("fail to create default config: %v", err)) } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") + vars.SetupLog.Error(err, "unable to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") + vars.SetupLog.Error(err, "unable to set up ready check") os.Exit(1) } @@ -204,9 +207,9 @@ func main() { controllers.RunPeriodicUpdate(ticker, daemonWatcher, cidrHandler, hostInterfaceReconciler, quit) - setupLog.V(7).Info("starting manager") + vars.SetupLog.V(7).Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - setupLog.Error(err, "problem running manager") + vars.SetupLog.Error(err, "problem running manager") os.Exit(1) } } diff --git a/plugin/net_attach_def.go b/plugin/net_attach_def.go index e877647e..15731d19 100644 --- a/plugin/net_attach_def.go +++ b/plugin/net_attach_def.go @@ -120,7 +120,6 @@ func CheckDefChanged(def, existingDef *NetworkAttachmentDefinition) bool { return false } -// CreateOrUpdate creates new NetworkAttachmentDefinition resource if not exists, otherwise update func (h *NetAttachDefHandler) CreateOrUpdate(net *multinicv1.MultiNicNetwork, pluginStr string, annotations map[string]string) error { defs, err := h.generate(net, pluginStr, annotations) if err != nil { @@ -128,32 +127,51 @@ func (h *NetAttachDefHandler) CreateOrUpdate(net *multinicv1.MultiNicNetwork, pl } errMsg := "" for _, def := range defs { - name := def.GetName() - namespace := def.GetNamespace() - result := &NetworkAttachmentDefinition{} - if h.IsExist(name, namespace) { - existingDef, _ := h.Get(name, namespace) - def.ObjectMeta = existingDef.ObjectMeta - if CheckDefChanged(def, existingDef) { - if namespace == "default" { - vars.NetworkLog.V(2).Info(fmt.Sprintf("Update net-attach-def %s", def.Name)) - } - err := h.DynamicHandler.Update(namespace, def, result) - if err != nil { - errMsg = fmt.Sprintf("%s\n%s: %v", errMsg, namespace, err) - } + errMsg = h.createOrUpdate(def, errMsg) + } + if errMsg != "" { + vars.NetworkLog.V(2).Info(errMsg) + } + return nil +} + +func (h *NetAttachDefHandler) CreateOrUpdateOnNamespace(ns string, net *multinicv1.MultiNicNetwork, pluginStr string, annotations map[string]string) error { + def, err := NetToDef(ns, net, pluginStr, annotations) + if err != nil { + return err + } + errMsg := h.createOrUpdate(def, "") + if errMsg != "" { + vars.NetworkLog.V(2).Info(errMsg) + return fmt.Errorf(errMsg) + } + return nil +} + +// createOrUpdate creates new NetworkAttachmentDefinition resource if not exists, otherwise update +func (h *NetAttachDefHandler) createOrUpdate(def *NetworkAttachmentDefinition, errMsg string) string { + name := def.GetName() + namespace := def.GetNamespace() + result := &NetworkAttachmentDefinition{} + if h.IsExist(name, namespace) { + existingDef, _ := h.Get(name, namespace) + def.ObjectMeta = existingDef.ObjectMeta + if CheckDefChanged(def, existingDef) { + if namespace == "default" { + vars.NetworkLog.V(2).Info(fmt.Sprintf("Update net-attach-def %s", def.Name)) } - } else { - err := h.DynamicHandler.Create(namespace, def, result) + err := h.DynamicHandler.Update(namespace, def, result) if err != nil { errMsg = fmt.Sprintf("%s\n%s: %v", errMsg, namespace, err) } } + } else { + err := h.DynamicHandler.Create(namespace, def, result) + if err != nil { + errMsg = fmt.Sprintf("%s\n%s: %v", errMsg, namespace, err) + } } - if errMsg != "" { - vars.NetworkLog.V(2).Info(errMsg) - } - return nil + return errMsg } // getNamespace returns all available namespaces if .Spec.Namespaces not specified diff --git a/unit-test/multinicnetwork_test.go b/unit-test/multinicnetwork_test.go index 9f860a3d..0937d8dc 100644 --- a/unit-test/multinicnetwork_test.go +++ b/unit-test/multinicnetwork_test.go @@ -6,6 +6,7 @@ package controllers import ( + "context" "fmt" multinicv1 "github.com/foundation-model-stack/multi-nic-cni/api/v1" @@ -13,10 +14,15 @@ import ( "github.com/foundation-model-stack/multi-nic-cni/plugin" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" //+kubebuilder:scaffold:imports ) +const ( + newNamespaceName = "new-namespace" +) + var _ = Describe("Test deploying MultiNicNetwork", func() { cniVersion := "0.3.0" cniType := "ipvlan" @@ -36,6 +42,20 @@ var _ = Describe("Test deploying MultiNicNetwork", func() { err = multinicnetworkReconciler.NetAttachDefHandler.DeleteNets(multinicnetwork) Expect(err).NotTo(HaveOccurred()) }) + It("successfully create/delete network attachment definition on new namespace", func() { + newNamespace := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: newNamespaceName, + }, + } + mainPlugin, annotations, err := multinicnetworkReconciler.GetMainPluginConf(multinicnetwork) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient.Create(context.TODO(), &newNamespace)).Should(Succeed()) + err = multinicnetworkReconciler.NetAttachDefHandler.CreateOrUpdateOnNamespace(newNamespaceName, multinicnetwork, mainPlugin, annotations) + Expect(err).NotTo(HaveOccurred()) + err = multinicnetworkReconciler.NetAttachDefHandler.Delete(multinicnetwork.Name, newNamespaceName) + Expect(err).NotTo(HaveOccurred()) + }) }) var _ = Describe("Test definition changes check", func() {