Source code for core.policy.auto_policy

import numpy as np
from typing import List, Dict, Optional

from .base_carla_policy import BaseCarlaPolicy
from core.models import VehiclePIDController, SteerNoiseWrapper, MPCController
from ding.utils.default_helper import deep_merge_dicts
from ding.torch_utils.data_helper import to_ndarray

DEFAULT_LATERAL_DICT = {'K_P': 1, 'K_D': 0.1, 'K_I': 0, 'dt': 0.1}
DEFAULT_LONGITUDINAL_DICT = {'K_P': 1.0, 'K_D': 0, 'K_I': 0.05, 'dt': 0.1}
DEFAULT_LAT_HW_DICT = {'K_P': 0.75, 'K_D': 0.01, 'K_I': 0.2, 'dt': 0.1}
DEFAULT_LAT_CITY_DICT = {'K_P': 0.58, 'K_D': 0.01, 'K_I': 0.25, 'dt': 0.1}
DEFAULT_LONG_HW_DICT = {'K_P': 0.37, 'K_D': 0.012, 'K_I': 0.016, 'dt': 0.1}
DEFAULT_LONG_CITY_DICT = {'K_P': 0.15, 'K_D': 0.025, 'K_I': 0.035, 'dt': 0.1}

DEFAULT_MPC_ARGS = {'CTE_W': 1, 'ETH_W': 2, 'V_W': 1, 'ST_W': 0.5, 'ACC_W': 0.01}


[docs]class AutoPIDPolicy(BaseCarlaPolicy): """ Autonomous Driving policy follows target waypoint in env observations. It uses a Vehicle PID controller for each env with a specific env id related to it. In each updating, all envs should use the correct env id to make the PID controller works well, and the controller should be reset when starting a new episode. The policy has 2 modes: `collect` and `eval`. Their interfaces operate in the same way. The only difference is that in `collect` mode the ``forward`` method may add noises to steer if set in config. :Arguments: - cfg (Dict): Config Dict. :Interfaces: reset, forward """ config = dict( # target speed in km/h target_speed=25, # max control value max_brake=0.3, max_throttle=0.75, max_steer=0.8, # whether consider traffic light ignore_light=False, # whether consider speed limit provided by planner ignore_speed_limit=False, # traffic light distance threshold tl_threshold=10, # control param lateral_dict=DEFAULT_LATERAL_DICT, longitudinal_dict=DEFAULT_LONGITUDINAL_DICT, # whether add noise in steer noise=False, noise_kwargs=dict( noise_len=5, drive_len=100, noise_type='uniform', noise_args=dict( low=-0.3, high=0.3, ), ), debug=False, ) def __init__( self, cfg: Dict, ) -> None: super().__init__(cfg, enable_field=set(['collect', 'eval'])) self._controller_dict = dict() self._last_steer_dict = dict() self.target_speed = self._cfg.target_speed self._max_brake = self._cfg.max_brake self._max_throttle = self._cfg.max_throttle self._max_steer = self._cfg.max_steer self._ignore_traffic_light = self._cfg.ignore_light self._ignore_speed_limit = self._cfg.ignore_speed_limit self._tl_threshold = self._cfg.tl_threshold self._lateral_dict = self._cfg.lateral_dict self._longitudinal_dict = self._cfg.longitudinal_dict self._debug = self._cfg.debug def _reset(self, data_id: int, noise: bool = False) -> None: if data_id in self._controller_dict: self._controller_dict.pop(data_id) if self._lateral_dict is None: if self.target_speed > 50: self._lateral_dict = DEFAULT_LAT_HW_DICT else: self._lateral_dict = DEFAULT_LAT_CITY_DICT if self._longitudinal_dict is None: if self.target_speed > 50: self._longitudinal_dict = DEFAULT_LONG_HW_DICT else: self._longitudinal_dict = DEFAULT_LONG_CITY_DICT controller = VehiclePIDController( args_lateral=self._lateral_dict, args_longitudinal=self._longitudinal_dict, max_throttle=self._max_throttle, max_brake=self._max_brake, max_steering=self._max_steer, ) if noise: noise_controller = SteerNoiseWrapper( model=controller, **self._cfg.noise_kwargs, ) self._controller_dict[data_id] = noise_controller else: self._controller_dict[data_id] = controller self._last_steer_dict[data_id] = 0 def _forward(self, data_id: int, obs: Dict) -> Dict: controller = self._controller_dict[data_id] if obs['command'] == -1: control = self._emergency_stop(data_id) elif obs['agent_state'] == 2 or obs['agent_state'] == 3 or obs['agent_state'] == 5: control = self._emergency_stop(data_id) elif not self._ignore_traffic_light and obs['agent_state'] == 4: control = self._emergency_stop(data_id) elif not self._ignore_traffic_light and obs['tl_state'] in [0, 1] and obs['tl_dis'] < self._tl_threshold: control = self._emergency_stop(data_id) else: current_speed = obs['speed'] current_location = obs['location'] current_vector = obs['forward_vector'] target_location = obs['target'] if not self._ignore_speed_limit: target_speed = min(self.target_speed, obs['speed_limit']) else: target_speed = self.target_speed control = controller.forward( current_speed, current_location, current_vector, target_speed, target_location, ) if abs(control['steer'] > 0.1 and current_speed > 15): control['throttle'] = min(control['throttle'], 0.3) self._last_steer_dict[data_id] = control['steer'] return control def _emergency_stop(self, data_id: int) -> Dict: control = { 'steer': self._last_steer_dict[data_id], 'throttle': 0.0, 'brake': 1.0, } return control
[docs] def _forward_eval(self, data: Dict) -> Dict: """ Running forward to get control signal of `eval` mode. :Arguments: - data (Dict): Input dict, with env id in keys and related observations in values, :Returns: Dict: Control dict stored in values for each provided env id. """ data = to_ndarray(data) actions = dict() for i in data.keys(): obs = data[i] action = self._forward(i, obs) actions[i] = {'action': action} return actions
[docs] def _forward_collect(self, data: Dict) -> Dict: """ Running forward to get control signal of `collect` mode. :Arguments: - data (Dict): Input dict, with env id in keys and related observations in values, :Returns: Dict: Control dict stored in values for each provided env id. """ data = to_ndarray(data) actions = dict() for i in data.keys(): obs = data[i] action = self._forward(i, obs) actions[i] = {'action': action} return actions
[docs] def _reset_eval(self, data_id: Optional[List[int]] = None) -> None: """ Reset policy of `eval` mode. It will reset the controllers in providded env id. :Arguments: - data_id (List[int], optional): List of env id to reset. Defaults to None. """ if data_id is not None: for id in data_id: self._reset(id) else: for id in self._controller_dict: self._reset(id)
[docs] def _reset_collect(self, data_id: Optional[List[int]] = None) -> None: """ Reset policy of `collect` mode. It will reset the controllers in provided env id. Noise will be add to the controller according to config. :Arguments: - data_id (List[int], optional): List of env id to reset. Defaults to None. """ noise = self._cfg.noise if data_id is not None: for id in data_id: self._reset(id, noise) else: for id in self._controller_dict: self._reset(id, noise)
[docs]class AutoMPCPolicy(BaseCarlaPolicy): """ Autonomous Driving policy follows target waypoint list in env observations. It uses an MPC controller for each env with a specific env id related to it. In each updating, all envs should use the correct env id to make the MPC controller works well, and the controller should be reset when starting a new episode. The policy has 2 modes: `collect` and `eval`. Their interfaces operate in the same way. The only difference is that in `collect` mode the ``forward`` method may add noises to steer if set in config. :Arguments: - cfg (Dict): Config Dict. :Interfaces: reset, forward """ config = dict( target_speed=25, mpc_args=None, ignore_light=False, ignore_speed_limit=False, horizon=5, fps=4, noise=False, debug=False, ) def __init__( self, cfg: Dict, ) -> None: super().__init__(cfg, enable_field=set(['collect', 'eval'])) self._controller_dict = dict() self._last_steer_dict = dict() self.target_speed = self._cfg.target_speed self._ignore_traffic_light = self._cfg.ignore_light self._ignore_speed_limit = self._cfg.ignore_speed_limit if self._cfg.mpc_args is None: self._mpc_args = DEFAULT_MPC_ARGS else: self._mpc_args = self._cfg.mpc_args self._horizon = self._cfg.horizon self._fps = self._cfg.fps self._debug = self._cfg.debug def _reset(self, data_id: int, noise: bool = False) -> None: if data_id in self._controller_dict: self._controller_dict.pop(data_id) controller = MPCController(self._mpc_args, self._horizon, self._fps) if noise: noise_controller = SteerNoiseWrapper( model=controller, noise_type='uniform', noise_kwargs={ 'low': -0.3, 'high': 0.3 }, ) self._controller_dict[data_id] = noise_controller else: self._controller_dict[data_id] = controller self._last_steer_dict[data_id] = 0 def _forward(self, data_id: int, obs: Dict) -> Dict: controller = self._controller_dict[data_id] if obs['command'] == -1: control = self._emergency_stop(data_id) elif obs['agent_state'] == 2 or obs['agent_state'] == 3 or obs['agent_state'] == 5: control = self._emergency_stop(data_id) elif not self._ignore_traffic_light and obs['agent_state'] == 4: control = self._emergency_stop(data_id) elif not self._ignore_traffic_light and obs['tl_state'] in [0, 1] and obs['tl_dis'] < 10: control = self._emergency_stop(data_id) else: ego_pose = [ obs['location'][0], obs['location'][1], obs['rotation'][1], obs['speed'] / 3.6, ] waypoints = obs['waypoint_list'] if not self._ignore_speed_limit: target_speed = min(self.target_speed, obs['speed_limit']) else: target_speed = self.target_speed control = controller.forward( ego_pose, target_speed / 3.6, waypoints, ) self._last_steer_dict[data_id] = control['steer'] return control def _emergency_stop(self, data_id: int) -> Dict: control = { 'steer': self._last_steer_dict[data_id], 'throttle': 0.0, 'brake': 1.0, } return control
[docs] def _forward_eval(self, data: Dict) -> Dict: """ Running forward to get control signal of `eval` mode. :Arguments: - data (Dict): Input dict, with env id in keys and related observations in values, :Returns: Dict: Control dict stored in values for each provided env id. """ data = to_ndarray(data) actions = dict() for i in data.keys(): obs = data[i] action = self._forward(i, obs) actions[i] = {'action': action} return actions
[docs] def _forward_collect(self, data: Dict) -> Dict: """ Running forward to get control signal of `collect` mode. :Arguments: - data (Dict): Input dict, with env id in keys and related observations in values, :Returns: Dict: Control dict stored in values for each provided env id. """ data = to_ndarray(data) actions = dict() for i in data.keys(): obs = data[i] action = self._forward(i, obs) actions[i] = {'action': action} return actions
[docs] def _reset_eval(self, data_id: Optional[List[int]] = None) -> None: """ Reset policy of `eval` mode. It will reset the controllers in providded env id. :Arguments: - data_id (List[int], optional): List of env id to reset. Defaults to None. """ if data_id is not None: for id in data_id: self._reset(id) else: for id in self._controller_dict: self._reset(id)
[docs] def _reset_collect(self, data_id: Optional[List[int]] = None) -> None: """ Reset policy of `collect` mode. It will reset the controllers in provided env id. Noise will be add to the controller according to config. :Arguments: - data_id (List[int], optional): List of env id to reset. Defaults to None. """ noise = self._cfg.noise if data_id is not None: for id in data_id: self._reset(id, noise) else: for id in self._controller_dict: self._reset(id, noise)