From c5aead6f329a81d14faf1f8b1a4fd64e1e2fae88 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sat, 21 Sep 2024 14:07:02 +0100 Subject: [PATCH 01/15] Add Skypilot Kubernetes integration for remote orchestration This commit adds the Skypilot Kubernetes integration to enable remote orchestration of ZenML pipelines on VMs. The integration includes the necessary initialization, configuration, and flavor classes. It also adds the Skypilot Kubernetes orchestrator and its settings. This integration provides an alternative to the local orchestrator for running pipelines on Kubernetes. --- .../skypilot_kubernetes/__init__.py | 52 ++++++++ .../skypilot_kubernetes/flavors/__init__.py | 26 ++++ ...pilot_orchestrator_kubernetes_vm_flavor.py | 124 ++++++++++++++++++ .../orchestrators/__init__.py | 25 ++++ .../skypilot_kubernetes_vm_orchestrator.py | 89 +++++++++++++ 5 files changed, 316 insertions(+) create mode 100644 src/zenml/integrations/skypilot_kubernetes/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py create mode 100644 src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py new file mode 100644 index 00000000000..62ab626f3f8 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -0,0 +1,52 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Skypilot Kubernetes integration for ZenML. + +The Skypilot integration sub-module powers an alternative to the local +orchestrator for a remote orchestration of ZenML pipelines on VMs. +""" +from typing import List, Type + +from zenml.integrations.constants import ( + SKYPILOT_Kubernetes, +) +from zenml.integrations.integration import Integration +from zenml.stack import Flavor + +SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR = "vm_kubernetes" + + +class SkypilotKubernetesIntegration(Integration): + """Definition of Skypilot Kubernetes Integration for ZenML.""" + + NAME = SKYPILOT_Kubernetes + # all 0.6.x versions of skypilot[kubernetes] are compatible + REQUIREMENTS = ["skypilot[kubernetes]~=0.6.0"] + APT_PACKAGES = ["openssh-client", "rsync"] + + @classmethod + def flavors(cls) -> List[Type[Flavor]]: + """Declare the stack component flavors for the Skypilot Kubernetes integration. + + Returns: + List of stack component flavors for this integration. + """ + from zenml.integrations.skypilot_kubernetes.flavors import ( + SkypilotKubernetesOrchestratorFlavor, + ) + + return [SkypilotKubernetesOrchestratorFlavor] + + +SkypilotKubernetesIntegration.check_installation() \ No newline at end of file diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py b/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py new file mode 100644 index 00000000000..8b2e352d4a4 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Skypilot integration flavor for Skypilot Kubernetes orchestrator.""" + +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import ( + SkypilotKubernetesOrchestratorConfig, + SkypilotKubernetesOrchestratorFlavor, + SkypilotKubernetesOrchestratorSettings, +) + +__all__ = [ + "SkypilotKubernetesOrchestratorConfig", + "SkypilotKubernetesOrchestratorFlavor", + "SkypilotKubernetesOrchestratorSettings", +] diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py new file mode 100644 index 00000000000..0480755a744 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py @@ -0,0 +1,124 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Skypilot orchestrator Kubernetes flavor.""" + +from typing import TYPE_CHECKING, Optional, Type + +from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( + SkypilotBaseOrchestratorConfig, + SkypilotBaseOrchestratorSettings, +) +from zenml.integrations.skypilot_kubernetes import ( + SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR, +) +from zenml.logger import get_logger +from zenml.models import ServiceConnectorRequirements +from zenml.orchestrators import BaseOrchestratorConfig, BaseOrchestratorFlavor + +if TYPE_CHECKING: + from zenml.integrations.skypilot_kubernetes.orchestrators import ( + SkypilotKubernetesOrchestrator, + ) + + +logger = get_logger(__name__) + + +class SkypilotKubernetesOrchestratorSettings(SkypilotBaseOrchestratorSettings): + """Skypilot orchestrator settings.""" + + +class SkypilotKubernetesOrchestratorConfig( + SkypilotBaseOrchestratorConfig, SkypilotKubernetesOrchestratorSettings +): + """Skypilot orchestrator config.""" + + +class SkypilotKubernetesOrchestratorFlavor(BaseOrchestratorFlavor): + """Flavor for the Skypilot Kubernetes orchestrator.""" + + @property + def name(self) -> str: + """Name of the orchestrator flavor. + + Returns: + Name of the orchestrator flavor. + """ + return SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR + + @property + def service_connector_requirements( + self, + ) -> Optional[ServiceConnectorRequirements]: + """Service connector resource requirements for service connectors. + + Specifies resource requirements that are used to filter the available + service connector types that are compatible with this flavor. + + Returns: + Requirements for compatible service connectors, if a service + connector is required for this flavor. + """ + return ServiceConnectorRequirements( + resource_type="kubernetes-generic", + ) + + @property + def docs_url(self) -> Optional[str]: + """A url to point at docs explaining this flavor. + + Returns: + A flavor docs url. + """ + return self.generate_default_docs_url() + + @property + def sdk_docs_url(self) -> Optional[str]: + """A url to point at SDK docs explaining this flavor. + + Returns: + A flavor SDK docs url. + """ + return self.generate_default_sdk_docs_url() + + @property + def logo_url(self) -> str: + """A url to represent the flavor in the dashboard. + + Returns: + The flavor logo. + """ + return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubernetes-skypilot.png" + + @property + def config_class(self) -> Type[BaseOrchestratorConfig]: + """Config class for the base orchestrator flavor. + + Returns: + The config class. + """ + return SkypilotKubernetesOrchestratorConfig + + @property + def implementation_class(self) -> Type["SkypilotKubernetesOrchestrator"]: + """Implementation class for this flavor. + + Returns: + Implementation class for this flavor. + """ + from zenml.integrations.skypilot_kubernetes.orchestrators import ( + SkypilotKubernetesOrchestrator, + ) + + return SkypilotKubernetesOrchestrator diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py new file mode 100644 index 00000000000..c0ab27f1b4d --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/__init__.py @@ -0,0 +1,25 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Initialization of the Skypilot Kubernetes ZenML orchestrator.""" + +from zenml.integrations.skypilot.orchestrators.skypilot_base_vm_orchestrator import ( # noqa + SkypilotBaseOrchestrator, +) +from zenml.integrations.skypilot_kubernetes.orchestrators.skypilot_kubernetes_vm_orchestrator import ( # noqa + SkypilotKubernetesOrchestrator, +) +__all__ = [ + "SkypilotBaseOrchestrator", + "SkypilotKubernetesOrchestrator", +] diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py new file mode 100644 index 00000000000..b0105c110a6 --- /dev/null +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -0,0 +1,89 @@ +# Copyright (c) ZenML GmbH 2023. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the a Skypilot based Kubernetes VM orchestrator.""" + +from typing import TYPE_CHECKING, Optional, Type, cast + +import sky + +from zenml.integrations.skypilot.orchestrators.skypilot_base_vm_orchestrator import ( + SkypilotBaseOrchestrator, +) +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import ( + SkypilotKubernetesOrchestratorConfig, + SkypilotKubernetesOrchestratorSettings, +) +from zenml.logger import get_logger + +if TYPE_CHECKING: + from zenml.config.base_settings import BaseSettings + +logger = get_logger(__name__) + + +class SkypilotKubernetesOrchestrator(SkypilotBaseOrchestrator): + """Orchestrator responsible for running pipelines remotely in a VM on Kubernetes. + + This orchestrator does not support running on a schedule. + """ + + @property + def cloud(self) -> sky.clouds.Cloud: + """The type of sky cloud to use. + + Returns: + A `sky.clouds.Cloud` instance. + """ + return sky.clouds.Kubernetes() + + @property + def config(self) -> SkypilotKubernetesOrchestratorConfig: + """Returns the `SkypilotKubernetesOrchestratorConfig` config. + + Returns: + The configuration. + """ + return cast(SkypilotKubernetesOrchestratorConfig, self._config) + + @property + def settings_class(self) -> Optional[Type["BaseSettings"]]: + """Settings class for the Skypilot orchestrator. + + Returns: + The settings class. + """ + return SkypilotKubernetesOrchestratorSettings + + # def prepare_environment_variable(self, set: bool = True) -> None: + # """Set up Environment variables that are required for the orchestrator. + + # Args: + # set: Whether to set the environment variables or not. + + # Raises: + # ValueError: If no service connector is found. + # """ + # connector = self.get_connector() + # if connector is None: + # raise ValueError( + # "No service connector found. Please make sure to set up a connector " + # "that is compatible with this orchestrator." + # ) + # if set: + # # The Kubernetes connector creates a local configuration profile with the name computed from + # # the first 8 digits of its UUID. + # kubernetes_profile = f"zenml-{str(connector.id)[:8]}" + # os.environ[ENV_Kubernetes_PROFILE] = kubernetes_profile + # else: + # os.environ.pop(ENV_Kubernetes_PROFILE, None) From 342fa34fc7cb8f7858b4b6a22fa96a3517a028b0 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sat, 21 Sep 2024 14:28:41 +0100 Subject: [PATCH 02/15] Add Skypilot Kubernetes integration for remote orchestration --- src/zenml/integrations/__init__.py | 1 + src/zenml/integrations/constants.py | 1 + src/zenml/integrations/skypilot_kubernetes/__init__.py | 6 +++--- .../flavors/skypilot_orchestrator_kubernetes_vm_flavor.py | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/zenml/integrations/__init__.py b/src/zenml/integrations/__init__.py index f6273ba2928..da08157e416 100644 --- a/src/zenml/integrations/__init__.py +++ b/src/zenml/integrations/__init__.py @@ -69,6 +69,7 @@ from zenml.integrations.skypilot_gcp import SkypilotGCPIntegration # noqa from zenml.integrations.skypilot_azure import SkypilotAzureIntegration # noqa from zenml.integrations.skypilot_lambda import SkypilotLambdaIntegration # noqa +from zenml.integrations.skypilot_kubernetes import SkypilotKubernetesIntegration # noqa from zenml.integrations.slack import SlackIntegration # noqa from zenml.integrations.spark import SparkIntegration # noqa from zenml.integrations.tekton import TektonIntegration # noqa diff --git a/src/zenml/integrations/constants.py b/src/zenml/integrations/constants.py index 5c780712237..3f06dbb452e 100644 --- a/src/zenml/integrations/constants.py +++ b/src/zenml/integrations/constants.py @@ -64,6 +64,7 @@ SKYPILOT_GCP = "skypilot_gcp" SKYPILOT_AZURE = "skypilot_azure" SKYPILOT_LAMBDA = "skypilot_lambda" +SKYPILOT_KUBERNETES = "skypilot_kubernetes" SLACK = "slack" SPARK = "spark" TEKTON = "tekton" diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index 62ab626f3f8..0913cc00fed 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -19,18 +19,18 @@ from typing import List, Type from zenml.integrations.constants import ( - SKYPILOT_Kubernetes, + SKYPILOT_KUBERNETES, ) from zenml.integrations.integration import Integration from zenml.stack import Flavor -SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR = "vm_kubernetes" +SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR = "vm_kubernetes" class SkypilotKubernetesIntegration(Integration): """Definition of Skypilot Kubernetes Integration for ZenML.""" - NAME = SKYPILOT_Kubernetes + NAME = SKYPILOT_KUBERNETES # all 0.6.x versions of skypilot[kubernetes] are compatible REQUIREMENTS = ["skypilot[kubernetes]~=0.6.0"] APT_PACKAGES = ["openssh-client", "rsync"] diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py index 0480755a744..e0b0f88115c 100644 --- a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py @@ -20,7 +20,7 @@ SkypilotBaseOrchestratorSettings, ) from zenml.integrations.skypilot_kubernetes import ( - SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR, + SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR, ) from zenml.logger import get_logger from zenml.models import ServiceConnectorRequirements @@ -55,7 +55,7 @@ def name(self) -> str: Returns: Name of the orchestrator flavor. """ - return SKYPILOT_Kubernetes_ORCHESTRATOR_FLAVOR + return SKYPILOT_KUBERNETES_ORCHESTRATOR_FLAVOR @property def service_connector_requirements( From 759058484a6d772e9e731e65f5d7f8b7608703c2 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sat, 21 Sep 2024 15:24:14 +0100 Subject: [PATCH 03/15] Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method --- .../skypilot_kubernetes_vm_orchestrator.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index b0105c110a6..830ad85e741 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -65,15 +65,16 @@ def settings_class(self) -> Optional[Type["BaseSettings"]]: """ return SkypilotKubernetesOrchestratorSettings - # def prepare_environment_variable(self, set: bool = True) -> None: - # """Set up Environment variables that are required for the orchestrator. + def prepare_environment_variable(self, set: bool = True) -> None: + """Set up Environment variables that are required for the orchestrator. - # Args: - # set: Whether to set the environment variables or not. + Args: + set: Whether to set the environment variables or not. + + Raises: + ValueError: If no service connector is found. + """ - # Raises: - # ValueError: If no service connector is found. - # """ # connector = self.get_connector() # if connector is None: # raise ValueError( From 03951d87d1c00fcd24cd9213c235eb752123b537 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sat, 21 Sep 2024 15:24:20 +0100 Subject: [PATCH 04/15] Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method --- .../orchestrators/skypilot_kubernetes_vm_orchestrator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index 830ad85e741..b39f81aacd5 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -74,7 +74,6 @@ def prepare_environment_variable(self, set: bool = True) -> None: Raises: ValueError: If no service connector is found. """ - # connector = self.get_connector() # if connector is None: # raise ValueError( @@ -87,4 +86,4 @@ def prepare_environment_variable(self, set: bool = True) -> None: # kubernetes_profile = f"zenml-{str(connector.id)[:8]}" # os.environ[ENV_Kubernetes_PROFILE] = kubernetes_profile # else: - # os.environ.pop(ENV_Kubernetes_PROFILE, None) + # os.environ.pop(ENV_Kubernetes_PROFILE, None) \ No newline at end of file From 89718f01b9d1f83b5820ac7cf2dc62d1408a139e Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sun, 22 Sep 2024 02:15:31 +0100 Subject: [PATCH 05/15] Refactor SkypilotKubernetesOrchestrator prepare_environment_variable method --- .../skypilot_base_vm_orchestrator.py | 67 +++++++++++-------- .../skypilot_kubernetes/__init__.py | 2 +- ...pilot_orchestrator_kubernetes_vm_flavor.py | 3 +- .../skypilot_kubernetes_vm_orchestrator.py | 15 +++-- 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index e6765c4c85a..6cc4f1fe3c6 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -133,8 +133,8 @@ def cloud(self) -> sky.clouds.Cloud: def setup_credentials(self) -> None: """Set up credentials for the orchestrator.""" connector = self.get_connector() - assert connector is not None - connector.configure_local_client() + #$assert connector is not None + #connector.configure_local_client() @abstractmethod def prepare_environment_variable(self, set: bool = True) -> None: @@ -250,6 +250,7 @@ def prepare_or_run_pipeline( entrypoint_str = " ".join(command) arguments_str = " ".join(args) + task_envs = environment docker_environment_str = " ".join( f"-e {k}={v}" for k, v in environment.items() ) @@ -271,13 +272,10 @@ def prepare_or_run_pipeline( f"sudo docker login --username $DOCKER_USERNAME --password " f"$DOCKER_PASSWORD {stack.container_registry.config.uri}" ) - task_envs = { - "DOCKER_USERNAME": docker_username, - "DOCKER_PASSWORD": docker_password, - } + task_envs["DOCKER_USERNAME"] = (docker_username,) + task_envs["DOCKER_PASSWORD"] = (docker_password,) else: setup = None - task_envs = None # Run the entire pipeline @@ -285,15 +283,25 @@ def prepare_or_run_pipeline( self.prepare_environment_variable(set=True) try: + if isinstance(self.cloud, sky.clouds.Kubernetes): + image = image + run_command = f"/opt/venv/bin/{entrypoint_str} {arguments_str}" + setup = None + down = False + idle_minutes_to_autostop = None + else: + image = settings.image_id + run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" + down = settings.down + idle_minutes_to_autostop = settings.idle_minutes_to_autostop task = sky.Task( - run=f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}", + run=run_command, setup=setup, envs=task_envs, ) - logger.debug( - f"Running run: sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" - ) + logger.debug(run_command) logger.debug(f"Running run: {setup}") + task = task.set_resources( sky.Resources( cloud=self.cloud, @@ -306,15 +314,22 @@ def prepare_or_run_pipeline( job_recovery=settings.job_recovery, region=settings.region, zone=settings.zone, - image_id=settings.image_id, + image_id=image, disk_size=settings.disk_size, disk_tier=settings.disk_tier, ) ) - # Set the cluster name - cluster_name = settings.cluster_name - if cluster_name is None: + if settings.cluster_name: + sky.exec( + task, + settings.cluster_name, + down=down, + stream_logs=settings.stream_logs, + backend=None, + detach_run=True, + ) + else: # Find existing cluster for i in sky.status(refresh=True): if isinstance( @@ -324,21 +339,19 @@ def prepare_or_run_pipeline( logger.info( f"Found existing cluster {cluster_name}. Reusing..." ) - if cluster_name is None: cluster_name = self.sanitize_cluster_name( f"{orchestrator_run_name}" ) - - # Launch the cluster - sky.launch( - task, - cluster_name, - retry_until_up=settings.retry_until_up, - idle_minutes_to_autostop=settings.idle_minutes_to_autostop, - down=settings.down, - stream_logs=settings.stream_logs, - detach_setup=True, - ) + # Launch the cluster + sky.launch( + task, + cluster_name, + retry_until_up=settings.retry_until_up, + idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, + stream_logs=settings.stream_logs, + detach_setup=True, + ) except Exception as e: logger.error(f"Pipeline run failed: {e}") diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index 0913cc00fed..0ba82327688 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -32,7 +32,7 @@ class SkypilotKubernetesIntegration(Integration): NAME = SKYPILOT_KUBERNETES # all 0.6.x versions of skypilot[kubernetes] are compatible - REQUIREMENTS = ["skypilot[kubernetes]~=0.6.0"] + REQUIREMENTS = ["skypilot[kubernetes]~=0.6.1"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py index e0b0f88115c..7334750f6fd 100644 --- a/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py +++ b/src/zenml/integrations/skypilot_kubernetes/flavors/skypilot_orchestrator_kubernetes_vm_flavor.py @@ -15,6 +15,7 @@ from typing import TYPE_CHECKING, Optional, Type +from zenml.constants import KUBERNETES_CLUSTER_RESOURCE_TYPE from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( SkypilotBaseOrchestratorConfig, SkypilotBaseOrchestratorSettings, @@ -71,7 +72,7 @@ def service_connector_requirements( connector is required for this flavor. """ return ServiceConnectorRequirements( - resource_type="kubernetes-generic", + resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE, ) @property diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index b39f81aacd5..b28b0e3426d 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -74,16 +74,17 @@ def prepare_environment_variable(self, set: bool = True) -> None: Raises: ValueError: If no service connector is found. """ - # connector = self.get_connector() - # if connector is None: - # raise ValueError( - # "No service connector found. Please make sure to set up a connector " - # "that is compatible with this orchestrator." - # ) + # connector = self.get_connector() + # if connector is None: + # raise ValueError( + # "No service connector found. Please make sure to set up a connector " + # "that is compatible with this orchestrator." + # ) + # if set: # # The Kubernetes connector creates a local configuration profile with the name computed from # # the first 8 digits of its UUID. # kubernetes_profile = f"zenml-{str(connector.id)[:8]}" # os.environ[ENV_Kubernetes_PROFILE] = kubernetes_profile # else: - # os.environ.pop(ENV_Kubernetes_PROFILE, None) \ No newline at end of file + # os.environ.pop(ENV_Kubernetes_PROFILE, None) From 775253f9c39cc87c904692633a387c4ae04bbb1c Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sun, 22 Sep 2024 02:20:48 +0100 Subject: [PATCH 06/15] Refactor SkypilotBaseOrchestrator setup_credentials method --- .../skypilot/orchestrators/skypilot_base_vm_orchestrator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 6cc4f1fe3c6..c9e6b0fcbc3 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -132,9 +132,9 @@ def cloud(self) -> sky.clouds.Cloud: def setup_credentials(self) -> None: """Set up credentials for the orchestrator.""" - connector = self.get_connector() - #$assert connector is not None - #connector.configure_local_client() + # connector = self.get_connector() + # $assert connector is not None + # connector.configure_local_client() @abstractmethod def prepare_environment_variable(self, set: bool = True) -> None: From 4b429407f358df69e29a32f6bf748e271aa27d98 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sun, 22 Sep 2024 02:27:55 +0100 Subject: [PATCH 07/15] Refactor SkypilotBaseOrchestrator setup_credentials method and SkypilotKubernetesOrchestrator prepare_environment_variable method --- .../orchestrators/skypilot_base_vm_orchestrator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index c9e6b0fcbc3..048dba66ebe 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -272,8 +272,8 @@ def prepare_or_run_pipeline( f"sudo docker login --username $DOCKER_USERNAME --password " f"$DOCKER_PASSWORD {stack.container_registry.config.uri}" ) - task_envs["DOCKER_USERNAME"] = (docker_username,) - task_envs["DOCKER_PASSWORD"] = (docker_password,) + task_envs["DOCKER_USERNAME"] = docker_username + task_envs["DOCKER_PASSWORD"] = docker_password else: setup = None @@ -284,13 +284,11 @@ def prepare_or_run_pipeline( try: if isinstance(self.cloud, sky.clouds.Kubernetes): - image = image run_command = f"/opt/venv/bin/{entrypoint_str} {arguments_str}" setup = None down = False idle_minutes_to_autostop = None else: - image = settings.image_id run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" down = settings.down idle_minutes_to_autostop = settings.idle_minutes_to_autostop @@ -314,7 +312,9 @@ def prepare_or_run_pipeline( job_recovery=settings.job_recovery, region=settings.region, zone=settings.zone, - image_id=image, + image_id=image + if isinstance(self.cloud, sky.clouds.Kubernetes) + else settings.image_id, disk_size=settings.disk_size, disk_tier=settings.disk_tier, ) From 305322e924dbe3be3196e27c107f4bbd04dfe3b2 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sun, 22 Sep 2024 14:58:18 +0100 Subject: [PATCH 08/15] Refactor SkypilotBaseOrchestrator setup_credentials method and SkypilotKubernetesOrchestrator prepare_environment_variable method --- .../orchestrators/skypilot-vm.md | 77 +++++++++++++++++++ .../skypilot_base_vm_orchestrator.py | 9 +-- 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index 96d05c64fd4..df7e9340b0c 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -242,6 +242,55 @@ The Lambda Labs orchestrator does not support some of the features like `job_rec While testing the orchestrator, we noticed that the Lambda Labs orchestrator does not support the `down` flag. This means the orchestrator will not automatically tear down the cluster after all jobs finish. We recommend manually tearing down the cluster after all jobs finish to avoid unnecessary costs. {% endhint %} {% endtab %} + +{% tab title="Kubernetes" %} +We need first to install the SkyPilot integration for Kubernetes, using the following two commands: + +```shell + zenml integration install skypilot_kubernetes +``` + +To provision skypilot on kubernetes cluster, your orchestrator stack componenet needs to be configured to authenticate with a +[Service Connector](../../how-to/auth-management/service-connectors-guide.md). To configure the Service Connector, you need to register a new service connector configured with the appropriate credentials and permissions to access the K8s cluster. You can then use the service connector to configure your registered the Orchestrator stack component using the following command: + +First, check that the Kubernetes service connector type is available using the following command: + +```shell +zenml service-connector list-types --type kubernetes +``` +```shell +┏━━━━━━━━━━━━┯━━━━━━━━━━━━┯━━━━━━━━━━━━┯━━━━━━━━━━━┯━━━━━━━┯━━━━━━━━┓ +┃ │ │ RESOURCE │ AUTH │ │ ┃ +┃ NAME │ TYPE │ TYPES │ METHODS │ LOCAL │ REMOTE ┃ +┠────────────┼────────────┼────────────┼───────────┼───────┼────────┨ +┃ Kubernetes │ 🌀 │ 🌀 │ password │ ✅ │ ✅ ┃ +┃ Service │ kubernetes │ kubernetes │ token │ │ ┃ +┃ Connector │ │ -cluster │ │ │ ┃ +┗━━━━━━━━━━━━┷━━━━━━━━━━━━┷━━━━━━━━━━━━┷━━━━━━━━━━━┷━━━━━━━┷━━━━━━━━┛ +``` + +Next, configure a service connector using the CLI or the dashboard with the AWS credentials. For example, the following command uses the local AWS CLI credentials to auto-configure the service connector: + +```shell +zenml service-connector register kubernetes-skypilot --type kubernetes -i +``` + +This will automatically configure the service connector with the appropriate credentials and permissions to provision VMs on AWS. You can then use the service connector to configure your registered VM Orchestrator stack component using the following command: + +```shell +# Register the orchestrator +zenml orchestrator register --flavor sky_kubernetes +# Connect the orchestrator to the service connector +zenml orchestrator connect --connector kubernetes-skypilot + +# Register and activate a stack with the new orchestrator +zenml stack register -o ... --set +``` + +{% hint style="warning" %} +Some of the features like `job_recovery`, `disk_tier`, `image_id`, `zone`, `idle_minutes_to_autostop`, `disk_size`, `use_spot` are not supported by the Kubernetes orchestrator. It is recommended not to use these features with the Kubernetes orchestrator and not to use [step-specific settings](skypilot-vm.md#configuring-step-specific-resources). +{% endhint %} +{% endtab %} {% endtabs %} #### Additional Configuration @@ -392,6 +441,34 @@ skypilot_settings = SkypilotLambdaOrchestratorSettings( ) +@pipeline( + settings={ + "orchestrator": skypilot_settings + } +) +``` +{% endtab %} + +{% tab title="Kubernetes" %} + +**Code Example:** + +```python +from zenml.integrations.skypilot_kubernetes.flavors.skypilot_orchestrator_kubernetes_vm_flavor import SkypilotKubernetesOrchestratorSettings + +skypilot_settings = SkypilotKubernetesOrchestratorSettings( + cpus="2", + memory="16", + accelerators="V100:2", + image_id="ami-1234567890abcdef0", + disk_size=100, + cluster_name="my_cluster", + retry_until_up=True, + stream_logs=True + docker_run_args=["--gpus=all"] +) + + @pipeline( settings={ "orchestrator": skypilot_settings diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 048dba66ebe..4a7814db2fb 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -132,9 +132,9 @@ def cloud(self) -> sky.clouds.Cloud: def setup_credentials(self) -> None: """Set up credentials for the orchestrator.""" - # connector = self.get_connector() - # $assert connector is not None - # connector.configure_local_client() + connector = self.get_connector() + assert connector is not None + connector.configure_local_client() @abstractmethod def prepare_environment_variable(self, set: bool = True) -> None: @@ -297,8 +297,7 @@ def prepare_or_run_pipeline( setup=setup, envs=task_envs, ) - logger.debug(run_command) - logger.debug(f"Running run: {setup}") + logger.debug(f"Running run: {run_command}") task = task.set_resources( sky.Resources( From edae0e85b24f9108c3530a0b6f294595b8aac0a5 Mon Sep 17 00:00:00 2001 From: Safoine El Khabich <34200873+safoinme@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:13:15 +0100 Subject: [PATCH 09/15] Update src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py Co-authored-by: Stefan Nica --- .../skypilot/orchestrators/skypilot_base_vm_orchestrator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 6cc4f1fe3c6..f1d60b23d60 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -284,7 +284,6 @@ def prepare_or_run_pipeline( try: if isinstance(self.cloud, sky.clouds.Kubernetes): - image = image run_command = f"/opt/venv/bin/{entrypoint_str} {arguments_str}" setup = None down = False From 85e7f639c3cce19bd73afc4139a6ee739c4e3f9d Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Mon, 23 Sep 2024 16:29:30 +0100 Subject: [PATCH 10/15] Refactor SkypilotKubernetesOrchestrator to handle missing service connector --- .../skypilot_kubernetes_vm_orchestrator.py | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index b28b0e3426d..40a5733e122 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -74,17 +74,9 @@ def prepare_environment_variable(self, set: bool = True) -> None: Raises: ValueError: If no service connector is found. """ - # connector = self.get_connector() - # if connector is None: - # raise ValueError( - # "No service connector found. Please make sure to set up a connector " - # "that is compatible with this orchestrator." - # ) - - # if set: - # # The Kubernetes connector creates a local configuration profile with the name computed from - # # the first 8 digits of its UUID. - # kubernetes_profile = f"zenml-{str(connector.id)[:8]}" - # os.environ[ENV_Kubernetes_PROFILE] = kubernetes_profile - # else: - # os.environ.pop(ENV_Kubernetes_PROFILE, None) + connector = self.get_connector() + if connector is None: + raise ValueError( + "No service connector found. Please make sure to set up a connector " + "that is compatible with this orchestrator." + ) From f4639f6bb811c76e26d6fc22b20b39e58d3ddaef Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 24 Sep 2024 13:44:56 +0100 Subject: [PATCH 11/15] Refactor SkypilotBaseOrchestrator to use the correct virtual environment path for Kubernetes --- .../skypilot/orchestrators/skypilot_base_vm_orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 4a7814db2fb..92fcd619a4f 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -284,7 +284,7 @@ def prepare_or_run_pipeline( try: if isinstance(self.cloud, sky.clouds.Kubernetes): - run_command = f"/opt/venv/bin/{entrypoint_str} {arguments_str}" + run_command = f"$VIRTUAL_ENV{entrypoint_str} {arguments_str}" setup = None down = False idle_minutes_to_autostop = None From 46194cc08524e493551bd2492727e64d05d499a5 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 24 Sep 2024 13:47:09 +0100 Subject: [PATCH 12/15] specll check --- docs/book/component-guide/orchestrators/skypilot-vm.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index df7e9340b0c..61d489223a5 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -250,7 +250,7 @@ We need first to install the SkyPilot integration for Kubernetes, using the foll zenml integration install skypilot_kubernetes ``` -To provision skypilot on kubernetes cluster, your orchestrator stack componenet needs to be configured to authenticate with a +To provision skypilot on kubernetes cluster, your orchestrator stack components needs to be configured to authenticate with a [Service Connector](../../how-to/auth-management/service-connectors-guide.md). To configure the Service Connector, you need to register a new service connector configured with the appropriate credentials and permissions to access the K8s cluster. You can then use the service connector to configure your registered the Orchestrator stack component using the following command: First, check that the Kubernetes service connector type is available using the following command: From b3a45202075443e2a84acbf82e6b5c660506a18a Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 24 Sep 2024 16:54:55 +0100 Subject: [PATCH 13/15] fix run command --- .../skypilot/orchestrators/skypilot_base_vm_orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 92fcd619a4f..3f643f2ae66 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -284,7 +284,7 @@ def prepare_or_run_pipeline( try: if isinstance(self.cloud, sky.clouds.Kubernetes): - run_command = f"$VIRTUAL_ENV{entrypoint_str} {arguments_str}" + run_command = f"${{VIRTUAL_ENV:+$VIRTUAL_ENV/bin/}}{entrypoint_str} {arguments_str}" setup = None down = False idle_minutes_to_autostop = None From 615c083b61056e9e7833430a2a8e792f0e985492 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 24 Sep 2024 16:59:15 +0100 Subject: [PATCH 14/15] Refactor SkypilotKubernetesVMOrchestrator to remove unnecessary code --- .../orchestrators/skypilot_kubernetes_vm_orchestrator.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index 40a5733e122..a27feef4da7 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -74,9 +74,4 @@ def prepare_environment_variable(self, set: bool = True) -> None: Raises: ValueError: If no service connector is found. """ - connector = self.get_connector() - if connector is None: - raise ValueError( - "No service connector found. Please make sure to set up a connector " - "that is compatible with this orchestrator." - ) + pass From 98973a6b7082b2d716455533fc5ef28bb7c8afed Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 24 Sep 2024 17:29:42 +0100 Subject: [PATCH 15/15] Refactor SkypilotKubernetesVMOrchestrator to remove unnecessary code --- .../orchestrators/skypilot_kubernetes_vm_orchestrator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py index a27feef4da7..e1ce34cd6de 100644 --- a/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot_kubernetes/orchestrators/skypilot_kubernetes_vm_orchestrator.py @@ -70,8 +70,5 @@ def prepare_environment_variable(self, set: bool = True) -> None: Args: set: Whether to set the environment variables or not. - - Raises: - ValueError: If no service connector is found. """ pass