Source code for core.utils.planner.behavior_planner

import numpy as np
from typing import Dict, List, Tuple
import carla

from .basic_planner import AgentState, BasicPlanner, RoadOption
from core.utils.simulator_utils.carla_agents.navigation.types_behavior import Cautious, Aggressive, Normal
from core.simulators.carla_data_provider import CarlaDataProvider
from core.utils.simulator_utils.carla_agents.tools.misc import is_within_distance, compute_distance, positive
from core.utils.planner.planner_utils import get_next_until_junction, generate_change_lane_route


[docs]class BehaviorPlanner(BasicPlanner): """ Behavior local planner for Carla simulator. It can set route the same way as ``BasicPlanner``. BehaviorPlanner can check the speed limitations, traffic lights in evironment, and also take nearby vehicles into account. Lane changing decisions can be taken by analyzing the surrounding environment, such as overtaking or tailgating avoidance. Besides, it can also keep safety distance from a car in front of it by tracking the instantaneous time to collision and keeping it in a certain range. Different sets of behaviors are encoded in the agent, from cautious to a more aggressive ones. :Arguments: - cfg (Dict): Config dict. :Interfaces: set_destination, set_route, run_step, get_waypoints_list, clean_up """ config = dict( min_distance=5.0, resolution=5.0, fps=10, debug=False, behavior='normal', min_speed=5, ) def __init__(self, cfg: Dict) -> None: super().__init__(cfg) self._speed = 0 self._incoming_waypoint = None self._incoming_direction = None self._light_state = "Green" self._light_id_to_ignore = -1 behavior = self._cfg.behavior self._min_speed = self._cfg.min_speed # Parameters for agent behavior if behavior == 'cautious': self.behavior = Cautious() elif behavior == 'normal': self.behavior = Normal() elif behavior == 'aggressive': self.behavior = Aggressive() else: raise ValueError( 'behavior must in ["normal", "aggresive", and "cautious"], got {} instead.'.format(behavior) ) def _behavior_is_vehicle_hazard( self, vehicle_list: List, proximity_th: float, up_angle_th: float, low_angle_th: float = 0, lane_offset: int = 0 ) -> Tuple[bool, carla.Actor, float]: # Get the right offset if self.current_waypoint.lane_id < 0 and lane_offset != 0: lane_offset *= -1 for target_vehicle in vehicle_list: target_vehicle_loc = target_vehicle.get_location() # If the object is not in our next or current lane it's not an obstacle target_wpt = self._map.get_waypoint(target_vehicle_loc) if target_wpt.road_id != self.current_waypoint.road_id or \ target_wpt.lane_id != self.current_waypoint.lane_id + lane_offset: find_in_next = False for i in range(1, 5): next_wpt = self.get_incoming_waypoint_and_direction(steps=i)[0] if next_wpt is None: continue if target_wpt.road_id != next_wpt.road_id or \ target_wpt.lane_id != next_wpt.lane_id + lane_offset: continue find_in_next = True if not find_in_next: continue if is_within_distance(target_vehicle_loc, self._vehicle_location, self._hero_vehicle.get_transform().rotation.yaw, proximity_th, up_angle_th, low_angle_th): return (True, target_vehicle, compute_distance(target_vehicle_loc, self._vehicle_location)) return (False, None, -1) def change_route_in_current_road(self, route): route_end = route[-1][0] find_in_buffer = False wpt_index = -1 if self._waypoints_buffer: for i, (waypoint, _) in enumerate(self._waypoints_buffer): if waypoint.road_id != route_end.road_id: wpt_index = i find_in_buffer = True break if find_in_buffer: for i in range(wpt_index): self._waypoints_buffer.popleft() self.distance_to_goal -= self.distances.popleft() else: for i in range(len(self._waypoints_buffer)): self._waypoints_buffer.popleft() if self.distances: self.distance_to_goal -= self.distances.popleft() for i, (waypoint, _) in enumerate(self._waypoints_queue): if waypoint.road_id != route_end.road_id: wpt_index = i break if wpt_index >= 0: for i in range(wpt_index): self._waypoints_queue.popleft() if self.distances: self.distance_to_goal -= self.distances.popleft() else: self._waypoints_queue.clear() self.distances.clear() self.distance_to_goal = 0 self._waypoints_queue.append((self.end_waypoint, RoadOption.LANEFOLLOW)) if self._waypoints_buffer: old_route_start = self._waypoints_buffer[0][0] else: old_route_start = self._waypoints_queue[0][0] link_route = self._grp.trace_route(route_end.transform.location, old_route_start.transform.location) self.add_route_in_front(link_route) self.add_route_in_front(route)
[docs] def run_step(self) -> None: """ Run one step of local planner. It will update node and target waypoint and road option, and check agent states. Besides, it may have some behavior like overtaking, tailgating, safe distance keeping and so on """ assert self._route is not None vehicle_transform = CarlaDataProvider.get_transform(self._hero_vehicle) self._vehicle_location = vehicle_transform.location self.current_waypoint = self._map.get_waypoint( self._vehicle_location, lane_type=carla.LaneType.Driving, project_to_road=True ) # Add waypoints into buffer if empty if not self._waypoints_buffer: for i in range(min(self._buffer_size, len(self._waypoints_queue))): if self._waypoints_queue: self._waypoints_buffer.append(self._waypoints_queue.popleft()) else: break # If no waypoints return with current waypoint if not self._waypoints_buffer: self.target_waypoint = self.current_waypoint self.node_waypoint = self.current_waypoint self.target_road_option = RoadOption.VOID self.node_road_option = RoadOption.VOID self.agent_state = AgentState.VOID return # Find the most far waypoint within min distance max_index = -1 for i, (waypoint, _) in enumerate(self._waypoints_buffer): cur_dis = waypoint.transform.location.distance(vehicle_transform.location) if cur_dis < self._min_distance: max_index = i if max_index >= 0: for i in range(max_index + 1): self.node_waypoint, self.node_road_option = self._waypoints_buffer.popleft() if self._waypoints_queue: self._waypoints_buffer.append(self._waypoints_queue.popleft()) if self.distances: self.distance_to_goal -= self.distances.popleft() # Update information if self._waypoints_buffer: self.target_waypoint, self.target_road_option = self._waypoints_buffer[0] self.agent_state = AgentState.NAVIGATING self._speed = CarlaDataProvider.get_speed(self._hero_vehicle) * 3.6 self.speed_limit = self._hero_vehicle.get_speed_limit() look_ahead_steps = int((self.speed_limit) / 10) incoming_waypoint, incoming_direction = self.get_incoming_waypoint_and_direction(look_ahead_steps) # 1: Red lights and stops behavior light_state, _ = CarlaDataProvider.is_light_red(self._hero_vehicle) if light_state: self.agent_state = AgentState.BLOCKED_RED_LIGHT return # 2.1: Pedestrian avoidancd behaviors walker_state, walker, w_distance = self._pedestrian_avoid_manager() if walker_state: # Distance is computed from the center of the two cars, # we use bounding boxes to calculate the actual distance distance = w_distance - max(walker.bounding_box.extent.y, walker.bounding_box.extent.x) - max( self._hero_vehicle.bounding_box.extent.y, self._hero_vehicle.bounding_box.extent.x ) # Emergency brake if the car is very close. if distance < self.behavior.braking_distance: self.agent_state = AgentState.BLOCKED_BY_WALKER return # 2.2: BIke avoidancd behaviors bike_state, bike = CarlaDataProvider.is_bike_hazard(self._hero_vehicle) if bike_state: self.agent_state = AgentState.BLOCKED_BY_BIKE # 2.3: Car changelane behaviors lane_vehicle_state, vehicle = CarlaDataProvider.is_lane_vehicle_hazard( self._hero_vehicle, self.target_road_option ) if lane_vehicle_state: self.agent_state = AgentState.BLOCKED_BY_VEHICLE # 2.4: Car in junction behaviors junction_vehicle_state, vehicle = CarlaDataProvider.is_junction_vehicle_hazard( self._hero_vehicle, self.target_road_option ) if junction_vehicle_state: self.agent_state = AgentState.BLOCKED_BY_VEHICLE # 2.5: Car following behaviors vehicle_state, vehicle, distance = self._collision_and_car_avoid_manager() if vehicle_state: # Distance is computed from the center of the two cars, # we use bounding boxes to calculate the actual distance distance = distance - max(vehicle.bounding_box.extent.y, vehicle.bounding_box.extent.x) - max( self._hero_vehicle.bounding_box.extent.y, self._hero_vehicle.bounding_box.extent.x ) # Emergency brake if the car is very close. if distance < self.behavior.braking_distance: self.agent_state = AgentState.BLOCKED_BY_VEHICLE else: vehicle_speed = CarlaDataProvider.get_speed(vehicle) * 3.6 delta_v = max(1, (self._speed - vehicle_speed) / 3.6) ttc = distance / delta_v if delta_v != 0 else distance / np.nextafter(0., 1.) # Under safety time distance, slow down. if self.behavior.safety_time > ttc > 0.0: self.speed_limit = min( positive(vehicle_speed - self.behavior.speed_decrease), min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist) ) # Actual safety distance area, try to follow the speed of the vehicle in front. elif 2 * self.behavior.safety_time > ttc >= self.behavior.safety_time: self.speed_limit = min( max(self._min_speed, vehicle_speed), min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist) ) # Normal behavior. else: self.speed_limit = min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist) # 4: Intersection behavior # Checking if there's a junction nearby to slow down elif incoming_waypoint.is_junction and (incoming_direction == RoadOption.LEFT or incoming_direction == RoadOption.RIGHT): self.speed_limit = min(self.behavior.max_speed, self.speed_limit - 5) # 5: Change lane behavior elif incoming_waypoint.lane_id != self.current_waypoint.lane_id \ or (incoming_direction == RoadOption.CHANGELANELEFT or incoming_direction == RoadOption.CHANGELANERIGHT): self.speed_limit = max( min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist) * 0.6, 25 ) # 6: Normal behavior else: self.speed_limit = min(self.behavior.max_speed, self.speed_limit - self.behavior.speed_lim_dist)
def _traffic_light_manager(self) -> int: """ This method is in charge of behaviors for red lights and stops. WARNING: What follows is a proxy to avoid having a car brake after running a yellow light. This happens because the car is still under the influence of the semaphore, even after passing it. So, the semaphore id is temporarely saved to ignore it and go around this issue, until the car is near a new one. """ light_id = self._hero_vehicle.get_traffic_light( ).id if self._hero_vehicle.get_traffic_light() is not None else -1 light_state = str(self._hero_vehicle.get_traffic_light_state()) if light_state == "Red": if not self.current_waypoint.is_junction and (self._light_id_to_ignore != light_id or light_id == -1): return 1 elif self.current_waypoint.is_junction and light_id != -1: self.light_id_to_ignore = light_id if self._light_id_to_ignore != light_id: self._light_id_to_ignore = -1 return 0 def _pedestrian_avoid_manager(self) -> Tuple[bool, carla.Actor, float]: """ This module is in charge of warning in case of a collision with any pedestrian. :return vehicle_state: True if there is a walker nearby, False if not :return vehicle: nearby walker :return distance: distance to nearby walker """ walker_list = self._world.get_actors().filter("*walker.pedestrian*") def dist(w): return w.get_location().distance(self.current_waypoint.transform.location) walker_list = [w for w in walker_list if dist(w) < 10] if self.target_road_option == RoadOption.CHANGELANELEFT: walker_state, walker, distance = self._behavior_is_vehicle_hazard( walker_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=-1 ) elif self.target_road_option == RoadOption.CHANGELANERIGHT: walker_state, walker, distance = self._behavior_is_vehicle_hazard( walker_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=90, lane_offset=1 ) else: walker_state, walker, distance = self._behavior_is_vehicle_hazard( walker_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=60 ) return walker_state, walker, distance def _collision_and_car_avoid_manager(self) -> Tuple[bool, carla.Actor, float]: """ This module is in charge of warning in case of a collision and managing possible overtaking or tailgating chances. :return vehicle_state: True if there is a vehicle nearby, False if not :return vehicle: nearby vehicle :return distance: distance to nearby vehicle """ vehicle_list = self._world.get_actors().filter("*vehicle*") def dist(v): return v.get_location().distance(self.current_waypoint.transform.location) vehicle_list = [v for v in vehicle_list if dist(v) < 45 and v.id != self._hero_vehicle.id] if self.target_road_option == RoadOption.CHANGELANELEFT: vehicle_state, vehicle, distance = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1 ) elif self.target_road_option == RoadOption.CHANGELANERIGHT: vehicle_state, vehicle, distance = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1 ) else: vehicle_state, vehicle, distance = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=30 ) # Check for overtaking if vehicle_state and self.target_road_option == RoadOption.LANEFOLLOW \ and not self.current_waypoint.is_junction and self._speed > 10 \ and self.behavior.overtake_counter == 0 \ and self._speed > CarlaDataProvider.get_speed(vehicle) * 3.6: self._overtake(vehicle_list) # Check for tailgating elif not vehicle_state and self.target_road_option == RoadOption.LANEFOLLOW \ and not self.current_waypoint.is_junction and self._speed > 10 \ and self.behavior.tailgate_counter == 0: self._tailgating(vehicle_list) return vehicle_state, vehicle, distance def _overtake(self, vehicle_list: List[carla.Actor]) -> None: """ This method is in charge of overtaking behaviors. :param vehicle_list: list of all the nearby vehicles """ left_turn = self.current_waypoint.left_lane_marking.lane_change right_turn = self.current_waypoint.right_lane_marking.lane_change left_wpt = self.current_waypoint.get_left_lane() right_wpt = self.current_waypoint.get_right_lane() if (left_turn == carla.LaneChange.Left or left_turn == carla.LaneChange.Both) and \ self.current_waypoint.lane_id * left_wpt.lane_id > 0 and left_wpt.lane_type == carla.LaneType.Driving: new_vehicle_state, _, _ = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=-1 ) if not new_vehicle_state: _, left_end_dis = get_next_until_junction(left_wpt, 100) if left_end_dis > 40: print("[PLANNER] Overtaking to the left!") self.behavior.overtake_counter = 200 left_plan = generate_change_lane_route(self.current_waypoint, 'left', 0, 5, left_end_dis - 20) self.change_route_in_current_road(left_plan) elif right_turn == carla.LaneChange.Right and self.current_waypoint.lane_id * right_wpt.lane_id > 0 and \ right_wpt.lane_type == carla.LaneType.Driving: new_vehicle_state, _, _ = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 3), up_angle_th=180, lane_offset=1 ) if not new_vehicle_state: _, right_end_dis = get_next_until_junction(right_wpt, 100) if right_end_dis > 40: print("[PLANNER] Overtaking to the right!") self.behavior.overtake_counter = 200 right_plan = generate_change_lane_route(self.current_waypoint, 'right', 0, 5, right_end_dis - 20) self.change_route_in_current_road(right_plan) def _tailgating(self, vehicle_list: List[carla.Actor]) -> None: """ This method is in charge of tailgating behaviors. :param location: current location of the agent :param waypoint: current waypoint of the agent :param vehicle_list: list of all the nearby vehicles """ left_turn = self.current_waypoint.left_lane_marking.lane_change right_turn = self.current_waypoint.right_lane_marking.lane_change left_wpt = self.current_waypoint.get_left_lane() right_wpt = self.current_waypoint.get_right_lane() behind_vehicle_state, behind_vehicle, _ = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, low_angle_th=160 ) if behind_vehicle_state and self._speed < CarlaDataProvider.get_speed(behind_vehicle) * 3.6: if (right_turn == carla.LaneChange.Right or right_turn == carla.LaneChange.Both) and \ self.current_waypoint.lane_id * right_wpt.lane_id > 0 and \ right_wpt.lane_type == carla.LaneType.Driving: new_vehicle_state, _, _ = self._behavior_is_vehicle_hazard( vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=1 ) if not new_vehicle_state: print("[PLANNER] Tailgating, moving to the right!") self.behavior.tailgate_counter = 200 self.set_destination(right_wpt.transform.location, self.end_waypoint.transform.location, clean=True) elif left_turn == carla.LaneChange.Left and self.current_waypoint.lane_id * left_wpt.lane_id > 0 and \ left_wpt.lane_type == carla.LaneType.Driving: new_vehicle_state, _, _ = self._behavior_is_vehicle_hazard( self.current_waypoint, self._vehicle_location, vehicle_list, max(self.behavior.min_proximity_threshold, self.speed_limit / 2), up_angle_th=180, lane_offset=-1 ) if not new_vehicle_state: print("[PLANNER] Tailgating, moving to the left!") self.behavior.tailgate_counter = 200 self.set_destination(left_wpt.transform.location, self.end_waypoint.transform.location, clean=True)