Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay before deleting ray cluster in kfp component. #612

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ The input parameters section defines all the parameters required for the pipelin
# noop parameters
noop_sleep_sec: int = 10,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
```

The parameters used here are as follows:
Expand Down Expand Up @@ -146,8 +146,8 @@ component execution and parameters submitted to every component.

```python
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
3 changes: 3 additions & 0 deletions kfp/kfp_ray_components/deleteRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ inputs:
- { name: ray_name, type: String, description: "Ray cluster user name" }
- { name: run_id, type: String, description: "The KFP Run ID" }
- { name: server_url, type: String, default: "", description: "url of api server" }
- { name: additional_params, type: String, default: "{}", description: "additional parameters" }

implementation:
container:
Expand All @@ -24,4 +25,6 @@ implementation:
{ inputValue: run_id },
--server_url,
{ inputValue: server_url },
--additional_params,
{ inputValue: additional_params },
]
7 changes: 7 additions & 0 deletions kfp/kfp_ray_components/src/delete_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################
import sys
import time

from runtime_utils import KFPUtils, RayRemoteJobs

Expand All @@ -18,13 +19,17 @@
def cleanup_ray_cluster(
name: str, # name of Ray cluster
server_url: str, # url of api server
additional_params: str, # additional parameters for
):
# get current namespace
ns = KFPUtils.get_namespace()
if ns == "":
print(f"Failed to get namespace")
sys.exit(1)

dict_params = KFPUtils.load_from_json(additional_params.replace("'", '"'))
delete_cluster_delay_minutes = dict_params.get("delete_cluster_delay_minutes", 0)
time.sleep(delete_cluster_delay_minutes * 60)
# cleanup
remote_jobs = RayRemoteJobs(server_url=server_url)
status, error = remote_jobs.delete_ray_cluster(name=name, namespace=ns)
Expand All @@ -38,6 +43,7 @@ def cleanup_ray_cluster(
parser.add_argument("-rn", "--ray_name", type=str, default="")
parser.add_argument("-id", "--run_id", type=str, default="")
parser.add_argument("-su", "--server_url", default="", type=str)
parser.add_argument("-ap", "--additional_params", default="{}", type=str)
args = parser.parse_args()

cluster_name = KFPUtils.runtime_name(
Expand All @@ -48,4 +54,5 @@ def cleanup_ray_cluster(
cleanup_ray_cluster(
name=cluster_name,
server_url=args.server_url,
additional_params=args.additional_params,
)
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def {{ pipeline_name }}(
{{ pipeline_argument.name }}: {{ pipeline_argument.type }}{% if pipeline_argument.value is defined %}{% if pipeline_argument.type == "int" %} = {{ pipeline_argument.value }}{% else %} = "{{ pipeline_argument.value }}"{% endif %}{% endif %},
{%- endfor %}
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute {{ pipeline_name }} transform
Expand Down Expand Up @@ -162,8 +162,8 @@ def {{ pipeline_name }}(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
__add_p2_parameters__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
{%- for p2_parameter in add_p2_parameters %}
p2_pipeline_{{ p2_parameter.name }}: {{ p2_parameter.type }}{% if p2_parameter.value is defined %}{% if p2_parameter.type == "str" %} = "{{ p2_parameter.value }}"{% else %} = {{ p2_parameter.value }}{% endif %}{% endif %},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def sample_code_ray_orchestrator(
p2_pipeline_input_parent_path: str = "test/code2parquet/output/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def sample_ray_orchestrator(
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def super_pipeline(
p1_pipeline_input_path: str = "test/doc_id/input/",
p1_pipeline_output_path: str = "test/super/output/",
p1_pipeline_intermediate_path: str = "test/super/output/tmp",
p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p1_pipeline_data_s3_access_secret: str = "s3-secret",
p1_pipeline_runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
p1_pipeline_runtime_actor_options: dict = {'num_cpus': 0.8},
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def code2parquet(
code2parquet_snapshot: str = "github",
code2parquet_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
) -> None:
"""
Pipeline to execute NOOP transform
Expand Down Expand Up @@ -171,8 +171,8 @@ def code2parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def code_quality(
cq_tokenizer: str = "codeparrot/codeparrot",
cq_hf_token: str = "None",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute Code Quality transform
Expand Down Expand Up @@ -164,8 +164,8 @@ def code_quality(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def header_cleanser(
header_cleanser_license: bool = True,
header_cleanser_copyright: bool = True,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute Header Cleanser transform
Expand Down Expand Up @@ -160,8 +160,8 @@ def header_cleanser(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def malware(
malware_input_column: str = "contents",
malware_output_column: str = "virus_detection",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute malware transform
Expand Down Expand Up @@ -153,8 +153,8 @@ def malware(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def lang_select(
proglang_select_language_column: str = "language",
proglang_select_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
) -> None:
"""
Pipeline to execute NOOP transform
Expand Down Expand Up @@ -158,8 +158,8 @@ def lang_select(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def repo_level_order(
repo_lvl_output_by_langs: bool = False,
repo_lvl_combine_rows: bool = False,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute repo_level_order transform
Expand Down Expand Up @@ -191,8 +191,8 @@ def repo_level_order(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def doc_chunk(
doc_chunk_output_chunk_column_name: str = "contents",
doc_chunk_dl_min_chunk_len: int = 64,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute chunk documents transform
Expand Down Expand Up @@ -161,8 +161,8 @@ def doc_chunk(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def doc_chunk(
doc_chunk_output_chunk_column_name: str = "contents",
doc_chunk_dl_min_chunk_len: int = 64,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute chunk documents transform
Expand Down Expand Up @@ -162,8 +162,8 @@ def doc_chunk(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Loading