Skip to content

Commit

Permalink
feat: add dynamic generated names for notebooks controller resoruces (#7
Browse files Browse the repository at this point in the history
)

Signed-off-by: mishraprafful <mishraprafful@gmail.com>
  • Loading branch information
mishraprafful authored Nov 17, 2024
1 parent 26f7c81 commit b0af1d6
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 55 deletions.
146 changes: 97 additions & 49 deletions components/notebook-controller/controllers/notebook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package controllers

import (
"context"

"encoding/json"
"fmt"
"os"
Expand All @@ -34,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
Expand All @@ -53,7 +55,8 @@ const AnnotationRewriteURI = "notebooks.kubeflow.org/http-rewrite-uri"
const AnnotationHeadersRequestSet = "notebooks.kubeflow.org/http-headers-request-set"

const PrefixEnvVar = "NB_PREFIX"
const namePrefix = "notebook-"
const namePrefix = "nb"
const generatedSuffixLength = 5
const maxNameLength = 63

// The default fsGroup of PodSecurityContext.
Expand Down Expand Up @@ -113,9 +116,9 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

// Make sure the prefix doesn't cause the derived resource names to get too long
if len(nbName)+len(involvedNotebookKey.Namespace)+len(namePrefix) > maxNameLength {
if len(nbName)+generatedSuffixLength+len(namePrefix) > maxNameLength {
return reconcile.Result{},
fmt.Errorf("notebook name must not be longer than %d characters as notebook resources are prefixed with %s%s", maxNameLength-len(namePrefix), namePrefix, involvedNotebookKey.Namespace)
fmt.Errorf("notebook name must not be longer than %d characters as notebook resources are prefixed with %s%s", maxNameLength-(len(namePrefix)+generatedSuffixLength), namePrefix, involvedNotebookKey.Namespace)
}

// re-emit the event in the Notebook CR
Expand Down Expand Up @@ -150,9 +153,23 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
// Check if the StatefulSet already exists
foundStateful := &appsv1.StatefulSet{}
namespacedStatefulSets := &appsv1.StatefulSetList{}
justCreated := false
err := r.Get(ctx, types.NamespacedName{Name: ss.Name, Namespace: ss.Namespace}, foundStateful)
if err != nil && apierrs.IsNotFound(err) {

err := r.List(ctx, namespacedStatefulSets, client.InNamespace(ss.Namespace))
if err != nil {
log.Error(err, "error listing StatefulSets")
return ctrl.Result{}, err
}

for _, sts := range namespacedStatefulSets.Items {
if metav1.IsControlledBy(&sts, instance) {
foundStateful = &sts
break
}
}

if foundStateful.Name == "" && foundStateful.Namespace == "" {
log.Info("Creating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
r.Metrics.NotebookCreation.WithLabelValues(ss.Namespace).Inc()
err = r.Create(ctx, ss)
Expand All @@ -162,10 +179,8 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
r.Metrics.NotebookFailCreation.WithLabelValues(ss.Namespace).Inc()
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Statefulset")
return ctrl.Result{}, err
}

// Update the foundStateful object and write the result back if there are any changes
if !justCreated && reconcilehelper.CopyStatefulSetFields(ss, foundStateful) {
log.Info("Updating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
Expand All @@ -184,19 +199,29 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

// Check if the Service already exists
foundService := &corev1.Service{}
namespacedServices := &corev1.ServiceList{}
justCreated = false
err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
if err != nil && apierrs.IsNotFound(err) {
err = r.List(ctx, namespacedServices, client.InNamespace(ss.Namespace))
if err != nil {
log.Error(err, "error listing Services")
return ctrl.Result{}, err
}

for _, ser := range namespacedServices.Items {
if metav1.IsControlledBy(&ser, instance) {
foundService = &ser
break
}
}

if foundService.Name == "" && foundService.Namespace == "" {
log.Info("Creating Service", "namespace", service.Namespace, "name", service.Name)
err = r.Create(ctx, service)
justCreated = true
if err != nil {
log.Error(err, "unable to create Service")
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Service")
return ctrl.Result{}, err
}
// Update the foundService object and write the result back if there are any changes
if !justCreated && reconcilehelper.CopyServiceFields(service, foundService) {
Expand All @@ -207,7 +232,7 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}
}

err = deleteObsoleteService(ctx, r, instance)
if err != nil {
log.Error(err, "unable to delete obsolete Service")
Expand All @@ -216,7 +241,7 @@ func (r *NotebookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

// Reconcile virtual service if we use ISTIO.
if os.Getenv("USE_ISTIO") == "true" {
err = r.reconcileVirtualService(instance)
err = r.reconcileVirtualService(instance, foundService.Name)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -381,8 +406,8 @@ func generateStatefulSet(instance *v1beta1.Notebook) *appsv1.StatefulSet {

ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Name,
Namespace: instance.Namespace,
GenerateName: fmt.Sprintf("%s-%s-", namePrefix, instance.Name),
Namespace: instance.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Expand Down Expand Up @@ -454,13 +479,14 @@ func generateService(instance *v1beta1.Notebook) *corev1.Service {
// Define the desired Service object
port := DefaultContainerPort
containerPorts := instance.Spec.Template.Spec.Containers[0].Ports

if containerPorts != nil {
port = int(containerPorts[0].ContainerPort)
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s%s-%s", namePrefix, instance.Namespace, instance.Name),
Namespace: instance.Namespace,
Namespace: instance.Namespace,
GenerateName: fmt.Sprintf("%s-%s-", namePrefix, instance.Name),
},
Spec: corev1.ServiceSpec{
Type: "ClusterIP",
Expand All @@ -479,11 +505,7 @@ func generateService(instance *v1beta1.Notebook) *corev1.Service {
return svc
}

func virtualServiceName(kfName string, namespace string) string {
return fmt.Sprintf("%s%s-%s", namePrefix, namespace, kfName)
}

func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructured, error) {
func generateVirtualService(instance *v1beta1.Notebook, serviceName string) (*unstructured.Unstructured, error) {
name := instance.Name
namespace := instance.Namespace
clusterDomain := "cluster.local"
Expand All @@ -504,12 +526,12 @@ func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructu
if clusterDomainFromEnv, ok := os.LookupEnv("CLUSTER_DOMAIN"); ok {
clusterDomain = clusterDomainFromEnv
}
service := fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain)
service := fmt.Sprintf("%s.%s.svc.%s", serviceName, namespace, clusterDomain)

vsvc := &unstructured.Unstructured{}
vsvc.SetAPIVersion("networking.istio.io/v1alpha3")
vsvc.SetKind("VirtualService")
vsvc.SetName(virtualServiceName(name, namespace))
vsvc.SetGenerateName(fmt.Sprintf("%s-%s-", namePrefix, instance.Name))
vsvc.SetNamespace(namespace)

istioHost := os.Getenv("ISTIO_HOST")
Expand Down Expand Up @@ -585,9 +607,9 @@ func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructu

}

func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook) error {
func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook, serviceName string) error {
log := r.Log.WithValues("notebook", instance.Namespace)
virtualService, err := generateVirtualService(instance)
virtualService, err := generateVirtualService(instance, serviceName)
if err != nil {
log.Info("Unable to generate VirtualService...", err)
return err
Expand All @@ -597,26 +619,52 @@ func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook)
}
// Check if the virtual service already exists.
foundVirtual := &unstructured.Unstructured{}
justCreated := false
foundVirtual.SetAPIVersion("networking.istio.io/v1alpha3")
foundVirtual.SetKind("VirtualService")
err = r.Get(context.TODO(), types.NamespacedName{Name: virtualServiceName(instance.Name,
instance.Namespace), Namespace: instance.Namespace}, foundVirtual)
if err != nil && apierrs.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
namespacedVirtualServices := &unstructured.UnstructuredList{}
namespacedVirtualServices.SetGroupVersionKind(
schema.GroupVersionKind{
Group: "networking.istio.io",
Version: "v1alpha3",
Kind: "VirtualService",
},
)
justCreated := false

// List the VirtualServices in the given namespace
err = r.List(
context.TODO(),
namespacedVirtualServices,
client.InNamespace(instance.Namespace),
)
if err != nil {
log.Error(err, "error listing Virtual Services")
return err
}

// Manually filter the resources by kind and apiVersion
for _, virSer := range namespacedVirtualServices.Items {
// Check if the resource's kind and apiVersion match
if virSer.GetKind() == "VirtualService" && virSer.GetAPIVersion() == "networking.istio.io/v1alpha3" {
// If the resource is controlled by the instance, assign it to foundVirtual
if metav1.IsControlledBy(&virSer, instance) {
foundVirtual = &virSer
break
}
}
}

if foundVirtual.GetName() == "" && foundVirtual.GetNamespace() == "" {
log.Info("Creating virtual service", "namespace", instance.Namespace, "notebook-name", instance.Name)
err = r.Create(context.TODO(), virtualService)
justCreated = true
if err != nil {
return err
}
} else if err != nil {
return err
}

if !justCreated && reconcilehelper.CopyVirtualService(virtualService, foundVirtual) {
log.Info("Updating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
log.Info("Updating virtual service", "namespace", instance.Namespace, "notebook-name", instance.Name)
err = r.Update(context.TODO(), foundVirtual)
if err != nil {
return err
Expand All @@ -630,36 +678,36 @@ func deleteObsoleteService(ctx context.Context, r *NotebookReconciler, instance
log := r.Log.WithValues("notebook", instance.Namespace)
obsoleteServiceName := instance.Name
obsoleteService := &corev1.Service{}

err := r.Get(ctx, client.ObjectKey{Name: obsoleteServiceName, Namespace: instance.Namespace}, obsoleteService)
if apierrs.IsNotFound(err) {
log.Info("Obsolete Service not found; nothing to delete", "namespace", instance.Namespace, "name", obsoleteServiceName)
return nil
log.Info("Obsolete Service not found; nothing to delete", "namespace", instance.Namespace, "name", obsoleteServiceName)
return nil
} else if err != nil {
log.Error(err, "error getting obsolete service", "namespace", instance.Namespace, "name", obsoleteServiceName)
return err
log.Error(err, "error getting obsolete service", "namespace", instance.Namespace, "name", obsoleteServiceName)
return err
}

log.Info("Found obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)

// Remove owner references
obsoleteService.OwnerReferences = []metav1.OwnerReference{}
err = r.Update(ctx, obsoleteService)
if err != nil {
log.Error(err, "unable to update owner reference for obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
log.Error(err, "unable to update owner reference for obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
}

// Delete the obsolete service
err = r.Delete(ctx, obsoleteService)
if err != nil {
log.Error(err, "unable to delete obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
log.Error(err, "unable to delete obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return err
}

log.Info("Deleted obsolete Service", "namespace", obsoleteService.Namespace, "name", obsoleteService.Name)
return nil

}

func isStsOrPodEvent(event *corev1.Event) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

nbv1beta1 "github.com/kubeflow/kubeflow/components/notebook-controller/api/v1beta1"
)
Expand Down Expand Up @@ -74,15 +75,19 @@ var _ = Describe("Notebook controller", func() {
*/
By("By checking that the Notebook has statefulset")
Eventually(func() (bool, error) {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{
Name: Name,
Namespace: Namespace,
}}
err := k8sClient.Get(ctx, notebookLookupKey, sts)
namespacedStatefulSets := &appsv1.StatefulSetList{}

err := k8sClient.List(ctx, namespacedStatefulSets, client.InNamespace(Namespace))
if err != nil {
return false, err
}
return true, nil

for _, sts := range namespacedStatefulSets.Items {
if metav1.IsControlledBy(&sts, createdNotebook) {
return true, nil
}
}
return false, nil
}, timeout, interval).Should(BeTrue())
})
})
Expand Down

0 comments on commit b0af1d6

Please sign in to comment.