-
Notifications
You must be signed in to change notification settings - Fork 0
/
callbacks.py
99 lines (82 loc) · 3.74 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import numpy as np
from typing import Union, List
from stable_baselines3.common.callbacks import EventCallback
from stable_baselines3.common.vec_env import VecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from torch.utils.tensorboard import SummaryWriter
import gymnasium as gym
class EvalSaveCallback(EventCallback):
"""
Callback for evaluating an agent and saving the best and latest models.
:param eval_env: The environment used for evaluation
:param log_dir: Directory where logs and models are saved
:param eval_freq: Frequency (in timesteps) at which the model is evaluated
:param n_eval_episodes: Number of episodes to evaluate the model
:param deterministic: Whether to use deterministic actions during evaluation
:param verbose: Verbosity level (0: no output, 1: info messages, 2: debug messages)
"""
def __init__(
self,
eval_env: Union[gym.Env, VecEnv],
log_dir: str,
eval_freq: int = 10000,
n_eval_episodes: int = 5,
deterministic: bool = True,
verbose: int = 1
):
super().__init__(verbose=verbose)
self.eval_env = eval_env
self.eval_freq = eval_freq
self.n_eval_episodes = n_eval_episodes
self.deterministic = deterministic
self.best_mean_reward = -np.inf
self.log_dir = log_dir
self.checkpoint_dir = os.path.join(log_dir, "checkpoints")
self.best_model_path = os.path.join(log_dir, "best_model.zip")
self.latest_model_path = os.path.join(log_dir, "latest_model.zip")
self.evaluations: List[float] = []
# TensorBoard writer
self.writer = SummaryWriter(log_dir)
# Ensure directories exist
os.makedirs(self.checkpoint_dir, exist_ok=True)
def _on_step(self) -> bool:
# Evaluate the model at specified frequency
if self.n_calls % self.eval_freq == 0:
print("Evaluating model...")
mean_reward, std_reward = self.evaluate_and_log()
# Log results to TensorBoard
self.writer.add_scalar('eval/mean_reward', mean_reward, self.num_timesteps)
self.writer.add_scalar('eval/std_reward', std_reward, self.num_timesteps)
# Save the latest model checkpoint
self.model.save(self.latest_model_path)
if self.verbose >= 1:
print(f"Saved latest model to {self.latest_model_path}")
# If current mean reward is the best we have seen, save the model
if mean_reward > self.best_mean_reward:
self.best_mean_reward = mean_reward
self.model.save(self.best_model_path)
if self.verbose >= 1:
print(f"New best model with mean reward {mean_reward:.2f} saved to {self.best_model_path}")
return True
def evaluate_and_log(self):
"""Evaluate the model and log results."""
episode_rewards, _ = evaluate_policy(
self.model,
self.eval_env,
n_eval_episodes=self.n_eval_episodes,
deterministic=self.deterministic,
return_episode_rewards=True,
)
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)
self.evaluations.append(mean_reward)
# Log results
if self.verbose >= 1:
print(f"Evaluation: Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
# Save evaluation results to a file for further analysis
# np.save(os.path.join(self.log_dir, 'evaluations.npy'), self.evaluations)
return mean_reward, std_reward
def _on_training_end(self) -> None:
"""Close the TensorBoard writer when training ends."""
self.writer.close()