Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory and time needed for sample_concordance_plink. #362

Merged
merged 11 commits into from
Dec 11, 2024
61 changes: 36 additions & 25 deletions src/cgr_gwas_qc/parsers/plink.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np
import pandas as pd

from cgr_gwas_qc.typing import PathLike
Expand Down Expand Up @@ -73,59 +74,69 @@ def read_hwe(filename: PathLike) -> pd.DataFrame:
)


def read_genome(filename: PathLike) -> pd.DataFrame:
def read_genome(filename: PathLike, required_cols=None, chunksize=100000) -> pd.DataFrame:
"""Parse PLINK's genome file format.

Each row of the genome file is a pairwise combinations of
samples/subjects. I am unsure of how plink assigns IID order. Here I sort
IDs alphanumerically which will make searching for pairs easier because
you can assume the order.

>>> ID1, ID2 = sort([IID1, IID2])
samples/subjects.

Returns:
pd.DataFrame:
A (n x 5) table with the following columns
A (n x 17) table with the following columns

.. csv-table::
:header: name, dtype, description

ID1, object, First Sample or Subject ID (alphanumerically)
ID2, object, Second Sample or Subject ID (alphanumerically)
jaamarks marked this conversation as resolved.
Show resolved Hide resolved
RT, object, Relationship type inferred from .fam/.ped file {FS: Full Sib, HS, Half Sib, PO: Parent-Offspring, OT; Other}
ID1, string, First Sample or Subject ID (alphanumerically)
ID2, string, Second Sample or Subject ID (alphanumerically)
RT, category, Relationship type inferred from .fam/.ped file {FS: Full Sib, HS, Half Sib, PO: Parent-Offspring, OT; Other}
EZ, object, IBD sharing expected value, based on just .fam/.ped relationship
Z0, float, P(IBD=0)
Z1, float, P(IBD=1)
Z2, float, P(IBD=2)
PI_HAT, float, Proportion IBD, i.e. P(IBD=2) + 0.5*P(IBD=1)
PHE, int, Pairwise phenotypic code (1, 0, -1 = case-case, case-ctrl, and ctrl-ctrl pairs, respectively)
PHE, category, Pairwise phenotypic code (1, 0, -1 = case-case, case-ctrl, and ctrl-ctrl pairs, respectively)
DST, float, IBS distance, i.e. (IBS2 + 0.5*IBS1) / (IBS0 + IBS1 + IBS2)
PPC, float, IBS binomial test
RATIO, float, HETHET: IBS0 SNP ratio (expected value 2)
IBS0, int, Number of IBS 0 nonmissing variants
IBS1, int, Number of IBS 1 nonmissing variants
IBS2, int, Number of IBS 2 nonmissing variants
HOMHOM, float, Number of IBS 0 SNP pairs used in PPC test
HETHET, float, Number of IBS 2 het/het SNP pairs used in PPC test
HOMHOM, int, Number of IBS 0 SNP pairs used in PPC test
HETHET, int, Number of IBS 2 het/het SNP pairs used in PPC test

References:
- https://www.cog-genomics.org/plink/1.9/ibd
- https://www.cog-genomics.org/plink/1.9/formats#genome
"""

def _sort_ids(x: pd.Series):
"""Sort IDs alphanumerically."""
x["ID1"], x["ID2"] = sorted([x.IID1, x.IID2])
return x
DTYPES = {
"IID1": "string",
"IID2": "string",
"RT": "category",
"EZ": "category",
"Z0": np.float32,
"Z1": np.float32,
"Z2": np.float32,
"PI_HAT": np.float32,
"PHE": "category",
"DST": np.float32,
"PPC": np.float32,
"RATIO": np.float32,
"IBS0": np.uint32,
"IBS1": np.uint32,
"IBS2": np.uint32,
"HOMHOM": np.uint32,
"HETHET": np.uint32,
}

if required_cols is None:

return (
pd.read_csv(
filename,
delim_whitespace=True,
dtype={"FID1": "string", "IID1": "string", "FID2": "string", "IID2": "string"},
)
.apply(_sort_ids, axis=1)
.drop(["IID1", "IID2", "FID1", "FID2"], axis=1)
def required_cols(col_name):
return lambda col_name: col_name not in ["FID1", "FID2"]

return pd.read_csv(
filename, delim_whitespace=True, dtype=DTYPES, usecols=required_cols, chunksize=chunksize
)


Expand Down
72 changes: 66 additions & 6 deletions src/cgr_gwas_qc/workflow/scripts/concordance_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

"""

import concurrent.futures
import functools
import subprocess
import tempfile
from pathlib import Path

import numpy as np
import pandas as pd
import typer

Expand All @@ -28,8 +33,8 @@
app = typer.Typer(add_completion=False)

DTYPES = {
"ID1": "string",
"ID2": "string",
"ID1": "category",
"ID2": "category",
jaamarks marked this conversation as resolved.
Show resolved Hide resolved
"PI_HAT": "float",
"concordance": "float",
"is_ge_pi_hat": "boolean",
Expand All @@ -38,8 +43,14 @@


@app.command()
def main(filename: Path, concordance_threshold: float, pi_hat_threshold: float, outfile: Path):
build(filename, concordance_threshold, pi_hat_threshold).to_csv(outfile, index=False)
def main(
filename: Path,
concordance_threshold: float,
pi_hat_threshold: float,
outfile: Path,
threads: int = 1,
):
build(filename, concordance_threshold, pi_hat_threshold, outfile, threads)


def read(filename: PathLike) -> pd.DataFrame:
Expand All @@ -58,16 +69,65 @@ def read(filename: PathLike) -> pd.DataFrame:
return pd.read_csv(filename, dtype=DTYPES)


def build(filename: Path, concordance_threshold: float, pi_hat_threshold: float) -> pd.DataFrame:
def _sort_ids(x: pd.DataFrame):
"""Sort IDs alphanumerically."""
x.IID1, x.IID2 = np.where(x.IID1 < x.IID2, [x.IID1, x.IID2], [x.IID2, x.IID1])
x.rename(columns={"IID1": "ID1", "IID2": "ID2"}, inplace=True)
return x


def prep_concordance_table(pi_hat_threshold: float, concordance_threshold: float, x: pd.DataFrame):
"""Prepares the concordance_table by appending new columns based on thresholds."""
return (
plink.read_genome(filename)
_sort_ids(x)
.assign(is_ge_pi_hat=lambda x: x.PI_HAT >= pi_hat_threshold)
.assign(concordance=lambda x: x.IBS2 / (x.IBS0 + x.IBS1 + x.IBS2))
.assign(is_ge_concordance=lambda x: x.concordance >= concordance_threshold)
.reindex(DTYPES.keys(), axis=1)
)


def process_genome_chunk(pi_hat_threshold: float, concordance_threshold: float, chunk):
"""Processes each chunk of genome file."""
temp_file = tempfile.NamedTemporaryFile(delete=False)
prep_concordance_table(pi_hat_threshold, concordance_threshold, chunk).to_csv(
temp_file.name, index=False, header=False
)
return temp_file.name


def build(
filename: Path,
concordance_threshold: float,
pi_hat_threshold: float,
outfile: Path,
threads: int,
):

plink_genome_file = plink.read_genome(
filename, required_cols=["IID1", "IID2", "PI_HAT", "IBS0", "IBS1", "IBS2"]
)

temp_files = []

header = tempfile.NamedTemporaryFile(delete=False)
temp_files.append(header.name)
pd.DataFrame(columns=DTYPES.keys()).to_csv(header.name, index=False)

with concurrent.futures.ProcessPoolExecutor(max_workers=threads) as executor:
results = executor.map(
functools.partial(process_genome_chunk, pi_hat_threshold, concordance_threshold),
plink_genome_file,
)

temp_files = temp_files + list(results)
with open(outfile, "wb") as outputfile:
subprocess.run(["cat"] + temp_files, stdout=outputfile)

for f in temp_files:
Path(f).unlink()


if __name__ == "__main__":
if "snakemake" in locals():
defaults = {}
Expand Down
5 changes: 3 additions & 2 deletions src/cgr_gwas_qc/workflow/sub_workflows/sample_qc.smk
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,12 @@ rule sample_concordance_plink:
params:
concordance_threshold=cfg.config.software_params.dup_concordance_cutoff,
pi_hat_threshold=cfg.config.software_params.pi_hat_threshold,
threads: 1
output:
"sample_level/concordance/plink.csv",
resources:
mem_mb=lambda wc, attempt, input: max((attempt + 1) * input.size_mb, 1024),
time_hr=lambda wildcards, attempt: BIG_TIME[attempt],
mem_mb=2000,
time_hr=4,
script:
"../scripts/concordance_table.py"

Expand Down
Loading