Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 2.2.3 #713

Merged
merged 6 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import logging
from typing import Optional
from urllib.parse import urlparse

from libumccr.aws import libssm, libsqs

Expand Down Expand Up @@ -99,10 +100,8 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict:
# WGS Tumor/Normal BAMs output from DRAGEN alignment in ICA/GDS
tumor_wgs_sample_id = wgs_input['tumor_wgs_sample_id']
tumor_wgs_library_id = wgs_input['tumor_wgs_library_id']
tumor_wgs_bam = wgs_input['tumor_wgs_bam']
normal_wgs_sample_id = wgs_input['normal_wgs_sample_id']
normal_wgs_library_id = wgs_input['normal_wgs_library_id']
normal_wgs_bam = wgs_input['normal_wgs_bam']

# WTS BAM output from STAR aligner
tumor_wts_sample_id = wts_input['tumor_wts_sample_id']
Expand All @@ -113,6 +112,10 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict:
existing_wgs_dir = get_existing_wgs_dir(wgs_wf)
existing_wts_dir = get_existing_wts_dir(wts_wf)

# Select existing WGS MarkDups BAMs; DRAGEN BAMs from original WGS input are ignored
tumor_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, tumor_wgs_sample_id)
normal_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, normal_wgs_sample_id)

payload = {
"subject_id": subject_id,
"tumor_wgs_sample_id": tumor_wgs_sample_id,
Expand Down Expand Up @@ -203,6 +206,32 @@ def get_existing_wgs_dir(this_workflow: Workflow) -> str:
raise ValueError("Found none or many output directory")


def get_existing_wgs_markdups_bam(existing_wgs_dir: str, sample_id: str):
"""
Locate existing WGS MarkDups BAMs and return corresponding path.

:param str existing_wgs_dir: S3 path to previously generated WGS output
:param str sample_id: identifier for the WGS tumor and normal sample
:return: MarkDups BAM path
:rtype: str
:raises ValueError: if exactly one MarkDups BAM isn't found
"""

existing_wgs_dir_prefix = urlparse(existing_wgs_dir).path.lstrip('/')
results = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[
existing_wgs_dir_prefix,
f"alignments/dna/{sample_id}.markdups.bam",
])

filtered_list = list(filter(lambda x: str(x).endswith(".bam"), results))

if len(filtered_list) != 1:
message_component = "No MarkDups BAM" if len(filtered_list) == 0 else "Multiple MarkDups BAMs"
raise ValueError(f"{message_component} found in existing WGS output: {existing_wgs_dir}")

return filtered_list[0]


def get_existing_wts_dir(this_workflow: Workflow) -> str:
"""
this_workflow is succeeded oncoanalyser_wts type
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from datetime import timedelta, datetime

from django.utils.timezone import make_aware
from django.utils.timezone import now, make_aware

from data_portal.models import S3Object
from data_portal.models import Workflow
from data_portal.models.labmetadata import LabMetadata
from data_portal.models.libraryrun import LibraryRun
Expand All @@ -16,6 +17,37 @@
from data_processors.pipeline.tests.case import PipelineUnitTestCase, logger


def generate_mock_markdups_bam(include_bad_duplicate=False):
_ = S3Object.objects.create(
bucket="bucket1",
key=(
f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/"
f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/"
f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/"
f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}/"
f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam"
),
size=1000,
last_modified_date=now(),
e_tag="abcdefghi123456"
)

if include_bad_duplicate:
_ = S3Object.objects.create(
bucket="bucket1",
key=(
f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/"
f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/"
f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/"
f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}//"
f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam"
),
size=1000,
last_modified_date=now(),
e_tag="abcdefghi123456"
)


class OncoanalyserWgtsExistingBothStepUnitTests(PipelineUnitTestCase):

def test_perform(self):
Expand Down Expand Up @@ -47,6 +79,8 @@ def test_perform(self):
workflow=mock_wts_wfl,
)

_ = generate_mock_markdups_bam()

result = oncoanalyser_wgts_existing_both_step.perform(this_workflow=mock_wgs_wfl)

self.assertIsNotNone(result)
Expand Down Expand Up @@ -288,3 +322,43 @@ def test_get_existing_wts_dir_raises_error(self):

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("Found none", str(e))

def test_find_markdup_bam_more_than_1(self):
"""
python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_more_than_1
"""
self.verify_local()

mock_wgs_wfl = OncoanalyserWgsWorkflowFactory()

_ = OncoanalyserWgsS3ObjectOutputFactory()

mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl)

_ = generate_mock_markdups_bam(include_bad_duplicate=True)

with self.assertRaises(ValueError) as cm:
_ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value)
e = cm.exception

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("Multiple MarkDups BAMs found", str(e))

def test_find_markdup_bam_less_than_1(self):
"""
python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_less_than_1
"""
self.verify_local()

mock_wgs_wfl = OncoanalyserWgsWorkflowFactory()

_ = OncoanalyserWgsS3ObjectOutputFactory()

mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl)

with self.assertRaises(ValueError) as cm:
_ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value)
e = cm.exception

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("No MarkDups BAM found", str(e))