Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Star Alignment Automation #616

Merged
merged 8 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions data_processors/pipeline/domain/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
SQS_DRAGEN_WTS_QUEUE_ARN = "/data_portal/backend/sqs_dragen_wts_queue_arn"
SQS_RNASUM_QUEUE_ARN = "/data_portal/backend/sqs_rnasum_queue_arn"
SQS_SOMALIER_EXTRACT_QUEUE_ARN = "/data_portal/backend/sqs_somalier_extract_queue_arn"
SQS_STAR_ALIGNMENT_QUEUE_ARN = "/data_portal/backend/sqs_star_alignment_queue_arn"

# SSM parameter names for external submission lambdas
STAR_ALIGNMENT_LAMBDA_ARN = "/nextflow_stack/star-align-nf/submission_lambda_arn"
51 changes: 35 additions & 16 deletions data_processors/pipeline/domain/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class WorkflowType(Enum):
DRAGEN_WTS = "wts_tumor_only"
UMCCRISE = "umccrise"
RNASUM = "rnasum"
STAR_ALIGNMENT = "star_alignment"

@classmethod
def from_value(cls, value):
Expand All @@ -58,6 +59,8 @@ def from_value(cls, value):
return cls.UMCCRISE
elif value == cls.RNASUM.value:
return cls.RNASUM
elif value == cls.STAR_ALIGNMENT.value:
return cls.STAR_ALIGNMENT
else:
raise ValueError(f"No matching type found for {value}")

Expand All @@ -77,14 +80,23 @@ class WorkflowRunEventType(Enum):


class WorkflowHelper(ABC):
prefix = "umccr__automated"
workdir_root = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/workdir_root")
output_root = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/output_root")

def __init__(self, type_: WorkflowType):
self.type = type_
self.date_time = datetime.utcnow()
self.portal_run_id = f"{self.date_time.strftime('%Y%m%d')}{str(uuid4())[:8]}"

def get_portal_run_id(self) -> str:
return self.portal_run_id


class IcaWorkflowHelper(WorkflowHelper):
prefix = "umccr__automated"

def __init__(self, type_: WorkflowType):
super().__init__(type_)
self.workdir_root = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/workdir_root")
self.output_root = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/output_root")
self.workflow_id = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/{self.type.value}/id")
self.workflow_version = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/{self.type.value}/version")
input_template = libssm.get_ssm_param(f"{ICA_WORKFLOW_PREFIX}/{self.type.value}/input")
Expand All @@ -102,13 +114,11 @@ def _sanitize_target_id(target_id: str):
raise ValueError("target_id must not be none")
return target_id

@staticmethod
def get_workdir_root():
return WorkflowHelper.workdir_root
def get_workdir_root(self):
return self.workdir_root

@staticmethod
def get_output_root():
return WorkflowHelper.output_root
def get_output_root(self):
return self.output_root

def get_workflow_id(self) -> str:
return self.workflow_id
Expand All @@ -119,9 +129,6 @@ def get_workflow_version(self) -> str:
def get_workflow_input(self) -> dict:
return self.workflow_input

def get_portal_run_id(self) -> str:
return self.portal_run_id

def construct_workdir(self, target_id, secondary_target_id: str = None):
"""
Construct a work directory given target ID and a timestamp
Expand Down Expand Up @@ -158,7 +165,7 @@ def construct_workflow_name(self, **kwargs):
raise NotImplementedError


class PrimaryDataHelper(WorkflowHelper):
class PrimaryDataHelper(IcaWorkflowHelper):

def __init__(self, type_: WorkflowType):
if type_ != WorkflowType.BCL_CONVERT:
Expand All @@ -183,10 +190,10 @@ def get_engine_parameters(self, target_id: str) -> dict:

def construct_workflow_name(self, seq_name: str):
# pattern: [AUTOMATION_PREFIX]__[WORKFLOW_TYPE]__[WORKFLOW_SPECIFIC_PART]__[PORTAL_RUN_ID]
return f"{WorkflowHelper.prefix}__{self.type.value}__{seq_name}__{self.portal_run_id}"
return f"{IcaWorkflowHelper.prefix}__{self.type.value}__{seq_name}__{self.portal_run_id}"


class SecondaryAnalysisHelper(WorkflowHelper):
class SecondaryAnalysisHelper(IcaWorkflowHelper):

def __init__(self, type_: WorkflowType):
allowed_workflow_types = [
Expand Down Expand Up @@ -231,7 +238,19 @@ def get_engine_parameters(self, target_id: str, secondary_target_id=None) -> dic

def construct_workflow_name(self, subject_id: str, sample_name: str):
# pattern: [AUTOMATION_PREFIX]__[WORKFLOW_TYPE]__[WORKFLOW_SPECIFIC_PART]__[PORTAL_RUN_ID]
return f"{WorkflowHelper.prefix}__{self.type.value}__{subject_id}__{sample_name}__{self.portal_run_id}"
return f"{IcaWorkflowHelper.prefix}__{self.type.value}__{subject_id}__{sample_name}__{self.portal_run_id}"


class ExternalWorkflowHelper(WorkflowHelper):

def __init__(self, type_: WorkflowType):
allowed_workflow_types = [
WorkflowType.STAR_ALIGNMENT
]
if type_ not in allowed_workflow_types:
raise ValueError(f"Unsupported WorkflowType for external analysis: {type_}")
super().__init__(type_)



class SequenceRuleError(ValueError):
Expand Down
2 changes: 1 addition & 1 deletion data_processors/pipeline/lambdas/bcl_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import List

import pandas as pd
from data_portal.models.workflow import Workflow
from data_portal.models.workflow import Workflow
from data_portal.models.labmetadata import LabMetadata
from data_processors.pipeline.services import notification_srv, sequencerun_srv, workflow_srv, metadata_srv, \
libraryrun_srv
Expand Down
12 changes: 10 additions & 2 deletions data_processors/pipeline/lambdas/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from data_processors.pipeline.domain.config import ICA_WORKFLOW_PREFIX
from data_processors.pipeline.services import workflow_srv
from data_processors.pipeline.orchestration import dragen_wgs_qc_step, tumor_normal_step, google_lims_update_step, \
dragen_tso_ctdna_step, fastq_update_step, dragen_wts_step, umccrise_step, rnasum_step, somalier_extract_step
dragen_tso_ctdna_step, fastq_update_step, dragen_wts_step, umccrise_step, rnasum_step, somalier_extract_step, \
star_alignment_step
from data_processors.pipeline.domain.workflow import WorkflowType, WorkflowStatus, WorkflowRule
from data_processors.pipeline.lambdas import workflow_update
from libumccr import libjson
Expand Down Expand Up @@ -59,7 +60,8 @@ def handler(event, context):
"TUMOR_NORMAL_STEP",
"UMCCRISE_STEP",
"RNASUM_STEP",
"SOMALIER_EXTRACT_STEP"
"SOMALIER_EXTRACT_STEP",
"STAR_ALIGNMENT_STEP"
],
'by_run': {
'220524_A01010_0998_ABCF2HDSYX': [
Expand Down Expand Up @@ -259,6 +261,12 @@ def next_step(this_workflow: Workflow, skip: dict, context=None):
logger.info("Performing DRAGEN_WTS_STEP")
results.append(dragen_wts_step.perform(this_workflow))

if "STAR_ALIGNMENT_STEP" in skiplist:
logger.info("Skip performing STAR_ALIGNMENT_STEP")
else:
logger.info("Performing STAR_ALIGNMENT_STEP")
results.append(star_alignment_step.perform(this_workflow))

return results

elif this_workflow.type_name.lower() == WorkflowType.DRAGEN_TSO_CTDNA.value.lower() and \
Expand Down
144 changes: 144 additions & 0 deletions data_processors/pipeline/lambdas/star_alignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
try:
import unzip_requirements
except ImportError:
pass

import os
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'data_portal.settings.base')
django.setup()

# ---
import logging
import json
from datetime import datetime
from libumccr import libjson, libdt, aws
from libumccr.aws import libssm
from libumccr.aws.liblambda import LambdaInvocationType

from data_portal.models import Workflow
from data_processors.pipeline.domain.config import STAR_ALIGNMENT_LAMBDA_ARN
from data_processors.pipeline.domain.workflow import ExternalWorkflowHelper, WorkflowType
from data_processors.pipeline.services import workflow_srv, libraryrun_srv

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def sqs_handler(event, context):
"""
Unpack body from SQS wrapper.
SQS event payload dict:
{
'Records': [
{
'messageId': "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
'body': "{\"JSON\": \"Formatted Message\"}",
'messageAttributes': {},
'md5OfBody': "",
'eventSource': "aws:sqs",
'eventSourceARN': "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
},
...
]
}

Details event payload dict refer to https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Backing queue is FIFO queue and, guaranteed delivery-once, no duplication.

:param event:
:param context:
:return:
"""
messages = event['Records']

results = []
for message in messages:
job = libjson.loads(message['body'])
results.append(handler(job, context))

return {
'results': results
}


def handler(event, context) -> dict:
"""
star alignment event payload dict
{
"subject_id": subject_id,
"sample_id": fastq_list_row.rgsm,
"library_id": library_id,
"fastq_fwd": fastq_list_row.read_1,
"fastq_rev": fastq_list_row.read_2,
}
"""
logger.info(f"Start processing {WorkflowType.STAR_ALIGNMENT.value} event")
logger.info(libjson.dumps(event))

# check expected information is present
library_id = event['library_id']
sample_id = event['sample_id']
subject_id = event['subject_id']
fastq_fwd = event['fastq_fwd']
fastq_rev = event['fastq_rev']
assert library_id is not None
assert sample_id is not None
assert fastq_fwd is not None
assert fastq_rev is not None

# see star alignment payload for preparing job JSON structure
# https://github.com/umccr/nextflow-stack/pull/29
helper = ExternalWorkflowHelper(WorkflowType.STAR_ALIGNMENT)
portal_run_id = helper.get_portal_run_id()
job = {
"portal_run_id": portal_run_id,
"subject_id": subject_id,
"sample_id": sample_id,
"library_id": library_id,
"fastq_fwd": fastq_fwd,
"fastq_rev": fastq_rev,
}

# register workflow in workflow table
workflow: Workflow = workflow_srv.create_or_update_workflow(
{
'portal_run_id': portal_run_id,
'wfr_name': f"star_alignment_{portal_run_id}",
'type': WorkflowType.STAR_ALIGNMENT,
'input': job,
'end_status': "CREATED",
}
)

# establish link between Workflow and LibraryRun
_ = libraryrun_srv.link_library_runs_with_x_seq_workflow([job['library_id']], workflow)

# submit job: call star alignment lambda
# NOTE: lambda_client and SSM parameter "should" be loaded statically on class initialisation instead of here
# (i.e. once instead of every invocation). However, that will prevent mockito from intercepting and complicate
# testing. We compromise the little execution overhead for ease of testing.
lambda_client = aws.lambda_client()
submission_lambda = libssm.get_ssm_param(STAR_ALIGNMENT_LAMBDA_ARN)
logger.info(f"Using star alignment lambda: {submission_lambda}")
lambda_response = lambda_client.invoke(
FunctionName=submission_lambda,
InvocationType=LambdaInvocationType.EVENT.value,
Payload=json.dumps(job),
)
logger.info(f"Submission lambda response: {lambda_response}")

result = {
'subject_id': subject_id,
'library_id': library_id,
'id': workflow.id,
'wfr_id': workflow.wfr_id,
'wfr_name': workflow.wfr_name,
'status': workflow.end_status,
'start': libdt.serializable_datetime(workflow.start),
}

logger.info(libjson.dumps(result))

return result
66 changes: 66 additions & 0 deletions data_processors/pipeline/lambdas/tests/test_star_alignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json

import boto3
from libumccr import aws
from libumccr.aws import libssm
from mockito import when, spy2, mock

from data_portal.models.workflow import Workflow
from data_portal.models.libraryrun import LibraryRun
from data_portal.tests.factories import TestConstant, LibraryRunFactory, WtsTumorLabMetadataFactory, \
WtsTumorLibraryRunFactory
from data_processors.pipeline.domain.config import STAR_ALIGNMENT_LAMBDA_ARN, ICA_WORKFLOW_PREFIX
from data_processors.pipeline.lambdas import star_alignment
from data_processors.pipeline.services import workflow_srv
from data_processors.pipeline.tests.case import logger, PipelineUnitTestCase


class StarAlignmentUnitTests(PipelineUnitTestCase):

def test_handler(self):
"""
python manage.py test data_processors.pipeline.lambdas.tests.test_star_alignment.StarAlignmentUnitTests.test_handler
"""
"""
Expected payload:
{
"subject_id": subject_id,
"sample_id": rgsm,
"library_id": library_id,
"fastq_fwd": fastq_read_1,
"fastq_rev": fastq_read_2,
}
"""

payload = {
"subject_id": "subject_id",
"sample_id": "rgsm",
"library_id": TestConstant.wts_library_id_tumor.value,
"fastq_fwd": "fastq_read_1",
"fastq_rev": "fastq_read_2",
}

_ = WtsTumorLabMetadataFactory()
_ = WtsTumorLibraryRunFactory()

spy2(libssm.get_ssm_param)
when(libssm).get_ssm_param(STAR_ALIGNMENT_LAMBDA_ARN).thenReturn('FOO')
mock_client = mock(aws.lambda_client())
mock_client.invoke = mock()
when(aws).lambda_client(...).thenReturn(mock_client)

result: dict = star_alignment.handler(payload, None)

logger.info("-" * 32)
logger.info("Example star_alignment.handler lambda output:")
logger.info(json.dumps(result))

# assert star_alignment workflow launch success and save workflow run in db
qs = Workflow.objects.all()
self.assertEqual(1, qs.count())

# assert that we can query related LibraryRun from Workflow side
wfl = qs.get()
all_lib_runs = workflow_srv.get_all_library_runs_by_workflow(wfl)
self.assertEqual(1, len(all_lib_runs))
self.assertEqual(all_lib_runs[0].library_id, TestConstant.wts_library_id_tumor.value)
Loading