import os
import numpy as np
from collections import defaultdict
import torch
from typing import Dict, Any, List, Optional, Callable, Tuple
from .base_evaluator import BaseEvaluator
from core.data.benchmark import ALL_SUITES
from ding.envs import BaseEnvManager
from ding.torch_utils.data_helper import to_tensor
from ding.utils import build_logger, EasyTimer
[docs]class SerialEvaluator(BaseEvaluator):
"""
Evaluator used to serially evaluate a policy for defined times. It is mainly used when training a policy to get the
evaluator performance frequently and store the best iterations. Different from serial evaluator in `DI-engine`, this
evaluator compares the performance of iterations by the success rate rather than rewards. You can provide a
tensorboard logger to save scalars when training.
Note:
Env manager must run WITH auto reset.
:Arguments:
- cfg (Dict): Config dict.
- env (BaseEnvManager): Env manager used to evaluate.
- policy (Any): Policy to evaluate. Must have ``forward`` method.
- tb_logger (SummaryWriter, optional): Tensorboard writter to store values in tensorboard. Defaults to None.
- exp_name (str, optional): Name of the experiments. Used to build logger. Defaults to 'default_experiment'.
- instance_name (str, optional): [description]. Defaults to 'serial_evaluator'.
:Interfaces: reset, eval, close, should_eval
:Properties:
- env (BaseEnvManager): Env manager with several environments used to evaluate.
- policy (Any): Policy instance to interact with envs.
"""
config = dict(
# whether transform obs into tensro manually
transform_obs=False,
# evaluate every "eval_freq" training iterations.
eval_freq=100,
# evaluate times in each evaluation
n_episode=10,
# stop value of success rate
stop_rate=1,
)
def __init__(
self,
cfg: Dict,
env: BaseEnvManager,
policy: Any,
tb_logger: Optional['SummaryWriter'] = None, # noqa
exp_name: Optional[str] = 'default_experiment',
instance_name: Optional[str] = 'serial_evaluator',
) -> None:
super().__init__(cfg, env, policy, tb_logger=tb_logger, exp_name=exp_name, instance_name=instance_name)
self._transform_obs = self._cfg.transform_obs
self._default_n_episode = self._cfg.n_episode
self._stop_rate = self._cfg.stop_rate
self._last_eval_iter = 0
self._max_success_rate = 0
@property
def env(self) -> BaseEnvManager:
return self._env_manager
@env.setter
def env(self, _env_manager: BaseEnvManager) -> None:
assert _env_manager._auto_reset, "auto reset for env manager should be opened!"
self._end_flag = False
self._env_manager = _env_manager
self._env_manager.launch()
self._env_num = self._env_manager.env_num
[docs] def close(self) -> None:
"""
Close the collector and the env manager if not closed.
"""
if self._close_flag:
return
self._close_flag = True
self._env_manager.close()
if self._tb_logger is not None:
self._tb_logger.flush()
self._tb_logger.close()
[docs] def reset(self) -> None:
"""
Reset evaluator and policies.
"""
self._policy.reset([i for i in range(self._env_num)])
self._last_eval_iter = 0
self._max_success_rate = 0
[docs] def should_eval(self, train_iter: int) -> bool:
"""
Judge if the training iteration is at frequency value to run evaluation.
:Arguments:
- train_iter (int): Current training iteration
:Returns:
bool: Whether should run iteration
"""
if (train_iter - self._last_eval_iter) < self._cfg.eval_freq and train_iter > 0:
return False
self._last_eval_iter = train_iter
return True
[docs] def eval(
self,
save_ckpt_fn: Callable = None,
train_iter: int = -1,
envstep: int = -1,
policy_kwargs: Optional[Dict] = None,
n_episode: Optional[int] = None
) -> Tuple[bool, float]:
"""
Run evaluation with provided policy arguments. It will evaluate all available episodes of the benchmark suite
unless `episode_per_suite` is set in config.
:Arguments:
- save_ckpt_fn (Callable, optional): Function to save ckpt. Will be called if at best performance.
Defaults to None.
- train_iter (int, optional): Current training iterations. Defaults to -1.
- envstep (int, optional): Current env steps. Defaults to -1.
- policy_kwargs (Dict, optional): Additional arguments in policy forward. Defaults to None.
- n_episode: (int, optional): Episodes to eval. By default it is set in config.
:Returns:
Tuple[bool, float]: Whether reach stop value and success rate.
"""
if policy_kwargs is None:
policy_kwargs = dict()
if n_episode is None:
n_episode = self._default_n_episode
assert n_episode is not None, "please indicate eval n_episode"
self._env_manager.reset()
self._policy.reset([i for i in range(self._env_num)])
episode_count = 0
results = defaultdict(list)
with self._timer:
while episode_count < n_episode:
obs = self._env_manager.ready_obs
if self._transform_obs:
obs = to_tensor(obs, dtype=torch.float32)
policy_output = self._policy.forward(obs, **policy_kwargs)
actions = {env_id: output['action'] for env_id, output in policy_output.items()}
timesteps = self._env_manager.step(actions)
for env_id, t in timesteps.items():
if t.info.get('abnormal', False):
self._policy.reset([env_id])
continue
if t.done:
self._policy.reset([env_id])
result = {
'reward': t.info['final_eval_reward'],
'success': t.info['success'],
'step': int(t.info['tick']),
}
episode_count += 1
for k, v in result.items():
results[k].append(v)
self._logger.info(
"[EVALUATOR] env {} finish episode, final reward: {}, current episode: {}".format(
env_id, result['reward'], episode_count
)
)
if self._env_manager.done:
break
duration = self._timer.value
episode_reward = results['reward']
envstep_count = np.sum(results['step'])
success_count = np.sum(results['success'])
success_rate = 0 if episode_count == 0 else success_count / episode_count
info = {
'train_iter': train_iter,
'ckpt_name': 'iteration_{}.pth.tar'.format(train_iter),
'avg_envstep_per_episode': envstep_count / n_episode,
'evaluate_time': duration,
'avg_time_per_episode': duration / n_episode,
'success_rate': success_rate,
'reward_mean': np.mean(episode_reward),
'reward_std': np.std(episode_reward),
}
self._logger.info(self._logger.get_tabulate_vars_hor(info))
if self._tb_logger is not None:
for k, v in info.items():
if k in ['train_iter', 'ckpt_name', 'each_reward']:
continue
if not np.isscalar(v):
continue
self._tb_logger.add_scalar('{}_iter/'.format(self._instance_name) + k, v, train_iter)
self._tb_logger.add_scalar('{}_step/'.format(self._instance_name) + k, v, envstep)
if success_rate > self._max_success_rate:
if save_ckpt_fn:
save_ckpt_fn('ckpt_best.pth.tar')
self._max_success_rate = success_rate
stop_flag = success_rate > self._stop_rate and train_iter > 0
if stop_flag:
self._logger.info(
"[EVALUATOR] " +
"Current success rate: {} is greater than stop rate: {}".format(success_rate, self._stop_rate) +
", so the training is converged."
)
return stop_flag, success_rate