envs

BaseDriveEnv

class core.envs.BaseDriveEnv(cfg: Dict, **kwargs)[source]

Base class for environments. It is inherited from gym.Env and uses the same interfaces. All Drive Env class is supposed to inherit from this class.

Note:

To run Reinforcement Learning on DI-engine platform, the environment should be wrapped with DingEnvWrapper.

Arguments:
  • cfg (Dict): Config Dict.

Interfaces:

reset, step, close, seed

abstract close() None[source]

Release all resources in environment and close.

abstract reset(*args, **kwargs) Any[source]

Reset current environment.

abstract seed(seed: int) None[source]

Set random seed.

abstract step(action: Any) Any[source]

Run one step of the environment and return the observation dict.

SimpleCarlaEnv

class core.envs.SimpleCarlaEnv(cfg: Dict, host: str = 'localhost', port: int = 9000, tm_port: int | None = None, carla_timeout: int | None = 60.0, **kwargs)[source]

A simple deployment of Carla Environment with single hero vehicle. It use CarlaSimulator to interact with Carla server and gets running status. The observation is obtained from simulator’s state, information and sensor data, along with reward which can be calculated and retrived.

When created, it will initialize environment with config and Carla TCP host & port. This method will NOT create simulator instance. It only creates some data structures to store information when running env.

Arguments:
  • cfg (Dict): Env config dict.

  • host (str, optional): Carla server IP host. Defaults to ‘localhost’.

  • port (int, optional): Carla server IP port. Defaults to 9000.

  • tm_port (Optional[int], optional): Carla Traffic Manager port. Defaults to None.

Interfaces:

reset, step, close, is_success, is_failure, render, seed

Properties:
  • hero_player (carla.Actor): Hero vehicle in simulator.

close() None[source]

Delete simulator & visualizer instances and close the environment.

compute_reward() Tuple[float, Dict][source]

Compute reward for current frame, with details returned in a dict. In short, in contains goal reward, route following reward calculated by route length in current and last frame, some navigation attitude reward with respective to target waypoint, and failure reward by checking each failure event.

Returns:

Tuple[float, Dict]: Total reward value and detail for each value.

get_observations() Dict[source]

Get observations from simulator. The sensor data, navigation, state and information in simulator are used, while not all these are added into observation dict.

Returns:

Dict: Observation dict.

is_failure() bool[source]

Check if env fails. colliding, being stuck, running light, running off road, running in wrong direction according to config. It will certainly happen when time is out.

Returns:

bool: Whether failure.

is_success() bool[source]

Check if the task succeed. It only happens when hero vehicle is close to target waypoint.

Returns:

bool: Whether success.

reset(**kwargs) Dict[source]

Reset environment to start a new episode, with provided reset params. If there is no simulator, this method will create a new simulator instance. The reset param is sent to simulator’s init method to reset simulator, then reset all statues recording running states, and create a visualizer if needed. It returns the first frame observation.

Returns:

Dict: The initial observation.

seed(seed: int) None[source]

Set random seed for environment.

Arguments:
  • seed (int): Random seed value.

step(action: Dict) Tuple[Any, float, bool, Dict][source]

Run one time step of environment, get observation from simulator and calculate reward. The environment will be set to ‘done’ only at success or failure. And if so, all visualizers will end. Its interfaces follow the standard definition of gym.Env.

Arguments:
  • action (Dict): Action provided by policy.

Returns:

Tuple[Any, float, bool, Dict]: A tuple contains observation, reward, done and information.

ScenarioCarlaEnv

class core.envs.ScenarioCarlaEnv(cfg: Dict, host: str = 'localhost', port: int | None = None, tm_port: int | None = None, **kwargs)[source]

Carla Scenario Environment with a single hero vehicle. It uses CarlaScenarioSimulator to load scenario configurations and interacts with Carla server to get running status. The Env is initialized with a scenario config, which could be a route with scenarios or a single scenario. The observation, sensor settings and visualizer are the same with SimpleCarlaEnv. The reward is derived based on the scenario criteria in each tick. The criteria is also related to success and failure judgement which is used to end an episode.

When created, it will initialize environment with config and Carla TCP host & port. This method will NOT create the simulator instance. It only creates some data structures to store information when running env.

Arguments:
  • cfg (Dict): Env config dict.

  • host (str, optional): Carla server IP host. Defaults to ‘localhost’.

  • port (int, optional): Carla server IP port. Defaults to 9000.

  • tm_port (Optional[int], optional): Carla Traffic Manager port. Defaults to None.

Interfaces:

reset, step, close, is_success, is_failure, render, seed

Properties:
  • hero_player (carla.Actor): Hero vehicle in simulator.

close()[source]

Delete simulator & visualizer instances and close environment.

compute_reward()[source]

Compute reward for current frame, and return details in a dict. In short, it contains goal reward, route following reward calculated by criteria in current and last frame, and failure reward by checking criteria in each frame.

Returns:

Tuple[float, Dict]: Total reward value and detail for each value.

get_observations()[source]

Get observations from simulator. The sensor data, navigation, state and information in simulator are used, while not all these are added into observation dict.

Returns:

Dict: Observation dict.

is_failure() bool[source]

Check if the task fails. It may happen when behavior tree ends unsuccessfully or some criteria trigger.

Returns:

bool: Whether failure.

is_success() bool[source]

Check if the task succeed. It only happens when behavior tree ends successfully.

Returns:

bool: Whether success.

reset(config: Any) Dict[source]

Reset environment to start a new episode, with provided reset params. If there is no simulator, this method will create a new simulator instance. The reset param is sent to simulator’s init method to reset simulator, then reset all statues recording running states, and create a visualizer if needed. It returns the first frame observation.

Arguments:
  • config (Any): Configuration instance of the scenario

Returns:

Dict: The initial observation.

seed(seed: int) None[source]

Set random seed for environment.

Arguments:
  • seed (int): Random seed value.

step(action)[source]

Run one time step of environment, get observation from simulator and calculate reward. The environment will be set to ‘done’ only at success or failure. And if so, all visualizers will end. Its interfaces follow the standard definition of gym.Env.

Arguments:
  • action (Dict): Action provided by policy.

Returns:

Tuple[Any, float, bool, Dict]: A tuple contains observation, reward, done and information.

MetaDriveMacroEnv

class core.envs.MetaDriveMacroEnv(config: dict | None = None)[source]

MetaDrive single-agent env controlled by a “macro” action. The agent is controlled by a discrete action set. Each one related to a series of control signals that can complish the macro action defined in the set. The observation is a top-down view image with 5 channel containing the temporary and history information of surroundings. This env is registered and can be used via gym.make.

Arguments:
  • config (Dict): Env config dict.

Interfaces:

reset, step, close, render, seed

MetaDriveTrajEnv

class core.envs.MetaDriveTrajEnv(config: dict | None = None)[source]

MetaDrive single-agent trajectory env. The agent is controlled by a trajectory (a list of waypoints) of a time period. Its length determines the times of env steps the agent will track it. The vehicle will execute actions along the trajectory by ‘move_to’ method of the simulator rather than physical controlling. The position is calculated from the trajectory with kinematic constraints before the vehicle is transmitted. The observation is a 5-channel top-down view image and a vector of structure information by default. This env is registered and can be used via gym.make.

Arguments:
  • config (Dict): Env config dict.

Interfaces:

reset, step, close, render, seed

DriveEnvWrapper

class core.envs.DriveEnvWrapper(env: BaseDriveEnv, cfg: Dict | None = None, **kwargs)[source]

Environment wrapper to make gym.Env align with DI-engine definitions, so as to use utilities in DI-engine. It changes step, reset and info method of gym.Env, while others are straightly delivered.

Arguments:
  • env (BaseDriveEnv): The environment to be wrapped.

  • cfg (Dict): Config dict.

Interfaces:

reset, step, info, render, seed, close

reset(*args, **kwargs) Any[source]

Wrapper of reset method in env. The observations are converted to np.ndarray and final reward are recorded.

Returns:

Any: Observations from environment

seed(seed: int, dynamic_seed: bool = True) None[source]

Seeds the environment.

step(action: Any | None = None) BaseEnvTimestep[source]

Wrapper of step method in env. This aims to convert the returns of gym.Env step method into that of ding.envs.BaseEnv, from (obs, reward, done, info) tuple to a BaseEnvTimestep namedtuple defined in DI-engine. It will also convert actions, observations and reward into np.ndarray, and check legality if action contains control signal.

Arguments:
  • action (Any, optional): Actions sent to env. Defaults to None.

Returns:

BaseEnvTimestep: DI-engine format of env step returns.

BenchmarkEnvWrapper

class core.envs.BenchmarkEnvWrapper(env: BaseDriveEnv, cfg: Dict, **kwargs)[source]

Environment Wrapper for Carla Benchmark suite evaluations. It wraps an environment with Benchmark suite so that the env will always run with a benchmark suite setting. It has 2 mode to get reset params in a suite: ‘random’ will randomly get reset param, ‘order’ will get all reset params in order.

Arguments:
  • env (BaseDriveEnv): The environment to be wrapped.

  • cfg (Dict): Config dict.

reset(*args, **kwargs) Any[source]

Wrapped reset method for env. it will ignore all incoming arguments and choose one from suite reset parameters according to config.

Returns:

Any: Returns of Env reset method.

step(action: Dict) Any[source]

Wrapped step method for Env. It will add a print log when the env is done.

Arguments:
  • action (Any): Actions sent to env.

Returns:

Any: Env step result.