Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource limits for Vertex #529

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,16 @@ def _set_configuration(self, task, fondant_component_operation):
accelerator_name = fondant_component_operation.accelerator_name
node_pool_label = fondant_component_operation.node_pool_label
node_pool_name = fondant_component_operation.node_pool_name
cpu_request = fondant_component_operation.cpu_request
cpu_limit = fondant_component_operation.cpu_limit
memory_request = fondant_component_operation.memory_request
memory_limit = fondant_component_operation.memory_limit

# Assign optional specification
if cpu_request is not None:
task.set_memory_request(cpu_request)
if cpu_limit is not None:
task.set_memory_limit(cpu_limit)
if memory_request is not None:
task.set_memory_request(memory_request)
if memory_limit is not None:
Expand Down Expand Up @@ -459,10 +465,16 @@ def resolve_imports(self):
@staticmethod
def _set_configuration(task, fondant_component_operation):
# Unpack optional specifications
cpu_limit = fondant_component_operation.cpu_limit
memory_limit = fondant_component_operation.memory_limit
number_of_accelerators = fondant_component_operation.number_of_accelerators
accelerator_name = fondant_component_operation.accelerator_name

# Assign optional specification
if cpu_limit is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add the cpu/memory request for Vertex?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs, it only supports limits.

task.set_cpu_limit(cpu_limit)
if memory_limit is not None:
task.set_memory_limit(memory_limit)
if number_of_accelerators is not None:
task.set_accelerator_limit(number_of_accelerators)
if accelerator_name not in valid_vertex_accelerator_types:
Expand Down
30 changes: 26 additions & 4 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ComponentOp:
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
cpu_request: the memory requested by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
cpu_limit: the maximum amount of CPU that can be used by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
memory_request: the memory requested by the component. The value can be a number or a
number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.
memory_limit: the maximum memory that can be used by the component. The value can be a
Expand Down Expand Up @@ -98,8 +104,10 @@ def __init__(
preemptible: t.Optional[bool] = False,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
memory_request: t.Optional[t.Union[str, int]] = None,
memory_limit: t.Optional[t.Union[str, int]] = None,
cpu_request: t.Optional[str] = None,
cpu_limit: t.Optional[str] = None,
memory_request: t.Optional[str] = None,
memory_limit: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
Expand All @@ -119,6 +127,8 @@ def __init__(

self.arguments.setdefault("component_spec", self.component_spec.specification)

self.cpu_request = cpu_request
self.cpu_limit = cpu_limit
self.memory_request = memory_request
self.memory_limit = memory_limit
self.node_pool_label, self.node_pool_name = self._validate_node_pool_spec(
Expand Down Expand Up @@ -231,8 +241,10 @@ def from_registry(
preemptible: t.Optional[bool] = False,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
memory_request: t.Optional[t.Union[str, int]] = None,
memory_limit: t.Optional[t.Union[str, int]] = None,
cpu_request: t.Optional[str] = None,
cpu_limit: t.Optional[str] = None,
memory_request: t.Optional[str] = None,
memory_limit: t.Optional[str] = None,
) -> "ComponentOp":
"""Load a reusable component by its name.

Expand All @@ -254,6 +266,14 @@ def from_registry(
Requires the setup and assignment of a preemptible node pool. Note that preemptibles
only work when KFP is setup on GCP. More info here:
https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the dask client.
cpu_request: the memory requested by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
cpu_limit: the maximum amount of CPU that can be used by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
memory_request: the memory requested by the component. The value can be a number or a
number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.
memory_limit: the maximum memory that can be used by the component. The value can be a
Expand All @@ -279,6 +299,8 @@ def from_registry(
preemptible=preemptible,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
cpu_request=cpu_request,
cpu_limit=cpu_limit,
memory_request=memory_request,
memory_limit=memory_limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,44 +94,46 @@ deploymentSpec:
exec-first-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --input_partition_rows
- '{{$.inputs.parameters[''input_partition_rows'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--input_partition_rows"
- "{{$.inputs.parameters['input_partition_rows']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
command:
- fondant
- execute
- main
image: example_component:latest
resources:
memoryLimit: 0.512
exec-second-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --input_partition_rows
- '{{$.inputs.parameters[''input_partition_rows'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- --input_manifest_path
- '{{$.inputs.parameters[''input_manifest_path'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--input_partition_rows"
- "{{$.inputs.parameters['input_partition_rows']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
- "--input_manifest_path"
- "{{$.inputs.parameters['input_manifest_path']}}"
command:
- fondant
- execute
Expand All @@ -140,20 +142,20 @@ deploymentSpec:
exec-third-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- --input_manifest_path
- '{{$.inputs.parameters[''input_manifest_path'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
- "--input_manifest_path"
- "{{$.inputs.parameters['input_manifest_path']}}"
command:
- fondant
- execute
Expand Down Expand Up @@ -198,15 +200,15 @@ root:
type: binary
input_partition_rows:
runtimeValue:
constant: 10.0
constant: 10
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "first_component",
"cache_key": "1"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down Expand Up @@ -250,18 +252,18 @@ root:
type: array
input_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json"
input_partition_rows:
runtimeValue:
constant: 10.0
constant: 10
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "second_component",
"cache_key": "2"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down Expand Up @@ -314,15 +316,15 @@ root:
type: binary
input_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json"
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "third_component",
"cache_key": "3"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down
Loading