forked from thuiar/MMSA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MTFN.py
217 lines (186 loc) · 9.39 KB
/
MTFN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""
paper: Tensor Fusion Network for Multimodal Sentiment Analysis
ref: https://github.com/A2Zadeh/TensorFusionNetwork
"""
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.nn.parameter import Parameter
from torch.nn.init import xavier_uniform, xavier_normal, orthogonal
__all__ = ['MTFN']
class SubNet(nn.Module):
'''
The subnetwork that is used in TFN for video and audio in the pre-fusion stage
'''
def __init__(self, in_size, hidden_size, dropout):
'''
Args:
in_size: input dimension
hidden_size: hidden layer dimension
dropout: dropout probability
Output:
(return value in forward) a tensor of shape (batch_size, hidden_size)
'''
super(SubNet, self).__init__()
self.norm = nn.BatchNorm1d(in_size)
self.drop = nn.Dropout(p=dropout)
self.linear_1 = nn.Linear(in_size, hidden_size)
self.linear_2 = nn.Linear(hidden_size, hidden_size)
self.linear_3 = nn.Linear(hidden_size, hidden_size)
def forward(self, x):
'''
Args:
x: tensor of shape (batch_size, in_size)
'''
normed = self.norm(x)
dropped = self.drop(normed)
y_1 = F.relu(self.linear_1(dropped))
y_2 = F.relu(self.linear_2(y_1))
y_3 = F.relu(self.linear_3(y_2))
return y_3
class TextSubNet(nn.Module):
'''
The LSTM-based subnetwork that is used in TFN for text
'''
def __init__(self, in_size, hidden_size, out_size, num_layers=1, dropout=0.2, bidirectional=False):
'''
Args:
in_size: input dimension
hidden_size: hidden layer dimension
num_layers: specify the number of layers of LSTMs.
dropout: dropout probability
bidirectional: specify usage of bidirectional LSTM
Output:
(return value in forward) a tensor of shape (batch_size, out_size)
'''
super(TextSubNet, self).__init__()
if num_layers == 1:
dropout = 0.0
self.rnn = nn.LSTM(in_size, hidden_size, num_layers=num_layers, dropout=dropout, bidirectional=bidirectional, batch_first=True)
self.dropout = nn.Dropout(dropout)
self.linear_1 = nn.Linear(hidden_size, out_size)
def forward(self, x):
'''
Args:
x: tensor of shape (batch_size, sequence_len, in_size)
'''
_, final_states = self.rnn(x)
h = self.dropout(final_states[0].squeeze())
y_1 = self.linear_1(h)
return y_1
class MTFN(nn.Module):
'''
Implements the Tensor Fusion Networks for multimodal sentiment analysis as is described in:
Zadeh, Amir, et al. "Tensor fusion network for multimodal sentiment analysis." EMNLP 2017 Oral.
'''
def __init__(self, args):
'''
Args:
input_dims - a length-3 tuple, contains (audio_dim, video_dim, text_dim)
hidden_dims - another length-3 tuple, similar to input_dims
text_out - int, specifying the resulting dimensions of the text subnetwork
dropouts - a length-4 tuple, contains (audio_dropout, video_dropout, text_dropout, post_fusion_dropout)
post_fusion_dim - int, specifying the size of the sub-networks after tensorfusion
Output:
(return value in forward) a scalar value between -3 and 3
'''
super(MTFN, self).__init__()
# dimensions are specified in the order of audio, video and text
self.text_in, self.audio_in, self.video_in = args.feature_dims
self.text_hidden, self.audio_hidden, self.video_hidden = args.hidden_dims
self.text_out= args.text_out
self.audio_prob, self.video_prob, self.text_prob = args.dropouts
self.post_text_prob, self.post_audio_prob, self.post_video_prob, self.post_fusion_prob = args.post_dropouts
self.post_fusion_dim = args.post_fusion_dim
self.post_text_dim = args.post_text_dim
self.post_audio_dim = args.post_audio_dim
self.post_video_dim = args.post_video_dim
# define the pre-fusion subnetworks
self.audio_subnet = SubNet(self.audio_in, self.audio_hidden, self.audio_prob)
self.video_subnet = SubNet(self.video_in, self.video_hidden, self.video_prob)
self.text_subnet = TextSubNet(self.text_in, self.text_hidden, self.text_out, dropout=self.text_prob)
# define the post_fusion layers
self.post_fusion_dropout = nn.Dropout(p=self.post_fusion_prob)
self.post_fusion_layer_1 = nn.Linear((self.text_out + 1) * (self.video_hidden + 1) * (self.audio_hidden + 1), self.post_fusion_dim)
self.post_fusion_layer_2 = nn.Linear(self.post_fusion_dim, self.post_fusion_dim)
self.post_fusion_layer_3 = nn.Linear(self.post_fusion_dim, 1)
# define the classify layer for text
self.post_text_dropout = nn.Dropout(p=self.post_text_prob)
self.post_text_layer_1 = nn.Linear(self.text_out, self.post_text_dim)
self.post_text_layer_2 = nn.Linear(self.post_text_dim, self.post_text_dim)
self.post_text_layer_3 = nn.Linear(self.post_text_dim, 1)
# define the classify layer for audio
self.post_audio_dropout = nn.Dropout(p=self.post_audio_prob)
self.post_audio_layer_1 = nn.Linear(self.audio_hidden, self.post_audio_dim)
self.post_audio_layer_2 = nn.Linear(self.post_audio_dim, self.post_audio_dim)
self.post_audio_layer_3 = nn.Linear(self.post_audio_dim, 1)
# define the classify layer for video
self.post_video_dropout = nn.Dropout(p=self.post_video_prob)
self.post_video_layer_1 = nn.Linear(self.video_hidden, self.post_video_dim)
self.post_video_layer_2 = nn.Linear(self.post_video_dim, self.post_video_dim)
self.post_video_layer_3 = nn.Linear(self.post_video_dim, 1)
# in TFN we are doing a regression with constrained output range: (-3, 3), hence we'll apply sigmoid to output
# shrink it to (0, 1), and scale\shift it back to range (-3, 3)
self.output_range = Parameter(torch.FloatTensor([6]), requires_grad=False)
self.output_shift = Parameter(torch.FloatTensor([-3]), requires_grad=False)
def forward(self, text_x, audio_x, video_x):
'''
Args:
audio_x: tensor of shape (batch_size, audio_in)
video_x: tensor of shape (batch_size, video_in)
text_x: tensor of shape (batch_size, sequence_len, text_in)
'''
audio_x = audio_x.squeeze(1)
video_x = video_x.squeeze(1)
audio_h = self.audio_subnet(audio_x)
video_h = self.video_subnet(video_x)
text_h = self.text_subnet(text_x)
# text
x_t = self.post_text_dropout(text_h)
x_t = F.relu(self.post_text_layer_1(x_t), inplace=True)
x_t = F.relu(self.post_text_layer_2(x_t), inplace=True)
output_text = self.post_text_layer_3(x_t)
# audio
x_a = self.post_audio_dropout(audio_h)
x_a = F.relu(self.post_audio_layer_1(x_a), inplace=True)
x_a = F.relu(self.post_audio_layer_2(x_a), inplace=True)
output_audio = self.post_audio_layer_3(x_a)
# video
x_v = self.post_video_dropout(video_h)
x_v = F.relu(self.post_video_layer_1(video_h), inplace=True)
x_v = F.relu(self.post_video_layer_2(x_v), inplace=True)
output_video = self.post_video_layer_3(x_v)
batch_size = audio_h.data.shape[0]
# next we perform "tensor fusion", which is essentially appending 1s to the tensors and take Kronecker product
add_one = torch.ones(size=[batch_size, 1], requires_grad=False).type_as(audio_h).to(text_x.device)
_audio_h = torch.cat((add_one, audio_h), dim=1)
_video_h = torch.cat((add_one, video_h), dim=1)
_text_h = torch.cat((add_one, text_h), dim=1)
# _audio_h has shape (batch_size, audio_in + 1), _video_h has shape (batch_size, _video_in + 1)
# we want to perform outer product between the two batch, hence we unsqueenze them to get
# (batch_size, audio_in + 1, 1) X (batch_size, 1, video_in + 1)
# fusion_tensor will have shape (batch_size, audio_in + 1, video_in + 1)
fusion_tensor = torch.bmm(_audio_h.unsqueeze(2), _video_h.unsqueeze(1))
# next we do kronecker product between fusion_tensor and _text_h. This is even trickier
# we have to reshape the fusion tensor during the computation
# in the end we don't keep the 3-D tensor, instead we flatten it
fusion_tensor = fusion_tensor.view(-1, (self.audio_hidden + 1) * (self.video_hidden + 1), 1)
fusion_tensor = torch.bmm(fusion_tensor, _text_h.unsqueeze(1)).view(batch_size, -1)
post_fusion_dropped = self.post_fusion_dropout(fusion_tensor)
post_fusion_y_1 = F.relu(self.post_fusion_layer_1(post_fusion_dropped), inplace=True)
post_fusion_y_2 = F.relu(self.post_fusion_layer_2(post_fusion_y_1), inplace=True)
post_fusion_y_3 = torch.sigmoid(self.post_fusion_layer_3(post_fusion_y_2))
output_fusion = post_fusion_y_3 * self.output_range + self.output_shift
res = {
'Feature_t': text_h,
'Feature_a': audio_h,
'Feature_v': video_h,
'Feature_f': fusion_tensor,
'M': output_fusion,
'T': output_text,
'A': output_audio,
'V': output_video
}
return res