forked from phbradley/tcr-dist
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblast.py
210 lines (162 loc) · 6.31 KB
/
blast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
### most of this will only work for Blast -- not psi-blast
def is_new_query_id_line( line ):
return line and line.split() and line.split()[0] == 'Query='
def is_new_hit_id_line( line ):
return line and line[0] == '>'
def is_new_match_line( line ):
return line and line[:3] == ' Sc'
def is_query_alignment_line( line ):
return line and line.split() and line.split()[0] == 'Query:'
class BlastMatch:
""" Reads a single blast match from a set of input lines"""
def get_line( s ):
if not s.lines: return ''
line = s.lines[0]
del s.lines[0]
return line
def __init__( s, lines, query_id, hit_id ):
s.valid = False
s.query_id = query_id
s.hit_id = hit_id
s.lines = lines[:]
line = s.get_line()
assert is_new_match_line( line )
assert line.split()[5][:6] == 'Expect' and line.split()[6] == '='
evalue = line.split()[7]
if evalue[-1] == ',': evalue = evalue[:-1]
#assert evalue[-1] == ','
if evalue[0] == 'e': evalue = '1'+evalue
s.evalue = float( evalue )
line = s.get_line()
ident = line.split()[3]
assert ident[0] == '('
if ident[-3:] == '%),':
ident = int( ident[1:-3] )
else:
assert ident[-2:] == '%)'
ident = int( ident[1:-2] ) ## pb added this line 10/15/15 (!)
s.identities = ident
is_blastx_output = False
is_blastn_output = False
s.frame = 'NA'
q_strand = 1
h_strand = 1
while line and not is_query_alignment_line( line ):
line = s.get_line()
if line.startswith(' Frame'):
#print line[:-1]
s.frame = line.split()[2]
is_blastx_output = True
if line.startswith(' Strand'):
is_blastn_output = True
#print line[:-1]
l = line.split()
assert l[3] == '/'
assert len(l) == 5
if l[2] == 'Plus':
q_strand = 1
else:
assert l[2] == 'Minus'
q_strand = -1
if l[4] == 'Plus':
h_strand = 1
else:
assert l[4] == 'Minus'
h_strand = -1
## loop through possible several lines of alignment
qstarts = []
qstops = []
qseq = ''
hstarts = []
hstops = []
hseq = ''
middleseq= ''
while line:
assert is_query_alignment_line( line )
qstarts.append( int(line.split()[1])-1) ## 0-indexed
qstops.append( int(line.split()[3])-1 )
new_qseq = line.split()[2]
qseq = qseq + new_qseq
middleseq_offset = line.index( new_qseq )
line = s.get_line()
middleseq += line[ middleseq_offset: middleseq_offset+len(new_qseq)]
line = s.get_line()
hstarts.append( int(line.split()[1])-1)
hstops.append( int(line.split()[3])-1)
hseq = hseq + line.split()[2]
## advance to next query alignment line
while line and not is_query_alignment_line( line ): line = s.get_line()
qstart = q_strand * min( [q_strand*x for x in qstarts ] )
qstop = q_strand * max( [q_strand*x for x in qstops ] )
hstart = h_strand * min( [h_strand*x for x in hstarts ] )
hstop = h_strand * max( [h_strand*x for x in hstops ] )
if not is_blastx_output:
assert q_strand*(qstop-qstart) == len( ''.join( qseq.split('-') ) ) - 1
assert h_strand*(hstop-hstart) == len( ''.join( hseq.split('-') ) ) - 1
assert len(hseq) == len(qseq)
#print hit_id
#print qseq
#print hseq
## success
s.h_start = hstart ## 0-indexed
s.h_stop = hstop
s.h_strand = h_strand
s.h_align = hseq
s.q_start = qstart
s.q_stop = qstop
s.q_strand = q_strand
s.q_align = qseq
s.middleseq = middleseq
q2hmap = {}
ia = 0
ib = 0
gaps = '.-'
for i,a in enumerate( qseq ):
b = hseq[i]
if a not in gaps:
if b in gaps: q2hmap[ qstart+ia ] = (-1,'-')
else: q2hmap[ qstart+ia ] = (hstart+ib,b)
if a not in gaps: ia += q_strand
if b not in gaps: ib += h_strand
s.q2hmap = q2hmap ## 0-indexed numbering wrt to fullseq
s.valid = True
def parse_blast_alignments( blastfile, evalue_threshold, identity_threshold ):
## THIS WILL NOT WORK FOR PSIBLAST
data = open( blastfile, 'r' )
line = data.readline()
while line and not is_new_query_id_line( line ): line = data.readline()
hits = {}
while line:
assert is_new_query_id_line( line )
query_id = line.split()[1]
hits[ query_id ] = []
## read to the start of the alignments
line = data.readline()
while line and not is_new_hit_id_line( line ) and not is_new_query_id_line( line ): line = data.readline()
if not is_new_hit_id_line( line ): continue ## no hits
while line:
assert is_new_hit_id_line( line )
hit_id = line.split()[0][1:]
## read to the first match for this hit
while line and not is_new_match_line( line ): line = data.readline()
while line:
assert is_new_match_line( line )
hitlines = [ line ]
line = data.readline()
while line and \
not is_new_match_line( line ) and \
not is_new_hit_id_line( line ) and \
not is_new_query_id_line( line ):
hitlines.append( line )
line = data.readline()
## now in a new match
m = BlastMatch( hitlines, query_id, hit_id )
if m.evalue <= evalue_threshold and m.identities >= identity_threshold:
hits[ query_id ].append( m )
if not is_new_match_line( line ): break
if not is_new_hit_id_line( line ): break
if not is_new_query_id_line( line ):
assert not line
break
data.close()
return hits