Skip to content

Commit

Permalink
[RayService] Refactor updateRayClusterInstance (ray-project#2875)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Feb 3, 2025
1 parent 5b8e9c6 commit bb31661
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,21 +514,31 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
if shouldUpdateCluster(rayServiceInstance, activeRayCluster, true) {
// TODO(kevin85421): We should not reconstruct the cluster to update it. This will cause issues if autoscaler is enabled.
logger.Info("Updating the active RayCluster instance", "clusterName", activeRayCluster.Name)
if activeRayCluster, err = constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name, r.Scheme); err != nil {
goalCluster, err := constructRayClusterForRayService(rayServiceInstance, activeRayCluster.Name, r.Scheme)
if err != nil {
return nil, nil, err
}
err = r.updateRayClusterInstance(ctx, rayServiceInstance, activeRayCluster)
modifyRayCluster(ctx, activeRayCluster, goalCluster)
if err = r.Update(ctx, activeRayCluster); err != nil {
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update the active RayCluster %s/%s: %v", activeRayCluster.Namespace, activeRayCluster.Name, err)
}
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated the active RayCluster %s/%s", activeRayCluster.Namespace, activeRayCluster.Name)
return activeRayCluster, pendingRayCluster, err
}

if shouldUpdateCluster(rayServiceInstance, pendingRayCluster, false) {
// TODO(kevin85421): We should not reconstruct the cluster to update it. This will cause issues if autoscaler is enabled.
logger.Info("Updating the pending RayCluster instance", "clusterName", pendingRayCluster.Name)
if pendingRayCluster, err = constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name, r.Scheme); err != nil {
goalCluster, err := constructRayClusterForRayService(rayServiceInstance, pendingRayCluster.Name, r.Scheme)
if err != nil {
return nil, nil, err
}
err = r.updateRayClusterInstance(ctx, rayServiceInstance, pendingRayCluster)
return activeRayCluster, pendingRayCluster, err
modifyRayCluster(ctx, pendingRayCluster, goalCluster)
if err = r.Update(ctx, pendingRayCluster); err != nil {
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update the pending RayCluster %s/%s: %v", pendingRayCluster.Namespace, pendingRayCluster.Name, err)
}
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated the pending RayCluster %s/%s", pendingRayCluster.Namespace, pendingRayCluster.Name)
return activeRayCluster, pendingRayCluster, nil
}

return activeRayCluster, pendingRayCluster, nil
Expand Down Expand Up @@ -695,41 +705,30 @@ func shouldPrepareNewCluster(ctx context.Context, rayServiceInstance *rayv1.RayS
return isZeroDowntimeUpgradeEnabled(ctx, rayServiceInstance.Spec.UpgradeStrategy)
}

// updateRayClusterInstance updates the RayCluster instance.
func (r *RayServiceReconciler) updateRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster) error {
// `modifyRayCluster` updates `currentCluster` in place based on `goalCluster`. `currentCluster` is the
// current RayCluster retrieved from the informer cache, and `goalCluster` is the target state of the
// RayCluster derived from the RayService spec.
func modifyRayCluster(ctx context.Context, currentCluster, goalCluster *rayv1.RayCluster) {
logger := ctrl.LoggerFrom(ctx)
logger.Info("updateRayClusterInstance", "Name", rayClusterInstance.Name, "Namespace", rayClusterInstance.Namespace)

// Fetch the current state of the RayCluster
currentRayCluster, err := r.getRayClusterByNamespacedName(ctx, client.ObjectKey{
Namespace: rayClusterInstance.Namespace,
Name: rayClusterInstance.Name,
})
if err != nil {
err = fmt.Errorf("failed to get the current state of RayCluster, namespace: %s, name: %s: %w", rayClusterInstance.Namespace, rayClusterInstance.Name, err)
return err
}

if currentRayCluster == nil {
logger.Info("RayCluster not found, possibly deleted", "Namespace", rayClusterInstance.Namespace, "Name", rayClusterInstance.Name)
return nil
if currentCluster.Name != goalCluster.Name || currentCluster.Namespace != goalCluster.Namespace {
panic(fmt.Sprintf(
"currentCluster and goalCluster have different names or namespaces: "+
"%s/%s != %s/%s",
currentCluster.Namespace,
currentCluster.Name,
goalCluster.Namespace,
goalCluster.Name,
))
}
logger.Info("updateRayClusterInstance", "Name", goalCluster.Name, "Namespace", goalCluster.Namespace)

// Update the fetched RayCluster with new changes
currentRayCluster.Spec = rayClusterInstance.Spec
currentCluster.Spec = goalCluster.Spec

// Update the labels and annotations
currentRayCluster.Labels = rayClusterInstance.Labels
currentRayCluster.Annotations = rayClusterInstance.Annotations

// Update the RayCluster
err = r.Update(ctx, currentRayCluster)
if err != nil {
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update the RayCluster %s/%s: %v", currentRayCluster.Namespace, currentRayCluster.Name, err)
} else {
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated the RayCluster %s/%s", currentRayCluster.Namespace, currentRayCluster.Name)
}
return err
currentCluster.Labels = goalCluster.Labels
currentCluster.Annotations = goalCluster.Annotations
}

func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService) (*rayv1.RayCluster, error) {
Expand Down

0 comments on commit bb31661

Please sign in to comment.