Shortcuts

Source code for ding.worker.coordinator.base_parallel_commander

from abc import ABC, abstractmethod
from collections import defaultdict
from easydict import EasyDict
import copy

from ding.utils import import_module, COMMANDER_REGISTRY, LimitedSpaceContainer


[docs]class BaseCommander(ABC): r""" Overview: Base parallel commander abstract class. Interface: get_collector_task """ @classmethod def default_config(cls: type) -> EasyDict: cfg = EasyDict(copy.deepcopy(cls.config)) cfg.cfg_type = cls.__name__ + 'Dict' return cfg @abstractmethod def get_collector_task(self) -> dict: raise NotImplementedError def judge_collector_finish(self, task_id: str, info: dict) -> bool: collector_done = info.get('collector_done', False) if collector_done: return True return False def judge_learner_finish(self, task_id: str, info: dict) -> bool: learner_done = info.get('learner_done', False) if learner_done: return True return False
[docs]@COMMANDER_REGISTRY.register('naive') class NaiveCommander(BaseCommander): r""" Overview: A naive implementation of parallel commander. Interface: __init__, get_collector_task, get_learner_task, finsh_collector_task, finish_learner_task, notify_fail_collector_task, notify_fail_learner_task, update_learner_info """ config = dict( collector_task_space=1, learner_task_space=1, eval_interval=60, )
[docs] def __init__(self, cfg: dict) -> None: r""" Overview: Init the naive commander according to config Arguments: - cfg (:obj:`dict`): The config to init commander. Should include \ "collector_task_space" and "learner_task_space". """ self._cfg = cfg self._exp_name = cfg.exp_name commander_cfg = self._cfg.policy.other.commander self._collector_task_space = LimitedSpaceContainer(0, commander_cfg.collector_task_space) self._learner_task_space = LimitedSpaceContainer(0, commander_cfg.learner_task_space) self._collector_env_cfg = copy.deepcopy(self._cfg.env) self._collector_env_cfg.pop('collector_episode_num') self._collector_env_cfg.pop('evaluator_episode_num') self._collector_env_cfg.manager.episode_num = self._cfg.env.collector_episode_num self._collector_task_count = 0 self._learner_task_count = 0 self._learner_info = defaultdict(list) self._learner_task_finish_count = 0 self._collector_task_finish_count = 0
[docs] def get_collector_task(self) -> dict: r""" Overview: Get a new collector task when ``collector_task_count`` is smaller than ``collector_task_space``. Return: - task (:obj:`dict`): New collector task. """ if self._collector_task_space.acquire_space(): self._collector_task_count += 1 collector_cfg = copy.deepcopy(self._cfg.policy.collect.collector) collector_cfg.collect_setting = {'eps': 0.9} collector_cfg.eval_flag = False collector_cfg.policy = copy.deepcopy(self._cfg.policy) collector_cfg.policy_update_path = 'test.pth' collector_cfg.env = self._collector_env_cfg collector_cfg.exp_name = self._exp_name return { 'task_id': 'collector_task_id{}'.format(self._collector_task_count), 'buffer_id': 'test', 'collector_cfg': collector_cfg, } else: return None
[docs] def get_learner_task(self) -> dict: r""" Overview: Get the new learner task when task_count is less than task_space Return: - task (:obj:`dict`): the new learner task """ if self._learner_task_space.acquire_space(): self._learner_task_count += 1 learner_cfg = copy.deepcopy(self._cfg.policy.learn.learner) learner_cfg.exp_name = self._exp_name return { 'task_id': 'learner_task_id{}'.format(self._learner_task_count), 'policy_id': 'test.pth', 'buffer_id': 'test', 'learner_cfg': learner_cfg, 'replay_buffer_cfg': copy.deepcopy(self._cfg.policy.other.replay_buffer), 'policy': copy.deepcopy(self._cfg.policy), } else: return None
[docs] def finish_collector_task(self, task_id: str, finished_task: dict) -> None: r""" Overview: finish collector task will add the collector_task_finish_count """ self._collector_task_space.release_space() self._collector_task_finish_count += 1
[docs] def finish_learner_task(self, task_id: str, finished_task: dict) -> str: r""" Overview: finish learner task will add the learner_task_finish_count and get the buffer_id of task to close the buffer Return: the finished_task buffer_id """ self._learner_task_finish_count += 1 self._learner_task_space.release_space() return finished_task['buffer_id']
[docs] def notify_fail_collector_task(self, task: dict) -> None: r""" Overview: naive coordinator will pass when need to notify_fail_collector_task """ self._collector_task_space.release_space()
[docs] def notify_fail_learner_task(self, task: dict) -> None: r""" Overview: naive coordinator will pass when need to notify_fail_learner_task """ self._learner_task_space.release_space()
def update_learner_info(self, task_id: str, info: dict) -> None: r""" Overview: append the info to learner: Arguments: - task_id (:obj:`str`): the learner task_id - info (:obj:`dict`): the info to append to learner """ self._learner_info[task_id].append(info) def increase_collector_task_space(self): r"""" Overview: Increase task space when a new collector has added dynamically. """ self._collector_task_space.increase_space() def decrease_collector_task_space(self): r"""" Overview: Decrease task space when a new collector has removed dynamically. """ self._collector_task_space.decrease_space()
def create_parallel_commander(cfg: EasyDict) -> BaseCommander: r""" Overview: create the commander according to cfg Arguments: - cfg (:obj:`dict`): the commander cfg to create, should include import_names and parallel_commander_type """ cfg = EasyDict(cfg) import_names = cfg.policy.other.commander.import_names import_module(import_names) return COMMANDER_REGISTRY.build(cfg.policy.other.commander.type, cfg=cfg) def get_parallel_commander_cls(cfg: EasyDict) -> type: cfg = EasyDict(cfg) import_module(cfg.get('import_names', [])) return COMMANDER_REGISTRY.get(cfg.type)