Skip to content

Commit

Permalink
Feature/sprint 14 somascan dataset join (#550)
Browse files Browse the repository at this point in the history
* Explicit support for SomaScan joins on genetic data. First combines
the adat's sample-level data and protein-level data, into one dataframe
that has one SampleId and Target per row.


Previously SomaScan data was supported, by joins/filtering would need to
be handled by the user through lower level functions (e.g. raw data
frames). Now we revise `join_annotation_result_to_fragpipe_dataset` to
`join_annotation_result_to_proteomic_dataset` that can take either a TMT
dataset, a SomaScan dataset, a somadata.Adat, or raw dataframe.

Follows the melting approach listed here (Thanks SomaScan team!):
SomaLogic/Canopy#8

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced support for joining annotation results to various proteomic
datasets including Somascan datasets.
- Added a new method to handle data manipulation for Somascan datasets.

- **Bug Fixes**
- Updated column references and logic handling for different types of
proteomic datasets.

- **Tests**
- Added new tests for validating the functionality of joining annotation
results with Somascan datasets.
- Updated existing tests to accommodate changes in dataset processing
functions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
akotlar committed Jul 18, 2024
1 parent 8f1df25 commit 97a22af
Show file tree
Hide file tree
Showing 8 changed files with 915 additions and 193 deletions.
893 changes: 730 additions & 163 deletions python/python/bystro/examples/ProteomicsProxiedQuery.ipynb

Large diffs are not rendered by default.

79 changes: 67 additions & 12 deletions python/python/bystro/proteomics/annotation_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import asyncio

from bystro.proteomics.somascan import SomascanDataset, ADAT_GENE_NAME_COLUMN, ADAT_SAMPLE_ID_COLUMN
from msgspec import Struct
import nest_asyncio # type: ignore
import numpy as np

import pandas as pd
from opensearchpy import AsyncOpenSearch
import somadata # type: ignore

from bystro.api.auth import CachedAuth
from bystro.api.search import get_async_proxied_opensearch_client
Expand Down Expand Up @@ -1315,48 +1317,101 @@ def explode_rows_with_list(df, column):
return pd.DataFrame(rows)


def join_annotation_result_to_fragpipe_dataset(
def join_annotation_result_to_proteomic_dataset(
query_result_df: pd.DataFrame,
tmt_dataset: TandemMassTagDataset,
proteomic_dataset: pd.DataFrame | TandemMassTagDataset | SomascanDataset | somadata.Adat,
get_tracking_id_from_genomic_sample_id: Callable[[str], str] = (lambda x: x),
get_tracking_id_from_proteomic_sample_id: Callable[[str], str] = (lambda x: x),
genetic_join_column: str = DEFAULT_GENE_NAME_COLUMN,
fragpipe_join_column: str = FRAGPIPE_GENE_GENE_NAME_COLUMN_RENAMED,
genetic_join_column: str | None = None,
proteomic_join_column: str | None = None,
proteomic_sample_id_column: str | None = None,
) -> pd.DataFrame:
"""
Join annotation result to FragPipe dataset.
Args:
query_result_df (pd.DataFrame):
DataFrame containing result from get_annotation_result_from_query.
tmt_dataset (TandemMassTagDataset): The TandemMassTagDataset instance.
proteomic_dataset (pd.DataFrame | TandemMassTagDataset | SomascanDataset | somadata.Adat):
A dataframe representing the proteomic dataset, or else
a TandemMassTagDataset, SomascanDataset, or somadata.Adat.
get_tracking_id_from_genomic_sample_id (Callable[[str], str]):
Callable mapping genomic sample IDs to tracking IDs, defaults to identity function.
get_tracking_id_from_proteomic_sample_id (Callable[[str], str]):
Callable mapping proteomic sample IDs to tracking IDs, defaults to identity function.
genetic_join_column (str, optional):
The column to join on in the genetic dataset, defaults to "refSeq.name2".
fragpipe_join_column (str, optional)
The column to join on in the FragPipe dataset, defaults to "gene_name".
The column to join on in the genetic dataset.
Must be provided if proteomic_dataset is a DataFrame, otherwise defaults to "refSeq.name2".
proteomic_join_column (str, optional)
The column to join on in the FragPipe dataset.
Must be provided if proteomic_dataset is a DataFrame,
otherwise defaults to "gene_name" for TandemMassTagDataset, and
"Target" for SomascanDataset and somadata.Adat.
proteomic_sample_id_column (str, optional):
The column name for the sample ID in the proteomic dataset.
Must be provided if proteomic_dataset is a DataFrame,
otherwise defaults to 'sample' for TandemMassTagDataset,
and 'SampleId' for SomascanDataset and somadata.Adat.
Returns:
pd.DataFrame: The joined DataFrame.
"""
query_result_df = query_result_df.copy()
proteomics_df = tmt_dataset.get_melted_abundance_df()

if isinstance(proteomic_dataset, TandemMassTagDataset):
proteomics_df = proteomic_dataset.get_melted_abundance_df()

if proteomic_join_column is None:
proteomic_join_column = FRAGPIPE_GENE_GENE_NAME_COLUMN_RENAMED
if genetic_join_column is None:
genetic_join_column = DEFAULT_GENE_NAME_COLUMN
if proteomic_sample_id_column is None:
proteomic_sample_id_column = FRAGPIPE_SAMPLE_COLUMN
elif isinstance(proteomic_dataset, (SomascanDataset, somadata.Adat)):
if isinstance(proteomic_dataset, somadata.Adat):
proteomic_dataset = SomascanDataset(proteomic_dataset)

proteomics_df = proteomic_dataset.to_melted_frame()

if proteomic_join_column is None:
proteomic_join_column = ADAT_GENE_NAME_COLUMN
if genetic_join_column is None:
genetic_join_column = DEFAULT_GENE_NAME_COLUMN
if proteomic_sample_id_column is None:
proteomic_sample_id_column = ADAT_SAMPLE_ID_COLUMN
elif isinstance(proteomic_dataset, pd.DataFrame):
proteomics_df = proteomic_dataset

if proteomic_join_column is None:
raise ValueError(
"proteomic_join_column must be provided if proteomic_dataset is a DataFrame"
)
if genetic_join_column is None:
raise ValueError("genetic_join_column must be provided if proteomic_dataset is a DataFrame")
if proteomic_sample_id_column is None:
raise ValueError(
"proteomic_sample_id_column must be provided if proteomic_dataset is a DataFrame"
)

query_result_df[SAMPLE_GENERATED_COLUMN] = query_result_df[SAMPLE_GENERATED_COLUMN].apply(
get_tracking_id_from_genomic_sample_id
)

proteomics_df[FRAGPIPE_SAMPLE_COLUMN] = proteomics_df[FRAGPIPE_SAMPLE_COLUMN].apply(
proteomics_df[proteomic_sample_id_column] = proteomics_df[proteomic_sample_id_column].apply(
get_tracking_id_from_proteomic_sample_id
)

columns_to_drop = []
if proteomic_sample_id_column != SAMPLE_GENERATED_COLUMN:
columns_to_drop.append(proteomic_sample_id_column)

if proteomic_join_column != genetic_join_column:
columns_to_drop.append(proteomic_join_column)

joined_df = query_result_df.merge(
proteomics_df,
left_on=[SAMPLE_GENERATED_COLUMN, genetic_join_column],
right_on=[FRAGPIPE_SAMPLE_COLUMN, fragpipe_join_column],
).drop(columns=[fragpipe_join_column])
right_on=[proteomic_sample_id_column, proteomic_join_column],
).drop(columns=columns_to_drop)

return joined_df
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from bystro.beanstalkd.messages import BaseMessage, CompletedJobMessage, SubmittedJobMessage
from bystro.proteomics.annotation_interface import (
get_annotation_result_from_query,
join_annotation_result_to_fragpipe_dataset,
join_annotation_result_to_proteomic_dataset,
)

logger = logging.getLogger(__file__)
Expand Down Expand Up @@ -82,7 +82,7 @@ def handler_fn(_publisher: ProgressPublisher, job_data: ProteomicsJobData) -> st
cluster_opensearch_config=search_conf,
)

joined_df = join_annotation_result_to_fragpipe_dataset(annotation_df, gene_abundance_df)
joined_df = join_annotation_result_to_proteomic_dataset(annotation_df, gene_abundance_df)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

result_path = Path(job_data.out_dir) / f"joined.{timestamp}.feather"
Expand Down
11 changes: 10 additions & 1 deletion python/python/bystro/proteomics/somascan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

from msgspec import Struct

ADAT_GENE_NAME_COLUMN = "Target"
ADAT_PROTEIN_NAME_COLUMN = "UniProt"
ADAT_SAMPLE_ID_COLUMN = "SampleId"
ADAT_RFU_COLUMN = "RFU"

class SomascanDataset(Struct):
"""Represent a Fragpipe Tandem Mass Tag dataset."""
"""Represent a SomaScan Aptamer dataset."""

adat: somadata.Adat
annotations: somadata.Annotations | None = None
Expand Down Expand Up @@ -40,3 +44,8 @@ def __post_init__(self) -> None:
f", not {type(self.annotations)}"
)
)

def to_melted_frame(self):
return self.adat.stack( # noqa: PD013
level=list(range(self.adat.columns.nlevels)), future_stack=True
).reset_index(name=ADAT_RFU_COLUMN)
90 changes: 77 additions & 13 deletions python/python/bystro/proteomics/tests/test_annotation_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import pytest

from bystro.proteomics.annotation_interface import (
DOSAGE_GENERATED_COLUMN,
process_query_response,
join_annotation_result_to_fragpipe_dataset,
join_annotation_result_to_proteomic_dataset,
async_get_annotation_result_from_query,
SAMPLE_GENERATED_COLUMN,
ALWAYS_INCLUDED_FIELDS,
Expand All @@ -20,9 +21,11 @@
FRAGPIPE_RENAMED_COLUMNS,
FRAGPIPE_SAMPLE_COLUMN,
FRAGPIPE_GENE_GENE_NAME_COLUMN_RENAMED,
FRAGPIPE_SAMPLE_INTENSITY_COLUMN
FRAGPIPE_SAMPLE_INTENSITY_COLUMN,
)

from bystro.proteomics.somascan import SomascanDataset, ADAT_SAMPLE_ID_COLUMN

TEST_LEGACY_RESPONSE_PATH = Path(__file__).parent / "test_legacy_response.dat"

with TEST_LEGACY_RESPONSE_PATH.open("rb") as f:
Expand Down Expand Up @@ -167,8 +170,8 @@ async def test_get_annotation_results_from_query_with_samples(mocker):
assert (397, 12) == samples_and_genes_df.shape

assert list(samples_and_genes_df.columns) == ALWAYS_INCLUDED_FIELDS + [LINK_GENERATED_COLUMN] + [
"sample",
"dosage",
SAMPLE_GENERATED_COLUMN,
DOSAGE_GENERATED_COLUMN,
] + ["gnomad.exomes.AF", "refSeq.name2"]


Expand Down Expand Up @@ -273,8 +276,8 @@ def test_process_response():
melted = process_query_response(copy.deepcopy(all_hits), melt_samples=True)
assert (397, 12) == melted.shape
assert list(melted.columns) == ALWAYS_INCLUDED_FIELDS + [LINK_GENERATED_COLUMN] + [
"sample",
"dosage",
SAMPLE_GENERATED_COLUMN,
DOSAGE_GENERATED_COLUMN,
] + ["gnomad.exomes.AF", "refSeq.name2"]

assert {"1805", "1847", "4805"} == set(melted[SAMPLE_GENERATED_COLUMN].unique())
Expand All @@ -293,21 +296,36 @@ def test_process_response():
heterozygotes = [heterozygotes]

for sample in heterozygotes:
assert 1 == melted_rows[melted_rows["sample"] == str(sample)]["dosage"].to_numpy()[0]
assert (
1
== melted_rows[melted_rows[SAMPLE_GENERATED_COLUMN] == str(sample)][
DOSAGE_GENERATED_COLUMN
].to_numpy()[0]
)

if homozygotes is not None:
if not isinstance(homozygotes, list):
homozygotes = [homozygotes]

for sample in homozygotes:
assert 2 == melted_rows[melted_rows["sample"] == str(sample)]["dosage"].to_numpy()[0]
assert (
2
== melted_rows[melted_rows[SAMPLE_GENERATED_COLUMN] == str(sample)][
DOSAGE_GENERATED_COLUMN
].to_numpy()[0]
)

if missing is not None:
if not isinstance(missing, list):
missing = [missing]

for sample in missing:
assert -1 == melted_rows[melted_rows["sample"] == str(sample)]["dosage"].to_numpy()[0]
assert (
-1
== melted_rows[melted_rows[SAMPLE_GENERATED_COLUMN] == str(sample)][
DOSAGE_GENERATED_COLUMN
].to_numpy()[0]
)


@pytest.mark.asyncio
Expand Down Expand Up @@ -336,7 +354,7 @@ async def test_join_annotation_result_to_fragpipe_dataset(mocker):

assert (582, 12) == query_result_df.shape

sample_ids = query_result_df["sample"].unique()
sample_ids = query_result_df[SAMPLE_GENERATED_COLUMN].unique()

abundance_file = str(Path(__file__).parent / "example_abundance_gene_MD.tsv")
experiment_file = str(Path(__file__).parent / "example_experiment_annotation_file.tsv")
Expand All @@ -346,10 +364,12 @@ async def test_join_annotation_result_to_fragpipe_dataset(mocker):

# replace the sample ids with the sample names
replacements = {sample_id: sample_name for sample_id, sample_name in zip(sample_ids, sample_names)}
query_result_df["sample"] = query_result_df["sample"].replace(replacements)
query_result_df[SAMPLE_GENERATED_COLUMN] = query_result_df[SAMPLE_GENERATED_COLUMN].replace(
replacements
)

joined_df = join_annotation_result_to_fragpipe_dataset(
query_result_df, tmt_dataset, fragpipe_join_column=FRAGPIPE_GENE_GENE_NAME_COLUMN_RENAMED
joined_df = join_annotation_result_to_proteomic_dataset(
query_result_df, tmt_dataset, proteomic_join_column=FRAGPIPE_GENE_GENE_NAME_COLUMN_RENAMED
)

assert (90, 17) == joined_df.shape
Expand All @@ -362,3 +382,47 @@ async def test_join_annotation_result_to_fragpipe_dataset(mocker):

retained_fragpipe_columns.append(FRAGPIPE_SAMPLE_INTENSITY_COLUMN)
assert list(joined_df.columns) == list(query_result_df.columns) + retained_fragpipe_columns


@pytest.mark.asyncio
async def test_join_annotation_result_to_somascan_dataset(mocker):
mocker.patch(
"bystro.proteomics.annotation_interface.AsyncOpenSearch",
return_value=MockAsyncOpenSearch(TEST_RESPONSES_WITH_SAMPLES),
)

query_string = "exonic (gnomad.genomes.af:<0.1 || gnomad.exomes.af:<0.1)"
index_name = "foo"

query_result_df = await async_get_annotation_result_from_query(
query_string,
index_name,
cluster_opensearch_config={
"connection": {
"nodes": ["http://localhost:9200"],
"request_timeout": 1200,
"use_ssl": False,
"verify_certs": False,
},
},
explode_field="refSeq.name2",
)

assert (582, 12) == query_result_df.shape

sample_ids = query_result_df[SAMPLE_GENERATED_COLUMN].unique()

adat_file = str(Path(__file__).parent / "example_data_v4.1_plasma.adat")
somascan_dataset = SomascanDataset.from_paths(adat_file)

sample_names = list(somascan_dataset.adat.index.to_frame()[ADAT_SAMPLE_ID_COLUMN].values)[
0 : sample_ids.shape[0]
]
replacements = {sample_id: sample_name for sample_id, sample_name in zip(sample_ids, sample_names)}
query_result_df[SAMPLE_GENERATED_COLUMN] = query_result_df[SAMPLE_GENERATED_COLUMN].replace(
replacements
)

joined_df_soma = join_annotation_result_to_proteomic_dataset(query_result_df, somascan_dataset)

assert (131, 71) == joined_df_soma.shape
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_handler_fn_happy_path(tmpdir, mocker):
return_value=pd.DataFrame(),
)
mocker.patch(
"bystro.proteomics.listener_annotation_interface.join_annotation_result_to_fragpipe_dataset",
"bystro.proteomics.listener_annotation_interface.join_annotation_result_to_proteomic_dataset",
return_value=pd.DataFrame(),
)
mocker.patch("bystro.proteomics.annotation_interface.AsyncOpenSearch", return_value=mocker.Mock())
Expand Down
29 changes: 28 additions & 1 deletion python/python/bystro/proteomics/tests/test_somascan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import pytest

from bystro.proteomics.somascan import SomascanDataset
from bystro.proteomics.somascan import (
ADAT_SAMPLE_ID_COLUMN,
ADAT_GENE_NAME_COLUMN,
ADAT_RFU_COLUMN,
SomascanDataset,
)

adat_path = Path(__file__).parent.parent / "example_data" / "example_data.adat"

Expand Down Expand Up @@ -30,3 +35,25 @@ def test_passed_bad_arguments():

with pytest.raises(FileNotFoundError):
SomascanDataset.from_paths("foo", "bar")


def test_can_melt_frame():
dataset = SomascanDataset.from_paths(str(adat_path))

unique_sample_ids = dataset.adat.index.get_level_values(ADAT_SAMPLE_ID_COLUMN).unique()
unique_targets = dataset.adat.columns.get_level_values(ADAT_GENE_NAME_COLUMN).unique()

melted = dataset.to_melted_frame()
assert melted.shape == (1014528, 55)

assert melted[ADAT_RFU_COLUMN].dtype == float
assert melted[ADAT_SAMPLE_ID_COLUMN].dtype == "string[pyarrow_numpy]"
assert melted[ADAT_GENE_NAME_COLUMN].dtype == "string[pyarrow_numpy]"
assert set(melted[ADAT_SAMPLE_ID_COLUMN].values) == set(unique_sample_ids)
assert set(melted[ADAT_GENE_NAME_COLUMN].values) == set(unique_targets)

columns = melted.columns.tolist()

assert ADAT_SAMPLE_ID_COLUMN in columns
assert ADAT_GENE_NAME_COLUMN in columns
assert ADAT_RFU_COLUMN in columns

0 comments on commit 97a22af

Please sign in to comment.