Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes for 0.2.1 #573

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@
# Standard imports
from pathlib import Path
from typing import List
from urllib.parse import urlunparse, urlparse
import boto3
from os import environ
import typing
import logging

# Wrapica imports
from wrapica.libica_models import ProjectData
from wrapica.job import get_job
from wrapica.project_data import (
convert_uri_to_project_data_obj, project_data_copy_batch_handler
convert_uri_to_project_data_obj, project_data_copy_batch_handler, delete_project_data
)

if typing.TYPE_CHECKING:
from mypy_boto3_ssm import SSMClient
from mypy_boto3_secretsmanager import SecretsManagerClient

# Set logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Globals
SUCCESS_STATES = [
"SUCCEEDED"
Expand All @@ -63,6 +70,7 @@
DEFAULT_WAIT_TIME_SECONDS = 10
DEFAULT_WAIT_TIME_SECONDS_EXT = 10

# Globals
ICAV2_BASE_URL = "https://ica.illumina.com/ica/rest"


Expand Down Expand Up @@ -222,6 +230,64 @@ def handler(event, context):

# Handle successful job
if job_status is True:
# Confirm source uris have made it to the destination successfully
# Get dest folder
dest_project_folder_data_obj: ProjectData = convert_uri_to_project_data_obj(dest_uri)

# Iterate through each source uri
has_errors = False
for source_uri in source_uris:
# Get the source project data object
source_project_data_obj: ProjectData = convert_uri_to_project_data_obj(source_uri)

# Get the dest uri file name
dest_file_uri = str(
urlunparse(
(
urlparse(dest_uri).scheme,
urlparse(dest_uri).netloc,
str(Path(dest_project_folder_data_obj.data.details.path) / source_project_data_obj.data.details.name),
None, None, None
)
)
)
# Get the dest project data object
dest_project_data_file_obj = convert_uri_to_project_data_obj(
dest_file_uri
)

# Compare the source and dest project data objects etags
if source_project_data_obj.data.details.file_size_in_bytes != dest_project_data_file_obj.data.details.file_size_in_bytes:
# Set has errors to true
has_errors = True
logger.error("Data size mismatch between source and dest project data objects")
logger.error(f"Data {source_uri} was transferred to {dest_file_uri} but the file sizes do not match")
logger.error(f"Source file size: {source_project_data_obj.data.details.file_size_in_bytes}")
logger.error(f"Dest file size: {dest_project_data_file_obj.data.details.file_size_in_bytes}")
logger.error("Purging the dest uri file and starting again")
# Purge the dest uri file and start again
delete_project_data(
project_id=dest_project_data_file_obj.project_id,
data_id=dest_project_data_file_obj.data.id
)

# If we have errors, we need to rerun the job
if has_errors:
# Add this job id to the failed job list
failed_job_list.append(job_id)
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
"job_id": submit_copy_job(
dest_uri=dest_uri,
source_uris=source_uris,
),
"failed_job_list": failed_job_list, # Empty list or list of failed jobs
"job_status": "RUNNING",
"wait_time_seconds": DEFAULT_WAIT_TIME_SECONDS
}

# If we don't have errors, we can return the job as successful
return {
"dest_uri": dest_uri,
"source_uris": source_uris,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,14 @@ def handler(event, context):

# Failed workflow
# if __name__ == "__main__":
# environ['AWS_PROFILE'] = 'umccr-production'
# environ['AWS_REGION'] = 'ap-southeast-2'
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production"
# print(
# json.dumps(
# handler(
# {
# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/202409156f4e1c52/"
# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/20240922130c78be/"
# },
# None
# ),
Expand All @@ -299,10 +301,10 @@ def handler(event, context):
#
# # {
# # "success": false,
# # "message": "Workflow failed at 'DragenCaller' step"
# # "message": "Workflow failed at 'FastqValidation' step"
# # }

# Passing workflow
# # Passing workflow
# if __name__ == "__main__":
# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production"
# print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
],
"ResultSelector": {
"workflow_success.$": "$.Payload.success",
"error_message.$": "$.Payload.error_message"
"error_message.$": "$.Payload.message"
},
"ResultPath": "$.check_successful_analysis_step",
"Next": "Is successful analysis"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
},
"id_type": {
"S": "${__instrument_run_partition_name__}"
},
"library_set": {
"SS": []
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
},
"id_type": {
"S": "${__instrument_run_partition_name__}"
},
"library_set": {
"SS": []
}
}
},
Expand Down