Skip to content

Commit

Permalink
fix resolve behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
nebfield committed Sep 19, 2023
1 parent ca60663 commit d608341
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 42 deletions.
7 changes: 7 additions & 0 deletions pgscatalog_utils/samplesheet/Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class Config:
input_path: str
output_path: str
130 changes: 88 additions & 42 deletions pgscatalog_utils/samplesheet/check.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
import logging
import math
import os
import pathlib
from pathlib import Path

import pandas as pd

from pathlib import Path
from pgscatalog_utils import config
from pgscatalog_utils.samplesheet.Config import Config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,8 @@ def _check_colnames(df: pd.DataFrame):
else:
logger.critical("Samplesheet has invalid header row")
logger.critical(f"Column names must only include: {mandatory}")
[logger.critical(f"Invalid column name: {col}") for col in df if col not in mandatory]
[logger.critical(f"Invalid column name: {col}") for col in df if
col not in mandatory]
raise Exception


Expand Down Expand Up @@ -96,7 +97,8 @@ def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]:

def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
seen = set()
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if x in seen or seen.add(x)]
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if
x in seen or seen.add(x)]
if duplicate_chromosomes:
logger.critical(f"Duplicate chromosomes detected in sampleset {sampleset}")
logger.critical(f"Duplicate chromosomes: {duplicate_chromosomes}")
Expand All @@ -106,9 +108,12 @@ def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
def _check_multiple_missing_chrom(sampleset: str, chrom_list: dict) -> None:
for chrom in chrom_list:
if chrom is None and len(chrom_list) != 1:
logger.critical(f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical("If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical("If your data are split per chromosome, then chromosomes must be set for all rows")
logger.critical(
f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical(
"If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical(
"If your data are split per chromosome, then chromosomes must be set for all rows")
raise Exception


Expand All @@ -126,7 +131,8 @@ def _check_format(df: pd.DataFrame):
for idx, row in df.iterrows():
valid_formats: list[str] = ['vcf', 'pfile', 'bfile']
if row['format'] not in valid_formats:
logger.critical(f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(
f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(f"\n{df.iloc[[idx]]}")
raise Exception

Expand All @@ -149,7 +155,8 @@ def _setup_paths(df: pd.DataFrame) -> pd.DataFrame:
case _:
raise Exception

resolved_paths: list[str] = _resolve_paths([row['path_prefix'] + x for x in suffix], row['format'])
resolved_paths: list[str] = _resolve_paths(
[row['path_prefix'] + x for x in suffix], row['format'])
paths.append(pd.Series(data=[resolved_paths], index=[idx]))

df['path'] = pd.concat(paths)
Expand All @@ -169,48 +176,79 @@ def _resolve_compressed_variant_path(path: str) -> pathlib.Path:
logger.info(f"Found compressed variant information file {compressed_path.name}")
return compressed_path
elif uncompressed_path.exists():
logger.info(f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
logger.info(
f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
return uncompressed_path
else:
logger.critical(f"{compressed_path} doesn't exist")
logger.critical(f"{uncompressed_path} doesn't exist")
logger.critical("Couldn't find variant information files, please check samplesheet path_prefix and try again")
logger.critical(
"Couldn't find variant information files, please check samplesheet path_prefix and try again")
raise Exception


def _resolve_paths(path_list: list[str], filetype: str) -> list[str]:
resolved_list: list[str] = []

# always resolve the input samplesheet
base_dir: Path = Path(Config.input_path).resolve().parent
if (path := Path(Config.input_path)).is_symlink():
logger.info(
f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}")

for path in path_list:
if not Path(path).is_absolute():
logger.warning("Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning("Assuming program working directory is a nextflow work directory (e.g. work/4c/8585/...)")
base_dir: Path = Path(os.getcwd()).parent.parent.parent
logger.warning(f"Resolving paths relative to work directory parent {base_dir}")
path = str(base_dir.joinpath(path))

match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
match path:
case p if path.startswith("https://") | path.startswith("s3://"):
logger.info("Remote path detected, skipping resolve")
resolved_list.append(str(path))
case "http://":
logger.critical("HTTP download is insecure! Did you mean https:// ?")
raise Exception("Insecure path detected")
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception
p: Path = Path(path)
if not p.is_absolute():
logger.warning(
"Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning(
"Assuming input samplesheet is a symlinked file in a nextflow working directory")
logger.warning(
"Following symlink and attempting to resolve path relative to input file")
logger.warning(
f"Resolving paths relative to: {base_dir}")
resolved = _resolve_filetypes(path=str(base_dir.joinpath(path)),
filetype=filetype)
else:
logger.info("Absolute path detected")
resolved = _resolve_filetypes(filetype=filetype, path=str(p))

if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
else:
logger.critical(f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError
if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
else:
logger.critical(
f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError

return resolved_list


def _resolve_filetypes(filetype: str, path: str) -> Path:
match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception

return resolved


def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
df['vcf_import_dosage'] = False # (dosage off by default)
if 'vcf_genotype_field' in df.columns:
Expand All @@ -224,7 +262,8 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
missing = False

if not missing:
logger.critical(f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(
f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(f"\n {row}")
raise Exception

Expand All @@ -237,14 +276,17 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:

def _check_reserved_names(df: pd.DataFrame):
if any(df['sampleset'] == 'reference'):
logger.critical("Samplesets must not be named 'reference', please rename in the sample sheet")
logger.critical(
"Samplesets must not be named 'reference', please rename in the sample sheet")
raise Exception

# Check whether reference contains reserved tokens from nextflow channels
badnames = [x for x in df['sampleset'] if ('.' in x or '_' in x)]
if len(badnames) > 0:
logger.critical("Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(badnames))
logger.critical(
"Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(
badnames))
raise Exception


Expand All @@ -269,7 +311,11 @@ def check_samplesheet() -> None:
"""
args = _parse_args()
config.set_logging_level(args.verbose)
df = _read_samplesheet(args.FILE_IN)

Config.input_path = args.FILE_IN
Config.output_path = args.FILE_OUT

df = _read_samplesheet(Config.input_path)

# check df for errors
_check_one_sampleset(df)
Expand All @@ -285,8 +331,8 @@ def check_samplesheet() -> None:

logger.info("Samplesheet checks complete")
(df.drop(['path_prefix'], axis=1)
.to_json(args.FILE_OUT, orient='records'))
logger.info(f"JSON file successfully written to {args.FILE_OUT}")
.to_json(Config.output_path, orient='records'))
logger.info(f"JSON file successfully written to {Config.output_path}")


if __name__ == "__main__":
Expand Down

0 comments on commit d608341

Please sign in to comment.