Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented SASH automation #630

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions data_portal/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@


class TestConstant(Enum):
portal_run_id = "20230101abcdefgh"
portal_run_id2 = "20230102abcdefgh"
portal_run_id = "20230101abcdefgh" # typically use in T/N mock
portal_run_id2 = "20230102abcdefgh" # likewise use in transcriptome mock
portal_run_id_umccrise = "20230103abcdefgh"
portal_run_id_rnasum = "20230104abcdefgh"
portal_run_id_oncoanalyser = "20230105abcdefgh"

wfr_id = f"wfr.j317paO8zB6yG25Zm6PsgSivJEoq4Ums"
wfr_id2 = f"wfr.Q5555aO8zB6yG25Zm6PsgSivGwDx_Uaa"
wfv_id = f"wfv.TKWp7hsFnVTCE8KhfXEurUfTCqSa6zVx"
wfl_id = f"wfl.Dc4GzACbjhzOf3NbqAYjSmzkE1oWKI9H"
umccrise_portal_run_id = "20230103abcdefgh"

umccrise_wfr_id = f"wfr.umccrisezB6yG25Zm6PsgSivJEoq4Ums"
umccrise_wfv_id = f"wfv.umccrisenVTCE8KhfXEurUfTCqSa6zVx"
umccrise_wfl_id = f"wfl.umccrisejhzOf3NbqAYjSmzkE1oWKI9H"
rnasum_portal_run_id = "20230104abcdefgh"

rnasum_wfr_id = f"wfr.rnasumzB6yG25Zm6PsgSivJEoq4Ums"
rnasum_wfv_id = f"wfv.rnasumnVTCE8KhfXEurUfTCqSa6zVx"
rnasum_wfl_id = f"wfl.rnasumjhzOf3NbqAYjSmzkE1oWKI9H"

version = "v1"
instrument_run_id = "200508_A01052_0001_BH5LY7ACGT"
instrument_run_id2 = "220101_A01052_0002_XR5LY7TGCA"
Expand Down Expand Up @@ -455,7 +460,7 @@ class RNAsumWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow

portal_run_id = TestConstant.rnasum_portal_run_id.value
portal_run_id = TestConstant.portal_run_id_rnasum.value
wfr_id = TestConstant.rnasum_wfr_id.value
wfv_id = TestConstant.rnasum_wfv_id.value
wfl_id = TestConstant.rnasum_wfl_id.value
Expand Down Expand Up @@ -499,7 +504,7 @@ class UmccriseWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow

portal_run_id = TestConstant.umccrise_portal_run_id.value
portal_run_id = TestConstant.portal_run_id_umccrise.value
wfr_id = TestConstant.umccrise_wfr_id.value
wfv_id = TestConstant.umccrise_wfv_id.value
wfl_id = TestConstant.umccrise_wfl_id.value
Expand Down Expand Up @@ -553,7 +558,7 @@ class Meta:

bucket = "bucket1"
key = (f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/"
f"{TestConstant.portal_run_id.value}/wgs/"
f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/"
f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/")
size = 1000
last_modified_date = now()
Expand Down Expand Up @@ -586,16 +591,16 @@ class OncoanalyserWgsWorkflowFactory(factory.django.DjangoModelFactory):
class Meta:
model = Workflow

portal_run_id = TestConstant.portal_run_id.value
portal_run_id = TestConstant.portal_run_id_oncoanalyser.value
type_name = WorkflowType.ONCOANALYSER_WGS.value
input = json.dumps({
"subject_id": TestConstant.subject_id.value,
"tumor_wgs_sample_id": TestConstant.sample_id.value,
"tumor_wgs_library_id": TestConstant.library_id_tumor.value,
"tumor_wgs_bam": f"gds://vol1/{portal_run_id}/wgs_tumor_normal/tumor_wts_bam.bam",
"tumor_wgs_bam": f"gds://vol1/wgs_tumor_normal/{TestConstant.portal_run_id.value}/tumor_wts_bam.bam",
"normal_wgs_sample_id": TestConstant.sample_id.value,
"normal_wgs_library_id": TestConstant.library_id_normal.value,
"normal_wgs_bam": f"gds://vol1/{portal_run_id}/wgs_tumor_normal/normal_wgs_bam.bam",
"normal_wgs_bam": f"gds://vol1/wgs_tumor_normal/{TestConstant.portal_run_id.value}/normal_wgs_bam.bam",
})
start = make_aware(datetime.now())
end_status = WorkflowStatus.SUCCEEDED.value
Expand Down
2 changes: 2 additions & 0 deletions data_processors/pipeline/domain/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
SQS_ONCOANALYSER_WTS_QUEUE_ARN = "/data_portal/backend/sqs_oncoanalyser_wts_queue_arn"
SQS_ONCOANALYSER_WGS_QUEUE_ARN = "/data_portal/backend/sqs_oncoanalyser_wgs_queue_arn"
SQS_ONCOANALYSER_WGTS_QUEUE_ARN = "/data_portal/backend/sqs_oncoanalyser_wgts_queue_arn"
SQS_SASH_QUEUE_ARN = "/data_portal/backend/sqs_sash_queue_arn"

# SSM parameter names for external submission lambdas
STAR_ALIGNMENT_LAMBDA_ARN = "/nextflow_stack/star-align-nf/submission_lambda_arn"
# oncoanalyser submission lambda is the same, just with different mode in payload
ONCOANALYSER_WTS_LAMBDA_ARN = "/nextflow_stack/oncoanalyser/submission_lambda_arn"
ONCOANALYSER_WGS_LAMBDA_ARN = "/nextflow_stack/oncoanalyser/submission_lambda_arn"
ONCOANALYSER_WGTS_LAMBDA_ARN = "/nextflow_stack/oncoanalyser/submission_lambda_arn"
SASH_LAMBDA_ARN = "/nextflow_stack/sash/submission_lambda_arn"
4 changes: 4 additions & 0 deletions data_processors/pipeline/domain/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class WorkflowType(Enum):
ONCOANALYSER_WTS = "oncoanalyser_wts"
ONCOANALYSER_WGS = "oncoanalyser_wgs"
ONCOANALYSER_WGTS_EXISTING_BOTH = "oncoanalyser_wgts_existing_both"
SASH = "sash"

@classmethod
def from_value(cls, value):
Expand Down Expand Up @@ -69,6 +70,8 @@ def from_value(cls, value):
return cls.ONCOANALYSER_WGS
elif value == cls.ONCOANALYSER_WGTS_EXISTING_BOTH.value:
return cls.ONCOANALYSER_WGTS_EXISTING_BOTH
elif value == cls.SASH.value:
return cls.SASH
else:
raise ValueError(f"No matching type found for {value}")

Expand Down Expand Up @@ -256,6 +259,7 @@ def __init__(self, type_: WorkflowType):
WorkflowType.ONCOANALYSER_WTS,
WorkflowType.ONCOANALYSER_WGS,
WorkflowType.ONCOANALYSER_WGTS_EXISTING_BOTH,
WorkflowType.SASH
]
if type_ not in allowed_workflow_types:
raise ValueError(f"Unsupported WorkflowType for external analysis: {type_}")
Expand Down
14 changes: 12 additions & 2 deletions data_processors/pipeline/lambdas/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from data_processors.pipeline.services import workflow_srv
from data_processors.pipeline.orchestration import dragen_wgs_qc_step, tumor_normal_step, google_lims_update_step, \
dragen_tso_ctdna_step, fastq_update_step, dragen_wts_step, umccrise_step, rnasum_step, somalier_extract_step, \
star_alignment_step, oncoanalyser_wts_step, oncoanalyser_wgs_step, oncoanalyser_wgts_existing_both_step
star_alignment_step, oncoanalyser_wts_step, oncoanalyser_wgs_step, oncoanalyser_wgts_existing_both_step, sash_step
from data_processors.pipeline.domain.workflow import WorkflowType, WorkflowStatus, WorkflowRule
from data_processors.pipeline.lambdas import workflow_update
from libumccr import libjson
Expand Down Expand Up @@ -58,7 +58,8 @@ def init_skip(event):
"STAR_ALIGNMENT_STEP",
"ONCOANALYSER_WTS_STEP",
"ONCOANALYSER_WGS_STEP",
"ONCOANALYSER_WGTS_EXISTING_BOTH_STEP"
"ONCOANALYSER_WGTS_EXISTING_BOTH_STEP",
"SASH_STEP"
],
'by_run': {
'220524_A01010_0998_ABCF2HDSYX': [
Expand Down Expand Up @@ -386,6 +387,7 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):

elif this_workflow.type_name.lower() == WorkflowType.STAR_ALIGNMENT.value.lower() and \
this_workflow.end_status.lower() == WorkflowStatus.SUCCEEDED.value.lower():
logger.info("Received STAR_ALIGNMENT workflow notification")

WorkflowRule(this_workflow).must_have_output()

Expand All @@ -401,6 +403,7 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):

elif this_workflow.type_name.lower() == WorkflowType.ONCOANALYSER_WTS.value.lower() and \
this_workflow.end_status.lower() == WorkflowStatus.SUCCEEDED.value.lower():
logger.info("Received ONCOANALYSER_WTS workflow notification")

WorkflowRule(this_workflow).must_have_output()

Expand All @@ -416,6 +419,7 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):

elif this_workflow.type_name.lower() == WorkflowType.ONCOANALYSER_WGS.value.lower() and \
this_workflow.end_status.lower() == WorkflowStatus.SUCCEEDED.value.lower():
logger.info("Received ONCOANALYSER_WGS workflow notification")

WorkflowRule(this_workflow).must_have_output()

Expand All @@ -427,4 +431,10 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):
logger.info("Performing ONCOANALYSER_WGTS_EXISTING_BOTH_STEP")
results.append(oncoanalyser_wgts_existing_both_step.perform(this_workflow))

if "SASH_STEP" in skiplist:
logger.info("Skip performing SASH_STEP")
else:
logger.info("Performing SASH_STEP")
results.append(sash_step.perform(this_workflow))

return results
150 changes: 150 additions & 0 deletions data_processors/pipeline/lambdas/sash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
try:
import unzip_requirements
except ImportError:
pass

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'data_portal.settings.base')
django.setup()

# ---

import logging
import json
from libumccr import libjson, libdt, aws
from libumccr.aws import libssm
from libumccr.aws.liblambda import LambdaInvocationType

from data_portal.models import Workflow
from data_processors.pipeline.domain.config import SASH_LAMBDA_ARN
from data_processors.pipeline.domain.workflow import ExternalWorkflowHelper, WorkflowType
from data_processors.pipeline.services import workflow_srv, libraryrun_srv

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def sqs_handler(event, context):
"""
Unpack body from SQS wrapper.
SQS event payload dict:
{
'Records': [
{
'messageId': "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
'body': "{\"JSON\": \"Formatted Message\"}",
'messageAttributes': {},
'md5OfBody': "",
'eventSource': "aws:sqs",
'eventSourceARN': "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
},
...
]
}

Details event payload dict refer to https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Backing queue is FIFO queue and, guaranteed delivery-once, no duplication.

:param event:
:param context:
:return:
"""
messages = event['Records']

results = []
for message in messages:
job = libjson.loads(message['body'])
results.append(handler(job, context))

return {
'results': results
}


def handler(event, context) -> dict:
"""event payload dict
{
"subject_id": "SBJ00001",
"tumor_sample_id": "PRJ230001",
"tumor_library_id": "L2300001",
"normal_sample_id": "PRJ230002",
"normal_library_id": "L2300002",
"dragen_somatic_dir": "gds://production/analysis_data/SBJ00001/wgs_tumor_normal/20230515zyxwvuts/L2300001_L2300002/",
"dragen_germline_dir": "gds://production/analysis_data/SBJ00001/wgs_tumor_normal/20230515zyxwvuts/L2300002_dragen_germline/",
"oncoanalyser_dir": "s3://org.umccr.data.oncoanalyser/analysis_data/SBJ00001/oncoanalyser/20230518poiuytre/wgs/L2300001__L2300002/SBJ00001_PRJ230001/"
}
"""
logger.info(f"Start processing {WorkflowType.SASH.value} event")
logger.info(libjson.dumps(event))

# check expected information is present
tumor_library_id = event['tumor_library_id']
normal_library_id = event['normal_library_id']
tumor_sample_id = event['tumor_sample_id']
normal_sample_id = event['normal_sample_id']
subject_id = event['subject_id']
dragen_somatic_dir = event['dragen_somatic_dir']
dragen_germline_dir = event['dragen_germline_dir']
oncoanalyser_dir = event['oncoanalyser_dir']

# see sash payload for preparing job JSON structure
# https://github.com/umccr/nextflow-stack/blob/2ba3c88/application/pipeline-stacks/sash/lambda_functions/batch_job_submission/lambda_code.py#L20-L31
helper = ExternalWorkflowHelper(WorkflowType.SASH)
portal_run_id = helper.get_portal_run_id()

job = {
"portal_run_id": portal_run_id,
"subject_id": subject_id,
"tumor_sample_id": tumor_sample_id,
"tumor_library_id": tumor_library_id,
"normal_sample_id": normal_sample_id,
"normal_library_id": normal_library_id,
"dragen_somatic_dir": dragen_somatic_dir,
"dragen_germline_dir": dragen_germline_dir,
"oncoanalyser_dir": oncoanalyser_dir
}

# register workflow in workflow table
workflow: Workflow = workflow_srv.create_or_update_workflow(
{
'portal_run_id': portal_run_id,
'wfr_name': f"sash_{portal_run_id}",
'type': WorkflowType.SASH,
'input': job,
'end_status': "CREATED",
}
)

# establish link between Workflow and LibraryRun
libraryrun_srv.link_library_runs_with_x_seq_workflow([tumor_library_id, normal_library_id], workflow)

# submit job: call sash submission lambda
# NOTE: lambda_client and SSM parameter "should" be loaded statically on class initialisation instead of here
# (i.e. once instead of every invocation). However, that will prevent mockito from intercepting and complicate
# testing. We compromise the little execution overhead for ease of testing.
lambda_client = aws.lambda_client()
submission_lambda = libssm.get_ssm_param(SASH_LAMBDA_ARN)
logger.info(f"Using sash submission lambda: {submission_lambda}")
lambda_response = lambda_client.invoke(
FunctionName=submission_lambda,
InvocationType=LambdaInvocationType.EVENT.value,
Payload=json.dumps(job),
)
logger.info(f"Submission lambda response: {lambda_response}")

result = {
'portal_run_id': workflow.portal_run_id,
'subject_id': subject_id,
"tumor_library_id": tumor_library_id,
"normal_library_id": normal_library_id,
'id': workflow.id,
'wfr_name': workflow.wfr_name,
'status': workflow.end_status,
'start': libdt.serializable_datetime(workflow.start),
}

logger.info(libjson.dumps(result))

return result
3 changes: 1 addition & 2 deletions data_processors/pipeline/lambdas/star_alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# ---
import logging
import json
from datetime import datetime
from libumccr import libjson, libdt, aws
from libumccr.aws import libssm
from libumccr.aws.liblambda import LambdaInvocationType
Expand Down Expand Up @@ -130,10 +129,10 @@ def handler(event, context) -> dict:
logger.info(f"Submission lambda response: {lambda_response}")

result = {
'portal_run_id': workflow.portal_run_id,
'subject_id': subject_id,
'library_id': library_id,
'id': workflow.id,
'wfr_id': workflow.wfr_id,
'wfr_name': workflow.wfr_name,
'status': workflow.end_status,
'start': libdt.serializable_datetime(workflow.start),
Expand Down
55 changes: 55 additions & 0 deletions data_processors/pipeline/lambdas/tests/test_sash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json

from libumccr import aws
from libumccr.aws import libssm
from mockito import spy2, when, mock

from data_portal.models import Workflow, LibraryRun
from data_portal.tests.factories import TumorLibraryRunFactory, LibraryRunFactory
from data_processors.pipeline.domain.config import SASH_LAMBDA_ARN
from data_processors.pipeline.lambdas import sash
from data_processors.pipeline.services import workflow_srv
from data_processors.pipeline.tests.case import PipelineUnitTestCase, logger


class SashUnitTests(PipelineUnitTestCase):

def test_handler(self):
"""
python manage.py test data_processors.pipeline.lambdas.tests.test_sash.SashUnitTests.test_handler
"""
mock_lbr_tumor: LibraryRun = TumorLibraryRunFactory()
mock_lbr_normal: LibraryRun = LibraryRunFactory()

spy2(libssm.get_ssm_param)
when(libssm).get_ssm_param(SASH_LAMBDA_ARN).thenReturn('FOO')
mock_client = mock(aws.lambda_client())
mock_client.invoke = mock()
when(aws).lambda_client(...).thenReturn(mock_client)

result = sash.handler({
"subject_id": "SBJ00001",
"tumor_sample_id": "PRJ230001",
"tumor_library_id": mock_lbr_tumor.library_id,
"normal_sample_id": "PRJ230002",
"normal_library_id": mock_lbr_normal.library_id,
"dragen_somatic_dir": "gds://production/analysis_data/SBJ00001/wgs_tumor_normal/20230515zyxwvuts/L2300001_L2300002/",
"dragen_germline_dir": "gds://production/analysis_data/SBJ00001/wgs_tumor_normal/20230515zyxwvuts/L2300002_dragen_germline/",
"oncoanalyser_dir": "s3://org.umccr.data.oncoanalyser/analysis_data/SBJ00001/oncoanalyser/20230518poiuytre/wgs/L2300001__L2300002/SBJ00001_PRJ230001/"
}, None)

logger.info("-" * 32)
logger.info("Example sash.handler lambda output:")
logger.info(json.dumps(result))

# assert sash workflow launch success and save workflow run in db
qs = Workflow.objects.all()
self.assertEqual(1, qs.count())

# assert that we can query related LibraryRun from Workflow side
wfl = qs.get(type_name='sash')
all_lib_runs = workflow_srv.get_all_library_runs_by_workflow(wfl)
self.assertEqual(2, len(all_lib_runs))
self.assertIn(mock_lbr_tumor, all_lib_runs)
self.assertIn(mock_lbr_normal, all_lib_runs)
self.assertEqual(result['normal_library_id'], mock_lbr_normal.library_id)
Loading