diff --git a/kfp/doc/simple_transform_pipeline.md b/kfp/doc/simple_transform_pipeline.md index e3e4e8d2b..184f9c42b 100644 --- a/kfp/doc/simple_transform_pipeline.md +++ b/kfp/doc/simple_transform_pipeline.md @@ -100,7 +100,7 @@ The input parameters section defines all the parameters required for the pipelin # noop parameters noop_sleep_sec: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ``` The parameters used here are as follows: @@ -146,8 +146,8 @@ component execution and parameters submitted to every component. ```python # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/kfp/kfp_ray_components/deleteRayClusterComponent.yaml b/kfp/kfp_ray_components/deleteRayClusterComponent.yaml index 681c44395..44e199c47 100644 --- a/kfp/kfp_ray_components/deleteRayClusterComponent.yaml +++ b/kfp/kfp_ray_components/deleteRayClusterComponent.yaml @@ -5,6 +5,7 @@ inputs: - { name: ray_name, type: String, description: "Ray cluster user name" } - { name: run_id, type: String, description: "The KFP Run ID" } - { name: server_url, type: String, default: "", description: "url of api server" } + - { name: additional_params, type: String, default: "{}", description: "additional parameters" } implementation: container: @@ -24,4 +25,6 @@ implementation: { inputValue: run_id }, --server_url, { inputValue: server_url }, + --additional_params, + { inputValue: additional_params }, ] diff --git a/kfp/kfp_ray_components/src/delete_ray_cluster.py b/kfp/kfp_ray_components/src/delete_ray_cluster.py index fde2b61fe..f74331a97 100644 --- a/kfp/kfp_ray_components/src/delete_ray_cluster.py +++ b/kfp/kfp_ray_components/src/delete_ray_cluster.py @@ -10,6 +10,7 @@ # limitations under the License. ################################################################################ import sys +import time from runtime_utils import KFPUtils, RayRemoteJobs @@ -18,6 +19,7 @@ def cleanup_ray_cluster( name: str, # name of Ray cluster server_url: str, # url of api server + additional_params: str, # additional parameters for ): # get current namespace ns = KFPUtils.get_namespace() @@ -25,6 +27,9 @@ def cleanup_ray_cluster( print(f"Failed to get namespace") sys.exit(1) + dict_params = KFPUtils.load_from_json(additional_params.replace("'", '"')) + delete_cluster_delay_minutes = dict_params.get("delete_cluster_delay_minutes", 0) + time.sleep(delete_cluster_delay_minutes * 60) # cleanup remote_jobs = RayRemoteJobs(server_url=server_url) status, error = remote_jobs.delete_ray_cluster(name=name, namespace=ns) @@ -38,6 +43,7 @@ def cleanup_ray_cluster( parser.add_argument("-rn", "--ray_name", type=str, default="") parser.add_argument("-id", "--run_id", type=str, default="") parser.add_argument("-su", "--server_url", default="", type=str) + parser.add_argument("-ap", "--additional_params", default="{}", type=str) args = parser.parse_args() cluster_name = KFPUtils.runtime_name( @@ -48,4 +54,5 @@ def cleanup_ray_cluster( cleanup_ray_cluster( name=cluster_name, server_url=args.server_url, + additional_params=args.additional_params, ) diff --git a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py index 1ada35c19..55b8626c7 100644 --- a/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py +++ b/kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py @@ -123,7 +123,7 @@ def {{ pipeline_name }}( {{ pipeline_argument.name }}: {{ pipeline_argument.type }}{% if pipeline_argument.value is defined %}{% if pipeline_argument.type == "int" %} = {{ pipeline_argument.value }}{% else %} = "{{ pipeline_argument.value }}"{% endif %}{% endif %}, {%- endfor %} # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute {{ pipeline_name }} transform @@ -162,8 +162,8 @@ def {{ pipeline_name }}( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/kfp/pipeline_generator/superpipeline/generated/sample-super-kubeflow-pipeline_wf.py b/kfp/pipeline_generator/superpipeline/generated/sample-super-kubeflow-pipeline_wf.py index 08f9c3aee..92f97763d 100644 --- a/kfp/pipeline_generator/superpipeline/generated/sample-super-kubeflow-pipeline_wf.py +++ b/kfp/pipeline_generator/superpipeline/generated/sample-super-kubeflow-pipeline_wf.py @@ -26,7 +26,7 @@ def super_pipeline( p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}', p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}', p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", - p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', p2_pipeline_input_parent_path: str = "test/doc_id/input/", p2_pipeline_output_parent_path: str = "test/super/output/", diff --git a/kfp/pipeline_generator/superpipeline/templates/superpipeline.ptmpl b/kfp/pipeline_generator/superpipeline/templates/superpipeline.ptmpl index d7fadf87f..6f00e0974 100644 --- a/kfp/pipeline_generator/superpipeline/templates/superpipeline.ptmpl +++ b/kfp/pipeline_generator/superpipeline/templates/superpipeline.ptmpl @@ -24,7 +24,7 @@ def super_pipeline( p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}', p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}', p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", - p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', __add_p2_parameters__ diff --git a/kfp/pipeline_generator/superpipeline/templates/template_superpipeline.py b/kfp/pipeline_generator/superpipeline/templates/template_superpipeline.py index 4ce356a0a..7d69b0143 100644 --- a/kfp/pipeline_generator/superpipeline/templates/template_superpipeline.py +++ b/kfp/pipeline_generator/superpipeline/templates/template_superpipeline.py @@ -31,7 +31,7 @@ def super_pipeline( p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}', p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}', p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", - p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', {%- for p2_parameter in add_p2_parameters %} p2_pipeline_{{ p2_parameter.name }}: {{ p2_parameter.type }}{% if p2_parameter.value is defined %}{% if p2_parameter.type == "str" %} = "{{ p2_parameter.value }}"{% else %} = {{ p2_parameter.value }}{% endif %}{% endif %}, diff --git a/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py b/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py index 2215c6193..867c83198 100644 --- a/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py +++ b/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py @@ -49,7 +49,7 @@ def sample_code_ray_orchestrator( p2_pipeline_input_parent_path: str = "test/code2parquet/output/", p2_pipeline_output_parent_path: str = "test/super/output/", p2_pipeline_parent_path_suffix: str = "", - p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p2_pipeline_data_s3_access_secret: str = "s3-secret", p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}', diff --git a/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py b/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py index df26ca349..8243a65b5 100644 --- a/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py +++ b/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py @@ -33,7 +33,7 @@ def sample_ray_orchestrator( p2_pipeline_input_parent_path: str = "test/doc_id/input/", p2_pipeline_output_parent_path: str = "test/super/output/", p2_pipeline_parent_path_suffix: str = "", - p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p2_pipeline_data_s3_access_secret: str = "s3-secret", p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}', p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}', diff --git a/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py b/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py index 72bdf8279..240547500 100755 --- a/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py +++ b/kfp/superworkflows/ray/kfp_v2/superpipeline_noop_docId_v2_wf.py @@ -35,7 +35,7 @@ def super_pipeline( p1_pipeline_input_path: str = "test/doc_id/input/", p1_pipeline_output_path: str = "test/super/output/", p1_pipeline_intermediate_path: str = "test/super/output/tmp", - p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', p1_pipeline_data_s3_access_secret: str = "s3-secret", p1_pipeline_runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"}, p1_pipeline_runtime_actor_options: dict = {'num_cpus': 0.8}, diff --git a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py index 7cc12fd60..a2080e70a 100644 --- a/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py +++ b/transforms/code/code2parquet/kfp_ray/code2parquet_wf.py @@ -128,7 +128,7 @@ def code2parquet( code2parquet_snapshot: str = "github", code2parquet_s3_access_secret: str = "s3-secret", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ) -> None: """ Pipeline to execute NOOP transform @@ -171,8 +171,8 @@ def code2parquet( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/code/code_quality/kfp_ray/code_quality_wf.py b/transforms/code/code_quality/kfp_ray/code_quality_wf.py index afefde191..138b5d613 100644 --- a/transforms/code/code_quality/kfp_ray/code_quality_wf.py +++ b/transforms/code/code_quality/kfp_ray/code_quality_wf.py @@ -125,7 +125,7 @@ def code_quality( cq_tokenizer: str = "codeparrot/codeparrot", cq_hf_token: str = "None", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Code Quality transform @@ -164,8 +164,8 @@ def code_quality( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py index f16d90528..ba82169c3 100644 --- a/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py +++ b/transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py @@ -122,7 +122,7 @@ def header_cleanser( header_cleanser_license: bool = True, header_cleanser_copyright: bool = True, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Header Cleanser transform @@ -160,8 +160,8 @@ def header_cleanser( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/code/malware/kfp_ray/malware_wf.py b/transforms/code/malware/kfp_ray/malware_wf.py index b31fa14c4..d9ec70b37 100644 --- a/transforms/code/malware/kfp_ray/malware_wf.py +++ b/transforms/code/malware/kfp_ray/malware_wf.py @@ -115,7 +115,7 @@ def malware( malware_input_column: str = "contents", malware_output_column: str = "virus_detection", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute malware transform @@ -153,8 +153,8 @@ def malware( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py index eea17222e..209121cd4 100644 --- a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py +++ b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py @@ -118,7 +118,7 @@ def lang_select( proglang_select_language_column: str = "language", proglang_select_s3_access_secret: str = "s3-secret", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ) -> None: """ Pipeline to execute NOOP transform @@ -158,8 +158,8 @@ def lang_select( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py b/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py index 8c249c52e..256636176 100644 --- a/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py +++ b/transforms/code/repo_level_ordering/kfp_ray/repo_level_order_wf.py @@ -145,7 +145,7 @@ def repo_level_order( repo_lvl_output_by_langs: bool = False, repo_lvl_combine_rows: bool = False, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute repo_level_order transform @@ -191,8 +191,8 @@ def repo_level_order( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py index 67e8640e7..a613955c9 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_multiple_wf.py @@ -121,7 +121,7 @@ def doc_chunk( doc_chunk_output_chunk_column_name: str = "contents", doc_chunk_dl_min_chunk_len: int = 64, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute chunk documents transform @@ -161,8 +161,8 @@ def doc_chunk( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py index 0fdcdb1a3..7fb107758 100644 --- a/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py +++ b/transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py @@ -122,7 +122,7 @@ def doc_chunk( doc_chunk_output_chunk_column_name: str = "contents", doc_chunk_dl_min_chunk_len: int = 64, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute chunk documents transform @@ -162,8 +162,8 @@ def doc_chunk( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py index e9e825748..c68715b5d 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_multiple_wf.py @@ -118,7 +118,7 @@ def doc_quality( docq_doc_content_column: str = "contents", docq_bad_word_filepath: str = "/home/ray/ldnoobw/en", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Document Quality transform @@ -157,8 +157,8 @@ def doc_quality( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py index b76732a58..b42262468 100644 --- a/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py +++ b/transforms/language/doc_quality/kfp_ray/doc_quality_wf.py @@ -118,7 +118,7 @@ def doc_quality( docq_doc_content_column: str = "contents", docq_bad_word_filepath: str = "/home/ray/ldnoobw/en", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Document Quality transform @@ -157,8 +157,8 @@ def doc_quality( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py index a7416875c..ecd58b6fe 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py @@ -127,7 +127,7 @@ def lang_id( lang_id_output_lang_column_name: str = "lang", lang_id_output_score_column_name: str = "score", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Language Identification transform @@ -169,8 +169,8 @@ def lang_id( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/lang_id/kfp_ray/lang_id_wf.py b/transforms/language/lang_id/kfp_ray/lang_id_wf.py index 9f02da7af..4f581cf2c 100644 --- a/transforms/language/lang_id/kfp_ray/lang_id_wf.py +++ b/transforms/language/lang_id/kfp_ray/lang_id_wf.py @@ -128,7 +128,7 @@ def lang_id( lang_id_output_lang_column_name: str = "lang", lang_id_output_score_column_name: str = "score", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Language Identification transform @@ -170,8 +170,8 @@ def lang_id( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py index 00ec76aad..fdfbc1fe4 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_multiple_wf.py @@ -115,7 +115,7 @@ def pdf2parquet( pdf2parquet_do_table_structure: bool = True, pdf2parquet_do_ocr: bool = False, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute PDF2PARQUET transform @@ -153,8 +153,8 @@ def pdf2parquet( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py index 6d7ef2808..c3bf399fe 100644 --- a/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py +++ b/transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py @@ -119,7 +119,7 @@ def pdf2parquet( pdf2parquet_do_table_structure: bool = True, pdf2parquet_do_ocr: bool = False, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute PDF2PARQUET transform @@ -157,8 +157,8 @@ def pdf2parquet( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py b/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py index b3c83bb04..f1c4dac98 100644 --- a/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py +++ b/transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py @@ -113,7 +113,7 @@ def pii_redactor( # pii_redactor parameters pii_redactor_contents: str = "title", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute pii_redactor transform @@ -150,8 +150,8 @@ def pii_redactor( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py index bad66f21e..120c53c99 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_multiple_wf.py @@ -118,7 +118,7 @@ def text_encoder( text_encoder_content_column_name: str = "contents", text_encoder_output_embeddings_column_name: str = "embeggings", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute TextEncoder transform @@ -157,8 +157,8 @@ def text_encoder( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py index 269ce4366..d402c8832 100644 --- a/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py +++ b/transforms/language/text_encoder/kfp_ray/text_encoder_wf.py @@ -119,7 +119,7 @@ def text_encoder( text_encoder_content_column_name: str = "contents", text_encoder_output_embeddings_column_name: str = "embeggings", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute TextEncoder transform @@ -158,8 +158,8 @@ def text_encoder( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py index 179d64417..1eb96af25 100644 --- a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py @@ -129,7 +129,7 @@ def doc_id( doc_id_int_column: str = "int_id_column", doc_id_start_id: int = 0, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute NOOP transform @@ -169,8 +169,8 @@ def doc_id( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/ededup/kfp_ray/ededup_wf.py b/transforms/universal/ededup/kfp_ray/ededup_wf.py index 9d9bb7088..306391d6c 100644 --- a/transforms/universal/ededup/kfp_ray/ededup_wf.py +++ b/transforms/universal/ededup/kfp_ray/ededup_wf.py @@ -93,7 +93,7 @@ def ededup( # data sampling ededup_n_samples: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute EDEDUP transform @@ -134,8 +134,8 @@ def ededup( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py index da29c9fa5..c98ffafa3 100644 --- a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py +++ b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py @@ -107,7 +107,7 @@ def fdedup( # data sampling fdedup_n_samples: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute FDEDUP transform @@ -159,8 +159,8 @@ def fdedup( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/filter/kfp_ray/filter_wf.py b/transforms/universal/filter/kfp_ray/filter_wf.py index bc37440e7..b998cd7b5 100644 --- a/transforms/universal/filter/kfp_ray/filter_wf.py +++ b/transforms/universal/filter/kfp_ray/filter_wf.py @@ -119,7 +119,7 @@ def filtering( filter_logical_operator: str = "AND", filter_columns_to_drop: str = "['extra', 'cluster']", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute Filtering transform @@ -158,8 +158,8 @@ def filtering( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py index 90b585718..a1f6592a8 100644 --- a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py @@ -112,7 +112,7 @@ def noop( # noop parameters noop_sleep_sec: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute NOOP transform @@ -149,8 +149,8 @@ def noop( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/noop/kfp_ray/noop_wf.py b/transforms/universal/noop/kfp_ray/noop_wf.py index dfa044017..0e01bbe7f 100644 --- a/transforms/universal/noop/kfp_ray/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_wf.py @@ -103,7 +103,7 @@ def noop( server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access data_s3_config: str = "{'input_folder': 'test/noop/input/', 'output_folder': 'test/noop/output/'}", - data_s3_access_secret: str = "s3-minio", + data_s3_access_secret: str = "s3-secret", data_max_files: int = -1, data_num_samples: int = -1, data_checkpointing: bool = False, @@ -114,7 +114,7 @@ def noop( # noop parameters noop_sleep_sec: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute NOOP transform @@ -151,8 +151,10 @@ def noop( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op( + ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params + ) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/profiler/kfp_ray/profiler_wf.py b/transforms/universal/profiler/kfp_ray/profiler_wf.py index 362a3ebee..7f21fa3e0 100644 --- a/transforms/universal/profiler/kfp_ray/profiler_wf.py +++ b/transforms/universal/profiler/kfp_ray/profiler_wf.py @@ -91,7 +91,7 @@ def profiler( # data sampling profiler_n_samples: int = 10, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute EDEDUP transform @@ -130,8 +130,8 @@ def profiler( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/resize/kfp_ray/resize_wf.py b/transforms/universal/resize/kfp_ray/resize_wf.py index bc425e305..f9b325674 100644 --- a/transforms/universal/resize/kfp_ray/resize_wf.py +++ b/transforms/universal/resize/kfp_ray/resize_wf.py @@ -126,7 +126,7 @@ def resize( resize_max_mbytes_per_table: int = -1, resize_size_type: str = "disk", # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute NOOP transform @@ -165,8 +165,8 @@ def resize( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params diff --git a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py index bb958e69d..ba96a790a 100644 --- a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py +++ b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py @@ -131,7 +131,7 @@ def tokenization( tkn_tokenizer_args: str = "cache_dir=/tmp/hf", tkn_chunk_size: int = 0, # additional parameters - additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', ): """ Pipeline to execute tokenization transform @@ -173,8 +173,8 @@ def tokenization( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60) + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): # compute execution params