-
Notifications
You must be signed in to change notification settings - Fork 1
/
layers.py
185 lines (153 loc) · 7.45 KB
/
layers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : models_gan.py
# Author : Yuanfei Wang <yuanfei_wang@pku.edu.cn>
# Date : 05.22.2022
# Last Modified Date: 05.22.2022
# Last Modified By : Yuanfei Wang <yuanfei_wang@pku.edu.cn>
import math
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter
from torch.nn.modules.module import Module
class GraphConvolutionLayer(Module):
def __init__(self, in_features, u, activation, edge_type_num, dropout_rate=0.):
super(GraphConvolutionLayer, self).__init__()
self.edge_type_num = edge_type_num
self.u = u
self.adj_list = nn.ModuleList()
for _ in range(self.edge_type_num):
self.adj_list.append(nn.Linear(in_features, u))
self.linear_2 = nn.Linear(in_features, u)
self.activation = activation
self.dropout = nn.Dropout(dropout_rate)
def forward(self, n_tensor, adj_tensor, h_tensor=None):
if h_tensor is not None:
annotations = torch.cat((n_tensor, h_tensor), -1)
else:
annotations = n_tensor
output = torch.stack([self.adj_list[i](annotations) for i in range(self.edge_type_num)], 1)
output = torch.matmul(adj_tensor, output)
out_sum = torch.sum(output, 1)
out_linear_2 = self.linear_2(annotations)
output = out_sum + out_linear_2
output = self.activation(output) if self.activation is not None else output
output = self.dropout(output)
return output
class MultiGraphConvolutionLayers(Module):
def __init__(self, in_features, units, activation, edge_type_num, with_features=False, f=0, dropout_rate=0.):
super(MultiGraphConvolutionLayers, self).__init__()
self.conv_nets = nn.ModuleList()
self.units = units
in_units = []
if with_features:
for i in range(len(self.units)):
in_units = list([x + in_features for x in self.units])
for u0, u1 in zip([in_features+f] + in_units[:-1], self.units):
self.conv_nets.append(GraphConvolutionLayer(u0, u1, activation, edge_type_num, dropout_rate))
else:
for i in range(len(self.units)):
in_units = list([x + in_features for x in self.units])
for u0, u1 in zip([in_features] + in_units[:-1], self.units):
self.conv_nets.append(GraphConvolutionLayer(u0, u1, activation, edge_type_num, dropout_rate))
def forward(self, n_tensor, adj_tensor, h_tensor=None):
hidden_tensor = h_tensor
for conv_idx in range(len(self.units)):
hidden_tensor = self.conv_nets[conv_idx](n_tensor, adj_tensor, hidden_tensor)
return hidden_tensor
class GraphConvolution(Module):
def __init__(self, in_features, graph_conv_units, edge_type_num, with_features=False, f_dim=0, dropout_rate=0.):
super(GraphConvolution, self).__init__()
self.in_features = in_features
self.graph_conv_units = graph_conv_units
self.activation_f = torch.nn.Tanh()
self.multi_graph_convolution_layers = \
MultiGraphConvolutionLayers(in_features, self.graph_conv_units, self.activation_f, edge_type_num,
with_features, f_dim, dropout_rate)
def forward(self, n_tensor, adj_tensor, h_tensor=None):
output = self.multi_graph_convolution_layers(n_tensor, adj_tensor, h_tensor)
return output
class GraphConvolution2(Module):
def __init__(self, in_features, out_feature_list, b_dim, dropout):
super(GraphConvolution2, self).__init__()
self.in_features = in_features
self.out_feature_list = out_feature_list
self.linear1 = nn.Linear(in_features, out_feature_list[0])
self.linear2 = nn.Linear(out_feature_list[0], out_feature_list[1])
self.dropout = nn.Dropout(dropout)
def forward(self, inputs, adj, activation=None):
# input : 16x9x9
# adj : 16x4x9x9
hidden = torch.stack([self.linear1(inputs) for _ in range(adj.size(1))], 1)
hidden = torch.einsum('bijk,bikl->bijl', (adj, hidden))
hidden = torch.sum(hidden, 1) + self.linear1(inputs)
hidden = activation(hidden) if activation is not None else hidden
hidden = self.dropout(hidden)
output = torch.stack([self.linear2(hidden) for _ in range(adj.size(1))], 1)
output = torch.einsum('bijk,bikl->bijl', (adj, output))
output = torch.sum(output, 1) + self.linear2(hidden)
output = activation(output) if activation is not None else output
output = self.dropout(output)
return output
class GraphAggregation(Module):
def __init__(self, in_features, aux_units, activation, with_features=False, f_dim=0,
dropout_rate=0.):
super(GraphAggregation, self).__init__()
self.with_features = with_features
self.activation = activation
if self.with_features:
self.i = nn.Sequential(nn.Linear(in_features+f_dim, aux_units),
nn.Sigmoid())
j_layers = [nn.Linear(in_features+f_dim, aux_units)]
if self.activation is not None:
j_layers.append(self.activation)
self.j = nn.Sequential(*j_layers)
else:
self.i = nn.Sequential(nn.Linear(in_features, aux_units),
nn.Sigmoid())
j_layers = [nn.Linear(in_features, aux_units)]
if self.activation is not None:
j_layers.append(self.activation)
self.j = nn.Sequential(*j_layers)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, n_tensor, out_tensor, h_tensor=None):
if h_tensor is not None:
annotations = torch.cat((out_tensor, h_tensor, n_tensor), -1)
else:
annotations = torch.cat((out_tensor, n_tensor), -1)
# The i here seems to be an attention.
i = self.i(annotations)
j = self.j(annotations)
output = torch.sum(torch.mul(i, j), 1)
if self.activation is not None:
output = self.activation(output)
output = self.dropout(output)
return output
class GraphAggregation2(Module):
def __init__(self, in_features, out_features, b_dim, dropout):
super(GraphAggregation2, self).__init__()
self.sigmoid_linear = nn.Sequential(nn.Linear(in_features+b_dim, out_features),
nn.Sigmoid())
self.tanh_linear = nn.Sequential(nn.Linear(in_features+b_dim, out_features),
nn.Tanh())
self.dropout = nn.Dropout(dropout)
def forward(self, inputs, activation):
i = self.sigmoid_linear(inputs)
j = self.tanh_linear(inputs)
output = torch.sum(torch.mul(i, j), 1)
output = activation(output) if activation is not None else output
output = self.dropout(output)
return output
class MultiDenseLayer(Module):
def __init__(self, aux_unit, linear_units, activation=None, dropout_rate=0.):
super(MultiDenseLayer, self).__init__()
layers = []
for c0, c1 in zip([aux_unit] + linear_units[:-1], linear_units):
layers.append(nn.Linear(c0, c1))
layers.append(nn.Dropout(dropout_rate))
if activation is not None:
layers.append(activation)
self.linear_layer = nn.Sequential(*layers)
def forward(self, inputs):
h = self.linear_layer(inputs)
return h