From 459ec664ee2798ffd0ad64ea4a0af1e50de7962d Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Mon, 23 Sep 2024 13:59:28 +1000 Subject: [PATCH] Bug fixes for 0.2.1 Handle project copy batch: * After 'successful' copy, check file sizes are the same between source and destination #572 Fix cttsov2 outputs json: * Error when collecting error message, just a typo #571 Roll back library_set init * Was put in to prevent sfn failure when a sample type was not present on a run but now fails due to dynamodb not supporting empty sets, will find a better solution soon # 570 --- .../check_or_launch_job_lambda.py | 68 ++++++++++++++++++- .../lambdas/check_success_py/check_success.py | 8 ++- .../set_cttso_v2_nf_outputs.asl.json | 2 +- ...v2_instrument_run_db_sfn_template.asl.json | 3 - ...ts_instrument_run_db_sfn_template.asl.json | 3 - 5 files changed, 73 insertions(+), 11 deletions(-) diff --git a/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py b/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py index c695885fa..c5f307269 100644 --- a/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py +++ b/lib/workload/components/icav2-copy-files/check_or_launch_job_lambda_py/check_or_launch_job_lambda.py @@ -29,20 +29,27 @@ # Standard imports from pathlib import Path from typing import List +from urllib.parse import urlunparse, urlparse import boto3 from os import environ import typing +import logging # Wrapica imports +from wrapica.libica_models import ProjectData from wrapica.job import get_job from wrapica.project_data import ( - convert_uri_to_project_data_obj, project_data_copy_batch_handler + convert_uri_to_project_data_obj, project_data_copy_batch_handler, delete_project_data ) if typing.TYPE_CHECKING: from mypy_boto3_ssm import SSMClient from mypy_boto3_secretsmanager import SecretsManagerClient +# Set logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + # Globals SUCCESS_STATES = [ "SUCCEEDED" @@ -63,6 +70,7 @@ DEFAULT_WAIT_TIME_SECONDS = 10 DEFAULT_WAIT_TIME_SECONDS_EXT = 10 +# Globals ICAV2_BASE_URL = "https://ica.illumina.com/ica/rest" @@ -222,6 +230,64 @@ def handler(event, context): # Handle successful job if job_status is True: + # Confirm source uris have made it to the destination successfully + # Get dest folder + dest_project_folder_data_obj: ProjectData = convert_uri_to_project_data_obj(dest_uri) + + # Iterate through each source uri + has_errors = False + for source_uri in source_uris: + # Get the source project data object + source_project_data_obj: ProjectData = convert_uri_to_project_data_obj(source_uri) + + # Get the dest uri file name + dest_file_uri = str( + urlunparse( + ( + urlparse(dest_uri).scheme, + urlparse(dest_uri).netloc, + str(Path(dest_project_folder_data_obj.data.details.path) / source_project_data_obj.data.details.name), + None, None, None + ) + ) + ) + # Get the dest project data object + dest_project_data_file_obj = convert_uri_to_project_data_obj( + dest_file_uri + ) + + # Compare the source and dest project data objects etags + if source_project_data_obj.data.details.file_size_in_bytes != dest_project_data_file_obj.data.details.file_size_in_bytes: + # Set has errors to true + has_errors = True + logger.error("Data size mismatch between source and dest project data objects") + logger.error(f"Data {source_uri} was transferred to {dest_file_uri} but the file sizes do not match") + logger.error(f"Source file size: {source_project_data_obj.data.details.file_size_in_bytes}") + logger.error(f"Dest file size: {dest_project_data_file_obj.data.details.file_size_in_bytes}") + logger.error("Purging the dest uri file and starting again") + # Purge the dest uri file and start again + delete_project_data( + project_id=dest_project_data_file_obj.project_id, + data_id=dest_project_data_file_obj.data.id + ) + + # If we have errors, we need to rerun the job + if has_errors: + # Add this job id to the failed job list + failed_job_list.append(job_id) + return { + "dest_uri": dest_uri, + "source_uris": source_uris, + "job_id": submit_copy_job( + dest_uri=dest_uri, + source_uris=source_uris, + ), + "failed_job_list": failed_job_list, # Empty list or list of failed jobs + "job_status": "RUNNING", + "wait_time_seconds": DEFAULT_WAIT_TIME_SECONDS + } + + # If we don't have errors, we can return the job as successful return { "dest_uri": dest_uri, "source_uris": source_uris, diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py index 6229d7be7..df1104361 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py @@ -284,12 +284,14 @@ def handler(event, context): # Failed workflow # if __name__ == "__main__": +# environ['AWS_PROFILE'] = 'umccr-production' +# environ['AWS_REGION'] = 'ap-southeast-2' # environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production" # print( # json.dumps( # handler( # { -# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/202409156f4e1c52/" +# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/20240922130c78be/" # }, # None # ), @@ -299,10 +301,10 @@ def handler(event, context): # # # { # # "success": false, -# # "message": "Workflow failed at 'DragenCaller' step" +# # "message": "Workflow failed at 'FastqValidation' step" # # } -# Passing workflow +# # Passing workflow # if __name__ == "__main__": # environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production" # print( diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json index c999f0920..e5206aa09 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json @@ -81,7 +81,7 @@ ], "ResultSelector": { "workflow_success.$": "$.Payload.success", - "error_message.$": "$.Payload.error_message" + "error_message.$": "$.Payload.message" }, "ResultPath": "$.check_successful_analysis_step", "Next": "Is successful analysis" diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/jb-weld/part_1/initialise-cttsov2-instrument-dbs/step_functions_templates/initialise_cttsov2_instrument_run_db_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/jb-weld/part_1/initialise-cttsov2-instrument-dbs/step_functions_templates/initialise_cttsov2_instrument_run_db_sfn_template.asl.json index 740424419..d56d8d202 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/jb-weld/part_1/initialise-cttsov2-instrument-dbs/step_functions_templates/initialise_cttsov2_instrument_run_db_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/jb-weld/part_1/initialise-cttsov2-instrument-dbs/step_functions_templates/initialise_cttsov2_instrument_run_db_sfn_template.asl.json @@ -21,9 +21,6 @@ }, "id_type": { "S": "${__instrument_run_partition_name__}" - }, - "library_set": { - "SS": [] } } }, diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/kwik/part_1/initialise-wgts-instrument-run-db/step_functions_templates/initialise_wgts_instrument_run_db_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/kwik/part_1/initialise-wgts-instrument-run-db/step_functions_templates/initialise_wgts_instrument_run_db_sfn_template.asl.json index 740424419..d56d8d202 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/kwik/part_1/initialise-wgts-instrument-run-db/step_functions_templates/initialise_wgts_instrument_run_db_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/kwik/part_1/initialise-wgts-instrument-run-db/step_functions_templates/initialise_wgts_instrument_run_db_sfn_template.asl.json @@ -21,9 +21,6 @@ }, "id_type": { "S": "${__instrument_run_partition_name__}" - }, - "library_set": { - "SS": [] } } },