from collections import namedtuple
import os
from ding.torch_utils.data_helper import to_device, to_dtype, to_tensor
import torch
from torchvision import transforms
import numpy as np
from typing import Dict, List, Any, Optional
from .base_carla_policy import BaseCarlaPolicy
from core.models import PIDController, CustomController
from core.models.lbc_model import LBCBirdviewModel, LBCImageModel
from core.utils.model_utils import common
from ding.utils.data import default_collate, default_decollate
from core.utils.learner_utils.loss_utils import LocationLoss
STEPS = 5
SPEED_STEPS = 3
COMMANDS = 4
[docs]class LBCBirdviewPolicy(BaseCarlaPolicy):
"""
LBC driving policy with Bird-eye View inputs. It has an LBC NN model which can handle
observations from several environments by collating data into batch. Each environment
has a PID controller related to it to get final control signals. In each updating, all
envs should use the correct env id to make the PID controller works well, and the
controller should be reset when starting a new episode.
It contains 2 modes: `eval` and `learn`. The learn mode will calculate all losses,
but will not back-propregate it. In `eval` mode, the output control signal will be
postprocessed to standard control signal in Carla.
:Arguments:
- cfg (Dict): Config Dict.
- enable_field(List): Enable policy filed, default to ['eval', 'learn']
:Interfaces:
reset, forward
"""
config = dict(
cuda=True,
model=dict(),
learn=dict(loss='l1', ),
steer_points=None,
pid=None,
gap=5,
dt=0.1,
crop_size=192,
pixels_per_meter=5,
)
def __init__(self, cfg: dict, enable_field: List = ['eval', 'learn']) -> None:
super().__init__(cfg, enable_field=enable_field)
self._controller_dict = dict()
if self._cfg.cuda:
if not torch.cuda.is_available():
print('[POLICY] No cuda device found! Use cpu by default')
self._device = torch.device('cpu')
else:
self._device = torch.device('cuda')
else:
self._device = torch.device('cpu')
self._one_hot = torch.FloatTensor(torch.eye(4))
self._transform = transforms.ToTensor()
self._gap = self._cfg.gap
self._dt = self._cfg.dt
self._crop_size = self._cfg.crop_size
self._pixels_per_meter = self._cfg.pixels_per_meter
self._steer_points = self._cfg.steer_points
self._pid = self._cfg.pid
if self._steer_points is None:
self._steer_points = {"1": 3, "2": 2, "3": 2, "4": 2}
if self._pid is None:
self._pid = {
"1": {
"Kp": 1.0,
"Ki": 0.1,
"Kd": 0
}, # Left
"2": {
"Kp": 1.0,
"Ki": 0.1,
"Kd": 0
}, # Right
"3": {
"Kp": 0.8,
"Ki": 0.1,
"Kd": 0
}, # Straight
"4": {
"Kp": 0.8,
"Ki": 0.1,
"Kd": 0
}, # Follow
}
self._speed_control_func = lambda: PIDController(K_P=1.0, K_I=0.1, K_D=2.5)
self._turn_control_func = lambda: CustomController(self._pid)
self._model = LBCBirdviewModel(**self._cfg.model)
self._model.to(self._device)
for field in self._enable_field:
getattr(self, '_init_' + field)()
def _init_learn(self) -> None:
if self._cfg.learn.loss == 'l1':
self._criterion = LocationLoss(choice='l1')
elif self._cfg.policy.learn.loss == 'l2':
self._criterion = LocationLoss(choice='l2')
def _postprocess(self, steer, throttle, brake):
control = {}
control.update(
{
'steer': np.clip(steer, -1.0, 1.0),
'throttle': np.clip(throttle, 0.0, 1.0),
'brake': np.clip(brake, 0.0, 1.0),
}
)
return control
def _reset_single(self, data_id):
if data_id in self._controller_dict:
self._controller_dict.pop(data_id)
self._controller_dict[data_id] = (self._speed_control_func(), self._turn_control_func())
def _reset(self, data_ids: Optional[List[int]] = None) -> None:
if data_ids is not None:
for id in data_ids:
self._reset_single(id)
else:
for id in self._controller_dict:
self._reset_single(id)
[docs] @torch.no_grad()
def _forward_eval(self, data: Dict) -> Dict[str, Any]:
"""
Running forward to get control signal of `eval` mode.
:Arguments:
- data (Dict): Input dict, with env id in keys and related observations in values,
:Returns:
Dict: Control and waypoints dict stored in values for each provided env id.
"""
data_ids = list(data.keys())
data = default_collate(list(data.values()))
birdview = to_dtype(data['birdview'], dtype=torch.float32)
speed = data['speed']
command_index = [i.item() - 1 for i in data['command']]
command = self._one_hot[command_index]
if command.ndim == 1:
command = command.unsqueeze(0)
_birdview = birdview.to(self._device)
_speed = speed.to(self._device)
_command = command.to(self._device)
if self._model._all_branch:
_locations, _ = self._model(_birdview, _speed, _command)
else:
_locations = self._model(_birdview, _speed, _command)
_locations = _locations.detach().cpu().numpy()
map_locations = _locations
actions = {}
for index, data_id in enumerate(data_ids):
# Pixel coordinates.
map_location = map_locations[index, ...]
map_location = (map_location + 1) / 2 * self._crop_size
targets = list()
for i in range(STEPS):
pixel_dx, pixel_dy = map_location[i]
pixel_dx = pixel_dx - self._crop_size / 2
pixel_dy = self._crop_size - pixel_dy
angle = np.arctan2(pixel_dx, pixel_dy)
dist = np.linalg.norm([pixel_dx, pixel_dy]) / self._pixels_per_meter
targets.append([dist * np.cos(angle), dist * np.sin(angle)])
target_speed = 0.0
for i in range(1, SPEED_STEPS):
pixel_dx, pixel_dy = map_location[i]
prev_dx, prev_dy = map_location[i - 1]
dx = pixel_dx - prev_dx
dy = pixel_dy - prev_dy
delta = np.linalg.norm([dx, dy])
target_speed += delta / (self._pixels_per_meter * self._gap * self._dt) / (SPEED_STEPS - 1)
_cmd = data['command'][index].item()
_sp = data['speed'][index].item()
n = self._steer_points.get(str(_cmd), 1)
targets = np.concatenate([[[0, 0]], targets], 0)
c, r = ls_circle(targets)
closest = common.project_point_to_circle(targets[n], c, r)
v = [1.0, 0.0, 0.0]
w = [closest[0], closest[1], 0.0]
alpha = common.signed_angle(v, w)
steer = self._controller_dict[data_id][1].run_step(alpha, _cmd)
throttle = self._controller_dict[data_id][0].step(target_speed - _sp)
brake = 0.0
if target_speed < 1.0:
steer = 0.0
throttle = 0.0
brake = 1.0
control = self._postprocess(steer, throttle, brake)
control.update({'map_locations': map_location})
actions[data_id] = {'action': control}
return actions
[docs] def _reset_eval(self, data_ids: Optional[List[int]] = None) -> None:
"""
Reset policy of `eval` mode. It will change the NN model into 'eval' mode and reset
the controllers in provided env id.
:Arguments:
- data_id (List[int], optional): List of env id to reset. Defaults to None.
"""
self._model.eval()
self._reset(data_ids)
def _forward_learn(self, data: Dict) -> Dict[str, Any]:
"""
Running forward of `learn` mode to get loss.
:Arguments:
- data (Dict): Input dict, with env id in keys and related observations in values,
:Returns:
Dict: information about training loss.
"""
birdview = to_dtype(data['birdview'], dtype=torch.float32)
speed = to_dtype(data['speed'], dtype=torch.float32)
command_index = [i.item() - 1 for i in data['command']]
command = self._one_hot[command_index]
if command.ndim == 1:
command = command.unsqueeze(0)
_birdview = birdview.to(self._device)
_speed = speed.to(self._device)
_command = command.to(self._device)
if self._model._all_branch:
_locations, _all_branch_locations = self._model(_birdview, _speed, _command)
else:
_locations = self._model(_birdview, _speed, _command)
locations_pred = _locations
if self._model._all_branch:
all_branch_locations_pred = _all_branch_locations
location_gt = data['location'].to(self._device)
loss = self._criterion(locations_pred, location_gt)
if self._model._all_branch:
return {
'loss': loss,
'locations_pred': locations_pred,
'all_branch_locations_pred': all_branch_locations_pred
}
return {
'loss': loss,
'locations_pred': locations_pred,
}
def _reset_learn(self, data_ids: Optional[List[int]] = None) -> None:
"""
Reset policy of `learn` mode. It will change the NN model into 'train' mode.
:Arguments:
- data_id (List[int], optional): List of env id to reset. Defaults to None.
"""
self._model.train()
[docs]class LBCImagePolicy(BaseCarlaPolicy):
"""
LBC driving policy with RGB image inputs. It has an LBC NN model which can handle
observations from several environments by collating data into batch. Each environment
has a PID controller related to it to get final control signals. In each updating, all
envs should use the correct env id to make the PID controller works well, and the
controller should be reset when starting a new episode.
:Arguments:
- cfg (Dict): Config Dict.
:Interfaces:
reset, forward
"""
config = dict(
cuda=True,
model=dict(),
learn=dict(loss='l1', ),
camera_args=dict(
fixed_offset=4.0,
fov=90,
h=160,
w=384,
world_y=1.4,
),
steer_points=None,
pid=None,
gap=5,
dt=0.1,
)
def __init__(self, cfg: dict, enable_field: List = ['eval', 'learn']) -> None:
super().__init__(cfg, enable_field=enable_field)
self._controller_dict = dict()
if self._cfg.cuda:
if not torch.cuda.is_available():
print('[POLICY] No cuda device found! Use cpu by default')
self._device = torch.device('cpu')
else:
self._device = torch.device('cuda')
else:
self._device = torch.device('cpu')
self._one_hot = torch.FloatTensor(torch.eye(4))
self._transform = transforms.ToTensor()
self._camera_args = self._cfg.camera_args
self._fixed_offset = self._camera_args.fixed_offset
w = float(self._camera_args.w)
h = float(self._camera_args.h)
self._img_size = np.array([w, h])
self._gap = self._cfg.gap
self._dt = self._cfg.dt
self._steer_points = self._cfg.steer_points
self._pid = self._cfg.pid
if self._steer_points is None:
self._steer_points = {"1": 4, "2": 3, "3": 2, "4": 2}
if self._pid is None:
self._pid = {
"1": {
"Kp": 0.5,
"Ki": 0.20,
"Kd": 0.0
},
"2": {
"Kp": 0.7,
"Ki": 0.10,
"Kd": 0.0
},
"3": {
"Kp": 1.0,
"Ki": 0.10,
"Kd": 0.0
},
"4": {
"Kp": 1.0,
"Ki": 0.50,
"Kd": 0.0
}
}
self._speed_control_func = lambda: PIDController(K_P=.8, K_I=.08, K_D=0.)
self._turn_control_func = lambda: CustomController(self._pid)
self._engine_brake_threshold = 2.0
self._brake_threshold = 2.0
self._model = LBCImageModel(**self._cfg.model)
self._model.to(self._device)
for field in self._enable_field:
getattr(self, '_init_' + field)()
def _init_learn(self) -> None:
if self._cfg.learn.loss == 'l1':
self._criterion = LocationLoss(choice='l1')
elif self._cfg.policy.learn.loss == 'l2':
self._criterion = LocationLoss(choice='l2')
def _reset_single(self, data_id):
if data_id in self._controller_dict:
self._controller_dict.pop(data_id)
self._controller_dict[data_id] = (self._speed_control_func(), self._turn_control_func())
def _reset(self, data_ids: Optional[List[int]] = None) -> None:
if data_ids is not None:
for id in data_ids:
self._reset_single(id)
else:
for id in self._controller_dict:
self._reset_single(id)
def _postprocess(self, steer, throttle, brake):
control = {}
control.update(
{
'steer': np.clip(steer, -1.0, 1.0),
'throttle': np.clip(throttle, 0.0, 1.0),
'brake': np.clip(brake, 0.0, 1.0),
}
)
return control
def _unproject(self, output, world_y=1.4, fov=90):
cx, cy = self._img_size / 2
w, h = self._img_size
f = w / (2 * np.tan(fov * np.pi / 360))
xt = (output[..., 0:1] - cx) / f
yt = (output[..., 1:2] - cy) / f
world_z = world_y / yt
world_x = world_z * xt
world_output = np.stack([world_x, world_z], axis=-1)
if self._fixed_offset:
world_output[..., 1] -= self._fixed_offset
world_output = world_output.squeeze()
return world_output
[docs] def _forward_eval(self, data: Dict) -> Dict:
"""
Running forward to get control signal of `eval` mode.
:Arguments:
- data (Dict): Input dict, with env id in keys and related observations in values,
:Returns:
Dict: Control and waypoints dict stored in values for each provided env id.
"""
data_ids = list(data.keys())
data = default_collate(list(data.values()))
rgb = to_dtype(data['rgb'], dtype=torch.float32)
speed = data['speed']
command_index = [i.item() - 1 for i in data['command']]
command = self._one_hot[command_index]
if command.ndim == 1:
command = command.unsqueeze(0)
with torch.no_grad():
_rgb = rgb.to(self._device)
_speed = speed.to(self._device)
_command = command.to(self._device)
if self._model._all_branch:
model_pred, _ = self._model(_rgb, _speed, _command)
else:
model_pred = self._model(_rgb, _speed, _command)
model_pred = model_pred.detach().cpu().numpy()
pixels_pred = model_pred
actions = {}
for index, data_id in enumerate(data_ids):
# Project back to world coordinate
pixel_pred = pixels_pred[index, ...]
pixel_pred = (pixel_pred + 1) * self._img_size / 2
world_pred = self._unproject(pixel_pred, self._camera_args.world_y, self._camera_args.fov)
targets = [(0, 0)]
for i in range(STEPS):
pixel_dx, pixel_dy = world_pred[i]
angle = np.arctan2(pixel_dx, pixel_dy)
dist = np.linalg.norm([pixel_dx, pixel_dy])
targets.append([dist * np.cos(angle), dist * np.sin(angle)])
targets = np.array(targets)
target_speed = np.linalg.norm(targets[:-1] - targets[1:], axis=1).mean() / (self._gap * self._dt)
_cmd = data['command'][index].item()
_sp = data['speed'][index].item()
c, r = ls_circle(targets)
n = self._steer_points.get(str(_cmd), 1)
closest = common.project_point_to_circle(targets[n], c, r)
v = [1.0, 0.0, 0.0]
w = [closest[0], closest[1], 0.0]
alpha = common.signed_angle(v, w)
steer = self._controller_dict[data_id][1].run_step(alpha, _cmd)
throttle = self._controller_dict[data_id][0].step(target_speed - _sp)
brake = 0.0
# Slow or stop.
if target_speed <= self._engine_brake_threshold:
steer = 0.0
throttle = 0.0
if target_speed <= self._brake_threshold:
brake = 1.0
control = self._postprocess(steer, throttle, brake)
control.update({'map_locations': pixels_pred})
actions[data_id] = {'action': control}
return actions
[docs] def _reset_eval(self, data_ids: Optional[List[int]]) -> None:
"""
Reset policy of `eval` mode. It will change the NN model into 'eval' mode and reset
the controllers in provided env id.
:Arguments:
- data_id (List[int], optional): List of env id to reset. Defaults to None.
"""
self._model.eval()
self._reset(data_ids)
def _forward_learn(self, data: Dict) -> Dict[str, Any]:
"""
Running forward of `learn` mode to get loss.
:Arguments:
- data (Dict): Input dict, with env id in keys and related observations in values,
:Returns:
Dict: information about training loss.
"""
rgb = to_dtype(data['rgb'], dtype=torch.float32)
speed = to_dtype(data['speed'], dtype=torch.float32)
command_index = [i.item() - 1 for i in data['command']]
command = self._one_hot[command_index]
if command.ndim == 1:
command = command.unsqueeze(0)
_rgb = rgb.to(self._device)
_speed = speed.to(self._device)
_command = command.to(self._device)
if self._model._all_branch:
_locations, _all_branch_locations = self._model(_rgb, _speed, _command)
else:
_locations = self._model(_rgb, _speed, _command)
locations_pred = _locations
if self._model._all_branch:
all_branch_locations_pred = _all_branch_locations
location_gt = data['location'].to(self._device)
loss = self._criterion(locations_pred, location_gt)
if self._model._all_branch:
return {
'loss': loss,
'locations_pred': locations_pred,
'all_branch_locations_pred': all_branch_locations_pred
}
return {
'loss': loss,
'locations_pred': locations_pred,
}
def _reset_learn(self, data_ids: Optional[List[int]] = None) -> None:
"""
Reset policy of `learn` mode. It will change the NN model into 'train' mode.
:Arguments:
- data_id (List[int], optional): List of env id to reset. Defaults to None.
"""
self._model.train()
def ls_circle(points):
'''
Input: Nx2 points
Output: cx, cy, r
'''
xs = points[:, 0]
ys = points[:, 1]
us = xs - np.mean(xs)
vs = ys - np.mean(ys)
Suu = np.sum(us ** 2)
Suv = np.sum(us * vs)
Svv = np.sum(vs ** 2)
Suuu = np.sum(us ** 3)
Suvv = np.sum(us * vs * vs)
Svvv = np.sum(vs ** 3)
Svuu = np.sum(vs * us * us)
A = np.array([[Suu, Suv], [Suv, Svv]])
b = np.array([1 / 2. * Suuu + 1 / 2. * Suvv, 1 / 2. * Svvv + 1 / 2. * Svuu])
cx, cy = np.linalg.solve(A, b)
r = np.sqrt(cx * cx + cy * cy + (Suu + Svv) / len(xs))
cx += np.mean(xs)
cy += np.mean(ys)
return np.array([cx, cy]), r