Source code for core.envs.scenario_carla_env

import os
import time
import numpy as np
from datetime import datetime
from typing import Any, Dict
from gym import spaces

from .base_drive_env import BaseDriveEnv
from core.simulators import CarlaScenarioSimulator
from core.utils.others.visualizer import Visualizer
from core.utils.simulator_utils.carla_utils import visualize_birdview


[docs]class ScenarioCarlaEnv(BaseDriveEnv): """ Carla Scenario Environment with a single hero vehicle. It uses ``CarlaScenarioSimulator`` to load scenario configurations and interacts with Carla server to get running status. The Env is initialized with a scenario config, which could be a route with scenarios or a single scenario. The observation, sensor settings and visualizer are the same with `SimpleCarlaEnv`. The reward is derived based on the scenario criteria in each tick. The criteria is also related to success and failure judgement which is used to end an episode. When created, it will initialize environment with config and Carla TCP host & port. This method will NOT create the simulator instance. It only creates some data structures to store information when running env. :Arguments: - cfg (Dict): Env config dict. - host (str, optional): Carla server IP host. Defaults to 'localhost'. - port (int, optional): Carla server IP port. Defaults to 9000. - tm_port (Optional[int], optional): Carla Traffic Manager port. Defaults to None. :Interfaces: reset, step, close, is_success, is_failure, render, seed :Properties: - hero_player (carla.Actor): Hero vehicle in simulator. """ action_space = spaces.Dict({}) observation_space = spaces.Dict({}) reward_space = spaces.Box(low=-float('inf'), high=float('inf'), shape=(1, )) config = dict( simulator=dict(), # reward value if success success_reward=10, # whether open visualize visualize=None, # outputs of scenario conclusion outputs=[], output_dir='', ) def __init__( self, cfg: Dict, host: str = 'localhost', port: int = None, tm_port: int = None, **kwargs, ) -> None: """ Initialize environment with config and Carla TCP host & port. """ super().__init__(cfg, **kwargs) self.cfg = cfg self._simulator_cfg = self._cfg.simulator self._carla_host = host self._carla_port = port self._carla_tm_port = tm_port self._use_local_carla = False if self._carla_host != 'localhost': self._use_local_carla = True self._simulator = None self._output_dir = self._cfg.output_dir self._outputs = self._cfg.outputs self._success_reward = self._cfg.success_reward self._is_success = False self._is_failure = False self._collided = False self._tick = 0 self._timeout = float('inf') self._launched_simulator = False self._config = None self._visualize_cfg = self._cfg.visualize self._simulator_databuffer = dict() self._visualizer = None def _init_carla_simulator(self) -> None: if not self._use_local_carla: print("------ Run Carla on Port: %d, GPU: %d ------" % (self._carla_port, 0)) #self.carla_process = subprocess.Popen() self._simulator = CarlaScenarioSimulator( cfg=self._simulator_cfg, client=None, host=self._carla_host, port=self._carla_port, tm_port=self._carla_tm_port ) else: print('------ Using Remote Carla @ {}:{} ------'.format(self._carla_host, self._carla_port)) self._simulator = CarlaScenarioSimulator( cfg=self._simulator_cfg, client=None, host=self._carla_host, port=self._carla_port, tm_port=self._carla_tm_port ) self._launched_simulator = True
[docs] def reset(self, config: Any) -> Dict: """ Reset environment to start a new episode, with provided reset params. If there is no simulator, this method will create a new simulator instance. The reset param is sent to simulator's ``init`` method to reset simulator, then reset all statues recording running states, and create a visualizer if needed. It returns the first frame observation. :Arguments: - config (Any): Configuration instance of the scenario :Returns: Dict: The initial observation. """ if not self._launched_simulator: self._init_carla_simulator() self._config = config self._simulator.init(self._config) if self._visualize_cfg is not None: if self._visualizer is not None: self._visualizer.done() else: self._visualizer = Visualizer(self._visualize_cfg) if 'Route' in config.name: config_name = os.path.splitext(os.path.split(config.scenario_file)[-1])[0] else: config_name = config.name vis_name = "{}_{}".format(config_name, time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())) self._visualizer.init(vis_name) self._simulator_databuffer.clear() self._collided = False self._criteria_last_value = dict() self._is_success = False self._is_failure = False self._reward = 0 self._tick = 0 self._timeout = self._simulator.end_timeout return self.get_observations()
[docs] def step(self, action): """ Run one time step of environment, get observation from simulator and calculate reward. The environment will be set to 'done' only at success or failure. And if so, all visualizers will end. Its interfaces follow the standard definition of ``gym.Env``. :Arguments: - action (Dict): Action provided by policy. :Returns: Tuple[Any, float, bool, Dict]: A tuple contains observation, reward, done and information. """ if action is not None: self._simulator.apply_control(action) self._simulator_databuffer.update({'action': action}) self._simulator.run_step() self._tick += 1 obs = self.get_observations() self._collided = self._simulator.collided res = self._simulator.scenario_manager.get_scenario_status() if res == 'SUCCESS': self._is_success = True elif res in ['FAILURE', 'INVALID']: self._is_failure = True self._reward, reward_info = self.compute_reward() done = self.is_success() or self.is_failure() if done: self._simulator.end_scenario() self._conclude_scenario(self._config) if self._visualizer is not None: self._visualizer.done() info = self._simulator.get_information() info.update(reward_info) info.update({ 'failure': self.is_failure(), 'success': self.is_success(), }) return obs, self._reward, done, info
[docs] def close(self): """ Delete simulator & visualizer instances and close environment. """ if self._launched_simulator: self._simulator.clean_up() self._simulator._set_sync_mode(False) del self._simulator self._launched_simulator = False if self._visualizer is not None: self._visualizer.done()
[docs] def is_success(self) -> bool: """ Check if the task succeed. It only happens when behavior tree ends successfully. :Returns: bool: Whether success. """ res = self._simulator.scenario_manager.get_scenario_status() if res == 'SUCCESS': return True return False
[docs] def is_failure(self) -> bool: """ Check if the task fails. It may happen when behavior tree ends unsuccessfully or some criteria trigger. :Returns: bool: Whether failure. """ res = self._simulator.scenario_manager.get_scenario_status() if res in ['FAILURE', 'INVALID']: return True return False
[docs] def get_observations(self): """ Get observations from simulator. The sensor data, navigation, state and information in simulator are used, while not all these are added into observation dict. :Returns: Dict: Observation dict. """ obs = dict() state = self._simulator.get_state() sensor_data = self._simulator.get_sensor_data() navigation = self._simulator.get_navigation() information = self._simulator.get_information() self._simulator_databuffer['state'] = state self._simulator_databuffer['navigation'] = navigation self._simulator_databuffer['information'] = information obs.update(sensor_data) obs.update( { 'tick': information['tick'], 'timestamp': np.float32(information['timestamp']), 'agent_state': navigation['agent_state'], 'node': navigation['node'], 'node_forward': navigation['node_forward'], 'target': np.float32(navigation['target']), 'target_forward': np.float32(navigation['target_forward']), 'command': navigation['command'], 'speed': np.float32(state['speed']), 'speed_limit': np.float32(navigation['speed_limit']), 'location': np.float32(state['location']), 'forward_vector': np.float32(state['forward_vector']), 'acceleration': np.float32(state['acceleration']), 'velocity': np.float32(state['velocity']), 'angular_velocity': np.float32(state['angular_velocity']), 'rotation': np.float32(state['rotation']), 'is_junction': np.float32(state['is_junction']), 'tl_state': state['tl_state'], 'tl_dis': np.float32(state['tl_dis']), 'waypoint_list': navigation['waypoint_list'], 'direction_list': navigation['direction_list'], } ) if self._visualizer is not None: if self._visualize_cfg.type not in sensor_data: raise ValueError("visualize type {} not in sensor data!".format(self._visualize_cfg.type)) self._render_buffer = sensor_data[self._visualize_cfg.type].copy() if self._visualize_cfg.type == 'birdview': self._render_buffer = visualize_birdview(self._render_buffer) return obs
[docs] def compute_reward(self): """ Compute reward for current frame, and return details in a dict. In short, it contains goal reward, route following reward calculated by criteria in current and last frame, and failure reward by checking criteria in each frame. :Returns: Tuple[float, Dict]: Total reward value and detail for each value. """ goal_reward = 0 if self._is_success: goal_reward += self._success_reward elif self._is_failure: goal_reward -= self._success_reward criteria_dict = self._simulator.get_criteria() failure_reward = 0 complete_reward = 0 for info, value in criteria_dict.items(): if value[0] == 'FAILURE': if info in self._criteria_last_value and value[1] != self._criteria_last_value[info][1]: if 'Collision' in info: failure_reward -= 10 elif 'RunningRedLight' in info: failure_reward -= 10 elif 'OutsideRouteLanes' in info: failure_reward -= 10 if 'RouteCompletion' in info and info in self._criteria_last_value: complete_reward = self._finish_reward / 100 * (value[1] - self._criteria_last_value[info][1]) self._criteria_last_value[info] = value reward_info = dict() reward_info['goal_reward'] = goal_reward reward_info['complete_reward'] = complete_reward reward_info['failure_reward'] = failure_reward total_reward = goal_reward + failure_reward + complete_reward return total_reward, reward_info
def render(self): """ Render a runtime visualization on screen, save a gif video file according to visualizer config. The main canvas is from a specific sensor data. It only works when 'visualize' is set in config dict. """ if self._visualizer is None: return render_info = { 'collided': self._collided, 'reward': self._reward, 'tick': self._tick, 'end_timeout': self._simulator.end_timeout, 'end_distance': self._simulator.end_distance, 'total_distance': self._simulator.total_distance, } render_info.update(self._simulator_databuffer['state']) render_info.update(self._simulator_databuffer['navigation']) render_info.update(self._simulator_databuffer['information']) render_info.update(self._simulator_databuffer['action']) self._visualizer.paint(self._render_buffer, render_info) self._visualizer.run_visualize()
[docs] def seed(self, seed: int) -> None: """ Set random seed for environment. :Arguments: - seed (int): Random seed value. """ print('[ENV] Setting seed:', seed) np.random.seed(seed)
def __repr__(self) -> str: return "ScenarioCarlaEnv with host %s, port %s." % (self._carla_host, self._carla_port) def _conclude_scenario(self, config: Any) -> None: """ Provide feedback about success/failure of a scenario """ # Create the filename current_time = str(datetime.now().strftime('%Y-%m-%d-%H-%M-%S')) junit_filename = None filename = None config_name = config.name if self._output_dir != '': os.makedirs(self._output_dir, exist_ok=True) config_name = os.path.join(self._output_dir, config_name) if 'junit' in self._outputs: junit_filename = config_name + '_' + current_time + ".xml" if 'file' in self._outputs: filename = config_name + '_' + current_time + "txt" if self._simulator.scenario_manager.analyze_scenario(True, filename, junit_filename): self._is_failure = True else: self._is_success = True @property def hero_player(self): return self._simulator.hero_player