Skip to content

Commit

Permalink
Add Webhook Unit Test Suite (#578)
Browse files Browse the repository at this point in the history
* Test suite initial commit

* Remove unused annotations from certs/

* remove duplicate sample

* Added more unit tests

* Change logging level and fix tpu v5e chips per host calculation

* Add remaining unit tests

* Calculate TPU chips per VM host using google.com/tpu Resource value

* Remove headlessServiceName var (dynamically generate it with current cluster name instead)

* Resolve merge issues

* Fix headlessServiceName and finish tests

* Change RayCluster name back to default value

* Final test changes

* Fix bug for determining replicaIndex with workergroup name with dashes

* Add troubleshooting guide

* Add test for nil workerGroupSpecs in validateRayCluster

* Update troubleshooting guide

* Add example Pod env
  • Loading branch information
ryanaoleary authored Apr 22, 2024
1 parent 2d3e898 commit f0ee9e6
Show file tree
Hide file tree
Showing 7 changed files with 1,316 additions and 83 deletions.
62 changes: 62 additions & 0 deletions applications/ray/kuberay-tpu-webhook/Troubleshooting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Ray TPU Webhook Troubleshooting / FAQ Guide
Common issues and their solutions when deploying Ray TPU worker groups with the webhook.
Solutions will be added as new issues are encountered.

## `TPU_WORKER_HOSTNAMES` aren't injected into the Pod environment

### Symptoms
This may be the issue if multi-host Jax initialization fails with error `RuntimeError: Unable to initialize backend 'tpu': UNKNOWN: TPU initialization failed`. Verify that `TPU_WORKER_HOSTNAMES` are missing from the Pod environment with `kubectl describe`. The Pod environment output by `kubectl describe {$POD_NAME}` should look similar to:
```
Containers:
ray-worker:
...
Environment:
...
TPU_WORKER_HOSTNAMES: list of NumOfHosts DNS hostnames
TPU_WORKER_ID: unique Integer between 0 and NumOfHosts representing ID of worker within the Pod slice
TPU_NAME: worker group name followed by the replica index (e.g. workergroup-0)
```

### Solution #1
`TPU_WORKER_HOSTNAMES` are only injected for multi-host worker groups. If you're expecting `TPU_WORKER_HOSTNAMES` to be injected, check that the `NumOfHosts` field in your Ray worker group spec is set to a value greater than 1.

### Solution #2
The mutating webhook only intercepts Pods with the label `app.kubernetes.io/name: kuberay`. If environment variables aren't being injected by the webhook, it's possible the Pods are missing this label and it should be added (this label is added automatically to Pods created with Kuberay).

## Internal error occurred: failed calling webhook no endpoints available for service "kuberay-tpu-webhook"

### Solution #1
If attempting to install the webhook on a cluster where a previous version (e.g. v1.0) had been installed, `make uninstall` may fail to delete outdated Deployments, ValidatingWebhookConfigurations, or MutatingWebhookConfigurations, causing this error. To fix this, use `kubectl get` to check for outdated deployments and `kubectl delete` to remove them. Running `make deploy deploy-cert` should now run successfully.

## Internal error occurred: failed calling webhook no endpoints available for service "cert-manager-webhook"

### Solution #1
This error occurs when attempting to run `make deploy-cert` before the cert-manager certificate has become ready. After installing cert-manager in the cluster with `make install-cert-manager` it's usually necessary to wait around 2 minutes before running `make deploy deploy-cert`.

## Admission webhook denied the request: Number of workers in worker group not equal to specified topology

### Solution #1
Check that the `NumOfHosts` field in each Ray TPU worker group is equal to the number of TPU VM hosts expected by a given `gke-tpu-topology` and `gke-tpu-accelerator`. The expected number of hosts is calculated by dividing the total number of TPU chips per slice (e.g. for a 2x2x4 TPU podslice there are 16 chips) by the `google.com/tpu` resource request per worker. Each Ray worker is scheduled on a single node and corresponds to 1 TPU VM host. For more information about choosing a topology, see [TPU configurations](https://cloud.google.com/kubernetes-engine/docs/concepts/tpus#configuration).
### Example:
For a TPU v5e podslice with `gke-tpu-accelerator: tpu-v5-lite-podslice` and `gke-tpu-topology: 2x4`, there may be 1 or 2 TPU VM hosts if the machine type is `ct5lp-hightpu-8t` or `ct5lp-hightpu-4t` respectively. Determine the best configuration for your workload, and set the `google.com/tpu` resource request and limits values to either 8 or 4 based on your chosen machine type. You can then set the `NumOfHosts` field accordingly, and the webhook should admit the RayCluster and inject the desired values into each TPU Pod.

## Webhook calculates number TPU VM hosts incorrectly

### Solution #1
Check the `google.com/tpu` resource request and limits values (these should be equal) in the Ray worker group spec. This value indicates the number of TPU chips requested for each Ray worker (i.e. the number of TPU chips per the VM host). For a `2x2x4` tpu-v4-podslice with 4 TPU chips per VM host, `google.com/tpu` for the Ray worker should be 4. For a `2x4` tpu-v5-podslice with 8 TPU chips per VM host, `google.com/tpu` should be 8. Information about selecting the correct topology, acceleratorType, and corresponding `google.com/tpu` chips per worker can be found in the [TPU versions](https://cloud.google.com/tpu/docs/v5e) docs.
```
workerGroupSpecs:
template:
spec:
containers:
resources:
limits:
google.com/tpu: "{$TPU_CHIPS_PER_WORKER}"
requests:
google.com/tpu: "{$TPU_CHIPS_PER_WORKER}"
```

## `TPU_WORKER_ID`s incorrectly assigned when re-creating a RayCluster of the same name

### Solution #1
A limitation of the webhook is that it relies on RayCluster and Pod creation/deletion requests to be intercepted in-order, which isn't necessarily true when deleting a RayCluster and re-creating it too quickly. It's possible for the second creation request to be intercepted before the deletion request, causing the webhook to incorrectly assign `TPU_WORKER_ID`s for the new Pods. A solution for this issue is to wait a few seconds before creating a RayCluster of the same name, or to change the name of the RayCluster before re-creating it.
2 changes: 1 addition & 1 deletion applications/ray/kuberay-tpu-webhook/certs/cert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ spec:
- kuberay-tpu-webhook.ray-system.svc
- kuberay-tpu-webhook.ray-system.svc.cluster.local
issuerRef:
name: selfsigned-issuer
name: selfsigned-issuer
2 changes: 1 addition & 1 deletion applications/ray/kuberay-tpu-webhook/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
k8s.io/api v0.29.1
k8s.io/apimachinery v0.29.1
k8s.io/klog/v2 v2.120.1
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
)

require (
Expand Down Expand Up @@ -63,7 +64,6 @@ require (
k8s.io/client-go v0.29.0 // indirect
k8s.io/component-base v0.29.0 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect
sigs.k8s.io/controller-runtime v0.17.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
Expand Down
109 changes: 54 additions & 55 deletions applications/ray/kuberay-tpu-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (

// headless svc will be of the form: {kuberay-cluster-name}-headless-worker-svc
headlessServiceSuffix = "headless-worker-svc"
headlessServiceName string

// map of pod slices to workers in the slice
sliceToWorkers map[slice][]worker
Expand Down Expand Up @@ -78,7 +77,30 @@ func containerRequestingTPUs(containers ...corev1.Container) bool {
return false
}

func getNumTPUHostsFromTopology(clusterName string, groupName string, namespace string, topology string, acceleratorType string) (int32, error) {
// returns `google.com/TPU` Resource request value for the container
// this indicates the number of TPU chips for the container to use
func getNumTPUChipsRequested(containers ...corev1.Container) int64 {
tpuLimit := int64(0)
tpuRequest := int64(0)
for _, container := range containers {
if l := container.Resources.Limits; l != nil {
if resource := l[tpuResourceName]; !resource.IsZero() {
tpuLimit = resource.Value()
}
}
if r := container.Resources.Requests; r != nil {
if resource := r[tpuResourceName]; !resource.IsZero() {
tpuRequest = resource.Value()
}
} else {
// default to limit if request is ommitted
tpuRequest = tpuLimit
}
}
return min(tpuLimit, tpuRequest)
}

func getNumTPUHostsFromTopology(clusterName string, groupName string, namespace string, topology string, chipsPerHost int64) (int32, error) {
if topology == "" {
return 0, errors.New("TPU topology not specified")
}
Expand All @@ -87,39 +109,17 @@ func getNumTPUHostsFromTopology(clusterName string, groupName string, namespace
for i := 0; i < len(topologyVals); i++ {
dim, err := strconv.Atoi(topologyVals[i])
if err != nil {
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "gke-tpu-topology", topology)
klog.ErrorS(err, "getNumTPUHostsFromTopology", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "gke-tpu-topology", topology)
return 0, err
}
chips *= dim
}
// calculate the # of VMs using # of chips per host
acceleratorTypeValues := strings.Split(acceleratorType, "-")
chipsPerHost := 4 // default to 4 chips per VM
if acceleratorTypeValues[0] == "v5litepod" {
// v5e TPU VMs can have 1, 4 or 8 chips
chipsPerHost, err := strconv.Atoi(acceleratorTypeValues[1])
if err != nil {
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "gke-tpu-accelerator", acceleratorType)
return 0, err
}
chipsPerHost = min(chipsPerHost, 8) // max of 8 chips per host
}
hosts := int32(max(chips/chipsPerHost, 1))
klog.V(1).InfoS("getNumTPUHostsFromTopology", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "hosts", hosts)
hosts := max(int32(chips)/int32(chipsPerHost), 1)
klog.V(1).InfoS("getNumTPUHostsFromTopology", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "topology", topology, "chips", chips, "hosts", hosts)
return hosts, nil
}

// check if request is for TPU multi-host
func isTPUMultiHost(clusterName string, groupName string, namespace string, topology string, acceleratorType string) (bool, error) {
vms, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType)
if err != nil {
return false, err
}
isMultiHost := vms > 1
klog.V(0).InfoS("isTPUMultiHost", "RayCluster", namespace+"/"+clusterName, "topology", topology, "TPU VMs", vms)
return isMultiHost, nil
}

// unmarshal raycluster from admission request
func extractRayCluster(admissionReview *admissionv1.AdmissionReview) (*ray.RayCluster, error) {
if admissionReview.Request.Kind.Kind != "RayCluster" {
Expand All @@ -143,22 +143,22 @@ func genDNSHostnames(workerGroupSpec ray.WorkerGroupSpec, clusterName string, na
hostNames := make([]string, numHosts)
// Host names will be of the form {WORKER_GROUP_NAME}-{REPLICA_INDEX}-{HOST_INDEX}.headless-worker-svc
for j := 0; j < int(numHosts); j++ {
hostNames[j] = fmt.Sprintf("%s-%d-%d.%s", workerGroupName, replicaIndex, j, headlessServiceName)
hostNames[j] = fmt.Sprintf("%s-%d-%d.%s-%s", workerGroupName, replicaIndex, j, clusterName, headlessServiceSuffix)
}
klog.V(1).InfoS("genDNSHostnames", "RayCluster", namespace+"/"+clusterName, "NumOfHosts", numHosts, "Replica Index", replicaIndex)
return strings.Join(hostNames, ","), nil
}

// inject subdomain and TPU_WORKER_HOSTNAMES into pods for TPU multi-host initialization
func injectHostnames(hostNames string, envPath string, container corev1.Container, patches *[]patch) {
func injectHostnames(clusterName string, hostNames string, envPath string, container corev1.Container, patches *[]patch) {
subdomainPatch, hostNamesPatch := patch{"op": "add"}, patch{"op": "add"}
subdomainPath := "/spec/subdomain"
tpuWorkerHostNames := corev1.EnvVar{
Name: "TPU_WORKER_HOSTNAMES",
Value: hostNames,
}
subdomainPatch["path"] = subdomainPath
subdomainPatch["value"] = headlessServiceName
subdomainPatch["value"] = fmt.Sprintf("%s-%s", clusterName, headlessServiceSuffix)
// create new EnvVar array if container.Env is empty, and append hostnames if not
if len(container.Env) == 0 {
hostNamesPatch["path"] = envPath
Expand Down Expand Up @@ -227,15 +227,19 @@ func checkWorkersMatchTopology(clusterName string, namespace string, workerGroup
}
if containerRequestingTPUs(containers...) {
topology := workerGroupSpec.Template.Spec.NodeSelector["cloud.google.com/gke-tpu-topology"]
acceleratorType := workerGroupSpec.Template.Spec.NodeSelector["cloud.google.com/gke-tpu-accelerator"]
klog.V(1).InfoS("checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "topology", topology, "AcceleratorType", acceleratorType, "NumOfHosts", numHosts)
klog.V(1).InfoS("checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "topology", topology, "NumOfHosts", numHosts)
if topology == "" {
klog.ErrorS(errors.New("TPU topology not specified"), "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
err := errors.New("TPU topology not specified")
klog.ErrorS(err, "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
return false, err
}
if acceleratorType == "" {
klog.ErrorS(errors.New("TPU accelerator not specified"), "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
chipsPerHost := getNumTPUChipsRequested(containers...)
if chipsPerHost == 0 {
err := errors.New("Container does not set TPU limits")
klog.ErrorS(err, "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
return false, err
}
expectedHosts, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType)
expectedHosts, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, chipsPerHost)
if err != nil {
return false, err
}
Expand All @@ -258,13 +262,12 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
message := ""
clusterName := raycluster.Name
namespace := raycluster.Namespace
klog.V(0).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName)
headlessServiceName = fmt.Sprintf("%s-%s", clusterName, headlessServiceSuffix)
klog.V(1).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName)
workerGroupSpecs := raycluster.Spec.WorkerGroupSpecs
for i := 0; i < len(workerGroupSpecs); i++ {
workerGroupSpec := workerGroupSpecs[i]
if containerRequestingTPUs(workerGroupSpec.Template.Spec.Containers...) {
klog.V(0).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName, "Worker Group", workerGroupSpec.GroupName, "Requests TPUs", true)
klog.V(1).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName, "Worker Group", workerGroupSpec.GroupName, "Requests TPUs", true)
// create mapping for pod slices -> TPU_WORKER_HOSTNAMES in cluster
replicas := int(*workerGroupSpec.Replicas)
numOfHosts := workerGroupSpec.NumOfHosts
Expand All @@ -285,7 +288,7 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
}
} else {
// RayCluster worker group does not request TPUs
klog.V(0).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName, "Worker Group", workerGroupSpec.GroupName, "Requests TPUs", false)
klog.V(1).InfoS("validateRayCluster", "RayCluster", namespace+"/"+clusterName, "Worker Group", workerGroupSpec.GroupName, "Requests TPUs", false)
}
// validate NumOfHosts for worker group matches topology nodeSelector
workersMatchTopology, err := checkWorkersMatchTopology(clusterName, namespace, workerGroupSpec)
Expand Down Expand Up @@ -363,7 +366,7 @@ func getReplicaIndex(clusterName string, groupName string, namespace string) int
if nextLowestId == math.MaxInt32 {
nextLowestId = numReplicas
}
klog.V(0).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "Replica Index", nextLowestId)
klog.V(1).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "Replica Index", nextLowestId)
return nextLowestId
}

Expand Down Expand Up @@ -399,7 +402,7 @@ func getNextWorkerID(podSlice slice, namespace string, replicaIndex int) int {
}
tpuWorkerID = nextLowestID
}
klog.V(0).InfoS("getNextWorkerID", "RayCluster", namespace+"/"+podSlice.clusterName, "Worker Group", podSlice.groupName, "TPU_WORKER_ID", tpuWorkerID)
klog.V(1).InfoS("getNextWorkerID", "RayCluster", namespace+"/"+podSlice.clusterName, "Worker Group", podSlice.groupName, "TPU_WORKER_ID", tpuWorkerID)
return tpuWorkerID
}

Expand Down Expand Up @@ -445,24 +448,20 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
namespace := pod.Namespace
groupName := pod.Labels["ray.io/group"]
topology := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-topology"]
acceleratorType := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-accelerator"]
if topology == "" {
klog.ErrorS(errors.New("TPU topology not specified"), "mutatePod", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
}
if acceleratorType == "" {
klog.ErrorS(errors.New("TPU accelerator not specified"), "mutatePod", "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
}
// assign worker to the next unique ID in the pod slice and update map
numOfHosts, _ := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
chipsPerHost := getNumTPUChipsRequested(containers...)
numOfHosts, _ := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, chipsPerHost) // ignore error here because topology may not be set yet
replicaIndex := getReplicaIndex(clusterName, groupName, namespace)
podSlice := slice{clusterName, groupName, namespace, replicaIndex, numOfHosts}
tpuWorkerID := getNextWorkerID(podSlice, namespace, replicaIndex) // defaults to 0 for single-host

// inject replica index label
injectReplicaLabel(clusterName, namespace, replicaIndex, groupName, &patches)

isMultiHost, _ := isTPUMultiHost(clusterName, groupName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
if isMultiHost {
if numOfHosts > 1 {
// inject hostname into pod spec for DNS records
hostname := fmt.Sprintf(groupName+"-%d-%d", replicaIndex, tpuWorkerID)
klog.V(1).InfoS("mutatePod", "RayCluster", namespace+"/"+clusterName, "hostname", hostname)
Expand All @@ -480,11 +479,11 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
container := containers[i]
if containerRequestingTPUs(container) {
path := fmt.Sprintf("/spec/containers/%d/env", i)
if isMultiHost {
if numOfHosts > 1 {
// inject TPU_WORKER_HOSTNAMES set during RayCluster interception
klog.V(1).InfoS("mutatePod", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_HOSTNAMES", sliceToHostnames[podSlice])
klog.V(1).InfoS("mutatePod", "RayCluster", namespace+"/"+clusterName, "subdomain", headlessServiceName)
injectHostnames(sliceToHostnames[podSlice], path, container, &patches)
klog.V(1).InfoS("mutatePod", "RayCluster", namespace+"/"+clusterName, "subdomain", clusterName+"-"+headlessServiceSuffix)
injectHostnames(clusterName, sliceToHostnames[podSlice], path, container, &patches)
}
// inject TPU_WORKER_ID
if getEnvironmentVariable("TPU_WORKER_ID", container) == "" {
Expand Down Expand Up @@ -564,7 +563,7 @@ func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis

if replicaIndexLabel != "" {
replicaIndexLabelValues := strings.Split(replicaIndexLabel, "-")
replicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[1]) // ignore error here since must be set
replicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[len(replicaIndexLabelValues)-1]) // ignore error here since must be set

containers := pod.Spec.Containers
if containers == nil {
Expand All @@ -590,7 +589,7 @@ func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
for index, worker := range sliceToWorkers[slice] {
if worker.workerIndex == tpuWorkerID {
sliceToWorkers[slice][index].isCreated = false
klog.V(0).InfoS("deletePod", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerID, "Replica Index", replicaIndex)
klog.V(1).InfoS("deletePod", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerID, "Replica Index", replicaIndex)
break
}
}
Expand Down Expand Up @@ -649,7 +648,7 @@ func main() {
}

if admissionReview.Request.Kind.Kind == "Pod" {
klog.V(0).Info("Received review for Pod creation")
klog.V(1).Info("Received review for Pod creation")
response, err := mutatePod(admissionReview)
if err != nil {
klog.Errorf("Failed to mutate pod: %s", err)
Expand Down
Loading

0 comments on commit f0ee9e6

Please sign in to comment.