Skip to content

Commit

Permalink
Add cluster role and log formatting changes
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
  • Loading branch information
ryanaoleary committed Jul 4, 2024
1 parent 872f08c commit aed7649
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
26 changes: 24 additions & 2 deletions ray-on-gke/tpu/kuberay-tpu-webhook/deployments/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@ kind: Namespace
metadata:
name: ray-system
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: pod-reader
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: pod-reader
subjects:
- kind: ServiceAccount
name: default
namespace: ray-system
roleRef:
kind: ClusterRole
name: pod-reader
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand All @@ -21,11 +43,11 @@ spec:
app: kuberay-tpu-webhook
spec:
containers:
- image: us-docker.pkg.dev/ai-on-gke/kuberay-tpu-webhook/kuberay-tpu-webhook:v1.1
- image: us-docker.pkg.dev/ai-on-gke/kuberay-tpu-webhook/kuberay-tpu-webhook:v1.2
imagePullPolicy: Always
name: kuberay-tpu-webhook
args:
- --v=0 # change this value to 1 for verbose logging
- --v=0 # change this value to 1 for verbose logging
ports:
- name: https
containerPort: 443
Expand Down
14 changes: 6 additions & 8 deletions ray-on-gke/tpu/kuberay-tpu-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,11 @@ func updateSliceToWorkerIDs(pod *corev1.Pod, clusterName string, groupName strin
klog.ErrorS(err, "updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName)
return
}

if podsInNamespace != nil {
klog.V(1).InfoS("updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "# Pods", len(podsInNamespace.Items))
for _, existingPod := range podsInNamespace.Items {
if existingPod.Status.Phase == "Pending" || existingPod.Status.Phase == "Running" {
existingClusterName := existingPod.Labels["ray.io/cluster"]
existingGroupName := existingPod.Labels["ray.io/group"]
klog.V(1).InfoS("updateSliceToWorkerIDs", "RayCluster", namespace+"/"+existingClusterName, "Worker Group", existingGroupName)
// we only care about workers in the same RayCluster and worker group when assigning IDs
if clusterName == existingClusterName && groupName == existingGroupName {
if containerRequestingTPUs(existingPod.Spec.Containers...) {
Expand All @@ -395,16 +392,18 @@ func updateSliceToWorkerIDs(pod *corev1.Pod, clusterName string, groupName strin
existingWorkerID := -1
for _, container := range existingPod.Spec.Containers {
if containerRequestingTPUs(container) {
tempVar, err := strconv.Atoi(getEnvironmentVariable("TPU_WORKER_ID", container))
if err == nil {
tpuWorkerIDEnvVar := getEnvironmentVariable("TPU_WORKER_ID", container)
tempVar, err := strconv.Atoi(tpuWorkerIDEnvVar)
if err != nil {
klog.ErrorS(err, "updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerIDEnvVar)
existingWorkerID = tempVar
}
break
}
}
if existingPod.Status.Phase == "Running" && existingWorkerID == -1 {
klog.ErrorS(errors.New("existing TPU worker missing TPU_WORKER_ID"), "updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName)
return
continue
}
if existingWorkerID != -1 {
// Pod has been intercepted by the webhook
Expand All @@ -414,8 +413,8 @@ func updateSliceToWorkerIDs(pod *corev1.Pod, clusterName string, groupName strin
} else {
sliceToWorkerIDs[podSlice] = append(sliceToWorkerIDs[podSlice], existingWorkerID)
}
klog.V(1).InfoS("updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "TPU_WORKER_ID", existingWorkerID)
}
klog.V(1).InfoS("updateSliceToWorkerIDs", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "TPU_WORKER_ID", existingWorkerID)
}
}
}
Expand Down Expand Up @@ -479,7 +478,6 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
// query k8s client to populate sliceToWorkerIDs to then calculate the next TPU_WORKER_ID and replicaIndex
sliceToWorkerIDs = make(map[slice][]int)
updateSliceToWorkerIDs(pod, clusterName, groupName, namespace, numOfHosts)
printSliceToWorkerIds()
replicaIndex := getReplicaIndex(clusterName, groupName, namespace)
podSlice := slice{clusterName, groupName, namespace, replicaIndex, numOfHosts}
tpuWorkerID := getNextWorkerID(podSlice, namespace, replicaIndex) // defaults to 0 for single-host
Expand Down

0 comments on commit aed7649

Please sign in to comment.