-
Notifications
You must be signed in to change notification settings - Fork 0
/
vcf_preprocessing.py
86 lines (74 loc) · 3.33 KB
/
vcf_preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import allel
import os
import pandas as pd
# Function to read in the vcf exactly as it is into a dataframe.
def vcf_body_to_df(file_path):
with open(file_path, 'r') as vcf_text_wrapper:
vcf_lines = vcf_text_wrapper.readlines()
vcf_labels_preprocessed = [line for line in vcf_lines if line.startswith("#") and not(line.startswith("##"))]
for line in vcf_labels_preprocessed:
line_list = line.split('\n')
vcf_label_split = [l.split('\t') for l in line_list if len(l) > 0]
# flatten list...
vcf_labels = []
for label in vcf_label_split:
for item in label:
vcf_labels.append(item)
vcf_body_preprocessed = [line for line in vcf_lines if not line.startswith("#")]
vcf_body_split = []
for line in vcf_body_preprocessed:
line_list = line.split('\n')
vcf_body_split.append([l.split('\t') for l in line_list if len(l) > 0])
# flatten list...
vcf_body = []
for entry in vcf_body_split:
for item in entry:
vcf_body.append(item)
df = pd.DataFrame.from_records(vcf_body, columns=vcf_labels)
return df
def split_multigene_vcf(file_path):
file_list = os.listdir(file_path)
for vcf_name in file_list:
name_root, name_ext = os.path.splitext(vcf_name)
if name_ext == ".vcf":
patient_id = name_root.split('-')[0]
#TODO: temporary string to replace build number...
genome_build = 'b37'
path_to_vcf = file_path+vcf_name
header_lines = allel.read_vcf_headers(path_to_vcf).headers
'''
# Fix to make single gene VCF files compatible with elimu tool
header_lines_ps = header_lines[:]
header_lines_ps[len(header_lines)-1] = '##FORMAT=<ID=PS,Number=1,Type=Integer,Description="Phase set">\n'
header_lines_ps.append(header_lines[len(header_lines)-1])
'''
gi_df = allel.vcf_to_dataframe(path_to_vcf, fields='GI', alt_number=1)
unique_GI = set(gi_df['GI'])
vcf_body_complete_df = vcf_body_to_df(path_to_vcf)
vcf_info_field = vcf_body_complete_df.INFO
vcf_info_split = [el.split(';') for el in vcf_info_field]
vcf_info_gi = []
for row in vcf_info_split:
for item in row:
if item.startswith('GI'):
vcf_gi = item.split('=')[1]
vcf_info_gi.append(vcf_gi)
break
for gi in unique_GI:
unique_GI_vars_df = vcf_body_complete_df[[x == gi for x in vcf_info_gi]]
new_file_name = patient_id+'.'+genome_build+'.'+gi+name_ext
f = open(file_path+'single_gene_vcf/'+new_file_name, 'w+')
f.writelines(header_lines)
vcf_file_values = unique_GI_vars_df.values
'''
# Fix for charite vcf files to be compatible with elimu vcf2fhir tool!!
vcf_file_values[:,8] = vcf_file_values[:,8] + ':PS'
vcf_file_values[:,9] = vcf_file_values[:,9] + ':0'
'''
for item in vcf_file_values:
line = '\t'.join(item)+'\n'
f.write(line)
f.close()
if __name__ == '__main__':
file_path = "./charite_example_files/"
split_multigene_vcf(file_path)