-
Notifications
You must be signed in to change notification settings - Fork 15
/
outlier_removal.py
executable file
·175 lines (144 loc) · 6.11 KB
/
outlier_removal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
import argparse
import sys
def argument_parser(args):
"""
Parses the list of arguments as implemented in argparse.
"""
parser = argparse.ArgumentParser(
description="A script to filter .VCF and .loci files based on loci list"
" files.",
formatter_class=argparse.RawTextHelpFormatter)
# Create subparsers for each main operation
subparsers = parser.add_subparsers(
help="Select which command you wish to execute.",
dest="main_op")
neutral_vcf = subparsers.add_parser("neutral", help="Outputs a VCF with "
"only neutral loci.")
selection_vcf = subparsers.add_parser("selection", help="Outputs a VCF "
"with only loci under selection.")
selection_fasta = subparsers.add_parser("fasta", help="Outputs a FASTA "
"file with the sequences of "
"loci under selection.")
# ###################### Neutral VCF ARGUMENTS #############################
# Group definition
io_opts = neutral_vcf.add_argument_group("Input/Output options")
io_opts.add_argument("-v", dest="vcf_filename", type=str, required=True,
metavar="vcf",
help="Location of the VCF file to filter.")
io_opts.add_argument("-o", dest="outlier_loci", type=str, nargs="+",
metavar="list of files", default=None,
help="Path to outlier loci lists. Can provide more "
"than one file path.")
io_opts.add_argument("-a", dest="assoc_loci", type=str, default=None,
metavar="path to associations summary",
help="Path to assocaitions summary file.")
# ###################### Selection VCF ARGUMENTS ###########################
# Group definition
io_opts = selection_vcf.add_argument_group("Input/Output options")
io_opts.add_argument("-v", dest="vcf_filename", type=str, required=True,
metavar="vcf",
help="Location of the VCF file to filter.")
io_opts.add_argument("-o", dest="outlier_loci", type=str, nargs="+",
metavar="list of files", default=None,
help="Path to outlier loci lists. Can provide more "
" than one file path.")
io_opts.add_argument("-a", dest="assoc_loci", type=str, default=None,
metavar="path to associations summary",
help="Path to assocaitions summary file.")
# ################### Selection FASTA ARGUMENTS ############################
# Group definition
io_opts = selection_fasta.add_argument_group("Input/Output options")
io_opts.add_argument("-v", dest="vcf_filename", type=str, required=True,
metavar="vcf",
help="Location of the VCF file to filter.")
io_opts.add_argument("-o", dest="outlier_loci", type=str, nargs="+",
metavar="list of files", default=None,
help="Path to outlier loci lists. Can provide more "
" than one file path.")
io_opts.add_argument("-a", dest="assoc_loci", type=str, default=None,
metavar="path to associations summary",
help="Path to assocaitions summary file.")
# ################### END OF SPECIFIC CODE ###############################
arguments = parser.parse_args(args)
return arguments
def parse_outliers(outlier_filename):
"""
Parses an outlier summary file and returns a list with the outlier SNPs.
"""
infile = open(outlier_filename, 'r')
loci = set()
for lines in infile:
lines = lines.strip()
if lines == "":
break
else:
lines = lines.strip().split()[1:]
[loci.add(int(x)) for x in lines]
infile.close()
return loci
def parse_associations(association_summary_filename):
"""
Parses an association summary file and return a list with the reported
loci.
"""
infile = open(association_summary_filename, 'r')
loci = set()
for lines in infile:
lines = lines.split()
loci.add(int(lines[1]))
infile.close()
return loci
def parse_vcf(vcf_filename, outliers, goal):
"""
Parses a vcf file and prints only the requested lines (either only neutral
or only under selection)
"""
infile = open(vcf_filename, 'r')
counter = 1
for lines in infile:
lines = lines.strip()
if lines.startswith("#") and goal != "fasta":
print(lines)
elif lines.startswith("#") and goal == "fasta":
continue
else:
if counter not in outliers and goal == "neutral":
print(lines)
elif counter in outliers and goal == "selection":
print(lines)
elif counter in outliers and goal == "fasta":
print("{0}\t{1}".format(lines.split()[0], counter))
if not lines.startswith("#"):
counter += 1
infile.close()
def runner(arg):
"""
Manages what gets run and runs it.
"""
loci_set = None
if arg.outlier_loci is not None:
for loci_list in arg.outlier_loci:
outliers = parse_outliers(loci_list)
if loci_set is None:
loci_set = outliers
else:
loci_set = loci_set.intersection(outliers)
if arg.assoc_loci is not None:
assocs = parse_associations(arg.assoc_loci)
if loci_set is None:
loci_set = assocs
else:
loci_set = loci_set.union(assocs)
parse_vcf(arg.vcf_filename, loci_set, arg.main_op)
def main():
"""
Main function. Calls the argparser and the runner function.
"""
# Make sure we provide an help message instead of an error
if len(sys.argv) == 1:
sys.argv += ["-h"]
arg = argument_parser(sys.argv[1:])
runner(arg)
if __name__ == "__main__":
main()