Skip to content

Commit

Permalink
Merge pull request #56 from PGScatalog/fix_resolve
Browse files Browse the repository at this point in the history
Fix path resolution in `check_samplesheet`
  • Loading branch information
nebfield authored Sep 19, 2023
2 parents ca60663 + 6cbf42a commit 60114fa
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 43 deletions.
7 changes: 7 additions & 0 deletions pgscatalog_utils/samplesheet/Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class Config:
input_path: str
output_path: str
132 changes: 89 additions & 43 deletions pgscatalog_utils/samplesheet/check.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
import logging
import math
import os
import pathlib
from pathlib import Path

import pandas as pd

from pathlib import Path
from pgscatalog_utils import config
from pgscatalog_utils.samplesheet.Config import Config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,8 @@ def _check_colnames(df: pd.DataFrame):
else:
logger.critical("Samplesheet has invalid header row")
logger.critical(f"Column names must only include: {mandatory}")
[logger.critical(f"Invalid column name: {col}") for col in df if col not in mandatory]
[logger.critical(f"Invalid column name: {col}") for col in df if
col not in mandatory]
raise Exception


Expand Down Expand Up @@ -96,7 +97,8 @@ def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]:

def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
seen = set()
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if x in seen or seen.add(x)]
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if
x in seen or seen.add(x)]
if duplicate_chromosomes:
logger.critical(f"Duplicate chromosomes detected in sampleset {sampleset}")
logger.critical(f"Duplicate chromosomes: {duplicate_chromosomes}")
Expand All @@ -106,9 +108,12 @@ def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
def _check_multiple_missing_chrom(sampleset: str, chrom_list: dict) -> None:
for chrom in chrom_list:
if chrom is None and len(chrom_list) != 1:
logger.critical(f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical("If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical("If your data are split per chromosome, then chromosomes must be set for all rows")
logger.critical(
f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical(
"If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical(
"If your data are split per chromosome, then chromosomes must be set for all rows")
raise Exception


Expand All @@ -126,7 +131,8 @@ def _check_format(df: pd.DataFrame):
for idx, row in df.iterrows():
valid_formats: list[str] = ['vcf', 'pfile', 'bfile']
if row['format'] not in valid_formats:
logger.critical(f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(
f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(f"\n{df.iloc[[idx]]}")
raise Exception

Expand All @@ -149,7 +155,8 @@ def _setup_paths(df: pd.DataFrame) -> pd.DataFrame:
case _:
raise Exception

resolved_paths: list[str] = _resolve_paths([row['path_prefix'] + x for x in suffix], row['format'])
resolved_paths: list[str] = _resolve_paths(
[row['path_prefix'] + x for x in suffix], row['format'])
paths.append(pd.Series(data=[resolved_paths], index=[idx]))

df['path'] = pd.concat(paths)
Expand All @@ -169,48 +176,79 @@ def _resolve_compressed_variant_path(path: str) -> pathlib.Path:
logger.info(f"Found compressed variant information file {compressed_path.name}")
return compressed_path
elif uncompressed_path.exists():
logger.info(f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
logger.info(
f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
return uncompressed_path
else:
logger.critical(f"{compressed_path} doesn't exist")
logger.critical(f"{uncompressed_path} doesn't exist")
logger.critical("Couldn't find variant information files, please check samplesheet path_prefix and try again")
logger.critical(
"Couldn't find variant information files, please check samplesheet path_prefix and try again")
raise Exception


def _resolve_paths(path_list: list[str], filetype: str) -> list[str]:
resolved_list: list[str] = []
for path in path_list:
if not Path(path).is_absolute():
logger.warning("Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning("Assuming program working directory is a nextflow work directory (e.g. work/4c/8585/...)")
base_dir: Path = Path(os.getcwd()).parent.parent.parent
logger.warning(f"Resolving paths relative to work directory parent {base_dir}")
path = str(base_dir.joinpath(path))

match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception

if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
# always resolve the input samplesheet
base_dir: Path = Path(Config.input_path).resolve().parent
if (path := Path(Config.input_path)).is_symlink():
logger.info(
f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}")

for path in path_list:
if path.startswith("https://") | path.startswith("s3://"):
logger.info("Remote path detected, skipping resolve")
resolved_list.append(str(path))
continue
elif path.startswith("http://"):
logger.critical("HTTP download is insecure! Did you mean https:// ?")
raise Exception("Insecure path detected")
else:
logger.critical(f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError
p: Path = Path(path)
if not p.is_absolute():
logger.warning(
"Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning(
"Assuming input samplesheet is a symlinked file in a nextflow working directory")
logger.warning(
"Following symlink and attempting to resolve path relative to input file")
logger.warning(
f"Resolving paths relative to: {base_dir}")
resolved = _resolve_filetypes(path=str(base_dir.joinpath(path)),
filetype=filetype)
else:
logger.info("Absolute path detected")
resolved = _resolve_filetypes(filetype=filetype, path=str(p))

if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
else:
logger.critical(
f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError

return resolved_list


def _resolve_filetypes(filetype: str, path: str) -> Path:
match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception

return resolved


def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
df['vcf_import_dosage'] = False # (dosage off by default)
if 'vcf_genotype_field' in df.columns:
Expand All @@ -224,7 +262,8 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
missing = False

if not missing:
logger.critical(f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(
f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(f"\n {row}")
raise Exception

Expand All @@ -237,14 +276,17 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:

def _check_reserved_names(df: pd.DataFrame):
if any(df['sampleset'] == 'reference'):
logger.critical("Samplesets must not be named 'reference', please rename in the sample sheet")
logger.critical(
"Samplesets must not be named 'reference', please rename in the sample sheet")
raise Exception

# Check whether reference contains reserved tokens from nextflow channels
badnames = [x for x in df['sampleset'] if ('.' in x or '_' in x)]
if len(badnames) > 0:
logger.critical("Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(badnames))
logger.critical(
"Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(
badnames))
raise Exception


Expand All @@ -269,7 +311,11 @@ def check_samplesheet() -> None:
"""
args = _parse_args()
config.set_logging_level(args.verbose)
df = _read_samplesheet(args.FILE_IN)

Config.input_path = args.FILE_IN
Config.output_path = args.FILE_OUT

df = _read_samplesheet(Config.input_path)

# check df for errors
_check_one_sampleset(df)
Expand All @@ -285,8 +331,8 @@ def check_samplesheet() -> None:

logger.info("Samplesheet checks complete")
(df.drop(['path_prefix'], axis=1)
.to_json(args.FILE_OUT, orient='records'))
logger.info(f"JSON file successfully written to {args.FILE_OUT}")
.to_json(Config.output_path, orient='records'))
logger.info(f"JSON file successfully written to {Config.output_path}")


if __name__ == "__main__":
Expand Down

0 comments on commit 60114fa

Please sign in to comment.