Skip to content

Commit

Permalink
Merge pull request #574 from umccr/patch-0.2.2a
Browse files Browse the repository at this point in the history
Merge pull request #573 from umccr/prod-patch/0-2-1
  • Loading branch information
victorskl authored Sep 24, 2024
2 parents da41498 + 2aca89d commit 0bcfc7e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@
# Standard imports
from pathlib import Path
from typing import List
from urllib.parse import urlunparse, urlparse
import boto3
from os import environ
import typing
import logging

# Wrapica imports
from wrapica.libica_models import ProjectData
from wrapica.job import get_job
from wrapica.project_data import (
convert_uri_to_project_data_obj, project_data_copy_batch_handler
convert_uri_to_project_data_obj, project_data_copy_batch_handler, delete_project_data
)

if typing.TYPE_CHECKING:
from mypy_boto3_ssm import SSMClient
from mypy_boto3_secretsmanager import SecretsManagerClient

# Set logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Globals
SUCCESS_STATES = [
"SUCCEEDED"
Expand All @@ -63,6 +70,7 @@
DEFAULT_WAIT_TIME_SECONDS = 10
DEFAULT_WAIT_TIME_SECONDS_EXT = 10

# Globals
ICAV2_BASE_URL = "https://ica.illumina.com/ica/rest"


Expand Down Expand Up @@ -222,6 +230,64 @@ def handler(event, context):

# Handle successful job
if job_status is True:
# Confirm source uris have made it to the destination successfully
# Get dest folder
dest_project_folder_data_obj: ProjectData = convert_uri_to_project_data_obj(dest_uri)

# Iterate through each source uri
has_errors = False
for source_uri in source_uris:
# Get the source project data object
source_project_data_obj: ProjectData = convert_uri_to_project_data_obj(source_uri)

# Get the dest uri file name
dest_file_uri = str(
urlunparse(
(
urlparse(dest_uri).scheme,
urlparse(dest_uri).netloc,
str(Path(dest_project_folder_data_obj.data.details.path) / source_project_data_obj.data.details.name),
None, None, None
)
)
)
# Get the dest project data object
dest_project_data_file_obj = convert_uri_to_project_data_obj(
dest_file_uri
)

# Compare the source and dest project data objects etags
if source_project_data_obj.data.details.file_size_in_bytes != dest_project_data_file_obj.data.details.file_size_in_bytes:
# Set has errors to true
has_errors = True
logger.error("Data size mismatch between source and dest project data objects")
logger.error(f"Data {source_uri} was transferred to {dest_file_uri} but the file sizes do not match")
logger.error(f"Source file size: {source_project_data_obj.data.details.file_size_in_bytes}")
logger.error(f"Dest file size: {dest_project_data_file_obj.data.details.file_size_in_bytes}")
logger.error("Purging the dest uri file and starting again")
# Purge the dest uri file and start again
delete_project_data(
project_id=dest_project_data_file_obj.project_id,
data_id=dest_project_data_file_obj.data.id
)

# If we have errors, we need to rerun the job
if has_errors:
# Add this job id to the failed job list
failed_job_list.append(job_id)
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": submit_copy_job(
dest_uri=dest_uri,
source_uris=source_uris,
),
"failed_job_list": failed_job_list, # Empty list or list of failed jobs
"job_status": "RUNNING",
"wait_time_seconds": DEFAULT_WAIT_TIME_SECONDS
}

# If we don't have errors, we can return the job as successful
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,14 @@ def handler(event, context):

# Failed workflow
# if __name__ == "__main__":
# environ['AWS_PROFILE'] = 'umccr-production'
# environ['AWS_REGION'] = 'ap-southeast-2'
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production"
# print(
# json.dumps(
# handler(
# {
# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/202409156f4e1c52/"
# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/20240922130c78be/"
# },
# None
# ),
Expand All @@ -299,10 +301,10 @@ def handler(event, context):
#
# # {
# # "success": false,
# # "message": "Workflow failed at 'DragenCaller' step"
# # "message": "Workflow failed at 'FastqValidation' step"
# # }

# Passing workflow
# # Passing workflow
# if __name__ == "__main__":
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production"
# print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
],
"ResultSelector": {
"workflow_success.$": "$.Payload.success",
"error_message.$": "$.Payload.error_message"
"error_message.$": "$.Payload.message"
},
"ResultPath": "$.check_successful_analysis_step",
"Next": "Is successful analysis"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
},
"id_type": {
"S": "${__instrument_run_partition_name__}"
},
"library_set": {
"SS": []
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
},
"id_type": {
"S": "${__instrument_run_partition_name__}"
},
"library_set": {
"SS": []
}
}
},
Expand Down

0 comments on commit 0bcfc7e

Please sign in to comment.