From 2f97a50afd2e956d920ed932e7f15b1cdbe81a3a Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Thu, 23 May 2024 13:53:23 +0300 Subject: [PATCH 1/2] Fail workflow if input size is empty. Signed-off-by: Revital Sur --- .../universal/ededup/src/ededup_compute_execution_params.py | 3 +++ .../universal/fdedup/src/fdedup_compute_execution_params.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py b/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py index 99926e2c5..fb9b7bf20 100644 --- a/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py +++ b/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py @@ -63,6 +63,9 @@ def ededup_compute_execution_params( sampling = data_access.sample_input_data(n_samples=n_samples) avg_doc_size = sampling.get("average doc size KB") number_of_docs = sampling.get("estimated number of docs") + if avg_doc_size == 0 or number_of_docs == 0: + print(f"Estimated number of documents and documents size is zero. Please verify the input path.") + sys.exit(1) avg_table_size = sampling.get("average table size MB") / KB # compute number of hashes n_hashes = math.ceil(number_of_docs * 32 / GB) diff --git a/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py b/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py index 6aed9124e..db1f79d10 100644 --- a/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py +++ b/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py @@ -135,6 +135,9 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: avg_doc_size = sampling.get("average doc size KB") number_of_docs = sampling.get("estimated number of docs") avg_table_size = sampling.get("average table size MB") / KB + if avg_doc_size == 0 or number_of_docs == 0: + print(f"Estimated number of documents and documents size is zero. Please verify the input path.") + sys.exit(1) # we are creating more buckets actors, so that we get better parallelization for bucket processing b_actors = math.ceil(num_buckets * number_of_docs * 64 * 1.1 / GB) d_actors = math.ceil(number_of_docs * 48 * 1.1 / GB) From 6a266cc98e954a13f13d6f915360fd2f09e4d2c9 Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Thu, 23 May 2024 14:57:00 +0300 Subject: [PATCH 2/2] Address review comments. Signed-off-by: Revital Sur --- .../universal/ededup/src/ededup_compute_execution_params.py | 2 +- .../universal/fdedup/src/fdedup_compute_execution_params.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py b/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py index fb9b7bf20..529a6ace3 100644 --- a/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py +++ b/kfp/transform_workflows/universal/ededup/src/ededup_compute_execution_params.py @@ -63,7 +63,7 @@ def ededup_compute_execution_params( sampling = data_access.sample_input_data(n_samples=n_samples) avg_doc_size = sampling.get("average doc size KB") number_of_docs = sampling.get("estimated number of docs") - if avg_doc_size == 0 or number_of_docs == 0: + if number_of_docs == 0: print(f"Estimated number of documents and documents size is zero. Please verify the input path.") sys.exit(1) avg_table_size = sampling.get("average table size MB") / KB diff --git a/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py b/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py index db1f79d10..a9f8b8d66 100644 --- a/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py +++ b/kfp/transform_workflows/universal/fdedup/src/fdedup_compute_execution_params.py @@ -135,7 +135,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: avg_doc_size = sampling.get("average doc size KB") number_of_docs = sampling.get("estimated number of docs") avg_table_size = sampling.get("average table size MB") / KB - if avg_doc_size == 0 or number_of_docs == 0: + if number_of_docs == 0: print(f"Estimated number of documents and documents size is zero. Please verify the input path.") sys.exit(1) # we are creating more buckets actors, so that we get better parallelization for bucket processing