Skip to content

Commit

Permalink
fix: only filter RayCluster events for reconciliation (ray-project#882)
Browse files Browse the repository at this point in the history
ray-project#639 accidentally applied event filters for child resources Pods and Services. This change does not filter Pod or Service related events. This means Pod updates will trigger RayCluster reconciliation.
  • Loading branch information
davidxia authored Jan 29, 2023
1 parent de0aaf5 commit 80c3ee5
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 21 deletions.
2 changes: 2 additions & 0 deletions ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ const (
RAY_SERVE_KV_TIMEOUT_S = "RAY_SERVE_KV_TIMEOUT_S"
SERVE_CONTROLLER_PIN_ON_NODE = "RAY_INTERNAL_SERVE_CONTROLLER_PIN_ON_NODE"
RAY_USAGE_STATS_KUBERAY_IN_USE = "RAY_USAGE_STATS_KUBERAY_IN_USE"
RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV = "RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV"
RAYCLUSTER_DEFAULT_REQUEUE_SECONDS = 300

// Ray core default configurations
DefaultRedisPassword = "5241590000000000"
Expand Down
19 changes: 12 additions & 7 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
controller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -236,13 +237,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(request ctrl.Request, instanc
}

// Unconditionally requeue after the number of seconds specified in the
// environment variable RAYCLUSTER_DEFAULT_RECONCILE_LOOP_S. If the
// environment variable is not set, requeue after 5 minutes.
// environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the
// environment variable is not set, requeue after the default value.
var requeueAfterSeconds int
requeueAfterSeconds, err := strconv.Atoi(os.Getenv("RAYCLUSTER_DEFAULT_RECONCILE_LOOP_S"))
requeueAfterSeconds, err := strconv.Atoi(os.Getenv(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV))
if err != nil {
r.Log.Info("RAYCLUSTER_DEFAULT_RECONCILE_LOOP_S is not set, using default value 300s", "cluster name", request.Name)
requeueAfterSeconds = 5 * 60
r.Log.Info(fmt.Sprintf("Environment variable %s is not set, using default value of %d seconds", common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV, common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS), "cluster name", request.Name)
requeueAfterSeconds = common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS
}
r.Log.Info("Unconditional requeue after", "cluster name", request.Name, "seconds", requeueAfterSeconds)
return ctrl.Result{RequeueAfter: time.Duration(requeueAfterSeconds) * time.Second}, nil
Expand Down Expand Up @@ -810,8 +811,12 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster,
// SetupWithManager builds the reconciler.
func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
b := ctrl.NewControllerManagedBy(mgr).
For(&rayiov1alpha1.RayCluster{}).Named("raycluster-controller").
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{}, predicate.AnnotationChangedPredicate{})).
Named("raycluster-controller").
For(&rayiov1alpha1.RayCluster{}, builder.WithPredicates(predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.AnnotationChangedPredicate{},
))).
Watches(&source.Kind{Type: &corev1.Event{}}, &handler.EnqueueRequestForObject{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{})
Expand Down
67 changes: 55 additions & 12 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ray
import (
"context"
"fmt"
"log"
"reflect"
"time"

Expand Down Expand Up @@ -46,6 +47,7 @@ const (
var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList
var headPods corev1.PodList
enableInTreeAutoscaling := true

myRayCluster := &rayiov1alpha1.RayCluster{
Expand All @@ -62,7 +64,6 @@ var _ = Context("Inside the default namespace", func() {
"port": "6379",
"object-manager-port": "12345",
"node-manager-port": "12346",
"object-store-memory": "100000000",
"num-cpus": "1",
},
Template: corev1.PodTemplateSpec{
Expand Down Expand Up @@ -126,7 +127,8 @@ var _ = Context("Inside the default namespace", func() {
},
}

filterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayCluster.Name, common.RayNodeGroupLabelKey: "small-group"}
headFilterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayCluster.Name, common.RayNodeGroupLabelKey: "headgroup"}
workerFilterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayCluster.Name, common.RayNodeGroupLabelKey: "small-group"}

Describe("When creating a raycluster", func() {
It("should create a raycluster object", func() {
Expand All @@ -150,17 +152,15 @@ var _ = Context("Inside the default namespace", func() {

It("should create 3 workers", func() {
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
if len(workerPods.Items) > 0 {
Expect(workerPods.Items[0].Status.Phase).Should(Or(Equal(corev1.PodRunning), Equal(corev1.PodPending)))
}
})

It("should create a head pod resource", func() {
var headPods corev1.PodList
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayCluster.Name, common.RayNodeGroupLabelKey: "headgroup"}
err := k8sClient.List(ctx, &headPods, filterLabels, &client.ListOptions{Namespace: "default"}, client.InNamespace(myRayCluster.Namespace))
err := k8sClient.List(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: "default"}, client.InNamespace(myRayCluster.Namespace))
Expect(err).NotTo(HaveOccurred(), "failed list head pods")
Expect(len(headPods.Items)).Should(BeNumerically("==", 1), "My head pod list= %v", headPods.Items)

Expand Down Expand Up @@ -190,9 +190,42 @@ var _ = Context("Inside the default namespace", func() {
time.Second*15, time.Millisecond*500).Should(BeNil(), "autoscaler RoleBinding = %v", rbName)
})

It("should be able to update all Pods to Running", func() {
// We need to manually update Pod statuses otherwise they'll always be Pending.
// envtest doesn't create a full K8s cluster. It's only the control plane.
// There's no container runtime or any other K8s controllers.
// So Pods are created, but no controller updates them from Pending to Running.
// See https://book.kubebuilder.io/reference/envtest.html
for _, headPod := range headPods.Items {
headPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &headPod)).Should(BeNil())
}
err := k8sClient.List(ctx, &headPods, headFilterLabels, &client.ListOptions{Namespace: "default"})
Expect(err).ShouldNot(HaveOccurred(), "failed to list head Pods")
for _, headPod := range headPods.Items {
Expect(headPod.Status.Phase).Should(Equal(corev1.PodRunning))
}

for _, workerPod := range workerPods.Items {
workerPod.Status.Phase = corev1.PodRunning
Expect(k8sClient.Status().Update(ctx, &workerPod)).Should(BeNil())
}
err = k8sClient.List(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"})
Expect(err).ShouldNot(HaveOccurred(), "failed to list worker Pods")
for _, workerPod := range workerPods.Items {
Expect(workerPod.Status.Phase).Should(Equal(corev1.PodRunning))
}
})

It("cluster's .status.state should be updated to 'ready' shortly after all Pods are Running", func() {
Eventually(
getClusterState(ctx, "default", myRayCluster.Name),
time.Second*(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS+5), time.Millisecond*500).Should(Equal(rayiov1alpha1.Ready))
})

It("should re-create a deleted worker", func() {
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))

pod := workerPods.Items[0]
Expand All @@ -203,7 +236,7 @@ var _ = Context("Inside the default namespace", func() {

// at least 3 pods should be in none-failed phase
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

Expand All @@ -228,7 +261,7 @@ var _ = Context("Inside the default namespace", func() {
It("should have only 2 running worker", func() {
// retry listing pods, given that last update may not immediately happen.
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(2), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

Expand All @@ -250,7 +283,7 @@ var _ = Context("Inside the default namespace", func() {
It("should have only 1 running worker", func() {
// retry listing pods, given that last update may not immediately happen.
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(1), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

Expand All @@ -275,14 +308,14 @@ var _ = Context("Inside the default namespace", func() {
It("should scale to maxReplicas (4) workers", func() {
// retry listing pods, given that last update may not immediately happen.
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))
})

It("should countinue to have only maxReplicas (4) workers", func() {
// check that pod count stays at 4 for two seconds.
Consistently(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
listResourceFunc(ctx, &workerPods, workerFilterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*2, time.Millisecond*200).Should(Equal(4), fmt.Sprintf("workerGroup %v", workerPods.Items))
})
})
Expand Down Expand Up @@ -330,3 +363,13 @@ func retryOnOldRevision(attempts int, sleep time.Duration, f func() error) error
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}

func getClusterState(ctx context.Context, namespace string, clusterName string) func() rayiov1alpha1.ClusterState {
return func() rayiov1alpha1.ClusterState {
var cluster rayiov1alpha1.RayCluster
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, &cluster); err != nil {
log.Fatal(err)
}
return cluster.Status.State
}
}
8 changes: 6 additions & 2 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand Down Expand Up @@ -219,8 +220,11 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// SetupWithManager sets up the controller with the Manager.
func (r *RayServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rayv1alpha1.RayService{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{}, predicate.AnnotationChangedPredicate{})).
For(&rayv1alpha1.RayService{}, builder.WithPredicates(predicate.Or(
predicate.GenerationChangedPredicate{},
predicate.LabelChangedPredicate{},
predicate.AnnotationChangedPredicate{},
))).
Owns(&rayv1alpha1.RayCluster{}).
Owns(&corev1.Service{}).
Owns(&networkingv1.Ingress{}).
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ limitations under the License.
package ray

import (
"os"
"path/filepath"
"testing"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
Expand Down Expand Up @@ -79,6 +81,7 @@ var _ = BeforeSuite(func(done Done) {
Expect(k8sClient).ToNot(BeNil())

// Suggested way to run tests
os.Setenv(common.RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV, "10")
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: "0",
Expand Down

0 comments on commit 80c3ee5

Please sign in to comment.