From c24a0b3a8f9b1d8193e27f976da9f3d71867506f Mon Sep 17 00:00:00 2001 From: Hysun He Date: Tue, 5 Nov 2024 11:50:56 +0800 Subject: [PATCH 1/8] [docs]: OCI key_file path clarrification (#4262) * [docs]: OCI key_file path clarrification * Update installation.rst --- docs/source/getting-started/installation.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index cf6115ee9e8..69303a582e2 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -264,6 +264,7 @@ The :code:`~/.oci/config` file should contain the following fields: fingerprint=aa:bb:cc:dd:ee:ff:gg:hh:ii:jj:kk:ll:mm:nn:oo:pp tenancy=ocid1.tenancy.oc1..aaaaaaaa region=us-sanjose-1 + # Note that we should avoid using full home path for the key_file configuration, e.g. use ~/.oci instead of /home/username/.oci key_file=~/.oci/oci_api_key.pem From 877d77f79725f831aa8657d26458698f8214b214 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 5 Nov 2024 13:21:22 -0800 Subject: [PATCH 2/8] [k8s] Parallelize setup for faster multi-node provisioning (#4240) * parallelize setup * lint * Add retries * lint * retry for get_remote_home_dir * optimize privilege check * parallelize termination * increase num threads * comments * lint --- sky/provision/kubernetes/instance.py | 122 +++++++++++++++++++++------ sky/utils/command_runner.py | 25 +++++- sky/utils/subprocess_utils.py | 14 ++- 3 files changed, 128 insertions(+), 33 deletions(-) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 26ed5f51a43..14eea45149c 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -2,7 +2,7 @@ import copy import json import time -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import uuid from sky import exceptions @@ -24,6 +24,8 @@ POLL_INTERVAL = 2 _TIMEOUT_FOR_POD_TERMINATION = 60 # 1 minutes +_MAX_RETRIES = 3 +NUM_THREADS = subprocess_utils.get_parallel_threads() * 2 logger = sky_logging.init_logger(__name__) TAG_RAY_CLUSTER_NAME = 'ray-cluster-name' @@ -304,6 +306,33 @@ def _check_init_containers(pod): time.sleep(1) +def _run_function_with_retries(func: Callable, + operation_name: str, + max_retries: int = _MAX_RETRIES, + retry_delay: int = 5) -> Any: + """Runs a function with retries on Kubernetes errors. + + Args: + func: Function to retry + operation_name: Name of the operation for logging + max_retries: Maximum number of retry attempts + retry_delay: Delay between retries in seconds + + Raises: + The last exception encountered if all retries fail. + """ + for attempt in range(max_retries + 1): + try: + return func() + except config_lib.KubernetesError: + if attempt < max_retries: + logger.warning(f'Failed to {operation_name} - ' + f'retrying in {retry_delay} seconds.') + time.sleep(retry_delay) + else: + raise + + def _set_env_vars_in_pods(namespace: str, context: Optional[str], new_pods: List): """Setting environment variables in pods. @@ -323,14 +352,27 @@ def _set_env_vars_in_pods(namespace: str, context: Optional[str], """ set_k8s_env_var_cmd = docker_utils.SETUP_ENV_VARS_CMD - for new_pod in new_pods: + def _set_env_vars_thread(new_pod): + pod_name = new_pod.metadata.name + logger.info(f'{"-"*20}Start: Set up env vars in pod {pod_name!r} ' + f'{"-"*20}') runner = command_runner.KubernetesCommandRunner( - ((namespace, context), new_pod.metadata.name)) - rc, stdout, _ = runner.run(set_k8s_env_var_cmd, - require_outputs=True, - stream_logs=False) - _raise_command_running_error('set env vars', set_k8s_env_var_cmd, - new_pod.metadata.name, rc, stdout) + ((namespace, context), pod_name)) + + def _run_env_vars_cmd(): + rc, stdout, _ = runner.run(set_k8s_env_var_cmd, + require_outputs=True, + stream_logs=False) + _raise_command_running_error('set env vars', set_k8s_env_var_cmd, + pod_name, rc, stdout) + + _run_function_with_retries(_run_env_vars_cmd, + f'set env vars in pod {pod_name}') + logger.info(f'{"-"*20}End: Set up env vars in pod {pod_name!r} ' + f'{"-"*20}') + + subprocess_utils.run_in_parallel(_set_env_vars_thread, new_pods, + NUM_THREADS) def _check_user_privilege(namespace: str, context: Optional[str], @@ -350,23 +392,37 @@ def _check_user_privilege(namespace: str, context: Optional[str], ' fi; ' 'fi') - for new_node in new_nodes: - runner = command_runner.KubernetesCommandRunner( - ((namespace, context), new_node.metadata.name)) + # This check needs to run on a per-image basis, so running the check on + # any one pod is sufficient. + new_node = new_nodes[0] + pod_name = new_node.metadata.name + + runner = command_runner.KubernetesCommandRunner( + ((namespace, context), pod_name)) + logger.info(f'{"-"*20}Start: Check user privilege in pod {pod_name!r} ' + f'{"-"*20}') + + def _run_privilege_check(): rc, stdout, stderr = runner.run(check_k8s_user_sudo_cmd, require_outputs=True, separate_stderr=True, stream_logs=False) _raise_command_running_error('check user privilege', - check_k8s_user_sudo_cmd, - new_node.metadata.name, rc, + check_k8s_user_sudo_cmd, pod_name, rc, stdout + stderr) - if stdout == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE): - raise config_lib.KubernetesError( - 'Insufficient system privileges detected. ' - 'Ensure the default user has root access or ' - '"sudo" is installed and the user is added to the sudoers ' - 'from the image.') + return stdout + + stdout = _run_function_with_retries( + _run_privilege_check, f'check user privilege in pod {pod_name!r}') + + if stdout == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE): + raise config_lib.KubernetesError( + 'Insufficient system privileges detected. ' + 'Ensure the default user has root access or ' + '"sudo" is installed and the user is added to the sudoers ' + 'from the image.') + logger.info(f'{"-"*20}End: Check user privilege in pod {pod_name!r} ' + f'{"-"*20}') def _setup_ssh_in_pods(namespace: str, context: Optional[str], @@ -405,14 +461,19 @@ def _setup_ssh_thread(new_node): runner = command_runner.KubernetesCommandRunner( ((namespace, context), pod_name)) logger.info(f'{"-"*20}Start: Set up SSH in pod {pod_name!r} {"-"*20}') - rc, stdout, _ = runner.run(set_k8s_ssh_cmd, - require_outputs=True, - stream_logs=False) - _raise_command_running_error('setup ssh', set_k8s_ssh_cmd, pod_name, rc, - stdout) + + def _run_ssh_setup(): + rc, stdout, _ = runner.run(set_k8s_ssh_cmd, + require_outputs=True, + stream_logs=False) + _raise_command_running_error('setup ssh', set_k8s_ssh_cmd, pod_name, + rc, stdout) + + _run_function_with_retries(_run_ssh_setup, + f'setup ssh in pod {pod_name!r}') logger.info(f'{"-"*20}End: Set up SSH in pod {pod_name!r} {"-"*20}') - subprocess_utils.run_in_parallel(_setup_ssh_thread, new_nodes) + subprocess_utils.run_in_parallel(_setup_ssh_thread, new_nodes, NUM_THREADS) def _label_pod(namespace: str, context: Optional[str], pod_name: str, @@ -765,12 +826,17 @@ def terminate_instances( def _is_head(pod) -> bool: return pod.metadata.labels[constants.TAG_RAY_NODE_KIND] == 'head' - for pod_name, pod in pods.items(): - logger.debug(f'Terminating instance {pod_name}: {pod}') + def _terminate_pod_thread(pod_info): + pod_name, pod = pod_info if _is_head(pod) and worker_only: - continue + return + logger.debug(f'Terminating instance {pod_name}: {pod}') _terminate_node(namespace, context, pod_name) + # Run pod termination in parallel + subprocess_utils.run_in_parallel(_terminate_pod_thread, pods.items(), + NUM_THREADS) + def get_cluster_info( region: str, diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index 7eae76040d8..25483031038 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -237,6 +237,23 @@ def _rsync( rsync_command.append(prefix_command) rsync_command += ['rsync', RSYNC_DISPLAY_OPTION] + def _get_remote_home_dir_with_retry(): + backoff = common_utils.Backoff(initial_backoff=1, + max_backoff_factor=5) + retries_left = max_retry + assert retries_left > 0, f'max_retry {max_retry} must be positive.' + while retries_left >= 0: + try: + return get_remote_home_dir() + except Exception: # pylint: disable=broad-except + if retries_left == 0: + raise + sleep_time = backoff.current_backoff() + logger.warning(f'Failed to get remote home dir ' + f'- retrying in {sleep_time} seconds.') + retries_left -= 1 + time.sleep(sleep_time) + # --filter # The source is a local path, so we need to resolve it. resolved_source = pathlib.Path(source).expanduser().resolve() @@ -261,7 +278,7 @@ def _rsync( if up: resolved_target = target if target.startswith('~'): - remote_home_dir = get_remote_home_dir() + remote_home_dir = _get_remote_home_dir_with_retry() resolved_target = target.replace('~', remote_home_dir) full_source_str = str(resolved_source) if resolved_source.is_dir(): @@ -273,7 +290,7 @@ def _rsync( else: resolved_source = source if source.startswith('~'): - remote_home_dir = get_remote_home_dir() + remote_home_dir = _get_remote_home_dir_with_retry() resolved_source = source.replace('~', remote_home_dir) rsync_command.extend([ f'{node_destination}:{resolved_source!r}', @@ -656,6 +673,8 @@ def rsync( class KubernetesCommandRunner(CommandRunner): """Runner for Kubernetes commands.""" + _MAX_RETRIES_FOR_RSYNC = 3 + def __init__( self, node: Tuple[Tuple[str, Optional[str]], str], @@ -798,7 +817,7 @@ def rsync( # Advanced options. log_path: str = os.devnull, stream_logs: bool = True, - max_retry: int = 1, + max_retry: int = _MAX_RETRIES_FOR_RSYNC, ) -> None: """Uses 'rsync' to sync 'source' to 'target'. diff --git a/sky/utils/subprocess_utils.py b/sky/utils/subprocess_utils.py index 303e3ddad99..acb8fb9f490 100644 --- a/sky/utils/subprocess_utils.py +++ b/sky/utils/subprocess_utils.py @@ -50,17 +50,27 @@ def get_parallel_threads() -> int: return max(4, cpu_count - 1) -def run_in_parallel(func: Callable, args: Iterable[Any]) -> List[Any]: +def run_in_parallel(func: Callable, + args: Iterable[Any], + num_threads: Optional[int] = None) -> List[Any]: """Run a function in parallel on a list of arguments. The function 'func' should raise a CommandError if the command fails. + Args: + func: The function to run in parallel + args: Iterable of arguments to pass to func + num_threads: Number of threads to use. If None, uses + get_parallel_threads() + Returns: A list of the return values of the function func, in the same order as the arguments. """ # Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long - with pool.ThreadPool(processes=get_parallel_threads()) as p: + processes = num_threads if num_threads is not None else get_parallel_threads( + ) + with pool.ThreadPool(processes=processes) as p: # Run the function in parallel on the arguments, keeping the order. return list(p.imap(func, args)) From 6a5ef1faafe933c408e435d0d7831c3f64108748 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Fri, 27 Sep 2024 15:42:01 -0700 Subject: [PATCH 3/8] K8s pod dns (#6) * enumerate worker pods, bind pod_name to headless service * patch worker numbering * allow any traffic between sky pods * add test for ssh vs hostname * fix ip-worker mapping for k8s ssh * lint --- sky/backends/cloud_vm_ray_backend.py | 11 +++++-- sky/provision/kubernetes/instance.py | 10 ++++--- sky/templates/kubernetes-ray.yml.j2 | 43 ++++++++++++++++------------ tests/test_smoke.py | 24 ++++++++++++++++ 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 0013e6cbaf9..b75cd10bb1a 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2318,9 +2318,14 @@ def is_provided_ips_valid( zip(cluster_internal_ips, cluster_feasible_ips)) # Ensure head node is the first element, then sort based on the - # external IPs for stableness - stable_internal_external_ips = [internal_external_ips[0]] + sorted( - internal_external_ips[1:], key=lambda x: x[1]) + # external IPs for stableness. Skip for k8s nodes since pods + # worker ids are already mapped. + if (cluster_info is not None and + cluster_info.provider_name == 'kubernetes'): + stable_internal_external_ips = internal_external_ips + else: + stable_internal_external_ips = [internal_external_ips[0]] + sorted( + internal_external_ips[1:], key=lambda x: x[1]) self.stable_internal_external_ips = stable_internal_external_ips @functools.lru_cache() diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 14eea45149c..fb37eaa73b1 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -634,7 +634,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str, created_pods = {} logger.debug(f'run_instances: calling create_namespaced_pod ' f'(count={to_start_count}).') - for _ in range(to_start_count): + for pod_id in range(config.count): if head_pod_name is None: pod_spec['metadata']['labels'].update(constants.HEAD_NODE_TAGS) head_selector = head_service_selector(cluster_name_on_cloud) @@ -642,9 +642,11 @@ def _create_pods(region: str, cluster_name_on_cloud: str, pod_spec['metadata']['name'] = f'{cluster_name_on_cloud}-head' else: pod_spec['metadata']['labels'].update(constants.WORKER_NODE_TAGS) - pod_uuid = str(uuid.uuid4())[:4] - pod_name = f'{cluster_name_on_cloud}-{pod_uuid}' - pod_spec['metadata']['name'] = f'{pod_name}-worker' + pod_name = f'{cluster_name_on_cloud}-worker{pod_id}' + if pod_id == 0 or pod_name in running_pods: + continue + pod_spec['metadata']['name'] = pod_name + pod_spec['metadata']['labels']['component'] = pod_name # For multi-node support, we put a soft-constraint to schedule # worker pods on different nodes than the head pod. # This is not set as a hard constraint because if different nodes diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index b807fd2135b..27794ed8634 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -226,27 +226,34 @@ provider: - apiVersion: v1 kind: Service metadata: - labels: - parent: skypilot - skypilot-cluster: {{cluster_name_on_cloud}} - skypilot-user: {{ user }} - # NOTE: If you're running multiple Ray clusters with services - # on one Kubernetes cluster, they must have unique service - # names. - name: {{cluster_name_on_cloud}}-head + labels: + parent: skypilot + skypilot-cluster: {{cluster_name_on_cloud}} + skypilot-user: {{ user }} + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: {{cluster_name_on_cloud}}-head spec: # This selector must match the head node pod's selector below. selector: component: {{cluster_name_on_cloud}}-head - ports: - - name: client - protocol: TCP - port: 10001 - targetPort: 10001 - - name: dashboard - protocol: TCP - port: 8265 - targetPort: 8265 + clusterIP: None + # Service maps to rest of the worker nodes + {% for worker_id in range(1, num_nodes) %} + - apiVersion: v1 + kind: Service + metadata: + labels: + parent: skypilot + skypilot-cluster: {{cluster_name_on_cloud}} + skypilot-user: {{ user }} + name: {{cluster_name_on_cloud}}-worker{{ worker_id }} + spec: + selector: + component: {{cluster_name_on_cloud}}-worker{{ worker_id }} + clusterIP: None + {% endfor %} # Specify the pod type for the ray head node (as configured below). head_node_type: ray_head_default @@ -259,7 +266,7 @@ available_node_types: metadata: # name will be filled in the provisioner # head node name will be {{cluster_name_on_cloud}}-head, which will match the head node service selector above if a head node - # service is required. + # service is required. Remaining nodes are named {{cluster_name_on_cloud}}-worker{{ node_id }} labels: parent: skypilot # component will be set for the head node pod to be the same as the head node service selector above if a diff --git a/tests/test_smoke.py b/tests/test_smoke.py index cdfd9dfc7cb..77998f5ad60 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3435,6 +3435,30 @@ def test_kubernetes_custom_image(image_id): run_one_test(test) +@pytest.mark.kubernetes +def test_kubernetes_ssh_hostname(): + name = _get_cluster_name() + test = Test( + 'test-kubernetes-ssh-hostname', + [ + f'sky launch -c {name} -y --num-nodes 10 --cpus 1+', + f'ssh {name} -t "hostname" | grep head', + f'ssh {name}-worker1 -t "hostname" | grep worker1', + f'ssh {name}-worker2 -t "hostname" | grep worker2', + f'ssh {name}-worker3 -t "hostname" | grep worker3', + f'ssh {name}-worker4 -t "hostname" | grep worker4', + f'ssh {name}-worker5 -t "hostname" | grep worker5', + f'ssh {name}-worker6 -t "hostname" | grep worker6', + f'ssh {name}-worker7 -t "hostname" | grep worker7', + f'ssh {name}-worker8 -t "hostname" | grep worker8', + f'ssh {name}-worker9 -t "hostname" | grep worker9', + ], + f'sky down -y {name}', + timeout=10 * 60, + ) + run_one_test(test) + + @pytest.mark.azure def test_azure_start_stop_two_nodes(): name = _get_cluster_name() From 9ce24b93321f4d44994430a29dd0108affc7d8dd Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Fri, 27 Sep 2024 17:20:00 -0700 Subject: [PATCH 4/8] Update README.md --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 2629cc4e4c8..c3e1a66f7a4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,15 @@ +

+ + Trainy Logo + +
+
+

+ +This repository is a fork of the [original Skypilot](https://github.com/skypilot-org/skypilot) and maintained by [Trainy](https://trainy.ai/) in order to support running jobs Trainy's our managed Kubernetes cluster platform as a service, Konduktor ([Github](https://github.com/Trainy-ai/konduktor) and [Documentation](https://konduktor.readthedocs.io/en/latest/)). You can see some our contributions to the mainline project [here](https://github.com/skypilot-org/skypilot/pulls?q=is%3Apr+author%3Aasaiacai+). If there are features in this fork you feel like make sense to contribute back to upstream, please let us know and we are happy to make a pull request. We are planning on keeping this fork the same license as the original project (Apache 2.0), as we have also greatly benefit from the open nature of the project and believe that sharing our work reduces redundant work streams for maintainers, contributors and users alike. + +---- +

From f782a016622d7d383dbf78b1946b1afe348a7562 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Fri, 27 Sep 2024 17:20:58 -0700 Subject: [PATCH 5/8] Update pypi-nightly-build.yml point build to our pypi project --- .github/workflows/pypi-nightly-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pypi-nightly-build.yml b/.github/workflows/pypi-nightly-build.yml index c4c6bfa2409..55ac58be311 100644 --- a/.github/workflows/pypi-nightly-build.yml +++ b/.github/workflows/pypi-nightly-build.yml @@ -43,7 +43,7 @@ jobs: RELEASE_VERSION=$(date +%Y%m%d) sed -i "s/{{SKYPILOT_COMMIT_SHA}}/${{ github.sha }}/g" sky/__init__.py sed -i "s/__version__ = '.*'/__version__ = '1.0.0.dev${RELEASE_VERSION}'/g" sky/__init__.py - sed -i "s/name='skypilot',/name='skypilot-nightly',/g" sky/setup_files/setup.py + sed -i "s/name='skypilot',/name='trainy-skypilot-nightly',/g" sky/setup_files/setup.py - name: Build a binary wheel and a source tarball run: >- python -m From 0435ad46dd02b6648322ef12393be168f3cb4535 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Fri, 11 Oct 2024 02:19:10 -0700 Subject: [PATCH 6/8] patch affinity to work with dws (#7) * patch affinity to work with dws * lint --- .../torch_ddp_benchmark.yaml | 9 ++-- sky/provision/kubernetes/instance.py | 50 +++++++++---------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/examples/torch_ddp_benchmark/torch_ddp_benchmark.yaml b/examples/torch_ddp_benchmark/torch_ddp_benchmark.yaml index 36278961006..1ff09e2fea0 100644 --- a/examples/torch_ddp_benchmark/torch_ddp_benchmark.yaml +++ b/examples/torch_ddp_benchmark/torch_ddp_benchmark.yaml @@ -28,9 +28,12 @@ name: torch-ddp-bench num_nodes: 2 resources: - accelerators: A100:8 # Make sure you use 8 GPU instances - use_spot: True - cloud: gcp + accelerators: H100-MEGA-80GB:8 # Make sure you use 8 GPU instances + cloud: kubernetes + labels: + kueue.x-k8s.io/queue-name: user-queue # this is assigned by your admin + kueue.x-k8s.io/priority-class: low-priority + max-run-duration-seconds: "3000" file_mounts: ./torch_ddp_benchmark.py: ./examples/torch_ddp_benchmark/torch_ddp_benchmark.py diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index fb37eaa73b1..e5ec245d7b6 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -647,31 +647,30 @@ def _create_pods(region: str, cluster_name_on_cloud: str, continue pod_spec['metadata']['name'] = pod_name pod_spec['metadata']['labels']['component'] = pod_name - # For multi-node support, we put a soft-constraint to schedule - # worker pods on different nodes than the head pod. - # This is not set as a hard constraint because if different nodes - # are not available, we still want to be able to schedule worker - # pods on larger nodes which may be able to fit multiple SkyPilot - # "nodes". - pod_spec['spec']['affinity'] = { - 'podAntiAffinity': { - # Set as a soft constraint - 'preferredDuringSchedulingIgnoredDuringExecution': [{ - # Max weight to avoid scheduling on the - # same physical node unless necessary. - 'weight': 100, - 'podAffinityTerm': { - 'labelSelector': { - 'matchExpressions': [{ - 'key': TAG_SKYPILOT_CLUSTER_NAME, - 'operator': 'In', - 'values': [cluster_name_on_cloud] - }] - }, - 'topologyKey': 'kubernetes.io/hostname' - } - }] - } + # For multi-node support, we put a soft-constraint to schedule + # worker pods on different nodes than the head pod. + # This is not set as a hard constraint because if different nodes + # are not available, we still want to be able to schedule worker + # pods on larger nodes which may be able to fit multiple SkyPilot + # "nodes". + pod_spec['spec']['affinity'] = { + 'podAntiAffinity': { + # Set as a soft constraint + 'preferredDuringSchedulingIgnoredDuringExecution': [{ + # Max weight to avoid scheduling on the + # same physical node unless necessary. + 'weight': 100, + 'podAffinityTerm': { + 'labelSelector': { + 'matchExpressions': [{ + 'key': TAG_SKYPILOT_CLUSTER_NAME, + 'operator': 'In', + 'values': [cluster_name_on_cloud] + }] + }, + 'topologyKey': 'kubernetes.io/hostname' + } + }] } pod = _create_namespaced_pod_with_retries(namespace, pod_spec, context) @@ -790,6 +789,7 @@ def _terminate_node(namespace: str, context: Optional[str], logger.warning('terminate_instances: Error occurred when analyzing ' f'SSH Jump pod: {e}') try: + kubernetes.core_api(context).delete_namespaced_service( pod_name, namespace, _request_timeout=config_lib.DELETION_TIMEOUT) kubernetes.core_api(context).delete_namespaced_service( From 2cb15606c0efc7eb1a79546582fb53efe16ee536 Mon Sep 17 00:00:00 2001 From: Andrew Aikawa Date: Fri, 11 Oct 2024 02:19:10 -0700 Subject: [PATCH 7/8] patch affinity to work with dws (#7) * patch affinity to work with dws * lint lint --- sky/provision/kubernetes/instance.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index e5ec245d7b6..4aa1c80b2e5 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -672,6 +672,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str, } }] } + } pod = _create_namespaced_pod_with_retries(namespace, pod_spec, context) created_pods[pod.metadata.name] = pod From 27826974cf3efda7f37e4fe8744212ca82cbb0a8 Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 5 Nov 2024 14:16:39 -0800 Subject: [PATCH 8/8] lint --- sky/provision/kubernetes/instance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 4aa1c80b2e5..b5e854cceac 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -3,7 +3,6 @@ import json import time from typing import Any, Callable, Dict, List, Optional -import uuid from sky import exceptions from sky import sky_logging