Skip to content

Commit

Permalink
Refactor RayCluster controller
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti committed Apr 17, 2024
1 parent 28a9c28 commit 4a7cb60
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 45 deletions.
65 changes: 42 additions & 23 deletions pkg/controllers/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"crypto/sha1"
"encoding/base64"
"fmt"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

Expand Down Expand Up @@ -96,31 +97,31 @@ var (
func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)

var cluster rayv1.RayCluster
cluster := &rayv1.RayCluster{}

if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil {
if err := r.Get(ctx, req.NamespacedName, cluster); err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "Error getting RayCluster resource")
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) {
if !controllerutil.ContainsFinalizer(cluster, oAuthFinalizer) {
logger.Info("Add a finalizer", "finalizer", oAuthFinalizer)
controllerutil.AddFinalizer(&cluster, oAuthFinalizer)
if err := r.Update(ctx, &cluster); err != nil {
controllerutil.AddFinalizer(cluster, oAuthFinalizer)
if err := r.Update(ctx, cluster); err != nil {
// this log is info level since errors are not fatal and are expected
logger.Info("WARN: Failed to update RayCluster with finalizer", "error", err.Error(), logRequeueing, true)
return ctrl.Result{RequeueAfter: requeueTime}, err
}
}
} else if controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) {
} else if controllerutil.ContainsFinalizer(cluster, oAuthFinalizer) {
err := client.IgnoreNotFound(r.Client.Delete(
ctx,
&rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: crbNameFromCluster(&cluster),
Name: crbNameFromCluster(cluster),
},
},
&deleteOptions,
Expand All @@ -129,75 +130,75 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
logger.Error(err, "Failed to remove OAuth ClusterRoleBinding.", logRequeueing, true)
return ctrl.Result{RequeueAfter: requeueTime}, err
}
controllerutil.RemoveFinalizer(&cluster, oAuthFinalizer)
if err := r.Update(ctx, &cluster); err != nil {
controllerutil.RemoveFinalizer(cluster, oAuthFinalizer)
if err := r.Update(ctx, cluster); err != nil {
logger.Error(err, "Failed to remove finalizer from RayCluster", logRequeueing, true)
return ctrl.Result{RequeueAfter: requeueTime}, err
}
logger.Info("Successfully removed finalizer.", logRequeueing, false)
return ctrl.Result{}, nil
}

if cluster.Status.State != "suspended" && r.isRayDashboardOAuthEnabled() && r.IsOpenShift {
if cluster.Status.State != "suspended" && isRayDashboardOAuthEnabled(r.Config) && r.IsOpenShift {
logger.Info("Creating OAuth Objects")
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Route")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to create OAuth Secret")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth Service")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ServiceAccount")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update OAuth ClusterRoleBinding")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

logger.Info("Creating RayClient Route")
_, err = r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update RayClient Route")
return ctrl.Result{RequeueAfter: requeueTime}, err
}

} else if cluster.Status.State != "suspended" && !r.isRayDashboardOAuthEnabled() && !r.IsOpenShift {
} else if cluster.Status.State != "suspended" && !isRayDashboardOAuthEnabled(r.Config) && !r.IsOpenShift {
logger.Info("We detected being on Vanilla Kubernetes!")
logger.Info("Creating Dashboard Ingress")
dashboardName := dashboardNameFromCluster(&cluster)
dashboardIngressHost, err := r.getIngressHost(ctx, r.kubeClient, &cluster, dashboardName)
dashboardName := dashboardNameFromCluster(cluster)
dashboardIngressHost, err := getIngressHost(r.Config, cluster, dashboardName)
if err != nil {
return ctrl.Result{RequeueAfter: requeueTime}, err
}
_, err = r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(&cluster, dashboardIngressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(cluster, dashboardIngressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
// This log is info level since errors are not fatal and are expected
logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true)
return ctrl.Result{RequeueAfter: requeueTime}, err
}
logger.Info("Creating RayClient Ingress")
rayClientName := rayClientNameFromCluster(&cluster)
rayClientIngressHost, err := r.getIngressHost(ctx, r.kubeClient, &cluster, rayClientName)
rayClientName := rayClientNameFromCluster(cluster)
rayClientIngressHost, err := getIngressHost(r.Config, cluster, rayClientName)
if err != nil {
return ctrl.Result{RequeueAfter: requeueTime}, err
}
_, err = r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(&cluster, rayClientIngressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
_, err = r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(cluster, rayClientIngressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true})
if err != nil {
logger.Error(err, "Failed to update RayClient Ingress")
return ctrl.Result{RequeueAfter: requeueTime}, err
Expand All @@ -207,6 +208,24 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// getIngressHost generates the cluster URL string based on the cluster type, RayCluster, and ingress domain.
func getIngressHost(cfg *config.KubeRayConfiguration, cluster *rayv1.RayCluster, ingressNameFromCluster string) (string, error) {
ingressDomain := ""
if cfg != nil && cfg.IngressDomain != "" {
ingressDomain = cfg.IngressDomain
} else {
return "", fmt.Errorf("missing IngressDomain configuration in ConfigMap 'codeflare-operator-config'")
}
return fmt.Sprintf("%s-%s.%s", ingressNameFromCluster, cluster.Namespace, ingressDomain), nil
}

func isRayDashboardOAuthEnabled(cfg *config.KubeRayConfiguration) bool {
if cfg != nil && cfg.RayDashboardOAuthEnabled != nil {
return *cfg.RayDashboardOAuthEnabled
}
return true
}

func crbNameFromCluster(cluster *rayv1.RayCluster) string {
return cluster.Name + "-" + cluster.Namespace + "-auth" // NOTE: potential naming conflicts ie {name: foo, ns: bar-baz} and {name: foo-bar, ns: baz}
}
Expand Down
22 changes: 0 additions & 22 deletions pkg/controllers/support.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package controllers

import (
"context"
"fmt"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/client-go/applyconfigurations/meta/v1"
networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1"
"k8s.io/client-go/kubernetes"

routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1"
)
Expand Down Expand Up @@ -98,21 +94,3 @@ func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *netwo
),
)
}

// getIngressHost generates the cluster URL string based on the cluster type, RayCluster, and ingress domain.
func (r *RayClusterReconciler) getIngressHost(ctx context.Context, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster, ingressNameFromCluster string) (string, error) {
ingressDomain := ""
if r.Config != nil && r.Config.IngressDomain != "" {
ingressDomain = r.Config.IngressDomain
} else {
return "", fmt.Errorf("missing IngressDomain configuration in ConfigMap 'codeflare-operator-config'")
}
return fmt.Sprintf("%s-%s.%s", ingressNameFromCluster, cluster.Namespace, ingressDomain), nil
}

func (r *RayClusterReconciler) isRayDashboardOAuthEnabled() bool {
if r.Config != nil && r.Config.RayDashboardOAuthEnabled != nil {
return *r.Config.RayDashboardOAuthEnabled
}
return true
}

0 comments on commit 4a7cb60

Please sign in to comment.