From f739341f70e9125f7ab295ebb3c90f67f347a958 Mon Sep 17 00:00:00 2001 From: Stephen Watts Date: Thu, 11 Jul 2024 13:00:08 +1000 Subject: [PATCH 1/4] Use existing MarkDups BAMs in oncoanalyser jobs --- .../oncoanalyser_wgts_existing_both_step.py | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py index 3a14167c..04e60e99 100644 --- a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py +++ b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py @@ -99,10 +99,8 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict: # WGS Tumor/Normal BAMs output from DRAGEN alignment in ICA/GDS tumor_wgs_sample_id = wgs_input['tumor_wgs_sample_id'] tumor_wgs_library_id = wgs_input['tumor_wgs_library_id'] - tumor_wgs_bam = wgs_input['tumor_wgs_bam'] normal_wgs_sample_id = wgs_input['normal_wgs_sample_id'] normal_wgs_library_id = wgs_input['normal_wgs_library_id'] - normal_wgs_bam = wgs_input['normal_wgs_bam'] # WTS BAM output from STAR aligner tumor_wts_sample_id = wts_input['tumor_wts_sample_id'] @@ -113,6 +111,10 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict: existing_wgs_dir = get_existing_wgs_dir(wgs_wf) existing_wts_dir = get_existing_wts_dir(wts_wf) + # Select existing WGS MarkDups BAMs; DRAGEN BAMs from original WGS input are ignored + tumor_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, tumor_wgs_sample_id) + normal_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, normal_wgs_sample_id) + payload = { "subject_id": subject_id, "tumor_wgs_sample_id": tumor_wgs_sample_id, @@ -203,6 +205,31 @@ def get_existing_wgs_dir(this_workflow: Workflow) -> str: raise ValueError("Found none or many output directory") +def get_existing_wgs_markdups_bam(existing_wgs_dir: str, sample_id: str): + """ + Locate existing WGS MarkDups BAMs and return corresponding path. + + :param str existing_wgs_dir: S3 path to previously generated WGS output + :param str sample_id: identifier for the WGS tumor and normal sample + :return: MarkDups BAM path + :rtype: str + :raises ValueError: if exactly one MarkDups BAM isn't found + """ + + results = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[ + existing_wgs_dir, + f"alignments/dna/{sample_id}.markdups.bam", + ]) + + filtered_list = list(filter(lambda x: str(x).endswith(".bam"), results)) + + if len(filtered_list) != 1: + message_component = "No MarkDups BAM" if len(filtered_list) == 0 else "Multiple MarkDups BAMs" + raise ValueError(f"{message_component} found in existing WGS output: {existing_wgs_dir}") + + return filtered_list[0] + + def get_existing_wts_dir(this_workflow: Workflow) -> str: """ this_workflow is succeeded oncoanalyser_wts type From 5e04263a16257bad719d2fb6bc038dc53030aec5 Mon Sep 17 00:00:00 2001 From: Stephen Watts Date: Thu, 11 Jul 2024 14:25:32 +1000 Subject: [PATCH 2/4] Use existing WGS dir prefix to locate MarkDups BAM --- .../orchestration/oncoanalyser_wgts_existing_both_step.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py index 04e60e99..c183369e 100644 --- a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py +++ b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py @@ -7,6 +7,7 @@ import json import logging from typing import Optional +from urllib.parse import urlparse from libumccr.aws import libssm, libsqs @@ -216,13 +217,18 @@ def get_existing_wgs_markdups_bam(existing_wgs_dir: str, sample_id: str): :raises ValueError: if exactly one MarkDups BAM isn't found """ + existing_wgs_dir_prefix = urlparse(existing_wgs_dir).path.lstrip('/') results = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[ - existing_wgs_dir, + existing_wgs_dir_prefix, f"alignments/dna/{sample_id}.markdups.bam", ]) filtered_list = list(filter(lambda x: str(x).endswith(".bam"), results)) + results_all = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[ + existing_wgs_dir_prefix, + ]) + if len(filtered_list) != 1: message_component = "No MarkDups BAM" if len(filtered_list) == 0 else "Multiple MarkDups BAMs" raise ValueError(f"{message_component} found in existing WGS output: {existing_wgs_dir}") From 1ddb6eb1ca02c4a03c35914fd09f9a558c0ea78b Mon Sep 17 00:00:00 2001 From: Stephen Watts Date: Thu, 11 Jul 2024 14:26:31 +1000 Subject: [PATCH 3/4] Implement tests for new MarkDups BAM locator code --- ...st_oncoanalyser_wgts_existing_both_step.py | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/data_processors/pipeline/orchestration/tests/test_oncoanalyser_wgts_existing_both_step.py b/data_processors/pipeline/orchestration/tests/test_oncoanalyser_wgts_existing_both_step.py index 6e886b9c..1ecacb60 100644 --- a/data_processors/pipeline/orchestration/tests/test_oncoanalyser_wgts_existing_both_step.py +++ b/data_processors/pipeline/orchestration/tests/test_oncoanalyser_wgts_existing_both_step.py @@ -1,8 +1,9 @@ import json from datetime import timedelta, datetime -from django.utils.timezone import make_aware +from django.utils.timezone import now, make_aware +from data_portal.models import S3Object from data_portal.models import Workflow from data_portal.models.labmetadata import LabMetadata from data_portal.models.libraryrun import LibraryRun @@ -16,6 +17,37 @@ from data_processors.pipeline.tests.case import PipelineUnitTestCase, logger +def generate_mock_markdups_bam(include_bad_duplicate=False): + _ = S3Object.objects.create( + bucket="bucket1", + key=( + f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/" + f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/" + f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/" + f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}/" + f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam" + ), + size=1000, + last_modified_date=now(), + e_tag="abcdefghi123456" + ) + + if include_bad_duplicate: + _ = S3Object.objects.create( + bucket="bucket1", + key=( + f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/" + f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/" + f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/" + f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}//" + f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam" + ), + size=1000, + last_modified_date=now(), + e_tag="abcdefghi123456" + ) + + class OncoanalyserWgtsExistingBothStepUnitTests(PipelineUnitTestCase): def test_perform(self): @@ -47,6 +79,8 @@ def test_perform(self): workflow=mock_wts_wfl, ) + _ = generate_mock_markdups_bam() + result = oncoanalyser_wgts_existing_both_step.perform(this_workflow=mock_wgs_wfl) self.assertIsNotNone(result) @@ -288,3 +322,43 @@ def test_get_existing_wts_dir_raises_error(self): logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}") self.assertIn("Found none", str(e)) + + def test_find_markdup_bam_more_than_1(self): + """ + python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_more_than_1 + """ + self.verify_local() + + mock_wgs_wfl = OncoanalyserWgsWorkflowFactory() + + _ = OncoanalyserWgsS3ObjectOutputFactory() + + mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl) + + _ = generate_mock_markdups_bam(include_bad_duplicate=True) + + with self.assertRaises(ValueError) as cm: + _ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value) + e = cm.exception + + logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}") + self.assertIn("Multiple MarkDups BAMs found", str(e)) + + def test_find_markdup_bam_less_than_1(self): + """ + python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_less_than_1 + """ + self.verify_local() + + mock_wgs_wfl = OncoanalyserWgsWorkflowFactory() + + _ = OncoanalyserWgsS3ObjectOutputFactory() + + mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl) + + with self.assertRaises(ValueError) as cm: + _ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value) + e = cm.exception + + logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}") + self.assertIn("No MarkDups BAM found", str(e)) From e7c6472f6cc6df90e7b4360f394f9edc5cc5836a Mon Sep 17 00:00:00 2001 From: Stephen Watts Date: Fri, 12 Jul 2024 11:01:32 +1000 Subject: [PATCH 4/4] Remove unused variable --- .../orchestration/oncoanalyser_wgts_existing_both_step.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py index c183369e..836a20bd 100644 --- a/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py +++ b/data_processors/pipeline/orchestration/oncoanalyser_wgts_existing_both_step.py @@ -225,10 +225,6 @@ def get_existing_wgs_markdups_bam(existing_wgs_dir: str, sample_id: str): filtered_list = list(filter(lambda x: str(x).endswith(".bam"), results)) - results_all = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[ - existing_wgs_dir_prefix, - ]) - if len(filtered_list) != 1: message_component = "No MarkDups BAM" if len(filtered_list) == 0 else "Multiple MarkDups BAMs" raise ValueError(f"{message_component} found in existing WGS output: {existing_wgs_dir}")