-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtest.py
134 lines (111 loc) · 5.06 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import logging
from functools import partial
import torch
import torch_geometric
from datasets.PowerFlowData import PowerFlowData, denormalize
from networks.MPN import MPN, MPN_simplenet, SkipMPN, MaskEmbdMPN, MultiConvNet, MultiMPN, MaskEmbdMultiMPN
from utils.evaluation import load_model
from torch_geometric.loader import DataLoader
from utils.evaluation import evaluate_epoch, evaluate_epoch_v2
from utils.argument_parser import argument_parser
from utils.custom_loss_functions import Masked_L2_loss, PowerImbalance, MixedMSEPoweImbalance, MaskedL2V2, MaskedL1
logger = logging.getLogger(__name__)
LOG_DIR = 'logs'
SAVE_DIR = 'models'
@torch.no_grad()
def main():
run_id = '20240503-29'
# logging.basicConfig(filename=f'test_{run_id}.log', level=100)
models = {
'MPN': MPN,
'MPN_simplenet': MPN_simplenet,
'SkipMPN': SkipMPN,
'MaskEmbdMPN': MaskEmbdMPN,
'MultiConvNet': MultiConvNet,
'MultiMPN': MultiMPN,
'MaskEmbdMultiMPN': MaskEmbdMultiMPN
}
args = argument_parser()
batch_size = args.batch_size
grid_case = args.case
data_dir = args.data_dir
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data_param_path = os.path.join(data_dir, 'params', f'data_params_{run_id}.pt')
data_param = torch.load(data_param_path, map_location='cpu')
xymean, xystd = data_param['xymean'], data_param['xystd']
edgemean, edgestd = data_param['edgemean'], data_param['edgestd']
testset = PowerFlowData(root=data_dir, case=grid_case,
split=[.5, .2, .3], task='test',
xymean=xymean, xystd=xystd, edgemean=edgemean, edgestd=edgestd)
print('data value range')
print(f'{testset.data.y.shape}')
_y = testset.data.y * xystd + xymean
is_slack = testset.data.bus_type == 0
is_pv = testset.data.bus_type == 1
is_pq = testset.data.bus_type == 2
_std = lambda x: ((x-x.mean()).square().sum()/x.numel()).sqrt().item()
_l1 = lambda x: ((x-x.mean()).abs().sum()/x.numel()).item()
_v = _y[is_pq,0]
_a = _y[torch.logical_or(is_pv, is_pq),1]
_p = _y[is_slack,2]
_q = _y[torch.logical_or(is_slack, is_pv),3]
print(f'v: {_v.min().item():.4f}, {_v.max().item():.4f}, STD {_std(_v):.4f}, L1 {_l1(_v):.4f}')
print(f'a: {_a.min().item():.4f}, {_a.max().item():.4f}, STD {_std(_a):.4f}, L1 {_l1(_a):.4f}')
print(f'p: {_p.min().item():.4f}, {_p.max().item():.4f}, STD {_std(_p):.4f}, L1 {_l1(_p):.4f}')
print(f'q: {_q.min().item():.4f}, {_q.max().item():.4f}, STD {_std(_q):.4f}, L1 {_l1(_q):.4f}')
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False)
_sample = testset[0]
print(f'mean of vm,va,p,q:\t{xymean}')
# print(f'std of vm,va,p,q:\t{xystd}')
print(f'#slack:{(_sample.bus_type==0).sum()},\t#pv:{(_sample.bus_type==1).sum()},\t#pq:{(_sample.bus_type==2).sum()}')
pwr_imb_loss = PowerImbalance(*testset.get_data_means_stds()).to(device)
mse_loss = torch.nn.MSELoss(reduction='mean').to(device)
masked_l2 = Masked_L2_loss(regularize=False).to(device)
all_losses = {
'PowerImbalance': pwr_imb_loss,
'Masked_L2_loss': masked_l2,
'MSE': mse_loss,
}
# Network Parameters
nfeature_dim = args.nfeature_dim
efeature_dim = args.efeature_dim
hidden_dim = args.hidden_dim
output_dim = args.output_dim
n_gnn_layers = args.n_gnn_layers
conv_K = args.K
dropout_rate = args.dropout_rate
model = models[args.model]
node_in_dim, node_out_dim, edge_dim = testset.get_data_dimensions()
model = model(
nfeature_dim=node_in_dim,
efeature_dim=edge_dim,
output_dim=node_out_dim,
hidden_dim=hidden_dim,
n_gnn_layers=n_gnn_layers,
K=conv_K,
dropout_rate=dropout_rate,
).to(device) # 40k params
model.eval()
model, _ = load_model(model, run_id, device)
print(f"Model: {args.model}")
print(f"Case: {grid_case}")
_loss = MaskedL2V2()
masked_l2_terms = evaluate_epoch_v2(model, test_loader, _loss, device)
for key, value in masked_l2_terms.items():
print(f"MaskedL2 {key}:\t{value:.6f}")
masked_l2_terms_de = evaluate_epoch_v2(model, test_loader, _loss,
pre_loss_fn=partial(denormalize, mean=xymean, std=xystd), device=device)
for key, value in masked_l2_terms_de.items():
print(f"MaskedL2(denorm) {key}:\t{value:.6f}")
masked_l1_terms_de = evaluate_epoch_v2(model, test_loader, MaskedL1(),
pre_loss_fn=partial(denormalize, mean=xymean, std=xystd), device=device)
for key, value in masked_l1_terms_de.items():
print(f"MaskedL1(denorm) {key}:\t{value:.6f}")
for name, loss_fn in all_losses.items():
test_loss_terms = evaluate_epoch_v2(model, test_loader, loss_fn, device)
print(f"{name}:\t{test_loss_terms['total']:.6f}")
if 'ref' in test_loss_terms:
print(f"{name}(ref):\t{test_loss_terms['ref']:.6f}")
if __name__ == "__main__":
main()