-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_comet_catalyst.py
97 lines (78 loc) · 3.11 KB
/
test_comet_catalyst.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import comet_ml
from comet_ml import API
from torch import nn, optim
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.loggers.comet import CometLogger
from catalyst_pytest_consts import EXPECTED_METRICS
import multiprocessing
class AlwaysEquals(object):
def __eq__(self, other):
return True
def __ne__(self, other):
return False
def train_model(logger):
"""
Initiates and trains the model.
"""
model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.02)
loaders = {
"train": DataLoader(
MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32
),
"valid": DataLoader(
MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32
),
}
runner = dl.SupervisedRunner(
input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
# model training
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
loaders=loaders,
num_epochs=1,
callbacks=[
dl.AccuracyCallback(input_key="logits",
target_key="targets", topk_args=(1, 3, 5)),
dl.PrecisionRecallF1SupportCallback(
input_key="logits", target_key="targets", num_classes=10
),
],
logdir="./logs",
valid_loader="valid",
valid_metric="loss",
minimize_valid_metric=True,
verbose=True,
load_best_on_end=True,
loggers={"comet": logger}
)
def build_result(name):
if 'sys.' not in name:
return {'name': name, 'valueMax': AlwaysEquals(), 'valueMin': AlwaysEquals(), 'valueCurrent': AlwaysEquals(), 'timestampMax': AlwaysEquals(), 'timestampMin': AlwaysEquals(), 'timestampCurrent': AlwaysEquals(), 'stepMax': AlwaysEquals(), 'stepMin': AlwaysEquals(), 'stepCurrent': AlwaysEquals(), 'editable': False}
else:
return {'name': name, 'valueMax': AlwaysEquals(), 'valueMin': AlwaysEquals(), 'valueCurrent': AlwaysEquals(), 'timestampMax': AlwaysEquals(), 'timestampMin': AlwaysEquals(), 'timestampCurrent': AlwaysEquals(), 'editable': False}
def get_system_cpus():
"""
Returns the amount of cpu's of the machine that the test is running on.
"""
number_of_cpus = multiprocessing.cpu_count()
return ['sys.cpu.percent.' + str(number).zfill(2) for number in range(1, number_of_cpus + 1)]
def test_metrics():
logger = CometLogger(project_name='catalyst-pytest', logging_frequency=10)
train_model(logger=logger)
experiment_id = comet_ml.get_global_experiment().id
api = API()
api_experiment = api.get_experiment_by_id(experiment_id)
metrics = api_experiment.get_metrics_summary()
cpus = get_system_cpus()
expected_metric_names = EXPECTED_METRICS + cpus
for item in expected_metric_names:
assert build_result(item) in metrics