import os
import copy
import logging
import time
import numpy as np
import carla
import math
import weakref
import shapely.geometry
from enum import Enum
from easydict import EasyDict
from collections import deque
from typing import Any, Dict, List, Optional, Tuple, Union
from core.simulators.carla_data_provider import CarlaDataProvider
from ding.utils.default_helper import deep_merge_dicts
DEFAULT_CAMERA_CONFIG = {
'size': [384, 160],
'fov': 90,
'position': [2.0, 0.0, 1.4],
'rotation': [0, 0, 0],
}
DEFAULT_CAMERA_AUG_CONFIG = {'position_range': [0, 0, 0], 'rotation_range': [0, 0, 0]}
DEFAULT_LIDAR_CONFIG = {
'channels': 1,
'range': 2000,
'points_per_second': 1000,
'rotation_frequency': 10,
'upper_fov': -3,
'lower_fov': -3,
'position': [0, 0.0, 1.4],
'rotation': [0, -90, 0],
'draw': False,
}
DEFAULT_GNSS_CONFIG = {
'position': [0.0, 0.0, 1.4],
}
class TrafficLightState(Enum):
RED = 0
YELLOW = 1
GREEN = 2
OFF = 3
def get_random_sample(range_list):
res = []
for _range in range_list:
num = np.random.random() * _range * 2 - _range
res.append(num)
return res
[docs]class SensorHelper(object):
"""
Interfaces for sensors required for vehicles and data buffer for all sensor data in Carla. The updating for Carla
sensors are not synchronous. In each tick, the newest sensor data is obtained from sensor data buffer and returned
to the simulator. This class provides an interface that can easily create, receive data and destroy all
kinds of sensors in Carla according to config, and apply the same sensor augmantation to all camera sensors.
:Arguments:
- obs_cfg (Dict): Config dict for sensor
- aug_cfg (Dict, optional): Config dict for sensor augmentation. Defaults to None.
:Interfaces: setup_sensors, get_sensors_data, clear_up
"""
def __init__(
self,
obs_cfg: Dict,
aug_cfg: Optional[Dict] = None,
) -> None:
self._obs_cfg = obs_cfg
self._aug_cfg = aug_cfg
self._sensors_dict = {}
self._data_buffers = {}
self._timestamps = {}
self._random_aug_pos = None
self._random_aug_rot = None
[docs] def setup_sensors(self, world: carla.World, vehicle: carla.Actor) -> None:
"""
Create the sensors defined in config and attach them to the hero vehicle
:Arguments:
- world (carla.World): Carla world
- vehicle (carla.Actor): ego vehicle
"""
bp_library = world.get_blueprint_library()
if self._aug_cfg:
self._aug_cfg = EasyDict(deep_merge_dicts(DEFAULT_CAMERA_AUG_CONFIG, self._aug_cfg))
if min(self._aug_cfg.position_range) < 0 or min(self._aug_cfg.rotation_range) < 0:
raise ValueError('Augmentation parameters must greater than 0!')
self._random_aug_pos = get_random_sample(self._aug_cfg.position_range)
self._random_aug_rot = get_random_sample(self._aug_cfg.rotation_range)
else:
self._random_aug_pos = [0, 0, 0]
self._random_aug_rot = [0, 0, 0]
for obs_item in self._obs_cfg:
if obs_item.type in ['rgb', 'depth', 'segmentation']:
obs_item = EasyDict(deep_merge_dicts(DEFAULT_CAMERA_CONFIG, obs_item))
bp_name = {
'rgb': 'sensor.camera.rgb',
'depth': 'sensor.camera.depth',
'segmentation': 'sensor.camera.semantic_segmentation',
}[obs_item.type]
sensor_bp = bp_library.find(bp_name)
sensor_bp.set_attribute('image_size_x', str(obs_item.size[0]))
sensor_bp.set_attribute('image_size_y', str(obs_item.size[1]))
sensor_bp.set_attribute('fov', str(obs_item.fov))
sensor_location = carla.Location(
obs_item.position[0] + self._random_aug_pos[0], obs_item.position[1] + self._random_aug_pos[1],
obs_item.position[2] + self._random_aug_pos[2]
)
sensor_rotation = carla.Rotation(
obs_item.rotation[0] + self._random_aug_rot[0], obs_item.rotation[1] + self._random_aug_rot[1],
obs_item.rotation[2] + self._random_aug_rot[2]
)
elif obs_item.type == 'lidar':
obs_item = EasyDict(deep_merge_dicts(DEFAULT_LIDAR_CONFIG, obs_item))
sensor_bp = bp_library.find('sensor.lidar.ray_cast')
sensor_bp.set_attribute('range', str(obs_item.range))
sensor_bp.set_attribute('rotation_frequency', str(obs_item.rotation_frequency))
sensor_bp.set_attribute('channels', str(obs_item.channels))
sensor_bp.set_attribute('upper_fov', str(obs_item.upper_fov))
sensor_bp.set_attribute('lower_fov', str(obs_item.lower_fov))
sensor_bp.set_attribute('points_per_second', str(obs_item.points_per_second))
sensor_location = carla.Location(obs_item.position[0], obs_item.position[1], obs_item.position[2])
sensor_rotation = carla.Rotation(obs_item.rotation[0], obs_item.rotation[1], obs_item.rotation[2])
elif obs_item.type == 'gnss':
obs_item = EasyDict(deep_merge_dicts(DEFAULT_GNSS_CONFIG, obs_item))
obs_item.update(obs_item)
sensor_bp = bp_library.find('sensor.other.gnss')
sensor_location = carla.Location(obs_item.position[0], obs_item.position[1], obs_item.position[2])
sensor_rotation = carla.Rotation()
else:
continue
sensor_transform = carla.Transform(sensor_location, sensor_rotation)
sensor = world.spawn_actor(sensor_bp, sensor_transform, attach_to=vehicle)
sensor.listen(CallBack(obs_item.name, obs_item.type, self))
self.register_sensor(obs_item.name, sensor)
[docs] def clean_up(self) -> None:
"""
Remove and destroy all sensors
"""
for key in self._sensors_dict:
if self._sensors_dict[key] is not None:
if self._sensors_dict[key].is_alive:
self._sensors_dict[key].stop()
self._sensors_dict[key].destroy()
self._sensors_dict[key] = None
time.sleep(0.1)
self._sensors_dict.clear()
self._data_buffers.clear()
self._timestamps.clear()
def register_sensor(self, tag: str, sensor: Any) -> None:
"""
Registers the sensors
"""
if tag in self._sensors_dict:
raise ValueError("Duplicated sensor tag [{}]".format(tag))
self._sensors_dict[tag] = sensor
self._data_buffers[tag] = None
self._timestamps[tag] = -1
def update_sensor(self, tag: str, data: Any, timestamp: Any) -> None:
"""
Updates the sensor
"""
if tag not in self._sensors_dict:
raise ValueError("The sensor with tag [{}] has not been created!".format(tag))
self._data_buffers[tag] = data
self._timestamps[tag] = timestamp
def all_sensors_ready(self) -> bool:
"""
Checks if all the sensors have sent data at least once
"""
for key in self._sensors_dict:
if self._data_buffers[key] is None:
return False
return True
[docs] def get_sensors_data(self) -> Dict:
"""
Get all registered sensor data from buffer
:Returns:
Dict: all newest sensor data
"""
sensor_data = {}
for obs_item in self._obs_cfg:
if obs_item.type in ['rgb', 'segmentation', 'lidar', 'gnss']:
key = obs_item.name
img = self._data_buffers[key]
sensor_data[key] = img
elif obs_item.type == 'depth':
key = obs_item.name
raw = self._data_buffers[key]
img = raw.astype(np.float64)
R = img[..., 0]
G = img[..., 1]
B = img[..., 2]
depth = (R + G * 256 + B * 256 * 256) / (256 * 256 * 256 - 1)
depth = 1000 * depth
sensor_data[key] = depth
if self._aug_cfg:
sensor_data['aug'] = {
'aug_pos': np.array(self._random_aug_pos),
'aug_rot': np.array(self._random_aug_rot),
}
return sensor_data
class CallBack(object):
"""
Class the sensors listen to in order to receive their data each frame
"""
def __init__(self, tag: str, type: str, wrapper: Any) -> None:
"""
Initializes the call back
"""
self._tag = tag
self._type = type
self._data_wrapper = wrapper
def __call__(self, data: Any) -> None:
"""
call function
"""
if isinstance(data, carla.Image):
self._parse_image_cb(data, self._tag)
elif isinstance(data, carla.LidarMeasurement):
self._parse_lidar_cb(data, self._tag)
elif isinstance(data, carla.GnssMeasurement):
self._parse_gnss_cb(data, self._tag)
else:
logging.error('No callback method for this sensor.')
# Parsing CARLA physical Sensors
def _parse_image_cb(self, image: Any, tag: str) -> None:
"""
parses cameras
"""
if self._type == 'rgb':
image.convert(carla.ColorConverter.Raw)
if self._type == 'segmentation':
image.convert(carla.ColorConverter.CityScapesPalette)
img = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
img = np.reshape(img, (image.height, image.width, 4))
img = img[:, :, :3]
img = img[:, :, ::-1]
img = copy.deepcopy(img)
self._data_wrapper.update_sensor(tag, img, image.frame)
def _parse_lidar_cb(self, lidar_data: Any, tag: str) -> None:
"""
parses lidar sensors
"""
points = np.frombuffer(lidar_data.raw_data, dtype=np.dtype('f4'))
points = copy.deepcopy(points)
points = np.reshape(points, (int(points.shape[0] / 3), 3))
self._data_wrapper.update_sensor(tag, points, lidar_data.frame)
def _parse_gnss_cb(self, gnss_data: Any, tag: str) -> None:
"""
parses gnss sensors
"""
array = np.array([gnss_data.latitude, gnss_data.longitude, gnss_data.altitude], dtype=np.float64)
self._data_wrapper.update_sensor(tag, array, gnss_data.frame)
[docs]class CollisionSensor(object):
"""
Carla sensor interface used to detect collision info in simulator. Once created,
it will automatically update every tick.
:Arguments:
- parent_actor (carla.Actor): Actor to detect collision
- col_threshold (float): Threshold value of collided impulse
:Interfaces: clear
"""
def __init__(self, parent_actor: carla.Actor, col_threshold: float) -> None:
self.sensor = None
self._history = deque(maxlen=500)
self._parent = parent_actor
self._threshold = col_threshold
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
self.collided = False
self.collided_frame = -1
@staticmethod
def _on_collision(weak_self, event: Any) -> None:
self = weak_self()
if not self:
return
impulse = event.normal_impulse
intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2)
self._history.append((event.frame, intensity))
if intensity > self._threshold:
self.collided = True
self.collided_frame = event.frame
[docs] def clear(self) -> None:
"""
Clear collision sensor in Carla world.
"""
self._history.clear()
if self.sensor.is_alive:
self.sensor.stop()
self.sensor.destroy()
[docs]class TrafficLightHelper(object):
"""
Interface of traffic light detector and recorder. It detects next traffic light state,
calculates distance from hero vehicle to the end of this road, and if hero vehicle crosses
this line when correlated light is red, it will record running a red light
:Arguments:
- hero_vehicle (carla.Actor): Hero vehicle
:Interfaces:
- tick
"""
def __init__(self, hero_vehicle: carla.Actor, debug: bool = False) -> None:
self._hero_vehicle = hero_vehicle
self._world = CarlaDataProvider.get_world()
self._map = CarlaDataProvider.get_map()
self._light_dis_thresh = 20
self._active_light = None
self._last_light = None
self.total_lights_ran = 0
self.total_lights = 0
self.ran_light = False
self.active_light_state = TrafficLightState.OFF
self.active_light_dis = 200
self._debug = debug
[docs] def tick(self) -> None:
"""
Tick one step. It will check the next traffic light and its state, update the count number
of traffic light if needed. It will check the running light event by getting the last waypoints
in current road and check if the vehicle has crossed them.
"""
self.ran_light = False
vehicle_transform = CarlaDataProvider.get_transform(self._hero_vehicle)
vehicle_location = vehicle_transform.location
self._active_light, light_trigger_location = self._get_active_light()
if self._active_light is not None:
if self._debug:
self._world.debug.draw_point(light_trigger_location + carla.Location(z=2), size=0.1)
self.active_light_state = {
carla.TrafficLightState.Green: TrafficLightState.GREEN,
carla.TrafficLightState.Yellow: TrafficLightState.YELLOW,
carla.TrafficLightState.Red: TrafficLightState.RED,
carla.TrafficLightState.Off: TrafficLightState.OFF,
}[self._active_light.state]
delta = vehicle_location - light_trigger_location
distance = np.sqrt(sum([delta.x ** 2, delta.y ** 2, delta.z ** 2]))
self.active_light_dis = min(200, distance)
if self.active_light_dis < self._light_dis_thresh:
if self._last_light is None or self._active_light.id != self._last_light.id:
self.total_lights += 1
self._last_light = self._active_light
else:
self.active_light_state = TrafficLightState.OFF
self.active_light_dis = 200
if self._last_light is not None:
if self._last_light.state != carla.TrafficLightState.Red:
return
veh_extent = self._hero_vehicle.bounding_box.extent.x
tail_close_pt = self._rotate_point(
carla.Vector3D(-0.8 * veh_extent, 0.0, vehicle_location.z), vehicle_transform.rotation.yaw
)
tail_close_pt = vehicle_location + carla.Location(tail_close_pt)
tail_far_pt = self._rotate_point(
carla.Vector3D(-veh_extent - 1, 0.0, vehicle_location.z), vehicle_transform.rotation.yaw
)
tail_far_pt = vehicle_location + carla.Location(tail_far_pt)
trigger_waypoints = self._get_traffic_light_trigger_waypoints(self._last_light)
if self._debug:
z = 2.1
if self._last_light.state == carla.TrafficLightState.Red:
color = carla.Color(155, 0, 0)
elif self._last_light.state == carla.TrafficLightState.Green:
color = carla.Color(0, 155, 0)
else:
color = carla.Color(155, 155, 0)
for wp in trigger_waypoints:
text = "{}.{}".format(wp.road_id, wp.lane_id)
self._world.debug.draw_string(wp.transform.location + carla.Location(x=1, z=z), text, color=color)
self._world.debug.draw_point(wp.transform.location + carla.Location(z=z), size=0.1, color=color)
for wp in trigger_waypoints:
tail_wp = self._map.get_waypoint(tail_far_pt)
# Calculate the dot product (Might be unscaled, as only its sign is important)
ve_dir = vehicle_transform.get_forward_vector()
wp_dir = wp.transform.get_forward_vector()
dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z
# Check the lane until all the "tail" has passed
if tail_wp.road_id == wp.road_id and tail_wp.lane_id == wp.lane_id and dot_ve_wp > 0:
# This light is red and is affecting our lane
yaw_wp = wp.transform.rotation.yaw
lane_width = wp.lane_width
location_wp = wp.transform.location
lft_lane_wp = self._rotate_point(carla.Vector3D(0.4 * lane_width, 0.0, location_wp.z), yaw_wp + 90)
lft_lane_wp = location_wp + carla.Location(lft_lane_wp)
rgt_lane_wp = self._rotate_point(carla.Vector3D(0.4 * lane_width, 0.0, location_wp.z), yaw_wp - 90)
rgt_lane_wp = location_wp + carla.Location(rgt_lane_wp)
# Is the vehicle traversing the stop line?
if self._is_vehicle_crossing_line((tail_close_pt, tail_far_pt), (lft_lane_wp, rgt_lane_wp)):
self.ran_light = True
self.total_lights_ran += 1
self._last_light = None
def _get_active_light(self) -> Tuple[Optional[carla.Actor], Optional[carla.Vector3D]]:
lights_list = CarlaDataProvider.get_actor_list().filter("*traffic_light*")
vehicle_transform = CarlaDataProvider.get_transform(self._hero_vehicle)
vehicle_location = vehicle_transform.location
vehicle_waypoint = CarlaDataProvider._map.get_waypoint(vehicle_location)
for traffic_light in lights_list:
object_location = CarlaDataProvider.get_trafficlight_trigger_location(traffic_light)
object_waypoint = CarlaDataProvider._map.get_waypoint(object_location)
if object_waypoint.road_id != vehicle_waypoint.road_id:
continue
ve_dir = vehicle_waypoint.transform.get_forward_vector()
wp_dir = object_waypoint.transform.get_forward_vector()
dot_ve_wp = ve_dir.x * wp_dir.x + ve_dir.y * wp_dir.y + ve_dir.z * wp_dir.z
if dot_ve_wp < 0:
continue
while not object_waypoint.is_intersection:
next_waypoint = object_waypoint.next(0.5)[0]
if next_waypoint and not next_waypoint.is_intersection:
object_waypoint = next_waypoint
else:
break
return traffic_light, object_waypoint.transform.location
return None, None
def _get_traffic_light_trigger_waypoints(self, traffic_light: carla.Actor) -> List[carla.Waypoint]:
base_transform = traffic_light.get_transform()
base_rot = base_transform.rotation.yaw
area_loc = base_transform.transform(traffic_light.trigger_volume.location)
# Discretize the trigger box into points
area_ext = traffic_light.trigger_volume.extent
x_values = np.arange(-0.9 * area_ext.x, 0.9 * area_ext.x, 1.0) # 0.9 to avoid crossing to adjacent lanes
area = []
for x in x_values:
point = self._rotate_point(carla.Vector3D(x, 0, area_ext.z), base_rot)
point_location = area_loc + carla.Location(x=point.x, y=point.y)
area.append(point_location)
# Get the waypoints of these points, removing duplicates
ini_wps = []
for pt in area:
wpx = self._map.get_waypoint(pt)
# As x_values are arranged in order, only the last one has to be checked
if not ini_wps or ini_wps[-1].road_id != wpx.road_id or ini_wps[-1].lane_id != wpx.lane_id:
ini_wps.append(wpx)
# Advance them until the intersection
wps = []
for wpx in ini_wps:
while not wpx.is_intersection:
next_wp = wpx.next(0.5)[0]
if next_wp and not next_wp.is_intersection:
wpx = next_wp
else:
break
wps.append(wpx)
return wps
def _is_vehicle_crossing_line(self, seg1: List, seg2: List) -> bool:
"""
check if vehicle crosses a line segment
"""
line1 = shapely.geometry.LineString([(seg1[0].x, seg1[0].y), (seg1[1].x, seg1[1].y)])
line2 = shapely.geometry.LineString([(seg2[0].x, seg2[0].y), (seg2[1].x, seg2[1].y)])
inter = line1.intersection(line2)
return not inter.is_empty
def _rotate_point(self, point: carla.Vector3D, angle: float) -> carla.Vector3D:
"""
rotate a given point by a given angle
"""
x_ = math.cos(math.radians(angle)) * point.x - math.sin(math.radians(angle)) * point.y
y_ = math.sin(math.radians(angle)) * point.x + math.cos(math.radians(angle)) * point.y
return carla.Vector3D(x_, y_, point.z)