diff --git a/pgscatalog_utils/ancestry/read.py b/pgscatalog_utils/ancestry/read.py index 8763e07..ecbaf15 100644 --- a/pgscatalog_utils/ancestry/read.py +++ b/pgscatalog_utils/ancestry/read.py @@ -1,12 +1,10 @@ import logging import pandas as pd -import numpy as np -import os logger = logging.getLogger(__name__) -def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): +def read_pcs(loc_pcs: list[str], dataset: str, loc_related_ids=None, nPCs=None): """ Read the .pc file outputs of the fraposa_pgsc projection :param loc_pcs: list of locations for .pcs files @@ -18,20 +16,20 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): for i, path in enumerate(loc_pcs): logger.debug("Reading PCA projection: {}".format(path)) - df = pd.read_csv(path, sep='\t', converters={"IID": str}, header=0) - df['sampleset'] = dataset - df.set_index(['sampleset', 'IID'], inplace=True) + df = pd.read_csv(path, sep="\t", converters={"IID": str}, header=0) + df["sampleset"] = dataset + df.set_index(["sampleset", "IID"], inplace=True) if i == 0: - logger.debug('Initialising combined DF') + logger.debug("Initialising combined DF") proj = df.copy() else: - logger.debug('Appending to combined DF') + logger.debug("Appending to combined DF") proj = pd.concat([proj, df]) # Drop PCs if nPCs: - logger.debug('Filtering to relevant PCs') + logger.debug("Filtering to relevant PCs") dropcols = [] for x in proj.columns: if int(x[2:]) > nPCs: @@ -41,32 +39,36 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): # Read/process IDs for unrelated samples (usually reference dataset) if loc_related_ids: logger.debug("Flagging related samples with: {}".format(loc_related_ids)) - proj['Unrelated'] = True - with open(loc_related_ids, 'r') as infile: + proj["Unrelated"] = True + with open(loc_related_ids, "r") as infile: IDs_related = [x.strip() for x in infile.readlines()] - proj.loc[proj.index.get_level_values(level=1).isin(IDs_related), 'Unrelated'] = False + proj.loc[ + proj.index.get_level_values(level=1).isin(IDs_related), "Unrelated" + ] = False else: # if unrelated is all nan -> dtype is float64 # if unrelated is only true / false -> dtype is bool # if unrelated contains None, dtype stays bool, and pd.concat warning disappears - proj['Unrelated'] = None + proj["Unrelated"] = None return proj -def extract_ref_psam_cols(loc_psam, dataset: str, df_target, keepcols=['SuperPop', 'Population']): - psam = pd.read_csv(loc_psam, sep='\t', header=0) +def extract_ref_psam_cols( + loc_psam, dataset: str, df_target, keepcols=["SuperPop", "Population"] +): + psam = pd.read_csv(loc_psam, sep="\t", header=0) - match (psam.columns[0]): + match psam.columns[0]: # handle case of #IID -> IID (happens when #FID is present) - case '#IID': - psam.rename({'#IID': 'IID'}, axis=1, inplace=True) - case '#FID': - psam.drop(['#FID'], axis=1, inplace=True) + case "#IID": + psam.rename({"#IID": "IID"}, axis=1, inplace=True) + case "#FID": + psam.drop(["#FID"], axis=1, inplace=True) case _: assert False, "Invalid columns" - psam['sampleset'] = dataset - psam.set_index(['sampleset', 'IID'], inplace=True) + psam["sampleset"] = dataset + psam.set_index(["sampleset", "IID"], inplace=True) return pd.merge(df_target, psam[keepcols], left_index=True, right_index=True) @@ -78,10 +80,19 @@ def read_pgs(loc_aggscore, onlySUM: bool): :param onlySUM: whether to return only _SUM columns (e.g. not _AVG) :return: """ - logger.debug('Reading aggregated score data: {}'.format(loc_aggscore)) - df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID'], converters={"IID": str}, header=0) + logger.debug("Reading aggregated score data: {}".format(loc_aggscore)) + df = pd.read_csv( + loc_aggscore, + sep="\t", + index_col=["sampleset", "IID"], + converters={"IID": str}, + header=0, + ).pivot(columns=["PGS"], values=["SUM", "AVG"]) + # join column levels ({PGS}_{VALUE}) + df.columns = [f"{j}_{i}" for i, j in df.columns] + if onlySUM: - df = df[[x for x in df.columns if x.endswith('_SUM')]] - rn = [x.rstrip('_SUM') for x in df.columns] + df = df[[x for x in df.columns if x.endswith("_SUM")]] + rn = [x.rstrip("_SUM") for x in df.columns] df.columns = rn - return df \ No newline at end of file + return df