Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multi step configuration for skypilot #2166

Merged
merged 41 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
27fc323
allow multi step configuration for skypilot
safoinme Dec 19, 2023
6f50333
fixes
safoinme Dec 20, 2023
f109795
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
strickvl Dec 20, 2023
11a18b9
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 4, 2024
db2c007
Merge branch 'feature/OSS-2680-multi-image-multi-vm-skypilot' of gith…
safoinme Jan 4, 2024
4471d48
Auto-update of Starter template
actions-user Jan 4, 2024
57ed6df
update skypilot to allow running using an orchestrating VM
safoinme Jan 4, 2024
0e02dee
Merge branch 'feature/OSS-2680-multi-image-multi-vm-skypilot' of gith…
safoinme Jan 4, 2024
7452667
add ability to create an aws config when configuring local client
safoinme Jan 4, 2024
32de4aa
Add advanced AWS configurations to config.yaml
safoinme Jan 5, 2024
0eb1f97
Fix cluster name concatenation in skypilot orchestrator entrypoint
safoinme Jan 5, 2024
4270edd
Fix AWS service connector and Skypilot orchestrator issues
safoinme Jan 5, 2024
dfdedf9
Add APT packages to Skypilot integrations and sanitize cluster name
safoinme Jan 6, 2024
b6c9f4c
Remove advanced AWS configurations from config.yaml
safoinme Jan 6, 2024
301bfa6
Update Skypilot orchestrator code
safoinme Jan 6, 2024
f9d51d8
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 6, 2024
559b710
Auto-update of Starter template
actions-user Jan 6, 2024
7476f85
Refactor cluster name sanitization in SkypilotBaseOrchestrator
safoinme Jan 6, 2024
09c368d
merge changes
safoinme Jan 6, 2024
4077eda
Refactor SkypilotBaseOrchestrator and SkypilotOrchestratorEntrypoint
safoinme Jan 7, 2024
5057bae
Remove commented out code and update Skypilot orchestrator entrypoint
safoinme Jan 7, 2024
b20b975
Fix GCP service connector support for local gcloud CLI login
stefannica Jan 8, 2024
8699b5a
Fix Skypilot orchestrator cluster name generation
safoinme Jan 9, 2024
9898aef
Merge branch 'feature/OSS-2680-multi-image-multi-vm-skypilot' of gith…
safoinme Jan 9, 2024
ec7d09b
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 9, 2024
21af350
Refactor cluster creation logic in SkypilotBaseOrchestrator
safoinme Jan 10, 2024
69b5924
Allow custom docker run args when using the Skypilot orchestrator
schustmi Jan 10, 2024
9964220
Handle exception and log error message in SkypilotBaseOrchestrator
safoinme Jan 10, 2024
f65e61d
Refactor AWS service connector to improve file handling
safoinme Jan 10, 2024
b33d4bb
Merge branch 'feature/OSS-2680-multi-image-multi-vm-skypilot' of gith…
safoinme Jan 10, 2024
daaaefc
Refactor Skypilot integrations for ZenML
safoinme Jan 10, 2024
dddd6fe
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 10, 2024
a3271a6
Add encoding parameter to subprocess call
safoinme Jan 10, 2024
794cba4
Update Skypilot integration for AWS, GCP, and Azure
safoinme Jan 10, 2024
f40796f
Add note about configuring pipeline resources for specific orchestrat…
safoinme Jan 10, 2024
46170b6
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
strickvl Jan 11, 2024
89903b8
Update src/zenml/integrations/gcp/service_connectors/gcp_service_conn…
safoinme Jan 11, 2024
44ea432
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 11, 2024
df16014
Add error handling for running code in a notebook and update Skypilot…
safoinme Jan 11, 2024
69a7ce4
Merge branch 'feature/OSS-2680-multi-image-multi-vm-skypilot' of gith…
safoinme Jan 11, 2024
7ef6179
Merge branch 'develop' into feature/OSS-2680-multi-image-multi-vm-sky…
safoinme Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ To use the SkyPilot VM Orchestrator, you need:
* One of the SkyPilot integrations installed. You can install the SkyPilot integration for your cloud provider of choice using the following command:
```shell
# For AWS
pip install "zenml[connectors-gcp]"
zenml integration install aws vm_aws
pip install "zenml[connectors-aws]"
zenml integration install aws skypilot_aws

# for GCP
pip install "zenml[connectors-gcp]"
zenml integration install gcp vm_gcp # for GCP
zenml integration install gcp skypilot_gcp # for GCP

# for Azure
pip install "zenml[connectors-azure]"
zenml integration install azure vm_azure # for Azure
zenml integration install azure skypilot_azure # for Azure
```
* [Docker](https://www.docker.com) installed and running.
* A [remote artifact store](../artifact-stores/artifact-stores.md) as part of your stack.
Expand Down Expand Up @@ -371,6 +371,38 @@ skypilot_settings = SkypilotAzureOrchestratorSettings(
{% endtab %}
{% endtabs %}


One of the key features of the SkyPilot VM Orchestrator is the ability to run each step of a pipeline on a separate VM with its own specific settings. This allows for fine-grained control over the resources allocated to each step, ensuring that each part of your pipeline has the necessary compute power while optimizing for cost and efficiency.

## Configuring Step-Specific Resources

The SkyPilot VM Orchestrator allows you to configure resources for each step individually. This means you can specify different VM types, CPU and memory requirements, and even use spot instances for certain steps while using on-demand instances for others.

To configure step-specific resources, you can pass a `SkypilotBaseOrchestratorSettings` object to the `settings` parameter of the `@step` decorator. This object allows you to define various attributes such as `instance_type`, `cpus`, `memory`, `use_spot`, `region`, and more.

Here's an example of how to configure specific resources for a step for the AWS cloud:

```python
from zenml.integrations.skypilot.flavors.skypilot_orchestrator_aws_vm_flavor import SkypilotAWSOrchestratorSettings

# Settings for a specific step that requires more resources
high_resource_settings = SkypilotAWSOrchestratorSettings(
instance_type='t2.2xlarge',
cpus=8,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt we take this from ResourceSettings actually now that I think about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the GPU

Copy link
Contributor Author

@safoinme safoinme Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to distinguish between both of them, because ResourceSettings can be defined for things like pods which would allow random resource given, while for Skypilot most of these parameters must match if supplied together because a VM type with this specific resource must exist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the cpu_count and memory in ResourceSettings should be mapped here... what is the point of ResourceSettings otherwise? @schustmi would love your feedback

memory=32,
use_spot=False,
region='us-east-1',
# ... other settings
)

@step(settings={"orchestrator.vm_aws": high_resource_settings})
safoinme marked this conversation as resolved.
Show resolved Hide resolved
def my_resource_intensive_step():
# Step implementation
pass
```

By using the `settings` parameter, you can tailor the resources for each step according to its specific needs. This flexibility allows you to optimize your pipeline execution for both performance and cost.

Check out
the [SDK docs](https://sdkdocs.zenml.io/latest/integration\_code\_docs/integrations-skypilot/#zenml.integrations.skypilot.flavors.skypilot\_orchestrator\_base\_vm\_flavor.SkypilotBaseOrchestratorSettings)
for a full list of available attributes and [this docs page](/docs/book/user-guide/advanced-guide/pipelining-features/configure-steps-pipelines.md) for more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import base64
import datetime
import json
import os
import re
from typing import Any, Dict, List, Optional, Tuple, cast

Expand Down Expand Up @@ -1371,6 +1372,22 @@ def _configure_local_client(
"aws_session_token"
] = credentials.token

aws_credentials_path = os.path.join(
users_home, ".aws", "credentials"
)

# Ensure the .aws directory exists
os.makedirs(os.path.dirname(aws_credentials_path), exist_ok=True)

# Ensure the credentials file exists
if not os.path.isfile(aws_credentials_path):
with open(aws_credentials_path, "a"):
pass # The file is automatically closed when exiting the with block

# Set the appropriate permissions for the .aws directory and credentials file
os.chmod(os.path.dirname(aws_credentials_path), 0o700)
os.chmod(aws_credentials_path, 0o600)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this works on every OS?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@safoinme FYI, this is the piece of code in the awcli package that does this same thing:

    def _create_file(self, config_filename):
        # Create the file as well as the parent dir if needed.
        dirname = os.path.split(config_filename)[0]
        if not os.path.isdir(dirname):
            os.makedirs(dirname)
        with os.fdopen(os.open(config_filename,
                               os.O_WRONLY | os.O_CREAT, 0o600), 'w'):
            pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@safoinme can you use the code I suggested here ?


common.rewrite_credentials_file(all_profiles, users_home)

logger.info(
Expand Down
3 changes: 3 additions & 0 deletions src/zenml/integrations/skypilot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SkypilotAWSIntegration(Integration):

NAME = SKYPILOT_AWS
REQUIREMENTS = ["skypilot[aws]"]
APT_PACKAGES = ["openssh-client","rsync"]

@classmethod
def flavors(cls) -> List[Type[Flavor]]:
Expand All @@ -56,6 +57,7 @@ class SkypilotGCPIntegration(Integration):

NAME = SKYPILOT_GCP
REQUIREMENTS = ["skypilot[gcp]"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a minimum version pin might help here and the other places

APT_PACKAGES = ["openssh-client","rsync"]
safoinme marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def flavors(cls) -> List[Type[Flavor]]:
Expand All @@ -76,6 +78,7 @@ class SkypilotAzureIntegration(Integration):

NAME = SKYPILOT_AZURE
REQUIREMENTS = ["skypilot[azure]"]
APT_PACKAGES = ["openssh-client","rsync"]

@classmethod
def flavors(cls) -> List[Type[Flavor]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
"""Implementation of the Skypilot base VM orchestrator."""

import os
import time
import re
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast
from uuid import uuid4

import sky

from zenml.entrypoints import PipelineEntrypointConfiguration
from zenml.enums import StackComponentType
from zenml.environment import Environment
from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import (
SkypilotBaseOrchestratorSettings,
)
from zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint_configuration import (
SkypilotOrchestratorEntrypointConfiguration,
)
from zenml.logger import get_logger
from zenml.orchestrators import (
ContainerizedOrchestrator,
)
from zenml.orchestrators.utils import get_orchestrator_run_name
from zenml.stack import StackValidator
from zenml.utils import string_utils

if TYPE_CHECKING:
from zenml.models import PipelineDeploymentResponse
Expand Down Expand Up @@ -136,7 +138,7 @@ def prepare_or_run_pipeline(
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs all pipeline steps in Skypilot containers.
"""Runs each pipeline step in a separate Skypilot container.

Args:
deployment: The pipeline deployment to prepare or run.
Expand All @@ -147,39 +149,59 @@ def prepare_or_run_pipeline(
Raises:
Exception: If the pipeline run fails.
"""
# First check whether the code is running in a notebook.
if Environment.in_notebook():
raise RuntimeError(
"The Skypilot orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
if deployment.schedule:
logger.warning(
"Skypilot Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)

# Set up some variables for configuration
orchestrator_run_id = str(uuid4())
environment[
ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID
] = orchestrator_run_id

settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(deployment),
)

entrypoint = PipelineEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(entrypoint)
arguments = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)

assert stack.container_registry
safoinme marked this conversation as resolved.
Show resolved Hide resolved

# Get Docker image for the orchestrator pod
try:
image = self.get_image(deployment=deployment)
except KeyError:
# If no generic pipeline image exists (which means all steps have
# custom builds) we use a random step image as all of them include
# dependencies for the active stack
pipeline_step_name = next(iter(deployment.step_configurations))
image = self.get_image(
deployment=deployment, step_name=pipeline_step_name
)

# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
entrypoint_str = " ".join(command)
args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
arguments_str = " ".join(arguments)
arguments_str = " ".join(args)

# Set up docker run command
image = self.get_image(deployment=deployment)
docker_environment_str = " ".join(
f"-e {k}={v}" for k, v in environment.items()
)

start_time = time.time()

instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE

# Set up credentials
safoinme marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -188,8 +210,7 @@ def prepare_or_run_pipeline(
# Guaranteed by stack validation
assert stack is not None and stack.container_registry is not None

docker_creds = stack.container_registry.credentials
if docker_creds:
if docker_creds := stack.container_registry.credentials:
docker_username, docker_password = docker_creds
setup = (
f"docker login --username $DOCKER_USERNAME --password "
Expand Down Expand Up @@ -232,6 +253,7 @@ def prepare_or_run_pipeline(
)
)

# Set the cluster name
cluster_name = settings.cluster_name
if cluster_name is None:
# Find existing cluster
Expand All @@ -247,7 +269,10 @@ def prepare_or_run_pipeline(
# Launch the cluster
sky.launch(
task,
cluster_name,
cluster_name
or self.sanitize_cluster_name(
f"{pipeline_name}-{orchestrator_run_name}"
),
retry_until_up=settings.retry_until_up,
idle_minutes_to_autostop=settings.idle_minutes_to_autostop,
down=settings.down,
Expand All @@ -261,8 +286,18 @@ def prepare_or_run_pipeline(
# Unset the service connector AWS profile ENV variable
self.prepare_environment_variable(set=False)

run_duration = time.time() - start_time
logger.info(
"Pipeline run has finished in `%s`.",
string_utils.get_human_readable_time(run_duration),
)
def sanitize_cluster_name(self, name: str) -> str:
"""Sanitize the value to be used in a cluster name.

Args:
name: Arbitrary input cluster name.

Returns:
Sanitized cluster name.
"""
name = re.sub(
r"[^a-z0-9-]", "-", name.lower()
) # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen
name = re.sub(r"^[-]+", "", name) # trim leading hyphens
name = re.sub(r"[-]+$", "", name) # trim trailing hyphens
return name
Loading
Loading