'''
Copyright 2021 OpenDILab. All Rights Reserved:
Description: Carla simulator.
'''
import os
import numpy as np
import random
from typing import Any, Union, Optional, Dict, List
from distutils.version import LooseVersion
import pkg_resources
from collections import defaultdict
from .base_simulator import BaseSimulator
from core.utils.simulator_utils.sensor_utils import SensorHelper, CollisionSensor, TrafficLightHelper
from core.utils.simulator_utils.map_utils import BeVWrapper
from core.simulators.carla_data_provider import CarlaDataProvider
from core.utils.simulator_utils.carla_utils import control_to_signal, get_birdview
from core.utils.planner import BasicPlanner, BehaviorPlanner, LBCPlannerNew
from core.utils.others.tcp_helper import find_traffic_manager_port
import carla
from carla import WeatherParameters
PRESET_WEATHERS = {
1: WeatherParameters.ClearNoon,
2: WeatherParameters.CloudyNoon,
3: WeatherParameters.WetNoon,
4: WeatherParameters.WetCloudyNoon,
5: WeatherParameters.MidRainyNoon,
6: WeatherParameters.HardRainNoon,
7: WeatherParameters.SoftRainNoon,
8: WeatherParameters.ClearSunset,
9: WeatherParameters.CloudySunset,
10: WeatherParameters.WetSunset,
11: WeatherParameters.WetCloudySunset,
12: WeatherParameters.MidRainSunset,
13: WeatherParameters.HardRainSunset,
14: WeatherParameters.SoftRainSunset,
}
VEHICLE_NAME = 'vehicle.tesla.model3'
ROLE_NAME = 'hero'
OBS_TYPE_LIST = ['state', 'depth', 'rgb', 'segmentation', 'bev', 'lidar', 'gnss']
PLANNER_DICT = {
'basic': BasicPlanner,
'behavior': BehaviorPlanner,
'lbc': LBCPlannerNew,
}
[docs]class CarlaSimulator(BaseSimulator):
"""
Common Carla Simulator.
The simulator creates a client to Carla server, and is able to get observation, send
control signals to the hero vehicle and record essential data from the simulated world.
In the initialization period, the simulator may change the environment parameters including
maps and weathers and can add actors (including NPC vehicles, pedestrians as well as sensors
mounted on the hero vehicle),
During the running period the simulator will achieve running state and information about
the hero vehicle (such as running speed, angle, navigation goal and reference path), data
from the sensors (such as camera images, lidar points) as well as running status(including
collision, running off road, red light, distance and timeout to end waypoint).
Once it is created, it will set up Carla client and set the parameters in the configuration
dict as its default. When actually calling the ``init`` method to start an episode, some of
the configurations may be changed by the input arguments while others remain by default.
The simulator stores and gets some information from a static class ``CarlaDataProvider``
to avoid frequently sending message to Carla server and speed up.
Up to now, it uses Carla version higher than 0.9.9.
If no traffic manager port is provided, it will find random free port in system.
:Arguments:
- cfg (Dict): Config Dict.
- client (carla.Client, optional): Already established Carla client. Defaults to None.
- host (str, optional): TCP host Carla client link to. Defaults to 'localhost'.
- port (int, optional): TCP port Carla client link to. Defaults to 9000.
- tm_port (int, optional): Traffic manager port Carla client link to. Defaults to None.
- timeout (float, optional): Carla client link timeout. Defaults to 60.0.
:Interfaces:
init, get_state, get_sensor_data, get_navigation, get_information, apply_planner,
apply_control, run_step, clean_up
:Properties:
- town_name (str): Current town name.
- hero_player (carla.Actor): hero actor in simulation.
- collided (bool): Whether collided in current episode.
- ran_light (bool): Whether ran light in current frame.
- off_road (bool): Whether ran off road in current frame.
- wrong_direction (bool): Whether ran in wrong direction in current frame.
- end_distance (float): Distance to target in current frame.
- end_timeout (float): Timeout for entire route provided by planner.
- total_distance (float): Distance for entire route provided by planner.
"""
config = dict(
town='Town01',
weather='random',
sync_mode=True,
delta_seconds=0.1,
no_rendering=False,
auto_pilot=False,
n_vehicles=0,
n_pedestrians=0,
disable_two_wheels=False,
col_threshold=400,
waypoint_num=20,
obs=list(),
planner=dict(),
aug=None,
verbose=True,
debug=False,
)
def __init__(
self,
cfg: Dict,
client: Optional[carla.Client] = None,
host: str = 'localhost',
port: int = 9000,
tm_port: Optional[int] = None,
timeout: float = 60.0,
**kwargs
) -> None:
"""
Init Carla simulator.
"""
super().__init__(cfg)
# Check Carla API version
dist = pkg_resources.get_distribution("carla")
if LooseVersion(dist.version) < LooseVersion('0.9.8'):
raise ImportError("CARLA version 0.9.8 or newer required. CARLA version found: {}".format(dist))
# Create the client that will send the requests to the simulator
if client is None:
self._client = carla.Client(host, port)
else:
self._client = client
self._client_timeout = timeout
self._client.set_timeout(self._client_timeout)
if tm_port is None:
print("[SIMULATOR] Not providing TM port, try finding free")
max_retry = 0
while True:
try:
tm_port = find_traffic_manager_port()
self._tm = self._client.get_trafficmanager(tm_port)
print("[SIMULATOR] Using TM port:", tm_port)
break
except Exception as e:
max_retry += 1
if max_retry > 10:
raise e
else:
self._tm = self._client.get_trafficmanager(tm_port)
self._tm.set_global_distance_to_leading_vehicle(2.0)
self._tm.set_hybrid_physics_mode(True)
self._world = None
self._map = None
self._sync_mode = self._cfg.sync_mode
self._no_rendering = self._cfg.no_rendering
self._delta_seconds = self._cfg.delta_seconds
self._apply_world_setting(**self._cfg)
self._col_threshold = self._cfg.col_threshold
self._waypoint_num = self._cfg.waypoint_num
self._obs_cfg = self._cfg.obs
self._planner_cfg = self._cfg.planner
self._camera_aug_cfg = self._cfg.aug
self._verbose = self._cfg.verbose
self._tick = 0
self._timestamp = 0
self._end_location = None
self._collided = False
self._ran_light = False
self._off_road = False
self._wrong_direction = False
self._end_distance = float('inf')
self._end_timeout = float('inf')
self._hero_actor = None
self._start_location = None
self._sensor_helper = None
self._bev_wrapper = None
self._planner = None
self._collision_sensor = None
self._traffic_light_helper = None
self._actor_map = defaultdict(list)
self._debug = self._cfg.debug
def _apply_world_setting(self, **world_param) -> None:
for k in world_param:
if k is None:
world_param.pop(k)
if 'town' in world_param:
self._town_name = world_param['town']
if 'weather' in world_param:
self._weather = world_param['weather']
if 'n_vehicles' in world_param:
self._n_vehicles = world_param['n_vehicles']
if 'n_pedestrians' in world_param:
self._n_pedestrians = world_param['n_pedestrians']
if 'autopilot' in world_param:
self._autopilot = world_param['autopilot']
if 'disable_two_wheels' in world_param:
self._disable_two_wheels = world_param['disable_two_wheels']
[docs] def init(self, start: int = 0, end: int = 1, **kwargs) -> None:
"""
Init simulator episode with provided args.
This method takes start and end waypoint indexes to set a navigation goal, and will use planner to build a route
to generate target waypoint and road options in each tick. It will set world, map, vehicles, pedestrians dut to
default config, and provided args, which will be stored to replace old config.
If no collision happens when creating actors, the init will end and return.
:Arguments:
- start (int, optional): Index of start waypoint. Defaults to 0.
- end (int, optional): Index of end waypoint. Defaults to 1.
:Optional arguments: town, weather, n_vehicles, n_pedestrians, autopilot, disable_two_wheels
"""
self._apply_world_setting(**kwargs)
self._set_town(self._town_name)
self._set_weather(self._weather)
self._blueprints = self._world.get_blueprint_library()
while True:
self.clean_up()
CarlaDataProvider.set_client(self._client)
CarlaDataProvider.set_world(self._world)
CarlaDataProvider.set_traffic_manager_port(self._tm.get_port())
self._spawn_hero_vehicle(start_pos=start)
self._prepare_observations()
self._spawn_vehicles()
self._spawn_pedestrians()
CarlaDataProvider.on_carla_tick()
self.apply_planner(end)
self._collided = False
self._ran_light = False
self._off_road = False
self._wrong_direction = False
if self._ready():
if self._debug:
self._count_actors()
break
def _set_town(self, town: str) -> None:
if self._world is None:
self._world = self._client.load_world(town)
elif self._map.name != town:
self._world = self._client.load_world(town)
else:
self._world = self._client.get_world()
if self._world.get_snapshot().timestamp.frame > 1e6:
self._world = self._client.load_world(town)
self._map = self._world.get_map()
self._set_sync_mode(self._sync_mode, self._delta_seconds)
def _set_sync_mode(self, sync: bool, delta_seconds: float = 0.1) -> None:
settings = self._world.get_settings()
if settings.synchronous_mode is not sync:
settings.synchronous_mode = sync
settings.fixed_delta_seconds = delta_seconds
settings.no_rendering_mode = self._no_rendering
self._world.apply_settings(settings)
#self._tm.set_synchronous_mode(True)
def _set_weather(self, weather_string):
if self._verbose:
print('[SIMULATOR] Setting weather: ', weather_string)
if weather_string == 'random':
weather = np.random.choice(list(PRESET_WEATHERS.values()))
else:
weather = PRESET_WEATHERS[weather_string]
self._world.set_weather(weather)
def _spawn_hero_vehicle(self, start_pos: int = 0) -> None:
start_waypoint = CarlaDataProvider.get_spawn_point(start_pos)
self._start_location = start_waypoint.location
self._hero_actor = CarlaDataProvider.request_new_actor(VEHICLE_NAME, start_waypoint, ROLE_NAME)
def _spawn_vehicles(self) -> None:
blueprints = self._blueprints.filter('vehicle.*')
if self._disable_two_wheels:
blueprints = [x for x in blueprints if int(x.get_attribute('number_of_wheels')) == 4]
spawn_points = self._map.get_spawn_points()
random.shuffle(spawn_points)
SpawnActor = carla.command.SpawnActor
SetAutopilot = carla.command.SetAutopilot
FutureActor = carla.command.FutureActor
batch = []
for n, transform in enumerate(spawn_points):
if n >= self._n_vehicles:
break
blueprint = random.choice(blueprints)
if blueprint.has_attribute('color'):
color = random.choice(blueprint.get_attribute('color').recommended_values)
blueprint.set_attribute('color', color)
if blueprint.has_attribute('driver_id'):
driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values)
blueprint.set_attribute('driver_id', driver_id)
blueprint.set_attribute('role_name', 'autopilot')
# spawn the cars and set their autopilot and light state all together
batch.append(SpawnActor(blueprint, transform).then(SetAutopilot(FutureActor, True, self._tm.get_port())))
for response in self._client.apply_batch_sync(batch, True):
if response.error:
if self._verbose:
print('[SIMULATOR]', response.error)
else:
CarlaDataProvider.register_actor(self._world.get_actor(response.actor_id))
def _spawn_pedestrians(self) -> None:
blueprints = self._blueprints.filter('walker.pedestrian.*')
SpawnActor = carla.command.SpawnActor
pedestrians_running = 30.0 # how many pedestrians will run
pedestrians_crossing = 30.0
peds_spawned = 0
walkers = []
controllers = []
walker_speed = []
while peds_spawned < self._n_pedestrians:
spawn_points = []
_walkers = []
_controllers = []
_walker_speed = []
# 1. take all the random locations to spawn
for i in range(self._n_pedestrians - peds_spawned):
spawn_point = carla.Transform()
loc = self._world.get_random_location_from_navigation()
if loc is not None:
spawn_point.location = loc
spawn_points.append(spawn_point)
# 2. spawn the walker object
batch = []
for spawn_point in spawn_points:
walker_bp = random.choice(blueprints)
# set as not invincible
if walker_bp.has_attribute('is_invincible'):
walker_bp.set_attribute('is_invincible', 'false')
# set the max speed
if walker_bp.has_attribute('speed'):
if random.random() > pedestrians_running:
# walking
_walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1])
else:
# running
_walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2])
else:
if self._verbose:
print("[SIMULATOR] Walker has no speed")
_walker_speed.append(0.0)
batch.append(SpawnActor(walker_bp, spawn_point))
results = self._client.apply_batch_sync(batch, True)
_walker_speed2 = []
for i in range(len(results)):
if results[i].error:
if self._verbose:
print('[SIMULATOR] Walker ', results[i].error)
else:
peds_spawned += 1
_walkers.append(results[i].actor_id)
_walker_speed2.append(_walker_speed[i])
_walker_speed = _walker_speed2
# 3. spawn the walker controller
walker_controller_bp = self._blueprints.find('controller.ai.walker')
batch = [SpawnActor(walker_controller_bp, carla.Transform(), walker) for walker in _walkers]
for result in self._client.apply_batch_sync(batch, True):
if result.error:
if self._verbose:
print('[SIMULATOR] Walker controller ', result.error)
else:
_controllers.append(result.actor_id)
# 4. add peds and controllers into actor dict
controllers.extend(_controllers)
walkers.extend(_walkers)
walker_speed.extend(_walker_speed)
CarlaDataProvider.register_actors(self._world.get_actors(walkers))
# CarlaDataProvider.register_actors(self._world.get_actors(controllers))
# wait for a tick to ensure client receives the last transform of the walkers we have just created
self._world.tick()
# 5. initialize each controller and set target to walk to (list is [controller, actor, controller, actor ...])
# set how many pedestrians can cross the road
self._world.set_pedestrians_cross_factor(pedestrians_crossing)
for i, controller_id in enumerate(controllers):
controller = self._world.get_actor(controller_id)
#controller = CarlaDataProvider.get_actor_by_id(controller_id)
controller.start()
controller.go_to_location(self._world.get_random_location_from_navigation())
controller.set_max_speed(float(walker_speed[i]))
self._actor_map['walker_controller'].append(controller)
# example of how to use parameters
self._tm.global_percentage_speed_difference(30.0)
self._world.tick()
def _prepare_observations(self) -> None:
self._sensor_helper = SensorHelper(self._obs_cfg, self._camera_aug_cfg)
self._sensor_helper.setup_sensors(self._world, self._hero_actor)
while not self._sensor_helper.all_sensors_ready():
self._world.tick()
for obs_item in self._obs_cfg:
if obs_item.type == 'bev':
self._bev_wrapper = BeVWrapper(obs_item)
self._bev_wrapper.init(self._client, self._world, self._map, self._hero_actor)
planner_cls = PLANNER_DICT[self._planner_cfg.get('type', 'basic')]
self._planner = planner_cls(self._planner_cfg)
self._collision_sensor = CollisionSensor(self._hero_actor, self._col_threshold)
self._traffic_light_helper = TrafficLightHelper(self._hero_actor)
def _ready(self, ticks: int = 30) -> bool:
for _ in range(ticks):
self.run_step()
self.get_state()
self._tick = 0
self._timestamp = 0
return not self._collided
def _count_actors(self) -> None:
vehicles = []
traffic_lights = []
speed_limits = []
walkers = []
sensors = []
others = []
actors = self._world.get_actors()
actors_with_transforms = [(actor, actor.get_transform()) for actor in actors]
for actor_with_transform in actors_with_transforms:
actor = actor_with_transform[0]
if 'vehicle' in actor.type_id:
vehicles.append(actor_with_transform)
elif 'traffic_light' in actor.type_id:
traffic_lights.append(actor_with_transform)
elif 'speed_limit' in actor.type_id:
speed_limits.append(actor_with_transform)
elif 'walker.pedestrian' in actor.type_id:
walkers.append(actor_with_transform)
elif 'sensor' in actor.type_id:
sensors.append(actor_with_transform)
else:
others.append(actor_with_transform)
print("[SIMULATOR] vehicles:")
for veh in vehicles:
print('\t', veh[0].id, veh[0].type_id, veh[0].attributes['role_name'])
print("[SIMULATOR] walkers:", len(walkers))
#print("[SIMULATOR] lights:", len(traffic_lights))
#print("[SIMULATOR] speed limits:", len(speed_limits))
print("[SIMULATOR] sensors:")
for ss in sensors:
print('\t', ss[0])
print("[SIMULATOR] others:", len(others))
[docs] def apply_planner(self, end_idx: int) -> Dict:
"""
Apply goal waypoint to planner in simulator. The start point is set to current hero vehicle waypoint.
:Arguments:
- end_idx (int): Index of end waypoint.
:Returns:
Dict: [description]
"""
assert self._start_location is not None
self._end_location = CarlaDataProvider.get_spawn_point(end_idx).location
self._planner.set_destination(self._start_location, self._end_location, clean=True)
self._total_distance = self._planner.distance_to_goal
self._end_timeout = self._planner.timeout
[docs] def get_state(self) -> Dict:
"""
Get running state from current world. It contains location, orientation, speed, acc,
and the state of surrounding road info such as traffic light and junction.
:Returns:
Dict: State dict.
"""
speed = CarlaDataProvider.get_speed(self._hero_actor) * 3.6
transform = CarlaDataProvider.get_transform(self._hero_actor)
location = transform.location
forward_vector = transform.get_forward_vector()
acceleration = CarlaDataProvider.get_acceleration(self._hero_actor)
angular_velocity = CarlaDataProvider.get_angular_velocity(self._hero_actor)
velocity = CarlaDataProvider.get_speed_vector(self._hero_actor)
light_state = self._traffic_light_helper.active_light_state.value
drive_waypoint = self._map.get_waypoint(
location,
project_to_road=False,
)
is_junction = False
if drive_waypoint is not None:
is_junction = drive_waypoint.is_junction
self._off_road = False
else:
self._off_road = True
lane_waypoint = self._map.get_waypoint(location, project_to_road=True, lane_type=carla.LaneType.Driving)
lane_location = lane_waypoint.transform.location
lane_forward_vector = lane_waypoint.transform.rotation.get_forward_vector()
state = {
'speed': speed,
'location': np.array([location.x, location.y, location.z]),
'forward_vector': np.array([forward_vector.x, forward_vector.y]),
'acceleration': np.array([acceleration.x, acceleration.y, acceleration.z]),
'velocity': np.array([velocity.x, velocity.y, velocity.z]),
'angular_velocity': np.array([angular_velocity.x, angular_velocity.y, angular_velocity.z]),
'rotation': np.array([transform.rotation.pitch, transform.rotation.yaw, transform.rotation.roll]),
'is_junction': is_junction,
'lane_location': np.array([lane_location.x, lane_location.y]),
'lane_forward': np.array([lane_forward_vector.x, lane_forward_vector.y]),
'tl_state': light_state,
'tl_dis': self._traffic_light_helper.active_light_dis,
}
if lane_waypoint is None:
state['lane_forward'] = None
else:
lane_forward_vector = lane_waypoint.transform.get_forward_vector()
state['lane_forward'] = np.array([lane_forward_vector.x, lane_forward_vector.y])
return state
[docs] def get_sensor_data(self) -> Dict:
"""
Get all sensor data and bird-eye view data if exist in current world. Bird-eye view will be
converted to an multi-channel image.
:Returns:
Dict: Sensor and Bev data dict.
"""
sensor_data = self._sensor_helper.get_sensors_data()
for obs_item in self._obs_cfg:
if obs_item.type not in OBS_TYPE_LIST:
raise NotImplementedError("observation type %s not implemented" % obs_item.type)
elif obs_item.type == 'bev':
key = obs_item.name
sensor_data.update({key: get_birdview(self._bev_wrapper.get_bev_data())})
return sensor_data
[docs] def get_navigation(self) -> Dict:
"""
Get navigation info in current world. Most of the contains come from planner.
:Returns:
Dict: Navigation dict.
"""
command = self._planner.node_road_option
node_location = self._planner.node_waypoint.transform.location
node_forward = self._planner.node_waypoint.transform.rotation.get_forward_vector()
target_location = self._planner.target_waypoint.transform.location
target_forward = self._planner.target_waypoint.transform.rotation.get_forward_vector()
waypoint_list = self._planner.get_waypoints_list(self._waypoint_num)
direction_list = self._planner.get_direction_list(self._waypoint_num)
agent_state = self._planner.agent_state
speed_limit = self._planner.speed_limit
self._end_distance = self._planner.distance_to_goal
self._end_timeout = self._planner.timeout
if self._bev_wrapper is not None:
self._bev_wrapper.update_waypoints(waypoint_list)
waypoint_location_list = []
for wp in waypoint_list:
wp_loc = wp.transform.location
wp_vec = wp.transform.rotation.get_forward_vector()
waypoint_location_list.append([wp_loc.x, wp_loc.y, wp_vec.x, wp_vec.y])
if not self._off_road:
current_waypoint = self._planner.current_waypoint
node_waypoint = self._planner.node_waypoint
# Lanes and roads are too chaotic at junctions
if current_waypoint.is_junction or node_waypoint.is_junction:
self._wrong_direction = False
else:
node_yaw = node_waypoint.transform.rotation.yaw % 360
cur_yaw = current_waypoint.transform.rotation.yaw % 360
wp_angle = (node_yaw - cur_yaw) % 360
if 150 <= wp_angle <= (360 - 150):
self._wrong_direction = True
else:
# Changing to a lane with the same direction
self._wrong_direction = False
navigation = {
'agent_state': agent_state.value,
'command': command.value,
'node': np.array([node_location.x, node_location.y]),
'node_forward': np.array([node_forward.x, node_forward.y]),
'target': np.array([target_location.x, target_location.y]),
'target_forward': np.array([target_forward.x, target_forward.y]),
'waypoint_list': np.array(waypoint_location_list),
'speed_limit': np.array(speed_limit),
'direction_list': np.array(direction_list)
}
return navigation
[docs] def run_step(self) -> None:
"""
Run one step simulation.
This will tick Carla world and update information for all sensors and measurement.
"""
self._world.tick()
self._tick += 1
world_snapshot = self._world.get_snapshot()
self._timestamp = world_snapshot.timestamp.elapsed_seconds
CarlaDataProvider.on_carla_tick()
if self._planner is not None:
self._planner.run_step()
self._collided = self._collision_sensor.collided
self._traffic_light_helper.tick()
self._ran_light = self._traffic_light_helper.ran_light
if self._bev_wrapper is not None:
if CarlaDataProvider._hero_vehicle_route is not None:
self._bev_wrapper.tick()
[docs] def apply_control(self, control: Dict = None) -> None:
"""
Apply control signal for hero player in simulator.
This will send message to the client and the control takes effect in next tick
:Arguments:
- control (dict, optional): Control signal dict. Default to None.
"""
if control is not None:
control_signal = control_to_signal(control)
self._hero_actor.apply_control(control_signal)
[docs] def clean_up(self) -> None:
"""
Destroy all actors and sensors in current world. Clear all messages saved in simulator and data provider.
This will NOT destroy the Carla client, so simulator can use same carla client to start next episode.
"""
for actor in self._actor_map['walker_controller']:
if actor.is_alive:
actor.stop()
actor.destroy()
self._actor_map['walker_controller'].clear()
self._actor_map.clear()
if self._sensor_helper is not None:
self._sensor_helper.clean_up()
if self._bev_wrapper is not None:
self._bev_wrapper.clear()
if self._collision_sensor is not None:
self._collision_sensor.clear()
if self._planner is not None:
self._planner.clean_up()
self._tick = 0
self._timestamp = 0
self._collided = False
self._ran_light = False
self._off_road = False
self._wrong_direction = False
self._end_distance = float('inf')
self._end_timeout = float('inf')
CarlaDataProvider.clean_up()
if self._debug:
print('after')
self._count_actors()
@property
def town_name(self) -> str:
return self._town_name
@property
def hero_player(self) -> carla.Actor:
return self._hero_actor
@property
def collided(self) -> bool:
return self._collided
@property
def ran_light(self) -> bool:
return self._ran_light
@property
def off_road(self) -> bool:
return self._off_road
@property
def wrong_direction(self) -> bool:
return self._wrong_direction
@property
def end_distance(self) -> float:
return self._end_distance
@property
def end_timeout(self) -> float:
return self._end_timeout
@property
def total_distance(self) -> float:
return self._total_distance