Source code for core.envs.simple_carla_env

import os
import time
import carla
import numpy as np
from typing import Any, Dict, Optional, Tuple
from gym import spaces
from collections import defaultdict

from .base_drive_env import BaseDriveEnv
from core.simulators import CarlaSimulator
from core.utils.others.visualizer import Visualizer
from core.utils.simulator_utils.carla_utils import visualize_birdview
from core.utils.env_utils.stuck_detector import StuckDetector
from core.utils.simulator_utils.carla_utils import lane_mid_distance


[docs]class SimpleCarlaEnv(BaseDriveEnv): """ A simple deployment of Carla Environment with single hero vehicle. It use ``CarlaSimulator`` to interact with Carla server and gets running status. The observation is obtained from simulator's state, information and sensor data, along with reward which can be calculated and retrived. When created, it will initialize environment with config and Carla TCP host & port. This method will NOT create simulator instance. It only creates some data structures to store information when running env. :Arguments: - cfg (Dict): Env config dict. - host (str, optional): Carla server IP host. Defaults to 'localhost'. - port (int, optional): Carla server IP port. Defaults to 9000. - tm_port (Optional[int], optional): Carla Traffic Manager port. Defaults to None. :Interfaces: reset, step, close, is_success, is_failure, render, seed :Properties: - hero_player (carla.Actor): Hero vehicle in simulator. """ metadata = {'render.modes': ['rgb_array']} action_space = spaces.Dict({}) observation_space = spaces.Dict({}) reward_space = spaces.Box(low=-float('inf'), high=float('inf'), shape=(1, )) reward_type = ['goal', 'distance', 'speed', 'angle', 'steer', 'lane', 'failure'] config = dict( simulator=dict(), # reward types total reward take into account reward_type=['goal', 'distance', 'speed', 'angle', 'failure'], # reward value if success success_reward=10, # failure judgement col_is_failure=False, stuck_is_failure=False, ignore_light=False, ran_light_is_failure=False, off_road_is_failure=False, wrong_direction_is_failure=False, off_route_is_failure=False, # failure judgement hyper-parameters off_route_distance=6, success_distance=5, stuck_len=300, max_speed=5, # whether open visualize visualize=None, ) def __init__( self, cfg: Dict, host: str = 'localhost', port: int = 9000, tm_port: Optional[int] = None, carla_timeout: Optional[int] = 60.0, **kwargs, ) -> None: """ Initialize environment with config and Carla TCP host & port. """ super().__init__(cfg, **kwargs) self._simulator_cfg = self._cfg.simulator self._carla_host = host self._carla_port = port self._carla_tm_port = tm_port self._carla_timeout = carla_timeout self._use_local_carla = False if self._carla_host != 'localhost': self._use_local_carla = True self._simulator = None self._col_is_failure = self._cfg.col_is_failure self._stuck_is_failure = self._cfg.stuck_is_failure self._ignore_light = self._cfg.ignore_light self._ran_light_is_failure = self._cfg.ran_light_is_failure self._off_road_is_failure = self._cfg.off_road_is_failure self._wrong_direction_is_failure = self._cfg.wrong_direction_is_failure self._off_route_is_failure = self._cfg.off_route_is_failure self._off_route_distance = self._cfg.off_route_distance self._reward_type = self._cfg.reward_type assert set(self._reward_type).issubset(self.reward_type), set(self._reward_type) self._success_distance = self._cfg.success_distance self._success_reward = self._cfg.success_reward self._max_speed = self._cfg.max_speed self._collided = False self._stuck = False self._ran_light = False self._off_road = False self._wrong_direction = False self._off_route = False self._stuck_detector = StuckDetector(self._cfg.stuck_len) self._tick = 0 self._timeout = float('inf') self._launched_simulator = False self._visualize_cfg = self._cfg.visualize self._simulator_databuffer = dict() self._visualizer = None self._last_canvas = None def _init_carla_simulator(self) -> None: if not self._use_local_carla: print("------ Run Carla on Port: %d, GPU: %d ------" % (self._carla_port, 0)) #self.carla_process = subprocess.Popen() self._simulator = CarlaSimulator( cfg=self._simulator_cfg, client=None, host=self._carla_host, port=self._carla_port, tm_port=self._carla_tm_port, timeout=self._carla_timeout, ) else: print('------ Using Remote carla @ {}:{} ------'.format(self._carla_host, self._carla_port)) self._simulator = CarlaSimulator( cfg=self._simulator_cfg, client=None, host=self._carla_host, port=self._carla_port, tm_port=self._carla_tm_port ) self._launched_simulator = True
[docs] def reset(self, **kwargs) -> Dict: """ Reset environment to start a new episode, with provided reset params. If there is no simulator, this method will create a new simulator instance. The reset param is sent to simulator's ``init`` method to reset simulator, then reset all statues recording running states, and create a visualizer if needed. It returns the first frame observation. :Returns: Dict: The initial observation. """ if not self._launched_simulator: self._init_carla_simulator() self._simulator.init(**kwargs) if self._visualize_cfg is not None: if self._visualizer is not None: self._visualizer.done() else: self._visualizer = Visualizer(self._visualize_cfg) if 'name' in kwargs: vis_name = kwargs['name'] else: vis_name = "{}_{}".format( self._simulator.town_name, time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) ) self._visualizer.init(vis_name) if 'col_is_failure' in kwargs: self._col_is_failure = kwargs['col_is_failure'] if 'stuck_is_failure' in kwargs: self._stuck_is_failure = kwargs['stuck_is_failure'] self._simulator_databuffer.clear() self._collided = False self._stuck = False self._ran_light = False self._off_road = False self._wrong_direction = False self._off_route = False self._stuck_detector.clear() self._tick = 0 self._reward = 0 self._last_steer = 0 self._last_distance = None self._timeout = self._simulator.end_timeout return self.get_observations()
[docs] def step(self, action: Dict) -> Tuple[Any, float, bool, Dict]: """ Run one time step of environment, get observation from simulator and calculate reward. The environment will be set to 'done' only at success or failure. And if so, all visualizers will end. Its interfaces follow the standard definition of ``gym.Env``. :Arguments: - action (Dict): Action provided by policy. :Returns: Tuple[Any, float, bool, Dict]: A tuple contains observation, reward, done and information. """ if action is not None: for key in ['throttle', 'brake']: if key in action: np.clip(action[key], 0, 1) if 'steer' in action: np.clip(action['steer'], -1, 1) self._simulator.apply_control(action) self._simulator_databuffer['action'] = action else: self._simulator_databuffer['action'] = dict() self._simulator.run_step() self._tick += 1 obs = self.get_observations() self._collided = self._simulator.collided self._stuck = self._stuck_detector.stuck self._ran_light = self._simulator.ran_light self._off_road = self._simulator.off_road self._wrong_direction = self._simulator.wrong_direction location = self._simulator_databuffer['state']['location'][:2] target = self._simulator_databuffer['navigation']['target'] self._off_route = np.linalg.norm(location - target) >= self._off_route_distance self._reward, reward_info = self.compute_reward() info = self._simulator.get_information() info.update(reward_info) info.update( { 'collided': self._collided, 'stuck': self._stuck, 'ran_light': self._ran_light, 'off_road': self._off_road, 'wrong_direction': self._wrong_direction, 'off_route': self._off_route, 'timeout': self._tick > self._timeout, 'success': self.is_success(), } ) done = self.is_success() or self.is_failure() if done: self._simulator.clean_up() if self._visualizer is not None: self._visualizer.done() self._visualizer = None return obs, self._reward, done, info
[docs] def close(self) -> None: """ Delete simulator & visualizer instances and close the environment. """ if self._launched_simulator: self._simulator.clean_up() self._simulator._set_sync_mode(False) del self._simulator self._launched_simulator = False if self._visualizer is not None: self._visualizer.done() self._visualizer = None
[docs] def is_success(self) -> bool: """ Check if the task succeed. It only happens when hero vehicle is close to target waypoint. :Returns: bool: Whether success. """ if self._simulator.end_distance < self._success_distance: return True return False
[docs] def is_failure(self) -> bool: """ Check if env fails. colliding, being stuck, running light, running off road, running in wrong direction according to config. It will certainly happen when time is out. :Returns: bool: Whether failure. """ if self._stuck_is_failure and self._stuck: return True if self._col_is_failure and self._collided: return True if self._ran_light_is_failure and self._ran_light: return True if self._off_road_is_failure and self._off_road: return True if self._wrong_direction_is_failure and self._wrong_direction: return True if self._off_route_is_failure and self._off_route: return True if self._tick > self._timeout: return True return False
[docs] def get_observations(self) -> Dict: """ Get observations from simulator. The sensor data, navigation, state and information in simulator are used, while not all these are added into observation dict. :Returns: Dict: Observation dict. """ obs = dict() state = self._simulator.get_state() navigation = self._simulator.get_navigation() sensor_data = self._simulator.get_sensor_data() information = self._simulator.get_information() self._simulator_databuffer['state'] = state self._simulator_databuffer['navigation'] = navigation self._simulator_databuffer['information'] = information if 'action' not in self._simulator_databuffer: self._simulator_databuffer['action'] = dict() if not navigation['agent_state'] == 4 or self._ignore_light: self._stuck_detector.tick(state['speed']) obs.update(sensor_data) obs.update( { 'tick': information['tick'], 'timestamp': np.float32(information['timestamp']), 'agent_state': navigation['agent_state'], 'node': navigation['node'], 'node_forward': navigation['node_forward'], 'target': np.float32(navigation['target']), 'target_forward': np.float32(navigation['target_forward']), 'command': navigation['command'], 'speed': np.float32(state['speed']), 'speed_limit': np.float32(navigation['speed_limit']), 'location': np.float32(state['location']), 'forward_vector': np.float32(state['forward_vector']), 'acceleration': np.float32(state['acceleration']), 'velocity': np.float32(state['velocity']), 'angular_velocity': np.float32(state['angular_velocity']), 'rotation': np.float32(state['rotation']), 'is_junction': np.float32(state['is_junction']), 'tl_state': state['tl_state'], 'tl_dis': np.float32(state['tl_dis']), 'waypoint_list': navigation['waypoint_list'], 'direction_list': navigation['direction_list'], } ) if self._visualizer is not None: if self._visualize_cfg.type not in sensor_data: raise ValueError("visualize type {} not in sensor data!".format(self._visualize_cfg.type)) self._render_buffer = sensor_data[self._visualize_cfg.type].copy() if self._visualize_cfg.type == 'birdview': self._render_buffer = visualize_birdview(self._render_buffer) return obs
[docs] def compute_reward(self) -> Tuple[float, Dict]: """ Compute reward for current frame, with details returned in a dict. In short, in contains goal reward, route following reward calculated by route length in current and last frame, some navigation attitude reward with respective to target waypoint, and failure reward by checking each failure event. :Returns: Tuple[float, Dict]: Total reward value and detail for each value. """ def dist(loc1, loc2): return ((loc1[0] - loc2[0]) ** 2 + (loc1[1] - loc2[1]) ** 2) ** 0.5 def angle(vec1, vec2): cos = vec1.dot(vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-10) cos = np.clip(cos, -1, 1) angle = np.arccos(cos) angle = np.clip(angle, 0, np.pi) return angle # goal reward goal_reward = 0 plan_distance = self._simulator.end_distance if self.is_success(): goal_reward += self._success_reward elif self.is_failure(): goal_reward -= 1 # distance reward location = self._simulator_databuffer['state']['location'] target = self._simulator_databuffer['navigation']['target'] target_distance = dist(target, location) cur_distance = plan_distance + target_distance if self._last_distance is None: distance_reward = 0 else: distance_reward = np.clip((self._last_distance - cur_distance) * 2, 0, 1) self._last_distance = cur_distance # state reward: speed, angle, steer, mid lane speed = self._simulator_databuffer['state']['speed'] / 3.6 speed_limit = self._simulator_databuffer['navigation']['speed_limit'] / 3.6 speed_limit = min(self._max_speed, speed_limit) agent_state = self._simulator_databuffer['navigation']['agent_state'] target_speed = speed_limit if agent_state == 2 or agent_state == 3: target_speed = 0 elif agent_state == 4 and not self._ignore_light: target_speed = 0 # speed_reward = 1 - abs(speed - target_speed) / speed_limit speed_reward = 0 if speed < target_speed / 5: speed_reward -= 1 if speed > target_speed: speed_reward -= 1 forward_vector = self._simulator_databuffer['state']['forward_vector'] target_forward = self._simulator_databuffer['navigation']['target_forward'] angle_reward = 3 * (0.1 - angle(forward_vector, target_forward) / np.pi) steer = self._simulator_databuffer['action'].get('steer', 0) command = self._simulator_databuffer['navigation']['command'] steer_reward = 0.5 if abs(steer - self._last_steer) > 0.5: steer_reward -= 0.2 if command == 1 and steer > 0.1: steer_reward = 0 elif command == 2 and steer < -0.1: steer_reward = 0 elif (command == 3 or command == 4) and abs(steer) > 0.3: steer_reward = 0 self._last_steer = steer waypoint_list = self._simulator_databuffer['navigation']['waypoint_list'] lane_mid_dis = lane_mid_distance(waypoint_list, location) lane_reward = -0.5 * lane_mid_dis failure_reward = 0 if self._col_is_failure and self._collided: failure_reward -= 5 elif self._stuck_is_failure and self._stuck: failure_reward -= 5 elif self._off_road_is_failure and self._off_road: failure_reward -= 5 elif self._ran_light_is_failure and not self._ignore_light and self._ran_light: failure_reward -= 5 elif self._wrong_direction_is_failure and self._wrong_direction: failure_reward -= 5 reward_info = {} total_reward = 0 reward_info['goal_reward'] = goal_reward reward_info['distance_reward'] = distance_reward reward_info['speed_reward'] = speed_reward reward_info['angle_reward'] = angle_reward reward_info['steer_reward'] = steer_reward reward_info['lane_reward'] = lane_reward reward_info['failure_reward'] = failure_reward reward_dict = { 'goal': goal_reward, 'distance': distance_reward, 'speed': speed_reward, 'angle': angle_reward, 'steer': steer_reward, 'lane': lane_reward, 'failure': failure_reward } for rt in self._reward_type: total_reward += reward_dict[rt] return total_reward, reward_info
def render(self, mode='rgb_array') -> Any: """ Render a runtime visualization on screen, save a gif or video according to visualizer config. The main canvas is from a specific sensor data. It only works when 'visualize' is set in config dict. :Returns: Any: visualized canvas, mainly used by tensorboard and gym monitor wrapper """ if self._visualizer is None: return self._last_canvas render_info = { 'collided': self._collided, 'off_road': self._off_road, 'wrong_direction': self._wrong_direction, 'off_route': self._off_route, 'reward': self._reward, 'tick': self._tick, 'end_timeout': self._simulator.end_timeout, 'end_distance': self._simulator.end_distance, 'total_distance': self._simulator.total_distance, } render_info.update(self._simulator_databuffer['state']) render_info.update(self._simulator_databuffer['navigation']) render_info.update(self._simulator_databuffer['information']) render_info.update(self._simulator_databuffer['action']) self._visualizer.paint(self._render_buffer, render_info) self._visualizer.run_visualize() self._last_canvas = self._visualizer.canvas return self._visualizer.canvas
[docs] def seed(self, seed: int) -> None: """ Set random seed for environment. :Arguments: - seed (int): Random seed value. """ print('[ENV] Setting seed:', seed) np.random.seed(seed)
def __repr__(self) -> str: return "SimpleCarlaEnv - host %s : port %s" % (self._carla_host, self._carla_port) @property def hero_player(self) -> carla.Actor: return self._simulator.hero_player