Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayCluster] Make headpod name deterministic #3028

Merged
6 changes: 3 additions & 3 deletions kubectl-plugin/test/e2e/kubectl_ray_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving all ray cluster logs", func() {
expectedDirPath := "./raycluster-kuberay"
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`

cmd := exec.Command("kubectl", "ray", "log", "--namespace", namespace, "raycluster-kuberay", "--node-type", "all")
output, err := cmd.CombinedOutput()
Expand Down Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving ray cluster head logs", func() {
expectedDirPath := "./raycluster-kuberay"
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve only head node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+`
expectedOutputStringFormat := `No output directory specified, creating dir under current directory using resource name\.\nCommand set to retrieve only head node logs\.\nDownloading log for Ray Node raycluster-kuberay-head`

cmd := exec.Command("kubectl", "ray", "log", "--namespace", namespace, "raycluster-kuberay", "--node-type", "head")
output, err := cmd.CombinedOutput()
Expand Down Expand Up @@ -191,7 +191,7 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {

It("succeed in retrieving ray cluster logs within designated directory", func() {
expectedDirPath := "./temporary-directory"
expectedOutputStringFormat := `Command set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head-\w+\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`
expectedOutputStringFormat := `Command set to retrieve both head and worker node logs\.\nDownloading log for Ray Node raycluster-kuberay-head\nDownloading log for Ray Node raycluster-kuberay-workergroup-worker-\w+`

err := os.MkdirAll(expectedDirPath, 0o755)
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
// headPort is passed into setMissingRayStartParams but unused there for the head pod.
// To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic.
podTemplate := headSpec.Template
podTemplate.GenerateName = podName
podTemplate.Name = podName
// Pods created by RayCluster should be restricted to the namespace of the RayCluster.
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
podTemplate.ObjectMeta.Namespace = instance.Namespace
Expand Down
32 changes: 13 additions & 19 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,24 +733,18 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return errstd.Join(utils.ErrFailedCreateHeadPod, err)
}
common.SuccessfulClustersCounterInc(instance.Namespace)
} else if len(headPods.Items) > 1 {
owenowenisme marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("reconcilePods: Found more than one head Pods; deleting extra head Pods.", "nHeadPods", len(headPods.Items))
// TODO (kevin85421): In-place update may not be a good idea.
itemLength := len(headPods.Items)
for index := 0; index < itemLength; index++ {
if headPods.Items[index].Status.Phase == corev1.PodRunning || headPods.Items[index].Status.Phase == corev1.PodPending {
headPods.Items[index] = headPods.Items[len(headPods.Items)-1] // Replace healthy pod at index i with the last element from the list of pods to delete.
headPods.Items = headPods.Items[:len(headPods.Items)-1] // Truncate slice.
itemLength--
}
}
// delete all the extra head pod pods
for _, extraHeadPodToDelete := range headPods.Items {
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
}
r.rayClusterScaleExpectation.ExpectScalePod(extraHeadPodToDelete.Namespace, instance.Name, expectations.HeadGroup, extraHeadPodToDelete.Name, expectations.Delete)
} else if len(headPods.Items) > 1 { // This should never happen. This protects against the case that users manually create headpod.
correctHeadPodName := instance.Name + "-head"
headPodNames := make([]string, len(headPods.Items))
for i, pod := range headPods.Items {
headPodNames[i] = pod.Name
}

logger.Info("Multiple head pods found, it should only exist one head pod. Please delete extra head pods.",
"found pods", headPodNames,
"should only leave", correctHeadPodName,
)
return fmt.Errorf("%d head pods found %v. Please delete extra head pods and leave only the head pod with name %s", len(headPods.Items), headPodNames, correctHeadPodName)
}

// Reconcile worker pods now
Expand Down Expand Up @@ -1092,7 +1086,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
// Build head instance pod(s).
func (r *RayClusterReconciler) buildHeadPod(ctx context.Context, instance rayv1.RayCluster) corev1.Pod {
logger := ctrl.LoggerFrom(ctx)
podName := utils.PodGenerateName(instance.Name, rayv1.HeadNode)
podName := utils.PodName(instance.Name, rayv1.HeadNode, false)
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name
// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
Expand All @@ -1119,7 +1113,7 @@ func getCreatorCRDType(instance rayv1.RayCluster) utils.CRDType {
// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv1.RayCluster, worker rayv1.WorkerGroupSpec) corev1.Pod {
logger := ctrl.LoggerFrom(ctx)
podName := utils.PodGenerateName(fmt.Sprintf("%s-%s", instance.Name, worker.GroupName), rayv1.WorkerNode)
podName := utils.PodName(fmt.Sprintf("%s-%s", instance.Name, worker.GroupName), rayv1.WorkerNode, true)
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) // Fully Qualified Domain Name

// The Ray head port used by workers to connect to the cluster (GCS server port for Ray >= 1.11.0, Redis port for older Ray.)
Expand Down
12 changes: 8 additions & 4 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func CheckRouteName(ctx context.Context, s string, n string) string {
return CheckName(s)
}

// PodGenerateName returns the value that should be used for a Pod's generateName
// PodName returns the value that should be used for a Pod's Name or GenerateName
// based on the RayCluster name and node type (head or worker).
func PodGenerateName(prefix string, nodeType rayv1.RayNodeType) string {
maxPrefixLength := 50 // 63 - (max(8,6) + 5 ) // 6 to 8 char are consumed at the end with "-head-" or -worker- + 5 generated.
func PodName(prefix string, nodeType rayv1.RayNodeType, isGenerateName bool) string {
maxPrefixLength := 50 // 63 - ( 8 + 5 ) // 8 char are consumed at the end with "-worker-" + 5 generated.

var podPrefix string
if len(prefix) <= maxPrefixLength {
Expand All @@ -177,7 +177,11 @@ func PodGenerateName(prefix string, nodeType rayv1.RayNodeType) string {
podPrefix = prefix[:maxPrefixLength]
}

return strings.ToLower(podPrefix + DashSymbol + string(nodeType) + DashSymbol)
result := strings.ToLower(podPrefix + DashSymbol + string(nodeType))
if isGenerateName {
result += DashSymbol
}
return result
}

// CheckName makes sure the name does not start with a numeric value and the total length is < 63 char
Expand Down
11 changes: 6 additions & 5 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCheckAllPodsRunning(t *testing.T) {
}
}

func TestPodGenerateName(t *testing.T) {
func TestPodName(t *testing.T) {
tests := []struct {
name string
prefix string
Expand All @@ -114,7 +114,7 @@ func TestPodGenerateName(t *testing.T) {
name: "short cluster name, head pod",
prefix: "ray-cluster-01",
nodeType: rayv1.HeadNode,
expected: "ray-cluster-01-head-",
expected: "ray-cluster-01-head",
},
{
name: "short cluster name, worker pod",
Expand All @@ -126,7 +126,7 @@ func TestPodGenerateName(t *testing.T) {
name: "long cluster name, head pod",
prefix: "ray-cluster-0000000000000000000000011111111122222233333333333333",
nodeType: rayv1.HeadNode,
expected: "ray-cluster-00000000000000000000000111111111222222-head-",
expected: "ray-cluster-00000000000000000000000111111111222222-head",
},
{
name: "long cluster name, worker pod",
Expand All @@ -138,11 +138,12 @@ func TestPodGenerateName(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
str := PodGenerateName(test.prefix, test.nodeType)
isPodNameGenerated := test.nodeType == rayv1.WorkerNode // HeadPod name is now fixed
str := PodName(test.prefix, test.nodeType, isPodNameGenerated)
if str != test.expected {
t.Logf("expected: %q", test.expected)
t.Logf("actual: %q", str)
t.Error("PodGenerateName returned an unexpected string")
t.Error("PodName returned an unexpected string")
}

// 63 (max pod name length) - 5 random hexadecimal characters from generateName
Expand Down
8 changes: 5 additions & 3 deletions ray-operator/test/e2e/raycluster_gcs_ft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ func TestRayClusterGCSFaultTolerence(t *testing.T) {
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())

testPodNameChanged := func(p *corev1.Pod) bool { return p.Name != headPod.Name }
owenowenisme marked this conversation as resolved.
Show resolved Hide resolved
PodUID := func(p *corev1.Pod) string { return string(p.UID) }
g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
Should(WithTransform(testPodNameChanged, Equal(true)))
ShouldNot(WithTransform(PodUID, Equal(string(headPod.UID)))) // Use UID to check if the new head pod is created.

g.Eventually(HeadPod(test, rayCluster), TestTimeoutMedium).
Should(WithTransform(PodState, Equal("Running")))

headPod, _ = GetHeadPod(test, rayCluster)
headPod, err = GetHeadPod(test, rayCluster) // Replace the old head pod
g.Expect(err).NotTo(HaveOccurred())

expectedOutput = "4"

ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "samples/test_detached_actor_2.py", rayNamespace, expectedOutput})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRedeployRayServe(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(endpoints.Subsets).To(HaveLen(1))
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(1))
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.Name).NotTo(Equal(oldHeadPod.Name))
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.UID).NotTo(Equal(oldHeadPod.UID))
}, TestTimeoutMedium).Should(Succeed())

test.T().Logf("Waiting for RayService %s/%s to running", rayService.Namespace, rayService.Name)
Expand Down
Loading