Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow skypilot to configure step or run full pipeline in one VM #2276

Merged
merged 9 commits into from
Jan 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ on-demand and managed spot VMs. While you can select the VM type you want to use
also includes an optimizer that automatically selects the cheapest VM/zone/region/cloud for your workloads.
Finally, the orchestrator includes an autostop feature that cleans up idle clusters, preventing unnecessary cloud costs.

{% hint style="info" %}
You can configure the SkyPilot VM Orchestrator to use a specific VM type, and
resources for each step of your pipeline can be configured individually.
Read more about how to configure step-specific resources [here](#configuring-step-specific-resources).
{% endhint %}

{% hint style="warning" %}
The SkyPilot VM Orchestrator does not currently support the ability to [schedule pipelines runs](/docs/book/user-guide/advanced-guide/pipelining-features/schedule-pipeline-runs.md)
{% endhint %}
Expand Down Expand Up @@ -378,7 +384,11 @@ One of the key features of the SkyPilot VM Orchestrator is the ability to run ea

The SkyPilot VM Orchestrator allows you to configure resources for each step individually. This means you can specify different VM types, CPU and memory requirements, and even use spot instances for certain steps while using on-demand instances for others.

To configure step-specific resources, you can pass a `SkypilotBaseOrchestratorSettings` object to the `settings` parameter of the `@step` decorator. This object allows you to define various attributes such as `instance_type`, `cpus`, `memory`, `use_spot`, `region`, and more.
By default, the orchestrator will use the resources specified in the orchestrator settings for each step and make sure that the VMs are provisioned with the appropriate resources. However, you can disable this behavior by setting the `disable_step_based_settings` parameter to `True` in the orchestrator configuration. You can do this using the following command:
safoinme marked this conversation as resolved.
Show resolved Hide resolved

```shell
zenml orchestrator update <ORCHESTRATOR_NAME> --disable_step_based_settings=True
```

Here's an example of how to configure specific resources for a step for the AWS cloud:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,16 @@ class SkypilotBaseOrchestratorSettings(BaseSettings):
class SkypilotBaseOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings
):
"""Skypilot orchestrator base config."""
"""Skypilot orchestrator base config.

Attributes:
disable_step_based_settings: whether to disable step-based settings.
If True, the orchestrator will run all steps with the pipeline
settings in one single VM. If False, the orchestrator will run
each step with its own settings in separate VMs if provided.
"""

disable_step_based_settings: bool = False

@property
def is_local(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import sky

from zenml.entrypoints import PipelineEntrypointConfiguration
from zenml.enums import StackComponentType
from zenml.environment import Environment
from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import (
SkypilotBaseOrchestratorConfig,
SkypilotBaseOrchestratorSettings,
)
from zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint_configuration import (
Expand Down Expand Up @@ -109,6 +111,15 @@ def get_orchestrator_run_id(self) -> str:
f"{ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID}."
)

@property
def config(self) -> SkypilotBaseOrchestratorConfig:
"""Returns the `SkypilotBaseOrchestratorConfig` config.

Returns:
The configuration.
"""
return cast(SkypilotBaseOrchestratorConfig, self._config)

@property
@abstractmethod
def cloud(self) -> sky.clouds.Cloud:
Expand Down Expand Up @@ -189,14 +200,40 @@ def prepare_or_run_pipeline(
deployment=deployment, step_name=pipeline_step_name
)

# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
if self.config.disable_step_based_settings:
# Run the entire pipeline in one VM
command = PipelineEntrypointConfiguration.get_entrypoint_command()
args = PipelineEntrypointConfiguration.get_entrypoint_arguments(
deployment_id=deployment.id
)
else:
for step_name, step in deployment.step_configurations.items():
step_settings = cast(
SkypilotBaseOrchestratorSettings,
self.get_settings(step),
)
if step_settings != settings:
logger.info(
"At least one step has different settings than the "
"pipeline. The step with different settings will be "
"run in a separate VM.\n"
"You can configure the orchestrator to disable this "
"behavior by updating the `disable_step_based_settings` "
"in your orchestrator configuration."
"By running the following command: "
safoinme marked this conversation as resolved.
Show resolved Hide resolved
"`zenml orchestrator update --disable-step-based-settings=True`"
)
break
# Run each step in a separate VM if configured.
# Build entrypoint command and args for the orchestrator VM.
# This will internally also build the command/args for all step VMs.
command = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)

entrypoint_str = " ".join(command)
args = SkypilotOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
)
arguments_str = " ".join(args)

docker_environment_str = " ".join(
Expand Down
Loading