Source code for ding.envs.env_manager.gym_vector_env_manager
from typing import Any, Union, List, Tuple, Dict, Callable, Optional
from ditk import logging
import numpy as np
from easydict import EasyDict
from collections import namedtuple
import gym
from gym.vector.async_vector_env import AsyncVectorEnv
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.torch_utils import to_ndarray, to_list
from ding.utils import PropagatingThread, LockContextType, LockContext, ENV_MANAGER_REGISTRY
from .base_env_manager import BaseEnvManager
from .base_env_manager import EnvState
[docs]@ENV_MANAGER_REGISTRY.register('gym_vector')
class GymVectorEnvManager(BaseEnvManager):
"""
Overview:
Create an GymVectorEnvManager to manage multiple environments.
Each Environment is run by a respective subprocess.
Interfaces:
seed, ready_obs, step, reset, close
"""
config = dict(shared_memory=False, episode_num=float("inf"))
[docs] def __init__(self, env_fn: List[Callable], cfg: EasyDict) -> None:
"""
.. note::
``env_fn`` must create gym-type environment instance, which may different DI-engine environment.
"""
self._cfg = cfg
self._env_fn = env_fn
self._env_num = len(self._env_fn)
self._closed = True
self._env_replay_path = None
# env_ref is used to acquire some common attributes of env, like obs_shape and act_shape
self._env_ref = self._env_fn[0]()
self._env_states = {i: EnvState.VOID for i in range(self._env_num)}
self._episode_num = self._cfg.episode_num
self._env_episode_count = {i: 0 for i in range(self.env_num)}
self._env_manager = AsyncVectorEnv(
env_fns=self._env_fn,
# observation_space=observation_space,
# action_space=action_space,
shared_memory=cfg.shared_memory,
)
self._env_states = {i: EnvState.INIT for i in range(self._env_num)}
self._eval_episode_return = [0. for _ in range(self._env_num)]
[docs] def reset(self, reset_param: Optional[Dict] = None) -> None:
assert reset_param is None
self._closed = False
for env_id in range(self.env_num):
self._env_states[env_id] = EnvState.RESET
self._ready_obs = self._env_manager.reset()
for env_id in range(self.env_num):
self._env_states[env_id] = EnvState.RUN
self._eval_episode_return = [0. for _ in range(self._env_num)]
[docs] def step(self, actions: Dict[int, Any]) -> Dict[int, namedtuple]:
assert isinstance(actions, Dict), type(actions)
env_ids_given = list(actions.keys())
for env_id in range(self.env_num):
if env_id not in actions.keys():
actions[env_id] = self._env_ref.random_action()
"""actions should be sorted by keys, since the original implementation
of the step method in gym accepts list-type actions"""
actions = dict(sorted(actions.items()))
actions = list(actions.values())
elem = actions[0]
if not isinstance(elem, np.ndarray):
raise Exception('DI-engine only accept np.ndarray-type action!')
if elem.shape == (1, ):
actions = [v.item() for v in actions]
timestep = self._env_manager.step(actions)
timestep_collate_result = {}
for i in range(self.env_num):
if i in env_ids_given:
# Fix the compatability of API for both gym>=0.24.0 and gym<0.24.0
# https://github.com/openai/gym/pull/2773
if gym.version.VERSION >= '0.24.0':
timestepinfo = {}
for k, v in timestep[3].items():
timestepinfo[k] = v[i]
timestep_collate_result[i] = BaseEnvTimestep(
timestep[0][i], timestep[1][i], timestep[2][i], timestepinfo
)
else:
timestep_collate_result[i] = BaseEnvTimestep(
timestep[0][i], timestep[1][i], timestep[2][i], timestep[3][i]
)
self._eval_episode_return[i] += timestep_collate_result[i].reward
if timestep_collate_result[i].done:
timestep_collate_result[i].info['eval_episode_return'] = self._eval_episode_return[i]
self._eval_episode_return[i] = 0
self._env_episode_count[i] += 1
if self._env_episode_count[i] >= self._episode_num:
self._env_states[i] = EnvState.DONE
else:
self._env_states[i] = EnvState.RESET
if all([self._env_states[i] == EnvState.RESET for i in range(self.env_num)]):
self.reset()
else:
self._ready_obs[i] = timestep_collate_result[i].obs
return timestep_collate_result
@property
def ready_obs(self) -> Dict[int, Any]:
return {
i: self._ready_obs[i]
for i in range(len(self._ready_obs)) if self._env_episode_count[i] < self._episode_num
}
[docs] def seed(self, seed: Union[Dict[int, int], List[int], int], dynamic_seed: bool = None) -> None:
self._env_manager.seed(seed)
# TODO dynamic_seed
logging.warning("gym env doesn't support dynamic_seed in different episode")
[docs] def close(self) -> None:
"""
Overview:
Release the environment resources
Since not calling super.__init__, no need to release BaseEnvManager's resources
"""
if self._closed:
return
self._closed = True
self._env_ref.close()
self._env_manager.close()
self._env_manager.close_extras(terminate=True)