Skip to content

Commit

Permalink
Merge pull request #597 from umccr/enhancement/wts-auto-qc-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
alexiswl authored Aug 7, 2023
2 parents b7c85b2 + dd8ac9a commit bffcb8d
Show file tree
Hide file tree
Showing 20 changed files with 327 additions and 243 deletions.
44 changes: 44 additions & 0 deletions data_portal/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,28 @@ class Meta:
)


class DragenWgtsQcWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow

sequence_run = factory.SubFactory(SequenceRunFactory)
wfr_id = TestConstant.wfr_id.value
wfv_id = TestConstant.wfv_id.value
wfl_id = TestConstant.wfl_id.value
version = TestConstant.version.value
type_name = WorkflowType.DRAGEN_WGTS_QC.value
input = json.dumps({
"mock": "must load template from ssm parameter store"
})
start = make_aware(datetime.now())
end_status = WorkflowStatus.RUNNING.value
notified = True

wfr_name = factory.LazyAttribute(
lambda w: f"umccr__{w.type_name}__{w.sequence_run.name}__{w.sequence_run.run_id}__{utc_now_ts}"
)


class DragenWgsQcWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow
Expand All @@ -378,6 +400,28 @@ class Meta:
)


class DragenWtsQcWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow

sequence_run = factory.SubFactory(SequenceRunFactory)
wfr_id = TestConstant.wfr_id.value
wfv_id = TestConstant.wfv_id.value
wfl_id = TestConstant.wfl_id.value
version = TestConstant.version.value
type_name = WorkflowType.DRAGEN_WTS_QC.value
input = json.dumps({
"mock": "must load template from ssm parameter store"
})
start = make_aware(datetime.now())
end_status = WorkflowStatus.RUNNING.value
notified = True

wfr_name = factory.LazyAttribute(
lambda w: f"umccr__{w.type_name}__{w.sequence_run.name}__{w.sequence_run.run_id}__{utc_now_ts}"
)


class DragenWtsWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow
Expand Down
4 changes: 2 additions & 2 deletions data_portal/viewsets/tests/test_libraryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from data_portal.models.libraryrun import LibraryRun
from data_portal.models.workflow import Workflow
from data_portal.tests.factories import DragenWgsQcWorkflowFactory, TestConstant
from data_portal.tests.factories import DragenWgtsQcWorkflowFactory, TestConstant
from data_portal.viewsets.tests import _logger


class LibraryRunViewSetTestCase(TestCase):

def setUp(self):
self.mock_qc_wfl: Workflow = DragenWgsQcWorkflowFactory()
self.mock_qc_wfl: Workflow = DragenWgtsQcWorkflowFactory()

self.mock_lib_run = LibraryRun.objects.create(
library_id=TestConstant.library_id_tumor.value,
Expand Down
4 changes: 2 additions & 2 deletions data_processors/pipeline/domain/tests/test_pairing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from mockito import when

from data_portal.models.workflow import Workflow
from data_portal.tests.factories import DragenWgsQcWorkflowFactory, LabMetadataFactory
from data_portal.tests.factories import DragenWgtsQcWorkflowFactory, LabMetadataFactory
from data_processors.pipeline.domain.pairing import Pairing, CollectionBasedFluentImpl, TNPairing
from data_processors.pipeline.services import sequencerun_srv, workflow_srv, metadata_srv
from data_processors.pipeline.tests.case import PipelineUnitTestCase, PipelineIntegrationTestCase, logger
Expand Down Expand Up @@ -44,7 +44,7 @@ def test_by_sequence_runs(self):
"""
python manage.py test data_processors.pipeline.domain.tests.test_pairing.PairingUnitTests.test_by_sequence_runs
"""
mock_workflow: Workflow = DragenWgsQcWorkflowFactory()
mock_workflow: Workflow = DragenWgtsQcWorkflowFactory()
when(workflow_srv).get_succeeded_by_sequence_run(...).thenReturn([mock_workflow])

mock_seq_run = mock_workflow.sequence_run
Expand Down
15 changes: 15 additions & 0 deletions data_processors/pipeline/domain/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class SampleSheetCSV(Enum):

class WorkflowType(Enum):
BCL_CONVERT = "bcl_convert"
DRAGEN_WGTS_QC = "wgts_alignment_qc" # Placeholder workflow type
DRAGEN_WGS_QC = "wgs_alignment_qc"
DRAGEN_WTS_QC = "wts_alignment_qc"
TUMOR_NORMAL = "wgs_tumor_normal"
DRAGEN_TSO_CTDNA = "tso_ctdna_tumor_only"
DRAGEN_WTS = "wts_tumor_only"
Expand All @@ -40,8 +42,12 @@ class WorkflowType(Enum):
def from_value(cls, value):
if value == cls.BCL_CONVERT.value:
return cls.BCL_CONVERT
elif value == cls.DRAGEN_WGTS_QC.value:
return cls.DRAGEN_WGTS_QC
elif value == cls.DRAGEN_WGS_QC.value:
return cls.DRAGEN_WGS_QC
elif value == cls.DRAGEN_WTS_QC.value:
return cls.DRAGEN_WTS_QC
elif value == cls.TUMOR_NORMAL.value:
return cls.TUMOR_NORMAL
elif value == cls.DRAGEN_TSO_CTDNA.value:
Expand Down Expand Up @@ -185,6 +191,8 @@ class SecondaryAnalysisHelper(WorkflowHelper):
def __init__(self, type_: WorkflowType):
allowed_workflow_types = [
WorkflowType.DRAGEN_WGS_QC,
WorkflowType.DRAGEN_WGTS_QC,
WorkflowType.DRAGEN_WTS_QC,
WorkflowType.DRAGEN_TSO_CTDNA,
WorkflowType.DRAGEN_WTS,
WorkflowType.TUMOR_NORMAL,
Expand Down Expand Up @@ -370,6 +378,13 @@ def must_be_wgs(self):
raise LabMetadataRuleError(f"'WGS' != '{self.this_metadata.type}'.")
return self

def must_be_wgts(self):
from data_portal.models.labmetadata import LabMetadataType
wgts_values = [ LabMetadataType.WGS.value.lower(), LabMetadataType.WTS.value.lower() ]
if self.this_metadata.type.lower() not in wgts_values:
raise LabMetadataRuleError(f"'WGS' or 'WTS' != '{self.this_metadata.type}'.")
return self

def must_be_wts(self):
from data_portal.models.labmetadata import LabMetadataType
if self.this_metadata.type.lower() != LabMetadataType.WTS.value.lower():
Expand Down
44 changes: 34 additions & 10 deletions data_processors/pipeline/lambdas/dragen_wgs_qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from data_portal.models.workflow import Workflow
from data_processors.pipeline.services import sequencerun_srv, batch_srv, workflow_srv, metadata_srv, libraryrun_srv
from data_portal.models.labmetadata import LabMetadataType
from data_processors.pipeline.domain.workflow import WorkflowType, SecondaryAnalysisHelper
from data_processors.pipeline.lambdas import wes_handler
from libumccr import libjson, libdt
Expand Down Expand Up @@ -87,7 +88,7 @@ def handler(event, context) -> dict:
:return: workflow db record id, wfr_id, sample_name in JSON string
"""

logger.info(f"Start processing {WorkflowType.DRAGEN_WGS_QC.value} event")
logger.info(f"Start processing {WorkflowType.DRAGEN_WGTS_QC.value} event")
logger.info(libjson.dumps(event))

# Extract name of sample and the fastq list rows
Expand All @@ -103,13 +104,33 @@ def handler(event, context) -> dict:

sample_name = fastq_list_rows[0]['rgsm']

# Set workflow helper
wfl_helper = SecondaryAnalysisHelper(WorkflowType.DRAGEN_WGS_QC)
# Get metadata by library id
library_lab_metadata = metadata_srv.get_metadata_by_library_id(library_id)

# Check type is not None
if library_lab_metadata is None:
logger.error(f"Expected to retrieve metadata for library '{library_id}' but no metadata was returned")
raise ValueError

# We set the RNA flag and set the workflow type based on the library lab metadata
if library_lab_metadata.type == LabMetadataType.WTS:
workflow_type = WorkflowType.DRAGEN_WTS_QC
enable_rna = True
elif library_lab_metadata.type == LabMetadataType.WGS:
workflow_type = WorkflowType.DRAGEN_WGS_QC
enable_rna = False
else:
logger.error(f"Expected metadata type for library id '{library_id}' to be one of WGS or WTS")
raise ValueError

wfl_helper = SecondaryAnalysisHelper(workflow_type)

# Set workflow helper
workflow_input: dict = wfl_helper.get_workflow_input()
workflow_input["output_file_prefix"] = f"{sample_name}"
workflow_input["output_directory"] = f"{library_id}__{lane}_dragen"
workflow_input["fastq_list_rows"] = fastq_list_rows
workflow_input["enable_rna"] = enable_rna

# read workflow id and version from parameter store
workflow_id = wfl_helper.get_workflow_id()
Expand All @@ -125,13 +146,16 @@ def handler(event, context) -> dict:

workflow_engine_parameters = wfl_helper.get_engine_parameters(target_id=subject_id, secondary_target_id=None)

wfl_run = wes_handler.launch({
'workflow_id': workflow_id,
'workflow_version': workflow_version,
'workflow_run_name': workflow_run_name,
'workflow_input': workflow_input,
'workflow_engine_parameters': workflow_engine_parameters
}, context)
wfl_run = wes_handler.launch(
{
'workflow_id': workflow_id,
'workflow_version': workflow_version,
'workflow_run_name': workflow_run_name,
'workflow_input': workflow_input,
'workflow_engine_parameters': workflow_engine_parameters
},
context
)

workflow: Workflow = workflow_srv.create_or_update_workflow(
{
Expand Down
25 changes: 7 additions & 18 deletions data_processors/pipeline/lambdas/dragen_wts.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def sqs_handler(event, context):
def handler(event, context) -> dict:
"""event payload dict
{
"subject_id": "subject_id",
"library_id": "library_id (usually rglb)",
"fastq_list_rows": [{
"rgid": "index1.index2.lane",
Expand All @@ -82,9 +83,6 @@ def handler(event, context) -> dict:
}
}],
"arriba_large_mem": true,
"seq_run_id": "sequence run id",
"seq_name": "sequence run name",
"batch_run_id": "batch run id",
}
:param event:
Expand All @@ -96,15 +94,11 @@ def handler(event, context) -> dict:
logger.info(libjson.dumps(event))

# Extract name of sample and the fastq list rows
subject_id = event['subject_id']
library_id = event['library_id']
fastq_list_rows = event['fastq_list_rows']

# Set sequence run id
seq_run_id = event.get('seq_run_id', None)
seq_name = event.get('seq_name', None)
# Set batch run id
batch_run_id = event.get('batch_run_id', None)

# Set sample name
sample_name = fastq_list_rows[0]['rgsm']

# Set workflow helper
Expand All @@ -118,14 +112,11 @@ def handler(event, context) -> dict:
workflow_id = wfl_helper.get_workflow_id()
workflow_version = wfl_helper.get_workflow_version()

sqr = sequencerun_srv.get_sequence_run_by_run_id(seq_run_id) if seq_run_id else None
batch_run = batch_srv.get_batch_run(batch_run_id=batch_run_id) if batch_run_id else None

# construct and format workflow run name convention
subject_id = metadata_srv.get_subject_id_from_library_id(library_id)
workflow_run_name = wfl_helper.construct_workflow_name(
sample_name=library_id,
subject_id=subject_id)
subject_id=subject_id
)
workflow_engine_parameters = wfl_helper.get_engine_parameters(target_id=subject_id, secondary_target_id=None)

if event.get('arriba_large_mem', False):
Expand All @@ -151,24 +142,22 @@ def handler(event, context) -> dict:
'input': workflow_input,
'start': wfl_run.get('time_started'),
'end_status': wfl_run.get('status'),
'sequence_run': sqr,
'batch_run': batch_run,
}
)

# establish link between Workflow and LibraryRun
_ = libraryrun_srv.link_library_runs_with_workflow(library_id, workflow)
_ = libraryrun_srv.link_library_runs_with_x_seq_workflow([library_id], workflow)

# notification shall trigger upon wes.run event created action in workflow_update lambda

result = {
'subject_id': subject_id,
'library_id': library_id,
'id': workflow.id,
'wfr_id': workflow.wfr_id,
'wfr_name': workflow.wfr_name,
'status': workflow.end_status,
'start': libdt.serializable_datetime(workflow.start),
'batch_run_id': workflow.batch_run.id if batch_run else None,
}

logger.info(libjson.dumps(result))
Expand Down
33 changes: 21 additions & 12 deletions data_processors/pipeline/lambdas/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def handler(event, context):
"UPDATE_STEP",
"FASTQ_UPDATE_STEP",
"GOOGLE_LIMS_UPDATE_STEP",
"DRAGEN_WGS_QC_STEP",
"DRAGEN_WGTS_QC_STEP",
"DRAGEN_TSO_CTDNA_STEP",
"DRAGEN_WTS_STEP",
"TUMOR_NORMAL_STEP",
Expand All @@ -65,15 +65,15 @@ def handler(event, context):
'220524_A01010_0998_ABCF2HDSYX': [
"FASTQ_UPDATE_STEP",
"GOOGLE_LIMS_UPDATE_STEP",
"DRAGEN_WGS_QC_STEP",
"DRAGEN_WGTS_QC_STEP",
"DRAGEN_TSO_CTDNA_STEP",
"DRAGEN_WTS_STEP",
],
'220525_A01010_0999_ABCF2HDSYX': [
"UPDATE_STEP",
"FASTQ_UPDATE_STEP",
"GOOGLE_LIMS_UPDATE_STEP",
"DRAGEN_WGS_QC_STEP",
"DRAGEN_WGTS_QC_STEP",
"DRAGEN_TSO_CTDNA_STEP",
"DRAGEN_WTS_STEP",
]
Expand Down Expand Up @@ -187,10 +187,10 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):
logger.info("Updating Google LIMS")
google_lims_update_step.perform(this_workflow)

if "DRAGEN_WGS_QC_STEP" in skiplist:
logger.info("Skip performing DRAGEN_WGS_QC_STEP")
if any([step in skiplist for step in ["DRAGEN_WGTS_QC_STEP", "DRAGEN_WGS_QC_STEP", "DRAGEN_WTS_QC_STEP"]]):
logger.info("Skip performing DRAGEN_WGTS_QC_STEP")
else:
logger.info("Performing DRAGEN_WGS_QC_STEP")
logger.info("Performing DRAGEN_WGTS_QC_STEP")
results.append(dragen_wgs_qc_step.perform(this_workflow))

if "DRAGEN_TSO_CTDNA_STEP" in skiplist:
Expand All @@ -199,12 +199,6 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):
logger.info("Performing DRAGEN_TSO_CTDNA_STEP")
results.append(dragen_tso_ctdna_step.perform(this_workflow))

if "DRAGEN_WTS_STEP" in skiplist:
logger.info("Skip performing DRAGEN_WTS_STEP")
else:
logger.info("Performing DRAGEN_WTS_STEP")
results.append(dragen_wts_step.perform(this_workflow))

return results

elif this_workflow.type_name.lower() == WorkflowType.DRAGEN_WTS.value.lower() and \
Expand Down Expand Up @@ -244,6 +238,21 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):
results.append(tumor_normal_step.perform(this_workflow))

return results
elif this_workflow.type_name.lower() == WorkflowType.DRAGEN_WTS_QC.value.lower() and \
this_workflow.end_status.lower() == WorkflowStatus.SUCCEEDED.value.lower():
logger.info(f"Received DRAGEN_WTS_QC workflow notification")

WorkflowRule(this_workflow).must_associate_sequence_run().must_have_output()

results = list()

if "DRAGEN_WTS_STEP" in skiplist:
logger.info("Skip performing DRAGEN_WTS_STEP")
else:
logger.info("Performing DRAGEN_WTS_STEP")
results.append(dragen_wts_step.perform(this_workflow))

return results

elif this_workflow.type_name.lower() == WorkflowType.DRAGEN_TSO_CTDNA.value.lower() and \
this_workflow.end_status.lower() == WorkflowStatus.SUCCEEDED.value.lower():
Expand Down
Loading

0 comments on commit bffcb8d

Please sign in to comment.