Source code for core.envs.md_macro_env

import os
import copy
import time
import gym
import numpy as np
from gym import spaces
from collections import defaultdict
from typing import Union, Dict, AnyStr, Tuple, Optional
import logging

from ding.utils import ENV_REGISTRY
from core.utils.simulator_utils.md_utils.discrete_policy import DiscreteMetaAction
from core.utils.simulator_utils.md_utils.agent_manager_utils import MacroAgentManager
from core.utils.simulator_utils.md_utils.engine_utils import MacroEngine
from core.utils.simulator_utils.md_utils.traffic_manager_utils import TrafficMode

from metadrive.envs.base_env import BaseEnv
from metadrive.component.map.base_map import BaseMap
from metadrive.component.map.pg_map import parse_map_config, MapGenerateMethod
# from metadrive.manager.traffic_manager import TrafficMode
from metadrive.component.pgblock.first_block import FirstPGBlock
from metadrive.constants import DEFAULT_AGENT, TerminationState
from metadrive.engine.base_engine import BaseEngine
from metadrive.utils import Config, merge_dicts, get_np_random, clip

from metadrive.envs.base_env import BASE_DEFAULT_CONFIG
from metadrive.obs.top_down_obs_multi_channel import TopDownMultiChannel

DIDRIVE_DEFAULT_CONFIG = dict(
    # ===== Generalization =====
    start_seed=0,
    use_render=False,
    environment_num=1,

    # ===== Map Config =====
    map='SSSSSSSSSS',  # int or string: an easy way to fill map_config
    random_lane_width=True,
    random_lane_num=True,
    map_config={
        BaseMap.GENERATE_TYPE: MapGenerateMethod.BIG_BLOCK_SEQUENCE,
        BaseMap.GENERATE_CONFIG: 'SSSSSSSSSS',  # None,  # it can be a file path / block num / block ID sequence
        BaseMap.LANE_WIDTH: 3.5,
        BaseMap.LANE_NUM: 3,
        "exit_length": 70,
    },

    # ===== Traffic =====
    traffic_density=0.3,
    on_screen=False,
    rgb_clip=True,
    need_inverse_traffic=False,
    traffic_mode=TrafficMode.Synch,  # "Respawn", "Trigger"
    random_traffic=True,  # Traffic is randomized at default.
    # this will update the vehicle_config and set to traffic
    traffic_vehicle_config=dict(
        show_navi_mark=False,
        show_dest_mark=False,
        enable_reverse=False,
        show_lidar=False,
        show_lane_line_detector=False,
        show_side_detector=False,
    ),

    # ===== Object =====
    accident_prob=0.,  # accident may happen on each block with this probability, except multi-exits block

    # ===== Others =====
    use_AI_protector=False,
    save_level=0.5,
    is_multi_agent=False,
    vehicle_config=dict(spawn_lane_index=(FirstPGBlock.NODE_1, FirstPGBlock.NODE_2, 0)),

    # ===== Agent =====
    target_vehicle_configs={
        DEFAULT_AGENT: dict(use_special_color=True, spawn_lane_index=(FirstPGBlock.NODE_1, FirstPGBlock.NODE_2, 2))
    },

    # ===== Reward Scheme =====
    # See: https://github.com/decisionforce/metadrive/issues/283
    success_reward=10.0,
    out_of_road_penalty=5.0,
    crash_vehicle_penalty=1.0,
    crash_object_penalty=5.0,
    driving_reward=1.0,
    speed_reward=0.1,
    use_lateral=False,

    # ===== Cost Scheme =====
    crash_vehicle_cost=1.0,
    crash_object_cost=1.0,
    out_of_road_cost=1.0,

    # ===== Termination Scheme =====
    out_of_route_done=True,
    physics_world_step_size=3e-2,
)


[docs]@ENV_REGISTRY.register("md_macro") class MetaDriveMacroEnv(BaseEnv): """ MetaDrive single-agent env controlled by a "macro" action. The agent is controlled by a discrete action set. Each one related to a series of control signals that can complish the macro action defined in the set. The observation is a top-down view image with 5 channel containing the temporary and history information of surroundings. This env is registered and can be used via `gym.make`. :Arguments: - config (Dict): Env config dict. :Interfaces: reset, step, close, render, seed """ @classmethod def default_config(cls) -> "Config": #config = super(SimpleMetaDriveEnv, cls).default_config() config = Config(BASE_DEFAULT_CONFIG) config.update(DIDRIVE_DEFAULT_CONFIG) config.register_type("map", str, int) config["map_config"].register_type("config", None) return config def __init__(self, config: dict = None) -> None: merged_config = self._merge_extra_config(config) global_config = self._post_process_config(merged_config) self.config = global_config # agent check self.num_agents = self.config["num_agents"] self.is_multi_agent = self.config["is_multi_agent"] if not self.is_multi_agent: assert self.num_agents == 1 assert isinstance(self.num_agents, int) and (self.num_agents > 0 or self.num_agents == -1) # observation and action space self.agent_manager = MacroAgentManager( init_observations=self._get_observations(), init_action_space=self._get_action_space() ) self.action_type = DiscreteMetaAction() #self.action_space = self.action_type.space() # lazy initialization, create the main vehicle in the lazy_init() func self.engine: Optional[BaseEngine] = None self._top_down_renderer = None self.episode_steps = 0 # self.current_seed = None # In MARL envs with respawn mechanism, varying episode lengths might happen. self.dones = None self.episode_rewards = defaultdict(float) self.episode_lengths = defaultdict(int) self.start_seed = self.config["start_seed"] self.env_num = self.config["environment_num"] self.time = 0 def step(self, actions: Union[np.ndarray, Dict[AnyStr, np.ndarray]]): self.episode_steps += 1 macro_actions = self._preprocess_macro_actions(actions) step_infos = self._step_macro_simulator(macro_actions) o, r, d, i = self._get_step_return(actions, step_infos) #o = o.transpose((2,0,1)) return o, r, d, i def _merge_extra_config(self, config: Union[dict, "Config"]) -> "Config": config = self.default_config().update(config, allow_add_new_key=True) if config["vehicle_config"]["lidar"]["distance"] > 50: config["max_distance"] = config["vehicle_config"]["lidar"]["distance"] return config def _post_process_config(self, config): config = super(MetaDriveMacroEnv, self)._post_process_config(config) if not config["rgb_clip"]: logging.warning( "You have set rgb_clip = False, which means the observation will be uint8 values in [0, 255]. " "Please make sure you have parsed them later before feeding them to network!" ) config["map_config"] = parse_map_config( easy_map_config=config["map"], new_map_config=config["map_config"], default_config=self.default_config() ) config["vehicle_config"]["rgb_clip"] = config["rgb_clip"] config["vehicle_config"]["random_agent_model"] = config["random_agent_model"] if config.get("gaussian_noise", 0) > 0: assert config["vehicle_config"]["lidar"]["gaussian_noise"] == 0, "You already provide config!" assert config["vehicle_config"]["side_detector"]["gaussian_noise"] == 0, "You already provide config!" assert config["vehicle_config"]["lane_line_detector"]["gaussian_noise"] == 0, "You already provide config!" config["vehicle_config"]["lidar"]["gaussian_noise"] = config["gaussian_noise"] config["vehicle_config"]["side_detector"]["gaussian_noise"] = config["gaussian_noise"] config["vehicle_config"]["lane_line_detector"]["gaussian_noise"] = config["gaussian_noise"] if config.get("dropout_prob", 0) > 0: assert config["vehicle_config"]["lidar"]["dropout_prob"] == 0, "You already provide config!" assert config["vehicle_config"]["side_detector"]["dropout_prob"] == 0, "You already provide config!" assert config["vehicle_config"]["lane_line_detector"]["dropout_prob"] == 0, "You already provide config!" config["vehicle_config"]["lidar"]["dropout_prob"] = config["dropout_prob"] config["vehicle_config"]["side_detector"]["dropout_prob"] = config["dropout_prob"] config["vehicle_config"]["lane_line_detector"]["dropout_prob"] = config["dropout_prob"] target_v_config = copy.deepcopy(config["vehicle_config"]) if not config["is_multi_agent"]: target_v_config.update(config["target_vehicle_configs"][DEFAULT_AGENT]) config["target_vehicle_configs"][DEFAULT_AGENT] = target_v_config return config def _get_observations(self): return {DEFAULT_AGENT: self.get_single_observation(self.config["vehicle_config"])} def done_function(self, vehicle_id: str): vehicle = self.vehicles[vehicle_id] done = False done_info = dict( crash_vehicle=False, crash_object=False, crash_building=False, out_of_road=False, arrive_dest=False ) if vehicle.arrive_destination: done = True logging.info("Episode ended! Reason: arrive_dest.") done_info[TerminationState.SUCCESS] = True elif hasattr(vehicle, 'macro_succ') and vehicle.macro_succ: done = True logging.info("Episode ended! Reason: arrive_dest.") done_info[TerminationState.SUCCESS] = True elif hasattr(vehicle, 'macro_crash') and vehicle.macro_crash: done = True logging.info("Episode ended! Reason: crash vehicle ") done_info[TerminationState.CRASH_VEHICLE] = True if self._is_out_of_road(vehicle): done = True logging.info("Episode ended! Reason: out_of_road.") done_info[TerminationState.OUT_OF_ROAD] = True if vehicle.crash_vehicle: done = True logging.info("Episode ended! Reason: crash vehicle ") done_info[TerminationState.CRASH_VEHICLE] = True if vehicle.crash_object: done = True done_info[TerminationState.CRASH_OBJECT] = True logging.info("Episode ended! Reason: crash object ") if vehicle.crash_building: done = True done_info[TerminationState.CRASH_BUILDING] = True logging.info("Episode ended! Reason: crash building ") # for compatibility # crash almost equals to crashing with vehicles done_info[TerminationState.CRASH] = ( done_info[TerminationState.CRASH_VEHICLE] or done_info[TerminationState.CRASH_OBJECT] or done_info[TerminationState.CRASH_BUILDING] ) return done, done_info def cost_function(self, vehicle_id: str): vehicle = self.vehicles[vehicle_id] step_info = dict() step_info["cost"] = 0 if self._is_out_of_road(vehicle): step_info["cost"] = self.config["out_of_road_cost"] elif vehicle.crash_vehicle: step_info["cost"] = self.config["crash_vehicle_cost"] elif vehicle.crash_object: step_info["cost"] = self.config["crash_object_cost"] return step_info['cost'], step_info def _is_out_of_road(self, vehicle): # A specified function to determine whether this vehicle should be done. # return vehicle.on_yellow_continuous_line or (not vehicle.on_lane) or vehicle.crash_sidewalk ret = vehicle.on_yellow_continuous_line or vehicle.on_white_continuous_line or \ (not vehicle.on_lane) or vehicle.crash_sidewalk if self.config["out_of_route_done"]: ret = ret or vehicle.out_of_route return ret def reward_function(self, vehicle_id: str): """ Override this func to get a new reward function :param vehicle_id: id of BaseVehicle :return: reward """ vehicle = self.vehicles[vehicle_id] step_info = dict() # Reward for moving forward in current lane if vehicle.lane in vehicle.navigation.current_ref_lanes: current_lane = vehicle.lane positive_road = 1 else: current_lane = vehicle.navigation.current_ref_lanes[0] current_road = vehicle.navigation.current_road positive_road = 1 if not current_road.is_negative_road() else -1 long_last, _ = current_lane.local_coordinates(vehicle.last_macro_position) long_now, lateral_now = current_lane.local_coordinates(vehicle.position) # reward for lane keeping, without it vehicle can learn to overtake but fail to keep in lane if self.config["use_lateral"]: lateral_factor = clip(1 - 2 * abs(lateral_now) / vehicle.navigation.get_current_lane_width(), 0.0, 1.0) else: lateral_factor = 1.0 reward = 0.0 lateral_factor *= 0.02 reward += self.config["driving_reward"] * (long_now - long_last) * lateral_factor * positive_road reward += self.config["speed_reward"] * (vehicle.speed / vehicle.max_speed) * positive_road #print('vel speed: {}'.format(vehicle.speed)) speed_rwd = -0.3 if vehicle.speed < 60 else 0 reward += speed_rwd step_info["step_reward"] = reward if vehicle.arrive_destination: reward = +self.config["success_reward"] elif vehicle.macro_succ: reward = +self.config["success_reward"] elif self._is_out_of_road(vehicle): reward = -self.config["out_of_road_penalty"] elif vehicle.crash_vehicle: reward = -self.config["crash_vehicle_penalty"] elif vehicle.macro_crash: reward = -self.config["crash_vehicle_penalty"] elif vehicle.crash_object: reward = -self.config["crash_object_penalty"] return reward, step_info def switch_to_third_person_view(self) -> None: if self.main_camera is None: return self.main_camera.reset() if self.config["prefer_track_agent"] is not None and self.config["prefer_track_agent"] in self.vehicles.keys(): new_v = self.vehicles[self.config["prefer_track_agent"]] current_track_vehicle = new_v else: if self.main_camera.is_bird_view_camera(): current_track_vehicle = self.current_track_vehicle else: vehicles = list(self.engine.agents.values()) if len(vehicles) <= 1: return if self.current_track_vehicle in vehicles: vehicles.remove(self.current_track_vehicle) new_v = get_np_random().choice(vehicles) current_track_vehicle = new_v self.main_camera.track(current_track_vehicle) return def switch_to_top_down_view(self): self.main_camera.stop_track() def setup_engine(self): super(MetaDriveMacroEnv, self).setup_engine() self.engine.accept("b", self.switch_to_top_down_view) self.engine.accept("q", self.switch_to_third_person_view) from core.utils.simulator_utils.md_utils.traffic_manager_utils import MacroTrafficManager from core.utils.simulator_utils.md_utils.map_manager_utils import MacroMapManager self.engine.register_manager("map_manager", MacroMapManager()) self.engine.register_manager("traffic_manager", MacroTrafficManager()) def _reset_global_seed(self, force_seed=None): current_seed = force_seed if force_seed is not None else \ get_np_random(self._DEBUG_RANDOM_SEED).randint(self.start_seed, self.start_seed + self.env_num) self.seed(current_seed) def _preprocess_macro_actions(self, actions: Union[np.ndarray, Dict[AnyStr, np.ndarray]]) \ -> Union[np.ndarray, Dict[AnyStr, np.ndarray]]: if not self.is_multi_agent: # print('action.dtype: {}'.format(type(actions))) #print('action: {}'.format(actions)) actions = int(actions) actions = {v_id: actions for v_id in self.vehicles.keys()} else: if self.config["vehicle_config"]["action_check"]: # Check whether some actions are not provided. given_keys = set(actions.keys()) have_keys = set(self.vehicles.keys()) assert given_keys == have_keys, "The input actions: {} have incompatible keys with existing {}!".format( given_keys, have_keys ) else: # That would be OK if extra actions is given. This is because, when evaluate a policy with naive # implementation, the "termination observation" will still be given in T=t-1. And at T=t, when you # collect action from policy(last_obs) without masking, then the action for "termination observation" # will still be computed. We just filter it out here. actions = {v_id: actions[v_id] for v_id in self.vehicles.keys()} return actions def _step_macro_simulator(self, actions): simulation_frequency = 30 # 60 80 policy_frequency = 1 frames = int(simulation_frequency / policy_frequency) self.time = 0 #print('di action pairs: {}'.format(actions)) actions = {vid: self.action_type.actions[vvalue] for vid, vvalue in actions.items()} for frame in range(frames): if self.time % int(simulation_frequency / policy_frequency) == 0: scene_manager_before_step_infos = self.engine.before_step_macro(actions) self.engine.step() #scene_manager_after_step_infos = self.engine.after_step() else: _ = self.engine.before_step_macro() self.engine.step() scene_manager_after_step_infos = self.engine.after_step() self.time += 1 #scene_manager_after_step_infos = self.engine.after_step() return merge_dicts( scene_manager_after_step_infos, scene_manager_before_step_infos, allow_new_keys=True, without_copy=True ) @property def action_space(self) -> gym.Space: """ Return observation spaces of active and controllable vehicles :return: Dict """ return self.action_type.space() def _get_reset_return(self): ret = {} self.engine.after_step() o = None for v_id, v in self.vehicles.items(): self.observations[v_id].reset(self, v) ret[v_id] = self.observations[v_id].observe(v) o = self.observations[v_id].observe(v) if hasattr(v, 'macro_succ'): v.macro_succ = False if hasattr(v, 'macro_crash'): v.macro_crash = False # zt_obs = zt_obs.transpose((2,0,1)) # print('process: {} --- > initializing: a new episode begins'.format(os.getpid())) self.remove_init_stop = False if self.remove_init_stop: return o for i in range(8): o, r, d, info = self.step(self.action_type.actions_indexes["Holdon"]) for v_id, v in self.vehicles.items(): if hasattr(v, 'macro_succ'): p = self.engine.get_policy(v.name) target_speed = p.NORMAL_SPEED * 0.1 v.set_velocity(v.heading, target_speed) for i in range(1): o, r, d, info = self.step(self.action_type.actions_indexes["IDLE"]) o = o return o def lazy_init(self): """ Only init once in runtime, variable here exists till the close_env is called :return: None """ # It is the true init() func to create the main vehicle and its module, to avoid incompatible with ray if MacroEngine.singleton is not None: return MacroEngine.singleton = MacroEngine(self.config) self.engine = MacroEngine.singleton # engine setup self.setup_engine() # other optional initialization self._after_lazy_init() def get_single_observation(self, _=None): o = TopDownMultiChannel( self.config["vehicle_config"], self.config["on_screen"], self.config["rgb_clip"], frame_stack=3, post_stack=10, frame_skip=1, resolution=(200, 200), max_distance=50 ) #o = TopDownMultiChannel(vehicle_config, self, False) return o