-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhmm.py
1693 lines (1340 loc) · 68.9 KB
/
hmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
ddir = "/bt3102project/submission"
#%%
import json
import pandas as pd
import numpy as np
import random
import math
class HMM:
def __init__(self, possible_states, possible_emissions,seed):
"""
Possible tags is a list of possible tags for the hidden states in this HMM
Possible emissions is a list of all possible emissions
"""
# We initailise by seeding the random function with the provided seed
random.seed(seed)
def generate_n_probabilities(n):
"""
Helper function. Generates a list of n probabilities, the total of which sums to 1
"""
r = []
for i in range(n):
new_random_number = random.random()
r.append(new_random_number)
random.seed(new_random_number) # Seed the next number with the current random number
s = sum(r)
r = [ i/s for i in r ]
return r
# Handle unseen output
possible_emissions.append("NONE")
# Generate initial random probabilities for transition, emission, and pi
transition_probs_init = { state: generate_n_probabilities(len(possible_states + ["STOP"])) for state in possible_states}
emission_probs_init = {state: generate_n_probabilities(len(possible_emissions)) for state in possible_states}
pi_init = generate_n_probabilities(len(possible_states))
# Initialise transition probs, emission probs, and the probability of transiting from START to state_i, pi
self.transition_probs = { state_i: {
state_j : transition_probs_init[state_i][index]
for (index,state_j) in enumerate(possible_states + ["STOP"]) }
for state_i in possible_states }
self.emission_probs = { state : {
emission: emission_probs_init[state][index]
for (index,emission) in enumerate(possible_emissions)}
for state in possible_states }
self.pi = {state: pi_init[index] for (index,state) in enumerate(possible_states)}
# Store the possible states and possible emissions for easy access in class methods
self.possible_states = possible_states
self.possible_emissions = possible_emissions
def forward_backward_algorithm(self,sequences,max_iter,thresh):
"""
Runs the forward backward algorithm (though all sequences in the sequences variable, which is a list of lists)
This method terminates when max_iter is reached, or when the incremental improvement to the probability of seeing the data drops below thresh
"""
def update_probs(accumulated_start_zai, accumulated_zai, accumulated_gamma):
# Update transition probabilities
for state_i in self.possible_states:
for state_j in self.possible_states + ["STOP"]:
self.transition_probs[state_i][state_j] = accumulated_zai[state_i][state_j]/ sum(accumulated_zai[state_i].values())
# Update probability for transiting from START to state_i
for state_i in self.possible_states:
self.pi[state_i] = accumulated_start_zai[state_i]/sum(accumulated_start_zai.values())
# Update emission probabilities
for state_j in self.possible_states:
for emission in self.possible_emissions:
self.emission_probs[state_j][emission] = accumulated_gamma[state_j][emission]/sum(accumulated_gamma[state_j].values())
# Set emission prob for NONE to minimum
min_prob = min(accumulated_gamma[state_j].values())
self.emission_probs[state_j]["NONE"] = min_prob/sum(accumulated_gamma[state_j].values())
# Initialise the first item in the log probability list to negative infinity; Log probabilities will be appended here after each iteration
# Algorithm will terminate if the difference between latest log probability and the previous log probability is less than thresh
iter_log_probs = [-math.inf]
for i in range(max_iter):
# Initialise iteration log probability to 0. Also initialise accumulated_start_zai, accumulated_zai, and accumulated_gamma
# Accumulated_start_zai is a corner case of zai. It is used for the updating of P ( y1 = j ) - ie the initial probabilities, pi.
iter_log_prob = 0
accumulated_start_zai = { state_i : 0 for state_i in self.possible_states }
accumulated_zai = { state_i : {state_j : 0 for state_j in self.possible_states + ["STOP"]} for state_i in self.possible_states }
accumulated_gamma = { state: { emission : 0 for emission in self.possible_emissions } for state in self.possible_states }
# Go through all the sequences in our data. The forward backward step function returns the log probability for seeing the sequence that was passed to it
# We add this log probability to the iteration log probability, since the log probability of all the sequences in the sum of the log probability of each sequence
for sequence in sequences:
iter_log_prob += self.forward_backward_step(sequence, accumulated_start_zai, accumulated_zai, accumulated_gamma)
# After each sequence, we use the zai and gamma values to update the transition and output probabilities. This concludes the iteration
update_probs(accumulated_start_zai,accumulated_zai,accumulated_gamma)
# Print the log probability for this iteration. Can be removed when not needed anymore
print("Iteration {} : {}".format(i,iter_log_prob))
# Terminate the algorithm if at convergence (improvement in log probability less than threshold)
if iter_log_prob - iter_log_probs[-1] < thresh:
break
iter_log_probs.append(iter_log_prob)
def forward_backward_step(self, sequence, accumulated_start_zai, accumulated_zai, accumulated_gamma):
"""
Runs forward backward for one sequence, returning zai and gamma.
In most cases, this method should not be called directly. Instead, refer to the forward_back_algorithm method
"""
def forward_probabilities(sequence):
"""
Takes in a sequence and returns a dictionary, forward[t][j],
which represents alpha[t][j] (ie. the probability that state at time step t is j, given the sequence of observations up till time step t)
"""
states = self.transition_probs.keys()
forward = {index: { state : None for state in states } for index in range(1, len(sequence) + 1 )}
# Initialisation
first_observation = sequence[0]
for state in states:
forward[1][state] = self.pi[state] * self.emission_probs[state][first_observation]
# Compute forward prob at each time step
for time_step in range(2,len(sequence) + 1):
current_observation = sequence[time_step-1]
for state in states:
forward[time_step][state] = sum(
[ forward[time_step - 1][prev_state] *
self.transition_probs[prev_state][state] *
self.emission_probs[state][current_observation] for prev_state in states ]
)
return forward
def backward_probabilities(sequence):
"""
Takes in a sequence and returns a dictionary, backward[t][i],
which represents beta[t][i] (ie. the probability that the state at time step t is i, given the sequence of observations from t+1 to n)
"""
states = self.transition_probs.keys()
backward = {index: { state : None for state in states } for index in range(1, len(sequence) + 1 )}
# Initialisation
for state in states:
nth_time_step = len(sequence)
backward[nth_time_step][state] = self.transition_probs[state]["STOP"]
# Compute backward prob at each time step
for time_step in range(len(sequence) - 1, 0, -1):
next_observation = sequence[time_step]
for state in states:
backward[time_step][state] = sum(
[ backward[time_step + 1][next_state] *
self.transition_probs[state][next_state] *
self.emission_probs[next_state][next_observation] for next_state in states ]
)
return backward
def compute_zai(sequence, alpha, beta):
"""
Takes in a sequence and returns a dictionary, sum_zai[i][j],
which represents the summed probability of transiting from i to j across all time steps
"""
states = list(self.transition_probs.keys())
# Instead of storing zai values for each time step, we use the sum_zai dictionary to store transitions from state i to state j across all time steps,
# since that is what is ultimately required in computation. By doing so, we save the need to iterate through all time steps and sum it up later on,
# which would be computationally expensive
sum_zai = { state_i : {state_j : 0 for state_j in states + ["STOP"]} for state_i in states }
joint_prob = sum([ alpha[len(sequence)][state] * self.transition_probs[state]["STOP"] for state in self.transition_probs])
for time_step in range(1, len(sequence)):
emission_at_t_plus_one = sequence[time_step]
for state_i in states:
for state_j in states:
sum_zai[state_i][state_j] += (
alpha[time_step][state_i]*
self.transition_probs[state_i][state_j]*
self.emission_probs[state_j][emission_at_t_plus_one]*
beta[time_step+1][state_j]
)/joint_prob
# Also add zai values for the transition from the last state (at t=n) to STOP
for state in states:
sum_zai[state]["STOP"] += (
alpha[len(sequence)][state] *
self.transition_probs[state]["STOP"]
)/joint_prob
return sum_zai
def compute_gamma(sequence, alpha, beta):
"""
Takes in a sequence and returns a dictionary, gamma[j][t],
which represents the probability that the state at time step t is j, given the sequence of observations
"""
states = self.transition_probs.keys()
gamma = { state: { index : None for index in range(1, len(sequence) + 1 )} for state in states }
for time_step in range(1,len(sequence) + 1):
sum_alpha_x_beta = sum (
[ alpha[time_step][state]*beta[time_step][state] for state in states ]
)
for state in states:
alpha_x_beta = alpha[time_step][state]*beta[time_step][state]
gamma[state][time_step] = alpha_x_beta / sum_alpha_x_beta
return gamma
def log_prob_of_seeing_sequence(sequence, alpha):
"""
Computes log ( P (x1, x2, .... xn) )
"""
prob_of_seeing_sequence = sum([ alpha[len(sequence)][state] * self.transition_probs[state]["STOP"] for state in self.transition_probs])
# Take the logarithm of the probability, and return it
return math.log(prob_of_seeing_sequence)
# Compute alpha and beta
alpha = forward_probabilities(sequence)
beta = backward_probabilities(sequence)
# Use computed values of alpha and beta to compute gamma and zai
gamma = compute_gamma(sequence, alpha, beta)
zai = compute_zai(sequence, alpha, beta)
# Accumulate zai for START to state in time step 1
for state in self.possible_states:
accumulated_start_zai[state] += gamma[state][1]
# Accumulate zai
for state_i in self.possible_states:
for state_j in self.possible_states + ["STOP"]:
accumulated_zai[state_i][state_j] += zai[state_i][state_j]
# Accumulate gamma
for state in self.possible_states:
for index,item in enumerate(sequence):
accumulated_gamma[state][item] += gamma[state][index+1]
return log_prob_of_seeing_sequence(sequence,alpha)
def transition_probs_df(self):
"""
Returns transition probabilities as a dataframe
"""
i = []
j = []
prob = []
for state in self.pi:
i.append("START")
j.append(state)
prob.append(self.pi[state])
for state_i in self.transition_probs:
for state_j in self.transition_probs[state_i]:
i.append(state_i)
j.append(state_j)
prob.append(self.transition_probs[state_i][state_j])
return pd.DataFrame({"i":i, "j": j, "prob":prob})
def emission_probs_df(self):
"""
Returns emission probabilities as a dataframe
"""
i = []
emission = []
prob = []
for state in self.emission_probs:
for em in self.emission_probs[state]:
i.append(state)
emission.append(em)
prob.append(self.emission_probs[state][em])
return pd.DataFrame({"i":i, "emission": emission, "prob":prob})
# Question 2a (Naive Approach)
@classmethod
def compute_output_probs(cls, in_train_filename, output_probs_filename, delta=0.1):
"""
Generates the P(x = w | y = j) for all w and j, saving them into a file called naive_output_probs.txt
:param in_train_filename : File path to twitter_train.txt, which contains labelled POS tags for each word in tweets
:param output_probs_filename: File path of naive_output_probs.txt
:param delta: Smoothing parameter, typical values are 0.01, 0.1, 1, or 10
"""
parsed_data = HMM.parse_twitter_training_data(in_train_filename)
tag_dict = {}
for tweet in parsed_data: # Go through each tweet
for word_and_tag in tweet: # Go through each word in the tweet
tag = word_and_tag[1]
word = word_and_tag[0]
# Add the word if it doesn't exist in the dict yet
if tag not in tag_dict:
tag_dict[tag] = {"occurences": 1, "words": {word: 1}}
else:
tag_dict[tag]["occurences"] += 1
# Add the tag if it doesn't exist in the dict yet
if word not in tag_dict[tag]["words"]:
tag_dict[tag]["words"][word] = 1
else:
tag_dict[tag]["words"][word] += 1
# Calculate unique number of words
num_words = len({word for tag in tag_dict.values() for word in tag["words"]})
# Add probabilities for unseen data and apply smoothing
for tag in tag_dict:
count_y_is_j = tag_dict[tag]["occurences"]
tag_dict[tag]["words"]["NONE"] = delta / (count_y_is_j + delta * (num_words + 1))
for word in tag_dict[tag]["words"]:
count_y_is_j_and_x_is_w = tag_dict[tag]["words"][word]
# Compute the MLE estimate for this tag, for this word
b_j_w = (count_y_is_j_and_x_is_w + delta) / (
count_y_is_j + delta * (num_words + 1)
)
tag_dict[tag]["words"][word] = b_j_w
# Create output_dict, where tag, word and prob keys will each represent a column in the output dataframe
output_dict = {"tag": [], "word": [], "prob": []}
for tag in tag_dict:
for word in tag_dict[tag]["words"]:
b_j_w = tag_dict[tag]["words"][word]
output_dict["tag"].append(tag)
output_dict["word"].append(word)
output_dict["prob"].append(b_j_w)
# Convert output dict into a dataframe
output_probs = pd.DataFrame.from_dict(output_dict)
# Save dataframe to output prob file
output_probs.to_csv(output_probs_filename, index=False)
@classmethod
def compute_output_probs_v2 (cls, in_train_filename, output_probs_filename):
parsed_data = HMM.parse_twitter_training_data(in_train_filename)
tag_dict = {}
for tweet in parsed_data: # Go through each tweet
for word_and_tag in tweet: # Go through each word in the tweet
tag = word_and_tag[1]
word = word_and_tag[0]
# Add the word if it doesn't exist in the dict yet
if tag not in tag_dict:
tag_dict[tag] = {"occurences": 1, "words": {word: 1}}
else:
tag_dict[tag]["occurences"] += 1
# Add the tag if it doesn't exist in the dict yet
if word not in tag_dict[tag]["words"]:
tag_dict[tag]["words"][word] = 1
else:
tag_dict[tag]["words"][word] += 1
def get_number_of_words_seen_r_times_in_tag(tag,r):
words = tag_dict[tag]["words"]
number = 0
for word in words:
count = words[word]
if count == r:
number+=1
return number
# Compute r*, which is the good-turing smoothed value of r (ie. counts)
# Number of words seen r times is nr
def compute_r_star(r, number_of_words_seen_r_plus_one_times,number_of_words_seen_r_times):
return (r+1) * ( number_of_words_seen_r_plus_one_times/number_of_words_seen_r_times )
import copy
smoothed_tag_dict = copy.deepcopy(tag_dict)
for tag in tag_dict:
tag_occurences = tag_dict[tag]["occurences"]
words = tag_dict[tag]["words"]
# All other n grams
total_n_grams = 0
for tag2 in tag_dict:
if tag2 != tag:
total_n_grams += sum(tag_dict[tag2]["words"].values())
# Smooth for unseen words using good-turing smoothing
smoothed_tag_dict[tag]["words"]["NONE"] = get_number_of_words_seen_r_times_in_tag(tag,1)/total_n_grams
sum_of_katz_counts = sum([count for count in smoothed_tag_dict[tag]["words"].values() ])
smoothed_tag_dict[tag]["occurences"] = sum_of_katz_counts
# Create output_dict, where tag, word and prob keys will each represent a column in the output dataframe
output_dict = {"tag": [], "word": [], "prob": []}
for tag in smoothed_tag_dict:
count_y_is_j = smoothed_tag_dict[tag]["occurences"]
for word in smoothed_tag_dict[tag]["words"]:
count_y_is_j_and_x_is_w = smoothed_tag_dict[tag]["words"][word]
# Compute the MLE estimate for this tag, for this word
b_j_w = count_y_is_j_and_x_is_w /count_y_is_j
output_dict["tag"].append(tag)
output_dict["word"].append(word)
output_dict["prob"].append(b_j_w)
# Convert output dict into a dataframe
output_probs = pd.DataFrame.from_dict(output_dict)
# Save dataframe to output prob file
output_probs.to_csv(output_probs_filename, index=False)
@classmethod
def compute_transition_probs(cls, in_train_filename, trans_probs_filename, in_tags_filename, sigma):
"""
Generates the P(yt = j | yt-1 = i) for all w and j, saving them into a file called trans_probs.txt
:param in_train_filename : File path to twitter_train.txt, which contains labelled POS tags for each word in tweets
:param trans_probs_filename: File path of trans_probs.txt
"""
# Aggregate all occurences in a df first with duplicates
parsed_data = HMM.parse_twitter_training_data(in_train_filename)
transition_df = pd.DataFrame(columns = ["Prior", "Next"])
for tweet in parsed_data: # Go through each tweet
lenTweet = len(tweet) - 1 #Start counting from 0
for index in range(len(tweet)): # Go through each word in the tweet
tag = tweet[index][1]
# START
if index == 0:
tempDF = pd.DataFrame.from_dict({"Prior": ["START"], "Next": [tag]})
transition_df = transition_df.append(tempDF, ignore_index=True)
# END
elif index == lenTweet:
tempDF = pd.DataFrame.from_dict({"Prior": [tag], "Next": ["END"]})
transition_df = transition_df.append(tempDF, ignore_index=True)
# Other Cases
else:
prev_tag = tweet[index-1][1]
tempDF = pd.DataFrame.from_dict({"Prior": [prev_tag], "Next": [tag]})
transition_df = transition_df.append(tempDF, ignore_index=True)
# Sum up all occurences
# Prior + Next
transition_df = transition_df.groupby(['Prior', 'Next']).size().reset_index()
transition_df.columns = ["Prior", "Next", "Count Tags"]
piror_unique_tags = transition_df["Prior"].unique()
# Prior Only
prior_df = transition_df.drop(columns = "Next")
prior_df = transition_df.groupby(['Prior']).size().to_frame('Count Tag')
prior_df = prior_df.reset_index()
# Get count of all possible unique tags
parsed_tag_data = cls.parse_tags_data(in_tags_filename)
parsed_tag_data_Start = parsed_tag_data + ["START"]
parsed_tag_data_End = parsed_tag_data + ["END"]
allUniqueTags = len(parsed_tag_data) + 2 # For END and START
# Get probability calculated
denominator = sigma*(allUniqueTags+1)
transition_df_prob = transition_df.copy()
transition_df_prob = transition_df.groupby(["Prior", "Next"])['Count Tags'].sum().rename("prob")
transition_df_prob = (transition_df_prob + sigma) / (transition_df_prob.groupby(level=0).sum() + denominator)
transition_df_prob.columns = ["Prior", "Next", "prob"]
transition_df_prob = transition_df_prob.reset_index()
# Find out what tags are already in transition_df and what not
for i in parsed_tag_data_Start:
# If tag does not exist in training data
if i not in piror_unique_tags:
# Create corresponding prob value in tag
for j in parsed_tag_data_End:
# transition prob = 0 as tag did not appear in training data
# count of prior tag = 0 as tag did not appear in training data
tag_prob = sigma / denominator
corres_prob_df = pd.DataFrame.from_dict({"Prior": [i], "Next": [j], "prob": [tag_prob]})
transition_df_prob = transition_df_prob.append(corres_prob_df, ignore_index = True)
# Get unique Next tags in each Prior tag
next_unique_tags = transition_df[transition_df["Prior"] == i]["Next"].unique()
if i != "START":
parsed_data_tag_enu = parsed_tag_data_End
elif i == "START":
parsed_data_tag_enu = parsed_tag_data
for j in parsed_data_tag_enu:
# If tag does not exist in Next tag
if j not in next_unique_tags:
# Get number of times tag Prior occured
prior_occured = prior_df[prior_df["Prior"] == i].iloc[0]["Count Tag"]
# transition prob = 0 as tag did not appear in training data
# count of prior tag = prior_occured
tag_prob = sigma / (prior_occured + denominator)
corres_prob_df = pd.DataFrame.from_dict({"Prior": [i], "Next": [j], "prob": [tag_prob]})
transition_df_prob = transition_df_prob.append(corres_prob_df, ignore_index = True)
transition_df_prob.to_csv(trans_probs_filename, index=None)
return transition_df_prob
@classmethod
def parse_tags_data(cls, in_tags_filename):
"""
:param in_tags_filename : File name of twitter_tags.txt, which contains all possible tags
:returns parsed_data: A list of all possible tags
"""
parsed_data = []
with open(in_tags_filename, "r") as f:
for line in f:
data = line[:-1] # Remove the newline character
parsed_data += [data]
return parsed_data
@classmethod
def parse_twitter_training_data(cls, in_train_filename):
"""
:param in_train_filename : File name of twitter_train.txt, which contains labelled POS tags for each word in tweets
:returns parsed_data: A list of lists containing each of the tweets and their respective words and tags
"""
parsed_data = []
current_tweet = []
with open(in_train_filename, "r") as f:
for line in f:
if line == "\n": # Start a new tweet if it's a blank line
parsed_data.append(current_tweet)
current_tweet = []
else: # Append data to current tweet
data = line.split("\t") #Split into word and tag
data[1] = data[1][:-1] # Remove the newline character
current_tweet.append(tuple(data))
return parsed_data
@classmethod
def parse_data_no_tag(cls, data_no_tag):
"""
:param data_no_tag : File name of data without tags, with each sequence seperated by spaces
:returns parsed_data: A list of lists containing each of the sequences read from the file
"""
parsed_data = []
current_sequence = []
with open(data_no_tag, "r") as f:
for line in f:
if line == "\n": # Start a new sequence if it's a blank line
parsed_data.append(current_sequence)
current_sequence = []
else: # Append data to current sequence
current_sequence.append(line[:-1])
return parsed_data
# Uncomment to generate naive_output_probs.txt
# in_train_filename = f"{ddir}/twitter_train.txt"
# naive_output_probs_filename = f"{ddir}/naive_output_probs.txt"
# HMM.compute_output_probs(in_train_filename, naive_output_probs_filename)
# Uncomment to generate viterbi2 output probs
# in_train_filename = f"{ddir}/twitter_train.txt"
# output_probs_filename2 = f"{ddir}/output_probs2.txt"
# HMM.compute_output_probs_v2(in_train_filename,output_probs_filename2)
#%%
class Viterbi:
@classmethod
def compute_Prob_MLE(cls, in_train_filename, output_probs_filename, in_tags_filename, sigma):
"""
For Q4 Part (a)
Reuses Question 2 output probability function
Transition Probability computed using freshly written code
"""
sigma = 0.01
# Reusing Question 2 function to compute output probability
HMM.compute_output_probs(in_train_filename, output_probs_filename, delta=0.1)
HMM.compute_transition_probs(in_train_filename, trans_probs_filename, in_tags_filename, sigma)
@classmethod
def dataframe_to_dict(cls, df, col1_name, col2_name):
"""
Converts transition prob to a nested dict
"""
from collections import defaultdict
d = defaultdict(dict)
for i, row in df.iterrows():
d[str(row[col1_name])][str(row[col2_name])] = row.drop([col1_name, col2_name]).to_dict()
dict_df = dict(d)
return dict_df
@classmethod
def run_viterbi(cls,states,trans_prob,output_prob,sequence):
"""
Given a sequence, the possible states, trans probs, and output probs, predicts tags for the sequence
"""
def init_zeroes(states, trans_prob, output_prob, sequence):
"""
Initiates Pi and Backpointer
"""
# get START probabilities
start_prob = trans_prob["START"]
# Define first word
firstWord = sequence[0]
# Define statistics for pi and backpointer
nLength = len(sequence)
nState = len(states)
# Define pi and Backpointer
# Uses sequence + 1 length to account for END state
pi = np.zeros(shape=(nLength, nState))
backpointer = np.zeros(shape=(nLength, nState))
# Iterate through states
for i, state in enumerate(states):
# get START -> state probability
state_prob = start_prob[state]
ao_v = state_prob["prob"]
# get state -> output probability given word
## Check if word exists in output probability
if firstWord in output_prob:
result_dict = output_prob[firstWord]
else:
result_dict = output_prob["NONE"]
if state in result_dict:
bv_x1 = result_dict[state]['prob']
else:
result_dict = output_prob["NONE"]
bv_x1 = result_dict[state]['prob']
# Calculate Prob
prob = ao_v*bv_x1
# Store in pi
pi[0][i] = prob
return [pi, backpointer]
def compute_viterbi(states, trans_prob, output_prob, sequence, pi, backpointer):
"""
Does the actual viterbi algorithm
"""
def find_max(trans_prob, state, states, index, stateIndex, bv_xk, pi):
"""
Finds arg max and max for each individual aij and pi[k]
"""
# retrieve pi values
pi_kminus1 = pi[index - 1]
# set temp holder for results
argMax = -1
maxVal = -1
# enumerate for u
for priorIndex, prior in enumerate(states):
# get prior probabilities
prior_prob = trans_prob[prior]
# get prior -> state probability
state_prob = prior_prob[state]
au_v = state_prob["prob"]
# get previous pi
pi_kminus1_prior = pi_kminus1[priorIndex]
# calculate result
piResult = pi_kminus1_prior*au_v*bv_xk
if piResult > maxVal:
maxVal = piResult
argMax = priorIndex
return [maxVal, argMax]
lastIndex = len(sequence) - 1
for index, word in enumerate(sequence):
## Check if word exists in output probability
if word in output_prob:
result_dict = output_prob[word]
else:
result_dict = output_prob["NONE"]
# START is covered in zero states
if index != 0:
for stateIndex, state in enumerate(states):
# Check if state exists in word dict
if state in result_dict:
bv_xk = result_dict[state]['prob']
else:
result_dict_else = output_prob["NONE"]
bv_xk = result_dict_else[state]['prob']
# finding max and argmax
max_ArgMax_result = find_max(trans_prob, state, states, index, stateIndex, bv_xk, pi)
pi[index][stateIndex] = max_ArgMax_result[0]
backpointer[index][stateIndex] = max_ArgMax_result[1]
# ensure that probability does not go to zero for super long tweets
if all(i <= 0.00001 for i in pi[index]):
pi[index] = [i * 10000 for i in pi[index]]
return [pi, backpointer]
def getBackPointer(pi, backpointer, sequence, states):
"""
Get backpointer results
"""
# Get last state and index
len_of_sequence = len(sequence)
pi_list = pi[len_of_sequence-1]
curr_index = np.argmax(pi_list)
state_result = [states[curr_index]]
path = [curr_index]
prob_path = [pi[len_of_sequence-1][curr_index]]
# access the relevant state
for index in range(len_of_sequence-1, 0, -1):
# Get index
curr_index = int(backpointer[index][curr_index])
# Get state
state_result += [states[curr_index]]
# Get path
path += [curr_index]
# Get prob
prob_path += [pi[len_of_sequence-1][curr_index]]
# reverse to get actual result
list.reverse(state_result)
list.reverse(path)
list.reverse(prob_path)
return [state_result, path, prob_path]
# Initialise pi and backpointer, and compute results for START
init_pi, init_backpointer = init_zeroes(states, trans_prob, output_prob, sequence)
# Compute viterbi for the remaining sequence
pi, backpointer = compute_viterbi(states, trans_prob, output_prob, sequence, init_pi, init_backpointer)
# get the backpointer results, which is a tuple of 3 items: the state_result, the path, and the prob_path
backpointer_result = getBackPointer(pi, backpointer, sequence, states)
return backpointer_result
#%%
# Implement the six functions below
def naive_predict(in_output_probs_filename, in_test_filename, out_prediction_filename):
# Read output probs from file
output_probs = pd.read_csv(in_output_probs_filename)
# Get all bj(w) assuming that count(y=j -> x=w) is 0 (ie there is no word w that exists with tag j in the training data)
# We consider these the baseline probabilities, and we will update them if training data exists (ie. if there exists a word x that was tagged as tag j in the training data)
# These "baseline" probabilities were stored as word == None in the output probs file, and the line below loads them as such
baseline_probs = output_probs[output_probs["word"] == "NONE"].drop("word",axis=1).set_index("tag")
# Read and parse test data from file
test_data = HMM.parse_data_no_tag(in_test_filename)
with open(out_prediction_filename, "w") as f:
for tweet in test_data:
for word in tweet:
prob_for_each_tag = baseline_probs.copy()
# Get output probs based on training data
training_data_output_probs = output_probs[output_probs.word == word].set_index("tag")
prob_for_each_tag.update(training_data_output_probs)
# Get the tag that gives us the maximum probability for this word
max_tag = prob_for_each_tag.idxmax().prob
f.write(max_tag + "\n")
f.write("\n")
#%%
def naive_predict2(
in_output_probs_filename,
in_train_filename,
in_test_filename,
out_prediction_filename,
):
"""
P ( y=j | x=w ) = P( x=w | y=j ) * P( y=j ) / P( x=w )
However, since we are only finding argmax, we can ignore the P( x=w ) term (the denominator)
"""
# Read output probs from file
output_probs = pd.read_csv(in_output_probs_filename)
# Calculate number of unique words in output_probs
num_words = len(output_probs.word.unique())
# Get bj(w) assuming that count(y=j -> x=w) is 0 (ie there is no word w that exists with tag j in the training data)
baseline_probs = output_probs[output_probs.word == "NONE"].drop("word",axis=1).set_index("tag")
# Read and parse test data from file
test_data = HMM.parse_data_no_tag(in_test_filename)
# Generate a dictionary with tags being the keys and P( y=j ) being the values
prob_y = {}
parsed_data = HMM.parse_twitter_training_data(in_train_filename)
for tweet in parsed_data:
for word_and_tag in tweet:
tag = word_and_tag[1]
if tag not in prob_y:
prob_y[tag] = 1
else:
prob_y[tag] += 1
number_of_occurences = sum(prob_y.values())
# Calculate P (y = j)
prob_y = { k:v/number_of_occurences for (k,v) in prob_y.items() }
with open(out_prediction_filename, "w") as f:
for tweet in test_data:
for word in tweet:
prob_for_each_tag = baseline_probs.copy()
# Update the probability of this word being of each tag using the output_probs previously generated
prob_for_each_tag.update(output_probs[output_probs.word == word].set_index("tag"))
# Iterate through the probabilitiy of this word being of a particular tag,
# and update it, changing it to P( x=w | y=j ) * P( y=j )
for index, row in prob_for_each_tag.iterrows():
new_prob = row.prob * prob_y[index]
prob_for_each_tag.loc[index].prob = new_prob
# Get the tag with the max probability, and write it to file
max_tag = prob_for_each_tag.idxmax().prob
f.write(max_tag + "\n")
f.write("\n")
# Uncomment to generate output_probs.txt and trans_probs.txt
# in_train_filename = f"{ddir}/twitter_train.txt"
# output_probs_filename = f"{ddir}/output_probs.txt"
# trans_probs_filename = f"{ddir}/trans_probs.txt"
# in_tags_filename = f"{ddir}/twitter_tags.txt"
# Viterbi.compute_Prob_MLE(in_train_filename, output_probs_filename, in_tags_filename, 0.01)
def viterbi_predict(in_tags_filename, in_trans_probs_filename, in_output_probs_filename, in_test_filename, out_predictions_filename):
# Import the relevant files
output_probability = pd.read_csv(in_output_probs_filename)
transition_probability = pd.read_csv(in_trans_probs_filename)
test_data = HMM.parse_data_no_tag(in_test_filename)
states = HMM.parse_tags_data(in_tags_filename)
# Convert transition and output probs to dict
output_prob = Viterbi.dataframe_to_dict(output_probability,"word","tag")
trans_prob = Viterbi.dataframe_to_dict(transition_probability,"Prior","Next")
# Initialise 3 lists to save the results for each tweet in the test data
state_result = []
path = []
prob_path = []
# iterate through all tweets
for tweet in test_data:
viterbi_predictions = Viterbi.run_viterbi(states,trans_prob,output_prob,tweet)
state_result += viterbi_predictions[0]
path += viterbi_predictions[1]
prob_path += viterbi_predictions[2]
# Write predictions to file
with open(out_predictions_filename, "w") as f:
for prediction in state_result:
f.write(prediction + "\n")
#%%
# Defunct, included here for documentation
class Viterbi2:
def __init__():
pass
@classmethod
def create_n_gram(cls, in_train_filename, in_tags_filename, trans_probs_filename2, sigma):
"""
Generates n-gram given training dataset
"""
# Aggregate all occurences in a df first with duplicates
parsed_data = HMM.parse_twitter_training_data(in_train_filename)
trigram = pd.DataFrame(columns = ["First", "Prior", "Curr"])
for tweet in parsed_data: # Go through each tweet
lenTweet = len(tweet) - 1 #Start counting from 0
if len(tweet) > 3:
for index in range(len(tweet)): # Go through each word in the tweet
tag = tweet[index][1]
# START START ?
if index == 0:
prev_tag = tweet[index-1][1]
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": ["START"], "Curr": [tag]})
trigram = trigram.append(tempDF, ignore_index=True)
# START ? ?
elif index == 1:
prev_tag = tweet[index-1][1]
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": [prev_tag], "Curr": [tag]})
trigram = trigram.append(tempDF, ignore_index=True)
# ? ? END
elif index == lenTweet:
prev_tag = tweet[index-1][1]
tempDF = pd.DataFrame.from_dict({"First": [prev_tag], "Prior": [tag], "Curr": ["END"]})
trigram = trigram.append(tempDF, ignore_index=True)
# ? ? ?
else:
prev_tag = tweet[index-1][1]
first_tag = tweet[index-2][1]
tempDF = pd.DataFrame.from_dict({"First": [first_tag], "Prior": [prev_tag], "Curr": [tag]})
trigram = trigram.append(tempDF, ignore_index=True)
elif len(tweet) == 2:
first_tag = tweet[0][1]
second_tag = tweet[1][1]
# START START First
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": ["START"], "Curr": [first_tag]})
trigram = trigram.append(tempDF, ignore_index=True)
# START First Second
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": [first_tag], "Curr": [second_tag]})
trigram = trigram.append(tempDF, ignore_index=True)
# First Second END
tempDF = pd.DataFrame.from_dict({"First": [first_tag], "Prior": [second_tag], "Curr": ["END"]})
trigram = trigram.append(tempDF, ignore_index=True)
elif len(tweet) == 1:
only_tag = tweet[0][1]
# START START Only
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": ["START"], "Curr": [only_tag]})
trigram = trigram.append(tempDF, ignore_index=True)
# START Only End
tempDF = pd.DataFrame.from_dict({"First": ["START"], "Prior": [only_tag], "Curr": ["END"]})
trigram = trigram.append(tempDF, ignore_index=True)
#Get Trigram Prob
trigram_prob = trigram.copy()
trigram_prob = trigram_prob.groupby(["First", "Prior", "Curr"]).size().reset_index()
trigram_prob.columns = ["First", "Prior", "Curr", "Count Tags_Triple"]
trigram_prior = trigram.copy()
trigram_prior = trigram_prior.groupby(["First", "Prior"]).size().reset_index()
trigram_prior.columns = ["First", "Prior", "Count Tags_Left"]
trigram_merged = pd.merge(trigram_prob, trigram_prior)
# Apply Laplace Smoothing
parsed_tag_data = HMM.parse_tags_data(in_tags_filename)
allUniqueTags = len(parsed_tag_data) + 1 # For START
denominator = sigma*(allUniqueTags+1)
trigram_merged["prob"] = (trigram_merged["Count Tags_Triple"] + sigma) / (trigram_merged["Count Tags_Left"] + denominator)
trigram_merged = trigram_merged.drop(columns = ["Count Tags_Triple", "Count Tags_Left"])
# Find out what tags are already in trigram and what not
parsed_tag_data_Start = parsed_tag_data + ["START"]
parsed_tag_data_End = parsed_tag_data + ["END"]
first_unique_tags = trigram_merged["First"].unique()
tag_prob = sigma / denominator
# Iterate for First Tag