Skip to content

Commit

Permalink
add namespace watcher
Browse files Browse the repository at this point in the history
Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
  • Loading branch information
sunya-ch committed Apr 9, 2024
1 parent ba1ac70 commit b7f185e
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 51 deletions.
36 changes: 30 additions & 6 deletions controllers/multinicnetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ
instance.Status.DiscoverStatus.ExistDaemon = daemonSize
instance.Status.InterfaceInfoAvailable = infoAvailableSize

// Get main plugin
mainPlugin, annotations, err := r.GetMainPluginConf(instance)
multinicnetworkName := instance.GetName()
if err != nil {
message := fmt.Sprintf("Failed to get main config %s: %v", multinicnetworkName, err)
Expand All @@ -143,10 +141,7 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
vars.NetworkLog.V(2).Info(message)
} else {
mainPlugin = plugin.RemoveEmpty(instance.Spec.MainPlugin.CNIArgs, mainPlugin)
vars.NetworkLog.V(2).Info(fmt.Sprintf("main plugin: %s", mainPlugin))
// Create net attach def
err = r.NetAttachDefHandler.CreateOrUpdate(instance, mainPlugin, annotations)
err = r.GenerateNetAttachDef(instance)
if err != nil {
message := fmt.Sprintf("Failed to create %s: %v", multinicnetworkName, err)
err = r.CIDRHandler.MultiNicNetworkHandler.UpdateNetConfigStatus(instance, multinicv1.ConfigFailed, message)
Expand Down Expand Up @@ -204,6 +199,18 @@ func (r *MultiNicNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

func (r *MultiNicNetworkReconciler) GenerateNetAttachDef(instance *multinicv1.MultiNicNetwork) error {
// Get main plugin
mainPlugin, annotations, err := r.GetMainPluginConf(instance)
if err == nil {
mainPlugin = plugin.RemoveEmpty(instance.Spec.MainPlugin.CNIArgs, mainPlugin)
vars.NetworkLog.V(2).Info(fmt.Sprintf("main plugin: %s", mainPlugin))
// Create net attach def
err = r.NetAttachDefHandler.CreateOrUpdate(instance, mainPlugin, annotations)
}
return err
}

func (r *MultiNicNetworkReconciler) GetMainPluginConf(instance *multinicv1.MultiNicNetwork) (string, map[string]string, error) {
spec := instance.Spec.MainPlugin
if p, exist := r.PluginMap[spec.Type]; exist {
Expand Down Expand Up @@ -291,3 +298,20 @@ func (r *MultiNicNetworkReconciler) callFinalizer(reqLogger logr.Logger, instanc
r.CIDRHandler.MultiNicNetworkHandler.SafeCache.UnsetCache(instance.Name)
return nil
}

// HandleNewNamespace handles new namespace
// - generate NAD
func (r *MultiNicNetworkReconciler) HandleNewNamespace(namespace string) {
multinicnetworks := r.CIDRHandler.MultiNicNetworkHandler.FilterNetworksByNamespace(namespace)
for _, net := range multinicnetworks {
// Get main plugin
mainPlugin, annotations, err := r.GetMainPluginConf(&net)
if err == nil {
mainPlugin = plugin.RemoveEmpty(net.Spec.MainPlugin.CNIArgs, mainPlugin)
err = r.NetAttachDefHandler.CreateOrUpdateOnNamespace(namespace, &net, mainPlugin, annotations)
}
if err != nil {
vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to create networkAttachementDefinition for %s on %s: %v", net.Name, namespace, err))
}
}
}
36 changes: 31 additions & 5 deletions controllers/multinicnetwork_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo

if !NetStatusUpdated(instance, netStatus) {
vars.NetworkLog.V(2).Info(fmt.Sprintf("No status update %s", instance.Name))
h.SetCache(instance.Name, *instance)
return netStatus, nil
}

Expand All @@ -126,7 +127,7 @@ func (h *MultiNicNetworkHandler) updateStatus(instance *multinicv1.MultiNicNetwo
if err != nil {
vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err))
} else {
h.SetStatusCache(instance.Name, instance.Status)
h.SetCache(instance.Name, *instance)
}
return netStatus, err
}
Expand Down Expand Up @@ -171,12 +172,12 @@ func (h *MultiNicNetworkHandler) UpdateNetConfigStatus(instance *multinicv1.Mult
if err != nil {
vars.NetworkLog.V(2).Info(fmt.Sprintf("Failed to update %s status: %v", instance.Name, err))
} else {
h.SetStatusCache(instance.Name, instance.Status)
h.SetCache(instance.Name, *instance)
}
return err
}

func (h *MultiNicNetworkHandler) SetStatusCache(key string, value multinicv1.MultiNicNetworkStatus) {
func (h *MultiNicNetworkHandler) SetCache(key string, value multinicv1.MultiNicNetwork) {
h.SafeCache.SetCache(key, value)
}

Expand All @@ -185,15 +186,40 @@ func (h *MultiNicNetworkHandler) GetStatusCache(key string) (multinicv1.MultiNic
if value == nil {
return multinicv1.MultiNicNetworkStatus{}, fmt.Errorf(vars.NotFoundError)
}
return value.(multinicv1.MultiNicNetworkStatus), nil
return value.(multinicv1.MultiNicNetwork).Status, nil
}

func (h *MultiNicNetworkHandler) ListStatusCache() map[string]multinicv1.MultiNicNetworkStatus {
snapshot := make(map[string]multinicv1.MultiNicNetworkStatus)
h.SafeCache.Lock()
for key, value := range h.cache {
snapshot[key] = value.(multinicv1.MultiNicNetworkStatus)
snapshot[key] = value.(multinicv1.MultiNicNetwork).Status
}
h.SafeCache.Unlock()
return snapshot
}

func (h *MultiNicNetworkHandler) FilterNetworksByNamespace(target string) []multinicv1.MultiNicNetwork {
filteredNetworks := []multinicv1.MultiNicNetwork{}
h.SafeCache.Lock()
for _, value := range h.cache {
net := value.(multinicv1.MultiNicNetwork)
namespaces := net.Spec.Namespaces
if len(namespaces) == 0 {
vars.NetworkLog.V(2).Info(fmt.Sprintf("FilterNetworksByNamespace %s has no namespace set", net.Name))
// NOTE: if namespace list is not specified, append
filteredNetworks = append(filteredNetworks, net)
} else {
vars.NetworkLog.V(2).Info(fmt.Sprintf("FilterNetworksByNamespace %s set %v", net.Name, namespaces))
// NOTE: if namespace list is specified and target is in the list, append
for _, ns := range namespaces {
if ns == target {
filteredNetworks = append(filteredNetworks, net)
break
}
}
}
}
h.SafeCache.Unlock()
return filteredNetworks
}
59 changes: 59 additions & 0 deletions controllers/namespace_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache2.0
*/

package controllers

import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NamespaceWatcher watches new namespace and generate net-attach-def
type NamespaceWatcher struct {
*kubernetes.Clientset
NamespaceQueue chan string
Quit chan struct{}
*MultiNicNetworkReconciler
}

// NewNamespaceWatcher creates new namespace watcher
func NewNamespaceWatcher(client client.Client, config *rest.Config, multinicnetworkReconciler *MultiNicNetworkReconciler, quit chan struct{}) *NamespaceWatcher {
clientset, _ := kubernetes.NewForConfig(config)
watcher := &NamespaceWatcher{
Clientset: clientset,
NamespaceQueue: make(chan string),
Quit: quit,
MultiNicNetworkReconciler: multinicnetworkReconciler,
}
factory := informers.NewSharedInformerFactory(clientset, 0)
nsInformer := factory.Core().V1().Namespaces()

nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if ns, ok := obj.(*v1.Namespace); ok {
watcher.NamespaceQueue <- ns.Name
}
},
})
factory.Start(watcher.Quit)

return watcher
}

// Run executes namespace watcher routine until get quit signal
func (w *NamespaceWatcher) Run() {
defer close(w.NamespaceQueue)
wait.Until(w.ProcessNamespaceQueue, 0, w.Quit)
}

func (w *NamespaceWatcher) ProcessNamespaceQueue() {
ns := <-w.NamespaceQueue
w.MultiNicNetworkReconciler.HandleNewNamespace(ns)
}
2 changes: 2 additions & 0 deletions controllers/vars/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var (

// logger options to change log level on the fly
ZapOpts *zap.Options
SetupLog logr.Logger
DaemonLog logr.Logger
DefLog logr.Logger
CIDRLog logr.Logger
Expand All @@ -103,6 +104,7 @@ func SetLog() {
zapp := zap.New(zap.UseFlagOptions(ZapOpts))
dlog := logf.NewDelegatingLogSink(zapp.GetSink())
ctrl.Log = logr.New(dlog)
SetupLog = ctrl.Log.WithName("setup")
DaemonLog = ctrl.Log.WithName("controllers").WithName("Daemon")
DefLog = ctrl.Log.WithName("controllers").WithName("NetAttachDef")
CIDRLog = ctrl.Log.WithName("controllers").WithName("CIDR")
Expand Down
41 changes: 22 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
MultiNicNetworkReconcilerPointer *controllers.MultiNicNetworkReconciler
)

Expand Down Expand Up @@ -75,7 +74,7 @@ func main() {
// Become the leader before proceeding
err := leader.Become(context.TODO(), "5aaf67fd.fms.io")
if err != nil {
setupLog.Error(err, "cannot become leader")
vars.SetupLog.Error(err, "cannot become leader")
os.Exit(1)
}
}
Expand All @@ -92,7 +91,7 @@ func main() {
})

if err != nil {
setupLog.Error(err, "unable to start manager")
vars.SetupLog.Error(err, "unable to start manager")
os.Exit(1)
}

Expand All @@ -107,28 +106,26 @@ func main() {

defHandler, err := plugin.GetNetAttachDefHandler(config)
if err != nil {
setupLog.Error(err, "unable to create NetworkAttachmentdefinition handler")
vars.SetupLog.Error(err, "unable to create NetworkAttachmentdefinition handler")
os.Exit(1)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
setupLog.Error(err, "unable to init clientset")
vars.SetupLog.Error(err, "unable to init clientset")
os.Exit(1)
}

cidrHandler := controllers.NewCIDRHandler(mgr.GetClient(), config, hostInterfaceHandler, daemonCacheHandler, quit)
go cidrHandler.Run()

pluginMap := controllers.GetPluginMap(config)
setupLog.V(2).Info(fmt.Sprintf("Plugin Map: %v", pluginMap))
vars.SetupLog.V(2).Info(fmt.Sprintf("Plugin Map: %v", pluginMap))

podQueue := make(chan *v1.Pod, vars.MaxQueueSize)
setupLog.V(7).Info("New Daemon Watcher")
daemonWatcher := controllers.NewDaemonWatcher(mgr.GetClient(), config, hostInterfaceHandler, daemonCacheHandler, podQueue, quit)
setupLog.V(7).Info("Run Daemon Watcher")
vars.SetupLog.V(1).Info("Run Daemon Watcher")
go daemonWatcher.Run()
setupLog.V(7).Info("New Reconcilers")

cidrReconciler := &controllers.CIDRReconciler{
Client: mgr.GetClient(),
Expand All @@ -137,7 +134,7 @@ func main() {
DaemonWatcher: daemonWatcher,
}
if err = (cidrReconciler).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CIDR")
vars.SetupLog.Error(err, "unable to create controller", "controller", "CIDR")
os.Exit(1)
}
hostInterfaceReconciler := &controllers.HostInterfaceReconciler{
Expand All @@ -148,7 +145,7 @@ func main() {
CIDRHandler: cidrHandler,
}
if err = (hostInterfaceReconciler).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HostInterface")
vars.SetupLog.Error(err, "unable to create controller", "controller", "HostInterface")
os.Exit(1)
}

Expand All @@ -158,9 +155,10 @@ func main() {
CIDRHandler: cidrHandler,
}
if err = (ippoolReconciler).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IPPool")
vars.SetupLog.Error(err, "unable to create controller", "controller", "IPPool")
os.Exit(1)
}

MultiNicNetworkReconcilerPointer = &controllers.MultiNicNetworkReconciler{
Client: mgr.GetClient(),
NetAttachDefHandler: defHandler,
Expand All @@ -169,9 +167,14 @@ func main() {
PluginMap: pluginMap,
}
if err = (MultiNicNetworkReconcilerPointer).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MultiNicNetwork")
vars.SetupLog.Error(err, "unable to create controller", "controller", "MultiNicNetwork")
os.Exit(1)
}

namespaceWatcher := controllers.NewNamespaceWatcher(mgr.GetClient(), config, MultiNicNetworkReconcilerPointer, quit)
vars.SetupLog.V(1).Info("Run Namespace Watcher")
go namespaceWatcher.Run()

cfgReconciler := &controllers.ConfigReconciler{
Client: mgr.GetClient(),
Clientset: clientset,
Expand All @@ -181,21 +184,21 @@ func main() {
Scheme: mgr.GetScheme(),
}
if err = (cfgReconciler).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Config")
vars.SetupLog.Error(err, "unable to create controller", "controller", "Config")
os.Exit(1)
}
err = cfgReconciler.CreateDefaultDaemonConfig()
if err != nil {
setupLog.Info(fmt.Sprintf("fail to create default config: %v", err))
vars.SetupLog.Info(fmt.Sprintf("fail to create default config: %v", err))
}

//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
vars.SetupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
vars.SetupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

Expand All @@ -204,9 +207,9 @@ func main() {

controllers.RunPeriodicUpdate(ticker, daemonWatcher, cidrHandler, hostInterfaceReconciler, quit)

setupLog.V(7).Info("starting manager")
vars.SetupLog.V(7).Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
vars.SetupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
Loading

0 comments on commit b7f185e

Please sign in to comment.