Source code for core.models.model_wrappers

import random
import numpy as np
from typing import Any, Callable, Dict, Optional


[docs]class SteerNoiseWrapper(object): """ Model wrapper to add noise in steer frequently. :Arguments: - model (Any): Model to wrap. - noise_type (str): Type to generate noise. Must in ['gauss', 'uniform']. - noise_kwargs (dict, optional): Arguments to set up noise generator. Defaults to {}. - noise_len (int, optional): Length of frames to apply noise. Defaults to 5. - drive_len (int, optional): Length of noise. Defaults to 100. - noise_range (dict, optional): Range of noise value, containing 'min' and 'max'. Defaults to None. :Instance: forward """ def __init__( self, model: Any, noise_type: str = 'uniform', noise_args: dict = {}, noise_len: int = 5, drive_len: int = 100, noise_range: Optional[dict] = None, ) -> None: self._model = model self._noise_func = get_noise_generator(noise_type, noise_args) self._noise_seq = {'drive': [noise_len, 'noise'], 'noise': [drive_len, 'drive']} self._noise_range = noise_range self._noise_state = 'drive' self._num_steps = drive_len self._state_step = 0 self._last_throttle = 0 self._last_brake = 0
[docs] def forward(self, *args, **kwargs) -> Dict: """ Running forward and add noise. :Returns: Dict: Noised control signal with real control. """ real_control = self._model.forward(*args, **kwargs) assert isinstance(real_control, dict), "model output must be dict, but find {}".format(type(real_control)) control = {'brake': 0} for k, v in real_control.items(): control['real_' + k] = v self._state_step += 1 last_state = self._noise_state if self._noise_state == 'noise': control['steer'] = self._noise_steer control['throttle'] = self._last_throttle control['brake'] = self._last_brake else: control['steer'] = real_control['steer'] control['throttle'] = real_control['throttle'] control['brake'] = real_control['brake'] control['noise_state'] = self._noise_state if self._state_step >= self._num_steps: self._state_step = 0 self._num_steps, self._noise_state = self._noise_seq[self._noise_state] self._num_steps += random.randint(-self._num_steps // 10, self._num_steps // 10) assert last_state != self._noise_state self._noise_steer = self._noise_func() if self._noise_range is not None: self._noise_steer = np.clip(self._noise_steer, self._noise_range['min'], self._noise_range['max']) self._last_throttle = real_control['throttle'] self._last_brake = real_control['brake'] return control
def get_noise_generator(noise_type: str, noise_args: Dict) -> Callable: noise_dict = { 'gauss': np.random.normal, 'uniform': np.random.uniform, } if noise_type not in noise_dict: raise KeyError("not support noise type: {}".format(noise_type)) else: noise_func = noise_dict[noise_type] return lambda: noise_func(**noise_args)