-
Notifications
You must be signed in to change notification settings - Fork 54
/
read_length_identity.py
executable file
·203 lines (169 loc) · 7.11 KB
/
read_length_identity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python3
"""
Copyright 2019 Ryan Wick (rrwick@gmail.com)
https://github.com/rrwick/Basecalling-comparison
This program is free software: you can redistribute it and/or modify it under the terms of the GNU
General Public License as published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version. This program is distributed in the hope that it
will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should
have received a copy of the GNU General Public License along with this program. If not, see
<http://www.gnu.org/licenses/>.
This script produces a table with information for each read.
Inputs:
* Read file
* minimap2 PAF
Output:
* tsv file with these columns: length, identity, relative length
If less than half of a read aligns, it is deemed unaligned and given an identity of 0. If more than
half aligns, only the aligned parts are used to determined the read identity.
Relative length is included to see if the reads are systematically too short or too long.
"""
import collections
import gzip
import os
import statistics
import sys
def main():
read_filename = sys.argv[1]
paf_filename = sys.argv[2]
read_lengths = get_read_lengths(read_filename)
read_names = sorted(read_lengths.keys())
read_alignments = load_alignments(paf_filename, read_lengths)
print('\t'.join(['Name', 'Length', 'Identity', 'Relative length']))
for read_num, read_name in enumerate(read_names):
read_length = read_lengths[read_name]
alignments = read_alignments[read_name]
identity_by_base = [0.0] * read_length
total_read_length, total_ref_length = 0, 0
for read_start, read_end, ref_start, ref_end, identity in alignments:
for i in range(read_start, read_end):
if identity > identity_by_base[i]:
identity_by_base[i] = identity
total_read_length += read_end - read_start
total_ref_length += ref_end - ref_start
# If less than half the read aligned, then we call it an unaligned read.
if identity_by_base.count(0.0) > read_length / 2.0:
whole_read_identity = 0.0
# Otherwise, the read's identity is the average of the aligned parts.
else:
whole_read_identity = statistics.mean([x for x in identity_by_base if x > 0.0])
if whole_read_identity > 0.0:
relative_length = '%.5f' % (100.0 * total_read_length / total_ref_length)
else:
relative_length = ''
print('{}\t{}\t{:.5f}\t{}'.format(read_name, read_length, whole_read_identity, relative_length))
print_progress(read_num, len(read_names))
print('\n', file=sys.stderr)
def print_progress(done_count, total_count):
print('\r{:,} / {:,} reads complete'.format(done_count+1, total_count),
end='', flush=True, file=sys.stderr)
def load_alignments(paf_filename, read_lengths):
print('Loading alignments...', end='', flush=True, file=sys.stderr)
read_alignments = collections.defaultdict(list)
with open(paf_filename, 'rt') as paf:
for line in paf:
paf_parts = line.strip().split('\t')
if len(paf_parts) < 11:
continue
read_name = paf_parts[0]
read_length = int(paf_parts[1])
assert read_length == read_lengths[read_name]
read_start = int(paf_parts[2])
read_end = int(paf_parts[3])
ref_start = int(paf_parts[7])
ref_end = int(paf_parts[8])
identity = 100.0 * int(paf_parts[9]) / int(paf_parts[10])
read_alignments[read_name].append((read_start, read_end, ref_start, ref_end, identity))
print(' done', file=sys.stderr)
return read_alignments
def get_compression_type(filename):
"""
Attempts to guess the compression (if any) on a file using the first few bytes.
http://stackoverflow.com/questions/13044562
"""
magic_dict = {'gz': (b'\x1f', b'\x8b', b'\x08'),
'bz2': (b'\x42', b'\x5a', b'\x68'),
'zip': (b'\x50', b'\x4b', b'\x03', b'\x04')}
max_len = max(len(x) for x in magic_dict)
unknown_file = open(filename, 'rb')
file_start = unknown_file.read(max_len)
unknown_file.close()
compression_type = 'plain'
for filetype, magic_bytes in magic_dict.items():
if file_start.startswith(magic_bytes):
compression_type = filetype
if compression_type == 'bz2':
sys.exit('Error: cannot use bzip2 format - use gzip instead')
if compression_type == 'zip':
sys.exit('Error: cannot use zip format - use gzip instead')
return compression_type
def get_sequence_file_type(filename):
if not os.path.isfile(filename):
sys.exit('Error: could not find ' + filename)
if get_compression_type(filename) == 'gz':
open_func = gzip.open
else: # plain text
open_func = open
with open_func(filename, 'rt') as seq_file:
try:
first_char = seq_file.read(1)
except UnicodeDecodeError:
first_char = ''
if first_char == '>':
return 'FASTA'
elif first_char == '@':
return 'FASTQ'
else:
raise ValueError('File is neither FASTA or FASTQ')
def get_read_lengths(filename):
print('Loading read lengths...', end='', flush=True, file=sys.stderr)
try:
file_type = get_sequence_file_type(filename)
if file_type == 'FASTA':
return get_fasta_lengths(filename)
else: # FASTQ
return get_fastq_lengths(filename)
except IndexError:
sys.exit('\nError: ' + filename + ' could not be parsed - is it formatted correctly?')
def get_fasta_lengths(fasta_filename):
if get_compression_type(fasta_filename) == 'gz':
open_func = gzip.open
else: # plain text
open_func = open
read_lengths = {}
with open_func(fasta_filename, 'rt') as fasta_file:
name = ''
sequence = []
for line in fasta_file:
line = line.strip()
if not line:
continue
if line[0] == '>': # Header line = start of new contig
if name:
read_lengths[name.split()[0]] = len(''.join(sequence))
sequence = []
name = line[1:]
else:
sequence.append(line)
if name:
read_lengths[name.split()[0]] = len(''.join(sequence))
print(' done', file=sys.stderr)
return read_lengths
def get_fastq_lengths(fastq_filename):
if get_compression_type(fastq_filename) == 'gz':
open_func = gzip.open
else: # plain text
open_func = open
read_lengths = {}
with open_func(fastq_filename, 'rt') as fastq:
for line in fastq:
name = line.strip()[1:].split()[0]
sequence = next(fastq).strip()
next(fastq)
next(fastq)
read_lengths[name] = len(sequence)
print(' done', file=sys.stderr)
return read_lengths
if __name__ == '__main__':
main()