Source code for core.eval.carla_benchmark_evaluator

import os
import numpy as np
import pandas as pd
from collections import deque
from tqdm import tqdm
from itertools import product
import torch
from typing import Dict, Any, List, Callable, Optional
from tensorboardX import SummaryWriter

from .base_evaluator import BaseEvaluator
from core.data.benchmark import ALL_SUITES
from core.data.benchmark.benchmark_utils import get_suites_list, gather_results, read_pose_txt, get_benchmark_dir
from ding.envs import BaseEnvManager
from ding.torch_utils.data_helper import to_tensor


[docs]class CarlaBenchmarkEvaluator(BaseEvaluator): """ Evaluator used to evaluate a policy with Carla benchmark evaluation suites. It uses several environments in ``EnvManager`` to evaluate policy. For every suites provided by user, evaluator will first find and store all available reset params from Benchmark files and store them in a queue such that each reset param is evaluated once and only once. The evaluation results are stored in a '.csv' file with reward, success and failure status and reset param of the episode. Note: Env manager must run WITHOUT auto reset. :Arguments: - cfg (Dict): Config dict. - env (BaseEnvManager): Env manager used to evaluate. - policy (Any): Policy to evaluate. Must have ``forward`` method. - tb_logger (SummaryWriter, optional): Tensorboard writter to store values in tensorboard. Defaults to None. - exp_name (str, optional): Name of the experiments. Used to build logger. Defaults to 'default_experiment'. - instance_name (str, optional): Name of the evaluator. Used to build logger. Defaults to 'benchmark_evaluator'. :Interfaces: reset, eval, close, should_eval :Properties: - env (BaseEnvManager): Env manager with several environments used to evaluate. - policy (Any): Policy instance to interact with envs. """ config = dict( benchmark_dir=None, # dir path to resume&save eval .csv files result_dir='', # whether to transform obs into tensor manually transform_obs=False, # num of episodes to eval in a suite episodes_per_suite=100, # stop value of success rate stop_rate=1, # whether resume an existing evaluation result in .csv resume=False, # suite name, can be str or list suite='FullTown01-v0', # manually set weathers rather than read from suite weathers=None, seed=0, # whether save as .csv file save_files=True, ) def __init__( self, cfg: Dict, env: BaseEnvManager, policy: Any, tb_logger: Optional['SummaryWriter'] = None, # noqa exp_name: Optional[str] = 'default_experiment', instance_name: Optional[str] = 'benchmark_evaluator', ) -> None: super().__init__(cfg, env, policy, tb_logger=tb_logger, exp_name=exp_name, instance_name=instance_name) self._benchmark_dir = self._cfg.benchmark_dir self._result_dir = self._cfg.result_dir self._transform_obs = self._cfg.transform_obs self._episodes_per_suite = self._cfg.episodes_per_suite self._resume = self._cfg.resume if self._benchmark_dir is None: self._benchmark_dir = get_benchmark_dir() suite = self._cfg.suite self._eval_suite_list = get_suites_list(suite) self._stop_rate = self._cfg.stop_rate self._seed = self._cfg.seed self._weathers = self._cfg.weathers self._save_files = self._cfg.save_files self._last_eval_iter = 0 self._max_success_rate = 0 @property def env(self) -> BaseEnvManager: return self._env_manager @env.setter def env(self, _env_manager: BaseEnvManager) -> None: assert not _env_manager._auto_reset, "auto reset for env manager should be closed!" self._end_flag = False self._env_manager = _env_manager self._env_manager.launch() self._env_num = self._env_manager.env_num
[docs] def close(self) -> None: """ Close the collector and the env manager if not closed. """ if self._close_flag: return self._close_flag = True self._env_manager.close() if self._tb_logger is not None: self._tb_logger.flush() self._tb_logger.close()
[docs] def reset(self) -> None: """ Reset evaluator and policies. """ self._policy.reset([i for i in range(self._env_num)]) self._last_eval_iter = 0 self._max_success_rate = 0
[docs] def should_eval(self, train_iter: int) -> bool: """ Judge if the training iteration is at frequency value to run evaluation. :Arguments: - train_iter (int): Current training iteration :Returns: bool: Whether should run iteration """ if (train_iter - self._last_eval_iter) < self._cfg.eval_freq and train_iter != 0: return False self._last_eval_iter = train_iter return True
[docs] def eval( self, save_ckpt_fn: Callable = None, train_iter: int = -1, envstep: int = -1, policy_kwargs: Optional[Dict] = None, n_episode: Optional[int] = None ) -> float: """ Run evaluation with provided policy arguments. It will evaluate all available episodes of the benchmark suite unless `episode_per_suite` is set in config. :Arguments: - save_ckpt_fn (Callable, optional): Function to save ckpt. Will be called if at best performance. Defaults to None. - train_iter (int, optional): Current training iterations. Defaults to -1. - envstep (int, optional): Current env steps. Defaults to -1. - policy_kwargs (Dict, optional): Additional arguments in policy forward. Defaults to None. - n_episode: (int, optional): Episodes to eval. By default it is set in config. :Returns: Tuple[bool, float]: Whether reach stop value and success rate. """ if policy_kwargs is None: policy_kwargs = dict() if n_episode is None: n_episode = self._episodes_per_suite assert n_episode >= self._env_num, "Episode num must be more than env num!" if self._result_dir != '': os.makedirs(self._result_dir, exist_ok=True) total_time = 0.0 total_episodes = 0 success_episodes = 0 self.reset() for suite in self._eval_suite_list: args, kwargs = ALL_SUITES[suite] assert len(args) == 0 reset_params = kwargs.copy() poses_txt = reset_params.pop('poses_txt') weathers = reset_params.pop('weathers') suite_name = suite + '_seed%d' % self._seed summary_csv = os.path.join(self._result_dir, suite_name + ".csv") if os.path.exists(summary_csv) and self._resume: summary = pd.read_csv(summary_csv) else: summary = pd.DataFrame() if self._weathers is not None: weathers = self._weathers pose_pairs = read_pose_txt(self._benchmark_dir, poses_txt) episode_queue = deque() running_env_params = dict() results = [] running_envs = 0 for episode, (weather, (start, end)) in enumerate(product(weathers, pose_pairs)): if episode >= n_episode: break param = reset_params.copy() param['start'] = start param['end'] = end param['weather'] = weather if self._resume and len(summary) > 0 and ((summary['start'] == start) & (summary['end'] == end) & (summary['weather'] == weather)).any(): self._logger.info( '[EVALUATOR] weather: {}, route: ({}, {}) already exist'.format(weather, start, end) ) continue if running_envs < self._env_num: running_env_params[running_envs] = param running_envs += 1 else: episode_queue.append(param) if not running_env_params: self._logger.info("[EVALUATOR] Nothing to eval.") else: pbar = tqdm(total=len(running_env_params) + len(episode_queue)) for env_id in running_env_params: self._env_manager.seed({env_id: self._seed}) self._env_manager.reset(running_env_params) with self._timer: while True: obs = self._env_manager.ready_obs for key in obs: if key not in running_env_params: obs.pop(key) if not obs: break if self._transform_obs: obs = to_tensor(obs, dtype=torch.float32) policy_output = self._policy.forward(obs, **policy_kwargs) actions = {env_id: output['action'] for env_id, output in policy_output.items()} timesteps = self._env_manager.step(actions) for i, t in timesteps.items(): if t.info.get('abnormal', False): self._policy.reset([i]) self._env_manager.reset(reset_params={i: running_env_params[i]}) continue if t.done: self._policy.reset([i]) result = { 'start': running_env_params[i]['start'], 'end': running_env_params[i]['end'], 'weather': running_env_params[i]['weather'], 'reward': t.info['final_eval_reward'], 'success': t.info['success'], 'collided': t.info['collided'], 'timecost': int(t.info['tick']), } results.append(result) pbar.update(1) if episode_queue: reset_param = episode_queue.pop() self._env_manager.reset({i: reset_param}) running_env_params[i] = reset_param else: running_env_params.pop(i) if self._env_manager.done: break duration = self._timer.value success_num = 0 episode_num = 0 episode_reward = [] envstep_num = 0 for result in results: episode_num += 1 if result['success']: success_num += 1 episode_reward.append(result['reward']) envstep_num += result['timecost'] if self._save_files: results_pd = pd.DataFrame(results) summary = pd.concat([summary, results_pd]) summary.to_csv(summary_csv, index=False) info = { 'suite': suite, 'train_iter': train_iter, 'ckpt_name': 'iteration_{}.pth.tar'.format(train_iter), 'avg_envstep_per_episode': envstep_num / n_episode, 'evaluate_time': duration, 'avg_time_per_episode': duration / n_episode, 'success_rate': 0 if envstep_num == 0 else success_num / episode_num, 'reward_mean': np.mean(episode_reward), 'reward_std': np.std(episode_reward), } if train_iter == -1: info.pop('train_iter') info.pop('ckpt_name') elif self._tb_logger is not None: for k, v in info.items(): if k in ['train_iter', 'ckpt_name', 'suite']: continue if not np.isscalar(v): continue self._tb_logger.add_scalar('{}_{}_iter/'.format(self._instance_name, suite) + k, v, train_iter) self._tb_logger.add_scalar('{}_{}_step/'.format(self._instance_name, suite) + k, v, envstep) self._logger.info(self._logger.get_tabulate_vars_hor(info)) total_episodes += episode_num success_episodes += success_num total_time += duration pbar.close() if self._save_files: results = gather_results(self._result_dir) print(results) success_rate = 0 if total_episodes == 0 else success_episodes / total_episodes if success_rate > self._max_success_rate: if save_ckpt_fn: save_ckpt_fn('ckpt_best.pth.tar') self._max_success_rate = success_rate stop_flag = success_rate > self._stop_rate and train_iter > 0 if stop_flag: self._logger.info( "[EVALUATOR] " + "Current success rate: {} is greater than stop rate: {}".format(success_rate, self._stop_rate) + ", so the training is converged." ) self._logger.info('[EVALUATOR] Total success: {}/{}.'.format(success_episodes, total_episodes)) self._logger.info('[EVALUATOR] Total time: {:.3f} hours.'.format(total_time / 3600.0)) return stop_flag, success_rate