Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for plugable reconcilers #112

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"github.com/telekom/das-schiff-network-operator/pkg/monitoring"
"github.com/telekom/das-schiff-network-operator/pkg/nl"
"github.com/telekom/das-schiff-network-operator/pkg/notrack"
"github.com/telekom/das-schiff-network-operator/pkg/reconciler"

Check failure on line 43 in cmd/manager/main.go

View workflow job for this annotation

GitHub Actions / lint

could not import github.com/telekom/das-schiff-network-operator/pkg/reconciler (-: # github.com/telekom/das-schiff-network-operator/pkg/reconciler

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) //nolint:gci
// to ensure that exec-entrypoint and run can make use of them.
Expand Down Expand Up @@ -92,6 +92,7 @@
var onlyBPFMode bool
var configFile string
var interfacePrefix string
var useNetconf bool
flag.StringVar(&configFile, "config", "",
"The controller will load its initial configuration from this file. "+
"Omit this flag to use the default configuration values. "+
Expand All @@ -100,6 +101,8 @@
"Only attach BPF to specified interfaces in config. This will not start any reconciliation. Perfect for masters.")
flag.StringVar(&interfacePrefix, "macvlan-interface-prefix", "",
"Interface prefix for bridge devices for MACVlan sync")
flag.BoolVar(&useNetconf, "use-netconf", false,
"Use NETCONF interface to configure hosts instead of Netlink and FRR.")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -146,7 +149,7 @@
os.Exit(1)
}

if err := initComponents(mgr, anycastTracker, cfg, clientConfig, onlyBPFMode); err != nil {
if err := initComponents(mgr, anycastTracker, cfg, clientConfig, onlyBPFMode, useNetconf); err != nil {
setupLog.Error(err, "unable to initialize components")
os.Exit(1)
}
Expand All @@ -163,10 +166,22 @@
}
}

func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config, clientConfig *rest.Config, onlyBPFMode bool) error {
func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config,
clientConfig *rest.Config, onlyBPFMode bool, useNetconf bool) error {
var adapter reconciler.Adapter
var err error
if useNetconf {
adapter, err = reconciler.NewNetconfReconciler()
} else {
adapter, err = reconciler.NewLegacyReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger())
}

if err != nil {
return fmt.Errorf("unable to create reconciler: %w", err)
}
// Start VRFRouteConfigurationReconciler when we are not running in only BPF mode.
if !onlyBPFMode {
if err := setupReconcilers(mgr, anycastTracker); err != nil {
if err := setupReconcilers(mgr, anycastTracker, adapter); err != nil {
return fmt.Errorf("unable to setup reconcilers: %w", err)
}
}
Expand All @@ -179,18 +194,10 @@
return fmt.Errorf("unable to set up ready check: %w", err)
}

setupLog.Info("load bpf program into Kernel")
if err := bpf.InitBPFRouter(); err != nil {
return fmt.Errorf("unable to init BPF router: %w", err)
}
setupLog.Info("attach bpf to interfaces specified in config")
if err := bpf.AttachInterfaces(cfg.BPFInterfaces); err != nil {
return fmt.Errorf("unable to attach bpf to interfaces: %w", err)
if err := setupBPF(cfg); err != nil {
return fmt.Errorf("uneable to set up BPF: %w", err)
}

setupLog.Info("start bpf interface check")
bpf.RunInterfaceCheck()

setupLog.Info("start anycast sync")
anycastTracker.RunAnycastSync()

Expand All @@ -200,33 +207,57 @@
}

if onlyBPFMode {
clusterClient, err := client.New(clientConfig, client.Options{})
if err != nil {
return fmt.Errorf("error creating controller-runtime client: %w", err)
if err := runBPFOnlyMode(clientConfig); err != nil {
return fmt.Errorf("error running BPF only mode: %w", err)
}
}

nc, err := healthcheck.LoadConfig(healthcheck.NetHealthcheckFile)
if err != nil {
return fmt.Errorf("error loading network healthcheck config: %w", err)
}
return nil
}

tcpDialer := healthcheck.NewTCPDialer(nc.Timeout)
hc, err := healthcheck.NewHealthChecker(clusterClient,
healthcheck.NewDefaultHealthcheckToolkit(nil, tcpDialer),
nc)
if err != nil {
return fmt.Errorf("error initializing healthchecker: %w", err)
}
if err = performNetworkingHealthcheck(hc); err != nil {
return fmt.Errorf("error performing healthcheck: %w", err)
}
func runBPFOnlyMode(clientConfig *rest.Config) error {
clusterClient, err := client.New(clientConfig, client.Options{})
if err != nil {
return fmt.Errorf("error creating controller-runtime client: %w", err)
}

nc, err := healthcheck.LoadConfig(healthcheck.NetHealthcheckFile)
if err != nil {
return fmt.Errorf("error loading network healthcheck config: %w", err)
}

tcpDialer := healthcheck.NewTCPDialer(nc.Timeout)
hc, err := healthcheck.NewHealthChecker(clusterClient,
healthcheck.NewDefaultHealthcheckToolkit(nil, tcpDialer),
nc)
if err != nil {
return fmt.Errorf("error initializing healthchecker: %w", err)
}
if err = performNetworkingHealthcheck(hc); err != nil {
return fmt.Errorf("error performing healthcheck: %w", err)
}

return nil
}

func setupBPF(cfg *config.Config) error {
setupLog.Info("load bpf program into Kernel")
if err := bpf.InitBPFRouter(); err != nil {
return fmt.Errorf("unable to init BPF router: %w", err)
}
setupLog.Info("attach bpf to interfaces specified in config")
if err := bpf.AttachInterfaces(cfg.BPFInterfaces); err != nil {
return fmt.Errorf("unable to attach bpf to interfaces: %w", err)
}

setupLog.Info("start bpf interface check")
bpf.RunInterfaceCheck()

return nil
}

func setupReconcilers(mgr manager.Manager, anycastTracker *anycast.Tracker) error {
r, err := reconciler.NewReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger())
func setupReconcilers(mgr manager.Manager, anycastTracker *anycast.Tracker, adapter reconciler.Adapter) error {
r, err := reconciler.NewReconciler(mgr.GetClient(), anycastTracker, mgr.GetLogger(), adapter)
if err != nil {
return fmt.Errorf("unable to create debounced reconciler: %w", err)
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/reconciler/layer2.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reconciler

Check failure on line 1 in pkg/reconciler/layer2.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/telekom/das-schiff-network-operator/pkg/reconciler

import (
"context"
Expand All @@ -16,26 +16,26 @@
"k8s.io/apimachinery/pkg/types"
)

func (r *reconcile) fetchLayer2(ctx context.Context) ([]networkv1alpha1.Layer2NetworkConfiguration, error) {
func (r *Reconciler) fetchLayer2(ctx context.Context) ([]networkv1alpha1.Layer2NetworkConfiguration, error) {
layer2List := &networkv1alpha1.Layer2NetworkConfigurationList{}
err := r.client.List(ctx, layer2List)
if err != nil {
r.Logger.Error(err, "error getting list of Layer2s from Kubernetes")
r.logger.Error(err, "error getting list of Layer2s from Kubernetes")
return nil, fmt.Errorf("error getting list of Layer2s from Kubernetes: %w", err)
}

nodeName := os.Getenv(healthcheck.NodenameEnv)
node := &corev1.Node{}
err = r.client.Get(ctx, types.NamespacedName{Name: nodeName}, node)
if err != nil {
r.Logger.Error(err, "error getting local node name")
r.logger.Error(err, "error getting local node name")
return nil, fmt.Errorf("error getting local node name: %w", err)
}

l2vnis := []networkv1alpha1.Layer2NetworkConfiguration{}
for i := range layer2List.Items {
item := &layer2List.Items[i]
logger := r.Logger.WithValues("name", item.ObjectMeta.Name, "namespace", item.ObjectMeta.Namespace, "vlan", item.Spec.ID, "vni", item.Spec.VNI)
logger := r.logger.WithValues("name", item.ObjectMeta.Name, "namespace", item.ObjectMeta.Namespace, "vlan", item.Spec.ID, "vni", item.Spec.VNI)
if item.Spec.NodeSelector != nil {
selector := labels.NewSelector()
var reqs labels.Requirements
Expand Down Expand Up @@ -76,7 +76,7 @@
return l2vnis, nil
}

func (r *reconcile) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) error {
func (r *LegacyReconciler) reconcileLayer2(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) error {
desired, err := r.getDesired(l2vnis)
if err != nil {
return err
Expand Down Expand Up @@ -111,10 +111,10 @@
}

for i := range toDelete {
r.Logger.Info("Deleting Layer2 because it is no longer configured", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
r.logger.Info("Deleting Layer2 because it is no longer configured", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
errs := r.netlinkManager.CleanupL2(&toDelete[i])
for _, err := range errs {
r.Logger.Error(err, "Error deleting Layer2", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
r.logger.Error(err, "Error deleting Layer2", "vlan", toDelete[i].VlanID, "vni", toDelete[i].VNI)
}
}

Expand All @@ -129,8 +129,8 @@
return nil
}

func (r *reconcile) createL2(info *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.Logger.Info("Creating Layer2", "vlan", info.VlanID, "vni", info.VNI)
func (r *LegacyReconciler) createL2(info *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.logger.Info("Creating Layer2", "vlan", info.VlanID, "vni", info.VNI)
err := r.netlinkManager.CreateL2(info)
if err != nil {
return fmt.Errorf("error creating layer2 vlan %d vni %d: %w", info.VlanID, info.VNI, err)
Expand All @@ -145,7 +145,7 @@
return nil
}

func (r *reconcile) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) ([]nl.Layer2Information, error) {
func (r *LegacyReconciler) getDesired(l2vnis []networkv1alpha1.Layer2NetworkConfiguration) ([]nl.Layer2Information, error) {
availableVrfs, err := r.netlinkManager.ListL3()
if err != nil {
return nil, fmt.Errorf("error loading available VRFs: %w", err)
Expand All @@ -162,7 +162,7 @@

anycastGateways, err := r.netlinkManager.ParseIPAddresses(spec.AnycastGateways)
if err != nil {
r.Logger.Error(err, "error parsing anycast gateways", "layer", l2vnis[i].ObjectMeta.Name, "gw", spec.AnycastGateways)
r.logger.Error(err, "error parsing anycast gateways", "layer", l2vnis[i].ObjectMeta.Name, "gw", spec.AnycastGateways)
return nil, fmt.Errorf("error parsing anycast gateways: %w", err)
}

Expand All @@ -175,7 +175,7 @@
}
}
if !vrfAvailable {
r.Logger.Error(err, "VRF of Layer2 not found on node", "layer", l2vnis[i].ObjectMeta.Name, "vrf", spec.VRF)
r.logger.Error(err, "VRF of Layer2 not found on node", "layer", l2vnis[i].ObjectMeta.Name, "vrf", spec.VRF)
continue
}
}
Expand Down Expand Up @@ -213,8 +213,8 @@
return toDelete
}

func (r *reconcile) reconcileExistingLayer(desired, currentConfig *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.Logger.Info("Reconciling existing Layer2", "vlan", desired.VlanID, "vni", desired.VNI)
func (r *LegacyReconciler) reconcileExistingLayer(desired, currentConfig *nl.Layer2Information, anycastTrackerInterfaces *[]int) error {
r.logger.Info("Reconciling existing Layer2", "vlan", desired.VlanID, "vni", desired.VNI)
err := r.netlinkManager.ReconcileL2(currentConfig, desired)
if err != nil {
return fmt.Errorf("error reconciling layer2 vlan %d vni %d: %w", desired.VlanID, desired.VNI, err)
Expand All @@ -229,7 +229,7 @@
return nil
}

func (*reconcile) checkL2Duplicates(configs []networkv1alpha1.Layer2NetworkConfiguration) error {
func (*Reconciler) checkL2Duplicates(configs []networkv1alpha1.Layer2NetworkConfiguration) error {
for i := range configs {
for j := i + 1; j < len(configs); j++ {
if configs[i].Spec.ID == configs[j].Spec.ID {
Expand Down
Loading
Loading