Skip to content

Commit

Permalink
feat: Support multiple MOFED DS (Mellanox#691)
Browse files Browse the repository at this point in the history
Mofed driver precompiled container images
are compiled using a specific Kernel.

As a result, the Mofed Driver DaemonSet should
have the Kernel as part of the Node Selector.

In addition, since there can be Nodes with different Kernel versions, a
DaemonSet for each existing Kernel in the cluster is created.
  • Loading branch information
rollandf authored Mar 13, 2024
2 parents 8480813 + a8f1a6e commit 7193b1e
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 120 deletions.
10 changes: 9 additions & 1 deletion controllers/hostdevicenetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -43,7 +44,8 @@ import (
// HostDeviceNetworkReconciler reconciles a HostDeviceNetwork object
type HostDeviceNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -59,6 +61,12 @@ type HostDeviceNetworkReconciler struct {
//
//nolint:dupl
func (r *HostDeviceNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling HostDeviceNetwork")

Expand Down
10 changes: 9 additions & 1 deletion controllers/ipoibnetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,7 +42,8 @@ import (
// IPoIBNetworkReconciler reconciles a IPoIBNetwork object
type IPoIBNetworkReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -57,6 +59,12 @@ type IPoIBNetworkReconciler struct {
//
//nolint:dupl
func (r *IPoIBNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling IPoIBNetwork")

Expand Down
12 changes: 10 additions & 2 deletions controllers/macvlannetwork_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers //nolint:dupl

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,8 +42,9 @@ import (
// MacvlanNetworkReconciler reconciles a MacvlanNetwork object
type MacvlanNetworkReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Log logr.Logger
Scheme *runtime.Scheme
MigrationCh chan struct{}

stateManager state.Manager
}
Expand All @@ -58,6 +60,12 @@ type MacvlanNetworkReconciler struct {
//
//nolint:dupl
func (r *MacvlanNetworkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling MacvlanNetwork")

Expand Down
11 changes: 11 additions & 0 deletions controllers/nicclusterpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NicClusterPolicyReconciler struct {
Scheme *runtime.Scheme
ClusterTypeProvider clustertype.Provider
StaticConfigProvider staticconfig.Provider
MigrationCh chan struct{}

stateManager state.Manager
}
Expand Down Expand Up @@ -87,6 +88,12 @@ type NicClusterPolicyReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *NicClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling NicClusterPolicy")

Expand Down Expand Up @@ -179,6 +186,10 @@ func (r *NicClusterPolicyReconciler) handleMOFEDWaitLabels(
_ = r.Client.List(ctx, pods, client.MatchingLabels{"nvidia.com/ofed-driver": ""})
for i := range pods.Items {
pod := pods.Items[i]
if pod.Spec.NodeName == "" {
// In case that Pod is in Pending state
continue
}
labelValue := "true"
// We assume that OFED pod contains only one container to simplify the logic.
// We can revisit this logic in the future if needed
Expand Down
19 changes: 13 additions & 6 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,27 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

err = (&HostDeviceNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&IPoIBNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

err = (&MacvlanNetworkReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand All @@ -133,6 +139,7 @@ var _ = BeforeSuite(func() {
Scheme: k8sManager.GetScheme(),
ClusterTypeProvider: clusterTypeProvider,
StaticConfigProvider: staticConfigProvider,
MigrationCh: migrationCompletionChan,
}).SetupWithManager(k8sManager, testSetupLog)
Expect(err).ToNot(HaveOccurred())

Expand Down
8 changes: 8 additions & 0 deletions controllers/upgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/NVIDIA/k8s-operator-libs/pkg/upgrade"
Expand Down Expand Up @@ -47,6 +48,7 @@ type UpgradeReconciler struct {
client.Client
Scheme *runtime.Scheme
StateManager upgrade.ClusterUpgradeStateManager
MigrationCh chan struct{}
}

const plannedRequeueInterval = time.Minute * 2
Expand All @@ -64,6 +66,12 @@ const UpgradeStateAnnotation = "nvidia.com/ofed-upgrade-state"
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *UpgradeReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
// Wait for migration flow to finish
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLogger := log.FromContext(ctx)
reqLogger.V(consts.LogLevelInfo).Info("Reconciling Upgrade")

Expand Down
14 changes: 10 additions & 4 deletions controllers/upgrade_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ var _ = Describe("Upgrade Controller", func() {
})
Context("When NicClusterPolicy CR is created", func() {
It("Upgrade policy is disabled", func() {
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)
upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}

req := ctrl.Request{NamespacedName: types.NamespacedName{Name: consts.NicClusterPolicyResourceName}}
Expand All @@ -76,10 +79,13 @@ var _ = Describe("Upgrade Controller", func() {
err := k8sClient.Create(goctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
}
migrationCompletionChan := make(chan struct{})
close(migrationCompletionChan)

upgradeReconciler := &UpgradeReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
MigrationCh: migrationCompletionChan,
}
// Call removeNodeUpgradeStateLabels function
err := upgradeReconciler.removeNodeUpgradeStateLabels(goctx.TODO())
Expand Down
74 changes: 47 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func setupWebhookControllers(mgr ctrl.Manager) error {
return nil
}

func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager) error {
func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager, migrationChan chan struct{}) error {
ctrLog := setupLog.WithName("controller")
clusterTypeProvider, err := clustertype.NewProvider(ctx, c)

Expand All @@ -98,27 +98,31 @@ func setupCRDControllers(ctx context.Context, c client.Client, mgr ctrl.Manager)
Scheme: mgr.GetScheme(),
ClusterTypeProvider: clusterTypeProvider, // we want to cache information about the cluster type
StaticConfigProvider: staticInfoProvider,
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("NicClusterPolicy")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NicClusterPolicy")
return err
}
if err := (&controllers.MacvlanNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("MacVlanNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MacvlanNetwork")
return err
}
if err := (&controllers.HostDeviceNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("HostDeviceNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "HostDeviceNetwork")
return err
}
if err := (&controllers.IPoIBNetworkReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr, ctrLog.WithName("IPoIBNetwork")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IPoIBNetwork")
return err
Expand Down Expand Up @@ -166,35 +170,26 @@ func main() {
os.Exit(1)
}

// run migration logic before controllers start
if err := migrate.Migrate(stopCtx, setupLog.WithName("migrate"), directClient); err != nil {
setupLog.Error(err, "failed to run migration logic")
os.Exit(1)
migrationCompletionChan := make(chan struct{})
m := migrate.Migrator{
K8sClient: directClient,
MigrationCh: migrationCompletionChan,
LeaderElection: enableLeaderElection,
Logger: ctrl.Log.WithName("Migrator"),
}

err = setupCRDControllers(stopCtx, directClient, mgr)
err = mgr.Add(&m)
if err != nil {
setupLog.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

err = setupCRDControllers(stopCtx, directClient, mgr, migrationCompletionChan)
if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
os.Exit(1)
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
err = setupUpgradeController(mgr, migrationCompletionChan)
if err != nil {
os.Exit(1)
}

Expand All @@ -221,3 +216,28 @@ func main() {
os.Exit(1)
}
}

func setupUpgradeController(mgr ctrl.Manager, migrationChan chan struct{}) error {
upgrade.SetDriverName("ofed")

upgradeLogger := ctrl.Log.WithName("controllers").WithName("Upgrade")

clusterUpdateStateManager, err := upgrade.NewClusterUpgradeStateManager(
upgradeLogger.WithName("clusterUpgradeManager"), config.GetConfigOrDie(), nil)

if err != nil {
setupLog.Error(err, "unable to create new ClusterUpdateStateManager", "controller", "Upgrade")
return err
}

if err = (&controllers.UpgradeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
StateManager: clusterUpdateStateManager,
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Upgrade")
return err
}
return nil
}
14 changes: 10 additions & 4 deletions manifests/state-ofed-driver/0050_ofed-driver-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
nvidia.com/ofed-driver: ""
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-ds
mofed-ds-format-version: "1"
name: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}-ds
namespace: {{ .RuntimeSpec.Namespace }}
spec:
updateStrategy:
type: OnDelete
selector:
matchLabels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
template:
metadata:
labels:
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}
app: mofed-{{ .RuntimeSpec.OSName }}{{ .RuntimeSpec.OSVer }}-{{ .RuntimeSpec.KernelHash }}
kernel: {{ .RuntimeSpec.Kernel }}
nvidia.com/ofed-driver: ""
spec:
priorityClassName: system-node-critical
Expand Down Expand Up @@ -250,6 +252,10 @@ spec:
feature.node.kubernetes.io/pci-15b3.present: "true"
feature.node.kubernetes.io/system-os_release.ID: {{ .RuntimeSpec.OSName }}
feature.node.kubernetes.io/system-os_release.VERSION_ID: "{{ .RuntimeSpec.OSVer }}"
feature.node.kubernetes.io/kernel-version.full: "{{ .RuntimeSpec.Kernel }}"
{{- if .RuntimeSpec.UseDtk }}
feature.node.kubernetes.io/system-os_release.OSTREE_VERSION: "{{ .RuntimeSpec.RhcosVersion }}"
{{- end }}
{{- if .NodeAffinity }}
affinity:
nodeAffinity:
Expand Down
Loading

0 comments on commit 7193b1e

Please sign in to comment.