-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplinkPy.py
439 lines (369 loc) · 14.4 KB
/
plinkPy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
#!/usr/bin/env python3
import sys
import pandas as pd
from copy import deepcopy
# READ INPUT
def reader(file):
"""reads and appends lines of a file to a list"""
data_lines = []
with open(file, "r") as f:
for line in f.readlines():
data_lines.append(line)
return data_lines[:]
def from_ped(data_lines_ped):
"""parses the list of lines from .ped file into a list of dict"""
parsed_ped = []
parsed_line = {}
for line in data_lines_ped:
line = line.strip().split("\t")
parsed_line["family"] = line[0]
parsed_line["individual_ID"] = line[1]
parsed_line["parent_A"] = line[2]
parsed_line["parent_B"] = line[3]
parsed_line["sex"] = line[4]
parsed_line["phenotype"] = line[5]
parsed_line["loci"] = line[6:]
parsed_ped.append(parsed_line.copy())
# copy to avoid reference to dict which iterates
return parsed_ped[:]
def from_map(data_lines_map):
"""parses the list of lines from .map file into a list of dict"""
parsed_map = []
parsed_line = {}
for line in data_lines_map:
line = line.strip().split("\t")
parsed_line["chromosome"] = line[0]
parsed_line["variant_ID"] = line[1]
parsed_line["position"] = line[2]
parsed_line["coordinate"] = line[3]
parsed_map.append(parsed_line.copy())
return parsed_map[:]
def from_ped_map(parsed_ped, parsed_map):
"""fuses alleles (from .ped) and other locus info (from .map) into a common dictionnary attributed
to each individual and for each marker"""
for ind in parsed_ped:
marker_loci = []
for locus, marker in zip(ind["loci"], parsed_map):
marker["alleles"] = locus
marker_loci.append(marker)
marker_loci_copy = deepcopy(marker_loci) # not to affect the original dict
ind["loci"] = marker_loci_copy
return parsed_ped[:] # which is now a fusion with parsed_map
def check_files(parsed_ped, parsed_map):
"""checks if .ped and .map files are likely to correspond to same dataset"""
try:
nb_loci = len(parsed_ped[1]["loci"])
nb_marker = len(parsed_map)
assert nb_loci == nb_marker
except AssertionError:
sys.exit(
f"""Something wrong with number of markers, {nb_loci} loci in .ped file and
{nb_marker} variants in .map file. Files might not be from same data."""
)
return None
# FILTERING
def check_target(target):
"""asserts that target is not a string and does not contain duplicates"""
try:
if isinstance(target, str):
sys.exit("You provided a string as target instead of a list of strings")
assert len(set(target)) - len(target) == 0
except AssertionError:
sys.exit("There are duplicates in your query")
return None
def filter_individuals(parsed_ped_map, target):
"""extracts a subset of individuals and returns an object similar to parsed_ped_map"""
try:
check_target(target)
found = [] # to check if everything was found
new_ind = []
for elmt in target:
for ind in parsed_ped_map:
if ind["individual_ID"] == elmt:
new_ind.append(ind)
found.append(elmt)
diff = list(set(target) - set(found))
if diff != []:
print(
f"The individual(s) {diff} from target could not be found in available data"
)
except TypeError:
sys.exit("Target should be a list of strings")
return new_ind[:]
def filter_markers(parsed_ped_map, target):
"""extracts a subset of markers and returns an object similar to parsed_ped_map"""
try:
check_target(target)
copy_dict = deepcopy(parsed_ped_map) # avoid modifying original
found = []
for ind in copy_dict:
new_markers = []
for marker in ind["loci"]:
for elmt in target:
if marker["variant_ID"] == elmt:
new_markers.append(marker)
found.append(elmt)
ind["loci"] = new_markers
diff = list(set(target) - set(found))
if diff != []:
print(
f"The marker(s) {diff} from target could not be found in available data"
)
except TypeError:
sys.exit("Target should be a list of strings")
return copy_dict[:]
# EXPORT
def from_fused_to_ped_map(parsed_ped_map):
"""parses the parsed fused object back to both map and ped parsed objects"""
pedkey = ["alleles"]
mapkeys = ["chromosome", "coordinate", "position", "variant_ID"]
new_map = []
copy_dict2 = deepcopy(parsed_ped_map)
filterByKey = lambda keys: {x: marker[x] for x in keys}
for marker in copy_dict2[0]["loci"]: # new map
map_marker = filterByKey(mapkeys)
new_map.append(map_marker)
for ind in copy_dict2: # new ped
new_ped = []
for marker in ind["loci"]:
ped = filterByKey(pedkey)
new_ped.append(ped)
gt = []
for alleles in new_ped:
allele = list(alleles.values())
gt += allele
ind["loci"] = gt
return new_map[:], copy_dict2[:]
def to_map(new_map):
"""parses the map object back to map lines that can be further saved in a .map file"""
map_lines_to_save = []
for marker in new_map:
line = list(marker.values())
line[1], line[3] = line[3], line[1] # swap order of coordinate and variant_ID
line = "\t".join(line)
line = line + "\n"
map_lines_to_save.append(line)
return map_lines_to_save[:]
def to_ped(copy_dict2):
"""parses the ped object back to ped lines that can be further saved in a .ped file"""
ped_lines_to_save = []
for ind in copy_dict2:
line = list(ind.values())
line += line[6]
line.remove(line[6]) # to end with alleles
line = "\t".join(line)
line = line + "\n"
ped_lines_to_save.append(line)
return ped_lines_to_save[:]
def writer(content, name):
"""writes a file"""
with open(name, "w") as f:
f.writelines(content)
return None
def export_to_ped_map(parsed_ped_map, name):
"""main function to parse back a parsed fused object and export it as .ped and .map files"""
new_map, copy_dict2 = from_fused_to_ped_map(parsed_ped_map)
map_lines_to_save = to_map(new_map)
ped_lines_to_save = to_ped(copy_dict2)
name_map = name + ".map"
writer(map_lines_to_save, name_map)
name_ped = name + ".ped"
writer(ped_lines_to_save, name_ped)
return None
def from_fused_to_df(parsed_ped_map):
"""returns a data frame as individual x marker = genotype from the parsed fused object"""
tbl = []
for ind in parsed_ped_map:
line = {}
for marker in ind["loci"]:
line["ind"] = ind["individual_ID"]
line["marker"] = marker["variant_ID"]
line["gt"] = marker["alleles"]
line_copy = deepcopy(line)
tbl.append(line_copy)
df = pd.DataFrame(tbl)
return df[:]
def reshape_df(df): # order of markers changed compared to original .map file
"""reshapes a data frame from long to wide format"""
df2 = df.pivot_table( # long to wide format
index="ind",
columns="marker",
values="gt", # str so need to adjust aggfunc
aggfunc=lambda x: " ".join(
x
), # if several values for same index, will concatenate with space
)
return df2[:]
def writer_to_csv(df2, name):
"""writes data frame to csv file"""
file = name + ".csv"
df2.to_csv(file, sep=";")
return None
def export_to_csv(parsed_ped_map, name):
"""main function to export parsed fused object to csv file"""
df = from_fused_to_df(parsed_ped_map)
df2 = reshape_df(df)
writer_to_csv(df2, name)
return None
def gt_to_alleles(gt):
"""parses plink genotypes into both alleles"""
allele_1 = gt.split(" ")[0]
allele_2 = gt.split(" ")[1]
return allele_1[:], allele_2[:]
def allele_to_genepop(allele):
"""translates plink allele into genepop format"""
if allele == "A":
allele_gen = "001"
elif allele == "B":
allele_gen = "002"
elif allele == "0":
allele_gen = "000"
return allele_gen[:]
def allele_to_structure(allele):
"""translates plink allele into structure format"""
if allele == "A":
allele_str = "1"
elif allele == "B":
allele_str = "2"
elif allele == "0":
allele_str = "9"
return allele_str[:]
def gt_parser(df, format):
"""parses and translates plink genotypes into either genepop or structure format"""
new_gt = []
for gt in df["gt"]:
allele_1, allele_2 = gt_to_alleles(gt)
if format == "genepop":
new_1 = allele_to_genepop(allele_1)
new_2 = allele_to_genepop(allele_2)
elif format == "structure":
new_1 = allele_to_structure(allele_1)
new_2 = allele_to_structure(allele_2)
new_gt.append(new_1 + new_2)
df["gt"] = new_gt
return df[:]
def parser_to_genepop(df2):
"""parses wide format data frame to genepop format"""
df2.insert(0, "", ",") # add column with comma
header = []
for col in df2.columns:
col = col + "\n"
header.append(col)
header.pop(0) # remove first element '' from the comma column
return df2[:], header[:]
def writer_to_genepop(df3, header, name, title):
"""writes a genepop file"""
with open(name, "w") as f:
f.write('Title line: "' + title + '"\n')
f.writelines(header)
f.write("Pop\n")
df3.to_csv(f, header=False, sep=" ")
return None
def export_to_genepop(parsed_ped_map, name):
"""main function to export parsed fused object to genepop format"""
df = from_fused_to_df(parsed_ped_map)
df_allele = gt_parser(df, "genepop")
df2 = reshape_df(df_allele)
df3, header = parser_to_genepop(df2)
title = input("Enter the title line for genepop file:")
file = name + ".txt"
writer_to_genepop(df3, header, file, title)
return None
def reshape_to_structure(df):
"""reshapes data frame on two columns per marker for structure format"""
header = "" # create header line with marker once instead of marker.1 marker.2 like in df
for col in df.columns[0:]:
df["{}.1".format(col)], df["{}.2".format(col)] = zip(
*df[col].apply(lambda x: list(x))
)
del df[col] # remove marker on only 1 column
header += col
header += " "
header.rstrip() # remove last blank
return df[:], header[:]
def writer_to_structure(df3, header, file):
"""writes data frame to structure file"""
with open(file, "w") as f:
f.write(header + "\n")
df3.to_csv(f, header=False, sep=" ")
return None
def export_to_structure(parsed_ped_map, name):
"""main function to export parsed fused object to structure format"""
df = from_fused_to_df(parsed_ped_map)
df_allele = gt_parser(df, "structure")
df2 = reshape_df(df_allele)
df3, header = reshape_to_structure(df2)
file = name + ".str"
writer_to_structure(df3, header, file)
return None
class plinkPy:
"""Class whose instance contains related .ped and .map files exported from Plink"""
def __read(self):
"""main function to read and parse .ped and .map files"""
try:
ped_lines = reader(self.ped)
map_lines = reader(self.map)
parsed_ped = from_ped(ped_lines)
parsed_map = from_map(map_lines)
check_files(parsed_ped, parsed_map)
parsed_ped_map = from_ped_map(parsed_ped, parsed_map)
except IndexError:
sys.exit(
"Check if .ped and .map files have been inverted, e.g. print your plinkPy object"
)
return parsed_ped_map[:]
def __init__(self, ped, map): # self instance #file attribute
self.ped = ped
self.map = map
self.__parsed = plinkPy.__read(self)
# parsed will be the fused parsed_ped_map object
return None
def __repr__(self):
return f".ped file {self.ped} - .map file {self.map}"
@property # method accessed as attribute
def individuals(self):
"""prints all individuals and their number"""
count = 0
for ind in self.__parsed:
count += 1
print(ind["individual_ID"])
print(f"There are {count} individuals")
return None
@property
def markers(self):
"""prints all markers and their number"""
count = 0
for marker in self.__parsed[0]["loci"]:
count += 1
print(marker["variant_ID"])
print(f"There are {count} markers")
return None
def get_individuals(self, target):
"""filters individuals on a copy of the original instance and returns a new filtered instance"""
self = deepcopy(self)
self.__parsed = filter_individuals(self.__parsed, target)
return self
def get_markers(self, target):
"""filters markers on a copy of the original instance and returns a new filtered instance"""
self = deepcopy(self)
self.__parsed = filter_markers(self.__parsed, target)
return self
def export(self, format, name):
"""general function to call the right export function according to the format specified"""
try:
assert isinstance(name, str)
if format == "ped_map":
export_to_ped_map(self.__parsed, name)
elif format == "csv":
export_to_csv(self.__parsed, name)
elif format == "genepop":
export_to_genepop(self.__parsed, name)
elif format == "structure":
export_to_structure(self.__parsed, name)
else:
sys.exit(f"Called an unsupported export format: {format}")
except AssertionError:
sys.exit("File name should be a string")
except TypeError:
sys.exit("Tried to export unsupported object")
return None