-
Notifications
You must be signed in to change notification settings - Fork 0
/
HMMViterbiLearn.py
149 lines (135 loc) · 8.77 KB
/
HMMViterbiLearn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# !/usr/bin/env python3
# HMM Set 2
# Implement Viterbi Learning
# University of California, Santa Cruz - BME 205
# Biomolecular Engineering and Bioinformatics
# Name: Zachary Mason (zmmason)
# Group Members: NONE
import sys
import numpy as np # numpy version 1.19.3
class ViterbiAlgorithm:
"""
Compute the The conditional probability Pr(x|π) that will be emitted by the HMM.
Input: A string x, followed by the alphabet Σ from which x was constructed, followed by the states States,
transition matrix Transition, and emission matrix Emission of an HMM (Σ, States, Transition, Emission).
Output: A path that maximizes the (unconditional) probability Pr(x, π) over all possible paths π.
"""
def __init__(self, emission, alphabet, states, transMatrix, emisMatrix):
"""Constructor: saves attributes from the input file."""
self.emissionIndex = [int(alphabet.index(em)) for em in emission] # list of index codes for each em in emission seq
self.alphabet = alphabet
self.states = states
with np.errstate(divide='ignore'): # handles division by zero encounters in array log
self.transMatrix = np.log(transMatrix) # takes the log of each item in the transition matrix
with np.errstate(divide='ignore'):
self.emisMatrix = np.log(emisMatrix) # takes the log of each item in the emission matrix
# initializing viterbi path with theo max lowest bound & list to hold backpointers -> keeping track of the paths
self.viterbi = np.array([[-float('inf') for j in range(len(self.emissionIndex))] for i in range(len(self.states))])
self.backPointers = [[0 for j in range(len(self.emissionIndex))] for i in range(len(self.states))]
def viterbiAlgo(self):
"""Get the path that maximizes Pr(x, π) over all possible paths π."""
for state in range(len(self.states)): # initializing start of viterbi with Pr(emisison)*(1/#states)
self.viterbi[state][0] = np.log(1/len(self.states))+self.emisMatrix[state][self.emissionIndex[0]]
self.backPointers[state][0] = -1 # sets starting nodes to -1 key
for i in range(1, len(self.emissionIndex)): # creating full viterbi graph after the initial node
for state in range(len(self.states)):
for prev in range(len(self.states)): # getting total probability for
totalProb = self.emisMatrix[state][self.emissionIndex[i]] + \
self.transMatrix[prev][state] + self.viterbi[prev][i - 1]
if totalProb > self.viterbi[state][i]: # find max-weight path to current node
self.viterbi[state][i] = totalProb
self.backPointers[state][i] = prev
score = -float('inf') # start backtrack from max-likelihood using max lowest value
for state in range(len(self.states)):
if self.viterbi[state][len(self.emissionIndex) - 1] > score:
last = state
score = self.viterbi[state][len(self.emissionIndex) - 1]
path = [last]
i = len(self.emissionIndex) - 1 # create max likelihood path backwards
while i > 0: # verification to go to next node
next = self.backPointers[last][i]
path.append(next)
last = next
i -= 1
result = ''.join(str(self.states[state]) for state in path[::-1])
return result
class ParamHMM:
"""
Estimate the Parameters of an HMM
Input: A sequence of emitted symbols x = x1 . . . xn in an alphabet ∑ and a path π = π1 . . . πn generated by a
k-state HMM with unknown transition and emission probabilities.
Output: A matrix of transition probabilities Transition and a matrix of emission probabilities Emission that
maximize Pr(x,π) over all possible matrices of transition and emission probabilities.
"""
def __init__(self, emission, alphabet, pi, states):
"""Constructor: saves attributes from the input file."""
self.emissionIndex = [alphabet.index(i) for i in emission] # list of index codes for each em in emission seq
self.states = states
self.pi = [states.index(i) for i in pi]
self.alphabet = alphabet
# initializing the matrix for emission and transition
self.transitionMatrix = np.array([[float(0) for j in range(len(states))] for i in range(len(states))])
self.emissionIndexMatrix = np.array([[float(0) for j in range(len(alphabet))] for i in range(len(states))])
def paramEst(self):
"""Compute the transition and emission matrices that maximize Pr(x,π)."""
self.emissionIndexMatrix[self.pi[0]][self.emissionIndex[0]] = 1 # initialize 1st prob
prev = self.pi[0] # initializing variable to hold previous index item
for i in range(1, len(self.emissionIndex)):
self.emissionIndexMatrix[self.pi[i]][self.emissionIndex[i]] += 1 # counts frequency of occurrences
self.transitionMatrix[prev][self.pi[i]] += 1 # counts frequency of occurrences
prev = self.pi[i] # reset new previous index
for s in range(len(self.states)):
if sum(self.emissionIndexMatrix[s]) == 0: # accounts for 0 frequency
self.emissionIndexMatrix[s] += 1
self.emissionIndexMatrix[s] = self.emissionIndexMatrix[s]/sum(self.emissionIndexMatrix[s]) # get Pr(emissions)
if sum(self.transitionMatrix[s]) == 0: # accounts for 0 frequency
self.transitionMatrix[s] += 1
self.transitionMatrix[s] = self.transitionMatrix[s]/sum(self.transitionMatrix[s]) # get Pr(transitions)
roundTrans = np.round(self.transitionMatrix, 3) # rounding array to 3rd dec place for formatting
roundEmis = np.round(self.emissionIndexMatrix, 3)
return roundEmis, roundTrans
def dataPrint(self, roundEmis, roundTrans):
"""Using the data from transition and emission matrices , print to specified formatting."""
maxMatrix = ['\t'.join([str(x) for x in self.states])] # initializing list to hold data for printing
for i in range(len(roundEmis)): # printing emission matrix info
roundTrans[i].insert(0, self.states[i]) # adding the state to the specific probabilities
maxMatrix.append('\t'.join([str(x) for x in roundTrans[i]]))
maxMatrix.append('--------')
maxMatrix.append('\t'+'\t'.join([str(x) for x in self.alphabet]))
for i in range(len(roundTrans)): # printing transition matrix info
roundEmis[i].insert(0, self.states[i]) # adding the state to the specific probabilities
maxMatrix.append('\t'.join([str(x) for x in roundEmis[i]]))
return maxMatrix
def main():
"""
Implement Viterbi Learning.
Given: A sequence of emitted symbols x = x1 ... xn in an alphabet A, generated by a k-state HMM with
unknown transition and emission probabilities, initial Transition and Emission matrices and a number
of iterations i.
Return: A matrix of transition probabilities Transition and a matrix of emission probabilities Emission that
maximizes Pr(x, π) over all possible transition and emission matrices and over all hidden paths π.
"""
contents = [] # list to hold the contents of the dataset
for line in sys.stdin: # takes STDIN only
contents.append(line.strip())
iteration = int(contents[0]) # number of iterations
statesLen = len(contents[6].split()) # counts amount of stated for to help parse input data
emission = contents[2]
alphabet = contents[4].split()
states = contents[6].split()
transMatrix = np.array([line.split()[1:] for line in contents[9: statesLen + 9]], float) # create array t-matrix
emisMatrix = np.array([line.split()[1:] for line in contents[len(contents) - statesLen:]], float) # array e-matrix
lastMatrixSet = [[transMatrix, emisMatrix]] # holds matrix data for each iteration
for i in range(1, iteration): # iterate over desired iteration count specified in file
viterbi = ViterbiAlgorithm(emission, alphabet, states, lastMatrixSet[0][0], lastMatrixSet[0][1])
newPath = viterbi.viterbiAlgo()
param = ParamHMM(emission, alphabet, newPath, states)
emis, trans = param.paramEst()
emis = emis.tolist() # turns matrix into list
trans = trans.tolist() # turns matrix into list
lastMatrixSet[0] = [trans, emis] # replaces last matrix data with iteration generated matrices
matrix = param.dataPrint(lastMatrixSet[0][1], lastMatrixSet[0][0]) # send last iteration to be formatted
for data in matrix:
print(data)
if __name__ == '__main__':
main()