import gym
import copy
import numpy as np
from typing import Any, Dict, Optional
from easydict import EasyDict
from itertools import product
from core.data.benchmark import ALL_SUITES
from core.eval.carla_benchmark_evaluator import get_suites_list, read_pose_txt, get_benchmark_dir
from .base_drive_env import BaseDriveEnv
from ding.utils.default_helper import deep_merge_dicts
from ding.envs.env.base_env import BaseEnvTimestep
from ding.envs.common.env_element import EnvElementInfo
from ding.torch_utils.data_helper import to_ndarray
[docs]class DriveEnvWrapper(gym.Wrapper):
"""
Environment wrapper to make ``gym.Env`` align with DI-engine definitions, so as to use utilities in DI-engine.
It changes ``step``, ``reset`` and ``info`` method of ``gym.Env``, while others are straightly delivered.
:Arguments:
- env (BaseDriveEnv): The environment to be wrapped.
- cfg (Dict): Config dict.
:Interfaces: reset, step, info, render, seed, close
"""
config = dict()
def __init__(self, env: BaseDriveEnv, cfg: Dict = None, **kwargs) -> None:
if cfg is None:
self._cfg = self.__class__.default_config()
elif 'cfg_type' not in cfg:
self._cfg = self.__class__.default_config()
self._cfg = deep_merge_dicts(self._cfg, cfg)
else:
self._cfg = cfg
self.env = env
if not hasattr(self.env, 'reward_space'):
self.reward_space = gym.spaces.Box(low=-float('inf'), high=float('inf'), shape=(1, ))
[docs] def reset(self, *args, **kwargs) -> Any:
"""
Wrapper of ``reset`` method in env. The observations are converted to ``np.ndarray`` and final reward
are recorded.
:Returns:
Any: Observations from environment
"""
obs = self.env.reset(*args, **kwargs)
obs = to_ndarray(obs, dtype=np.float32)
if isinstance(obs, np.ndarray) and len(obs.shape) == 3:
obs = obs.transpose((2, 0, 1))
# elif isinstance(obs, dict):
# if 'birdview' in obs:
# obs['birdview'] = obs['birdview'].transpose((2, 0, 1))
# if 'rgb' in obs:
# obs['rgb'] = obs['rgb'].transpose((2, 0, 1))
self._final_eval_reward = 0.0
return obs
[docs] def step(self, action: Any = None) -> BaseEnvTimestep:
"""
Wrapper of ``step`` method in env. This aims to convert the returns of ``gym.Env`` step method into
that of ``ding.envs.BaseEnv``, from ``(obs, reward, done, info)`` tuple to a ``BaseEnvTimestep``
namedtuple defined in DI-engine. It will also convert actions, observations and reward into
``np.ndarray``, and check legality if action contains control signal.
:Arguments:
- action (Any, optional): Actions sent to env. Defaults to None.
:Returns:
BaseEnvTimestep: DI-engine format of env step returns.
"""
action = to_ndarray(action)
obs, rew, done, info = self.env.step(action)
self._final_eval_reward += rew
obs = to_ndarray(obs, dtype=np.float32)
if isinstance(obs, np.ndarray) and len(obs.shape) == 3:
obs = obs.transpose((2, 0, 1))
# elif isinstance(obs, dict):
# if 'birdview' in obs:
# obs['birdview'] = obs['birdview'].transpose((2, 0, 1))
# if 'rgb' in obs:
# obs['rgb'] = obs['rgb'].transpose((2, 0, 1))
rew = to_ndarray([rew], dtype=np.float32)
if done:
info['final_eval_reward'] = self._final_eval_reward
return BaseEnvTimestep(obs, rew, done, info)
[docs] def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
self.env = gym.wrappers.Monitor(self.env, self._replay_path, video_callable=lambda episode_id: True, force=True)
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(cls.config)
cfg.cfg_type = cls.__name__ + 'Config'
return copy.deepcopy(cfg)
def __repr__(self) -> str:
return repr(self.env)
def render(self):
self.env.render()
[docs]class BenchmarkEnvWrapper(DriveEnvWrapper):
"""
Environment Wrapper for Carla Benchmark suite evaluations. It wraps an environment with Benchmark
suite so that the env will always run with a benchmark suite setting. It has 2 mode to get reset
params in a suite: 'random' will randomly get reset param, 'order' will get all reset params in
order.
:Arguments:
- env (BaseDriveEnv): The environment to be wrapped.
- cfg (Dict): Config dict.
"""
config = dict(
suite='FullTown01-v0',
benchmark_dir=None,
mode='random',
)
def __init__(self, env: BaseDriveEnv, cfg: Dict, **kwargs) -> None:
super().__init__(env, cfg=cfg, **kwargs)
suite = self._cfg.suite
benchmark_dir = self._cfg.benchmark_dir
self._mode = self._cfg.mode
if benchmark_dir is None:
benchmark_dir = get_benchmark_dir()
assert self._mode in ['random', 'order'], self._mode
self._param = dict()
suite_list = get_suites_list(suite)
self._reset_param_list = []
for suite in suite_list:
args, kwargs = ALL_SUITES[suite]
assert len(args) == 0
reset_params = kwargs.copy()
poses_txt = reset_params.pop('poses_txt')
weathers = reset_params.pop('weathers')
pose_pairs = read_pose_txt(benchmark_dir, poses_txt)
for (start, end), weather in product(pose_pairs, weathers):
param = reset_params.copy()
param['start'] = start
param['end'] = end
param['weather'] = weather
param['col_is_failure'] = True
self._reset_param_list.append(param)
self._reset_param_index = 0
[docs] def reset(self, *args, **kwargs) -> Any:
"""
Wrapped ``reset`` method for env. it will ignore all incoming arguments and choose one
from suite reset parameters according to config.
:Returns:
Any: Returns of Env `reset` method.
"""
if self._mode == 'random':
self._param = np.random.choice(self._reset_param_list)
elif self._mode == 'order':
self._param = self._reset_param_list[self._reset_param_index]
self._reset_param_index + 1
if self._reset_param_index >= len(self._reset_param_list):
self._reset_param_index = 0
return super().reset(**self._param)
[docs] def step(self, action: Dict) -> Any:
"""
Wrapped ``step`` method for Env. It will add a print log when the env is done.
:Arguments:
- action (Any): Actions sent to env.
:Returns:
Any: Env step result.
"""
timestep = super().step(action)
done = timestep.done
info = timestep.info
if done:
done_tick = info['tick']
done_reward = info['final_eval_reward']
if info['success']:
done_state = 'Success'
elif info['collided']:
done_state = "Collided"
elif info['wrong_direction']:
done_state = "Wrong Direction"
elif info['off_road']:
done_state = "Off road"
elif info['stuck']:
done_state = "Stuck"
elif info['timeout']:
done_state = "Timeout"
else:
done_state = 'None'
print(
"[ENV] {} done with tick: {}, state: {}, reward: {}".format(
repr(self.env), done_tick, done_state, done_reward
)
)
return timestep
# TODO: complete scenario env wrapper
class ScenarioEnvWrapper(DriveEnvWrapper):
config = dict()
def __init__(self, env: BaseDriveEnv, cfg: Dict, **kwargs) -> None:
super().__init__(env, cfg=cfg, **kwargs)