Source code for ding.worker.coordinator.solo_parallel_commander
from typing import Optional
import time
import copy
from ding.policy import create_policy
from ding.utils import LimitedSpaceContainer, get_task_uid, build_logger, COMMANDER_REGISTRY
from .base_parallel_commander import BaseCommander
[docs]@COMMANDER_REGISTRY.register('solo')
class SoloCommander(BaseCommander):
r"""
Overview:
Parallel commander for solo games.
Interface:
__init__, get_collector_task, get_learner_task, finish_collector_task, finish_learner_task,
notify_fail_collector_task, notify_fail_learner_task, update_learner_info
"""
config = dict(
collector_task_space=1,
learner_task_space=1,
eval_interval=60,
)
[docs] def __init__(self, cfg: dict) -> None:
r"""
Overview:
Init the solo commander according to config.
Arguments:
- cfg (:obj:`dict`): Dict type config file.
"""
self._cfg = cfg
self._exp_name = cfg.exp_name
commander_cfg = self._cfg.policy.other.commander
self._commander_cfg = commander_cfg
self._collector_env_cfg = copy.deepcopy(self._cfg.env)
self._collector_env_cfg.pop('collector_episode_num')
self._collector_env_cfg.pop('evaluator_episode_num')
self._collector_env_cfg.manager.episode_num = self._cfg.env.collector_episode_num
self._evaluator_env_cfg = copy.deepcopy(self._cfg.env)
self._evaluator_env_cfg.pop('collector_episode_num')
self._evaluator_env_cfg.pop('evaluator_episode_num')
self._evaluator_env_cfg.manager.episode_num = self._cfg.env.evaluator_episode_num
self._collector_task_space = LimitedSpaceContainer(0, commander_cfg.collector_task_space)
self._learner_task_space = LimitedSpaceContainer(0, commander_cfg.learner_task_space)
self._learner_info = [{'learner_step': 0}]
# TODO(nyz) accumulate collect info
self._collector_info = []
self._total_collector_env_step = 0
self._evaluator_info = []
self._current_buffer_id = None
self._current_policy_id = None
self._last_eval_time = 0
# policy_cfg must be deepcopyed
policy_cfg = copy.deepcopy(self._cfg.policy)
self._policy = create_policy(policy_cfg, enable_field=['command']).command_mode
self._logger, self._tb_logger = build_logger(
"./{}/log/commander".format(self._exp_name), "commander", need_tb=True
)
self._collector_logger, _ = build_logger(
"./{}/log/commander".format(self._exp_name), "commander_collector", need_tb=False
)
self._evaluator_logger, _ = build_logger(
"./{}/log/commander".format(self._exp_name), "commander_evaluator", need_tb=False
)
self._sub_logger = {
'collector': self._collector_logger,
'evaluator': self._evaluator_logger,
}
self._end_flag = False
[docs] def get_collector_task(self) -> Optional[dict]:
r"""
Overview:
Return the new collector task when there is residual task space; Otherwise return None.
Return:
- task (:obj:`Optional[dict]`): New collector task.
"""
if self._end_flag:
return None
if self._collector_task_space.acquire_space():
if self._current_buffer_id is None or self._current_policy_id is None:
self._collector_task_space.release_space()
return None
cur_time = time.time()
if cur_time - self._last_eval_time > self._commander_cfg.eval_interval:
eval_flag = True
self._last_eval_time = time.time()
else:
eval_flag = False
collector_cfg = copy.deepcopy(self._cfg.policy.collect.collector)
# the newest info
info = self._learner_info[-1]
info['envstep'] = self._total_collector_env_step
collector_cfg.collect_setting = self._policy.get_setting_collect(info)
collector_cfg.policy_update_path = self._current_policy_id
collector_cfg.eval_flag = eval_flag
collector_cfg.policy = copy.deepcopy(self._cfg.policy)
collector_cfg.exp_name = self._exp_name
if eval_flag:
collector_cfg.env = self._evaluator_env_cfg
else:
collector_cfg.env = self._collector_env_cfg
return {
'task_id': 'collector_task_{}'.format(get_task_uid()),
'buffer_id': self._current_buffer_id,
'collector_cfg': collector_cfg,
}
else:
return None
[docs] def get_learner_task(self) -> Optional[dict]:
r"""
Overview:
Return the new learner task when there is residual task space; Otherwise return None.
Return:
- task (:obj:`Optional[dict]`): New learner task.
"""
if self._end_flag:
return None
if self._learner_task_space.acquire_space():
learner_cfg = copy.deepcopy(self._cfg.policy.learn.learner)
learner_cfg.exp_name = self._exp_name
return {
'task_id': 'learner_task_{}'.format(get_task_uid()),
'policy_id': self._init_policy_id(),
'buffer_id': self._init_buffer_id(),
'learner_cfg': learner_cfg,
'replay_buffer_cfg': copy.deepcopy(self._cfg.policy.other.replay_buffer),
'policy': copy.deepcopy(self._cfg.policy),
}
else:
return None
[docs] def finish_collector_task(self, task_id: str, finished_task: dict) -> bool:
r"""
Overview:
Get collector's finish_task_info and release collector_task_space.
If collector's task is evaluation, judge the convergence and return it.
Arguments:
- task_id (:obj:`str`): the collector task_id
- finished_task (:obj:`dict`): the finished task
Returns:
- convergence (:obj:`bool`): Whether the stop val is reached and the algorithm is converged. \
If True, the pipeline can be finished.
"""
self._collector_task_space.release_space()
evaluator_or_collector = "evaluator" if finished_task['eval_flag'] else "collector"
train_iter = finished_task['train_iter']
info = {
'train_iter': train_iter,
'episode_count': finished_task['real_episode_count'],
'step_count': finished_task['step_count'],
'avg_step_per_episode': finished_task['avg_time_per_episode'],
'avg_time_per_step': finished_task['avg_time_per_step'],
'avg_time_per_episode': finished_task['avg_step_per_episode'],
'reward_mean': finished_task['reward_mean'],
'reward_std': finished_task['reward_std'],
}
self._sub_logger[evaluator_or_collector].info(
"[{}] Task ends:\n{}".format(
evaluator_or_collector.upper(), '\n'.join(['{}: {}'.format(k, v) for k, v in info.items()])
)
)
for k, v in info.items():
if k in ['train_iter']:
continue
self._tb_logger.add_scalar('{}_iter/'.format(evaluator_or_collector) + k, v, train_iter)
self._tb_logger.add_scalar('{}_step/'.format(evaluator_or_collector) + k, v, self._total_collector_env_step)
if finished_task['eval_flag']:
self._evaluator_info.append(finished_task)
eval_stop_value = self._cfg.env.stop_value
if eval_stop_value is not None and finished_task['reward_mean'] >= eval_stop_value:
self._logger.info(
"[DI-engine parallel pipeline] current episode_return: {} is greater than the stop_value: {}".
format(finished_task['reward_mean'], eval_stop_value) + ", so the total training program is over."
)
self._end_flag = True
return True
else:
self._collector_info.append(finished_task)
self._total_collector_env_step += finished_task['step_count']
return False
[docs] def finish_learner_task(self, task_id: str, finished_task: dict) -> str:
r"""
Overview:
Get learner's finish_task_info, release learner_task_space, reset corresponding variables.
Arguments:
- task_id (:obj:`str`): Learner task_id
- finished_task (:obj:`dict`): Learner's finish_learn_info.
Returns:
- buffer_id (:obj:`str`): Buffer id of the finished learner.
"""
self._learner_task_space.release_space()
buffer_id = finished_task['buffer_id']
self._current_buffer_id = None
self._current_policy_id = None
self._learner_info = [{'learner_step': 0}]
self._evaluator_info = []
self._last_eval_time = 0
return buffer_id
[docs] def notify_fail_collector_task(self, task: dict) -> None:
r"""
Overview:
Release task space when collector task fails.
"""
self._collector_task_space.release_space()
[docs] def notify_fail_learner_task(self, task: dict) -> None:
r"""
Overview:
Release task space when learner task fails.
"""
self._learner_task_space.release_space()
def update_learner_info(self, task_id: str, info: dict) -> None:
r"""
Overview:
Append the info to learner_info:
Arguments:
- task_id (:obj:`str`): Learner task_id
- info (:obj:`dict`): Dict type learner info.
"""
self._learner_info.append(info)
def _init_policy_id(self) -> str:
r"""
Overview:
Init the policy id and return it.
Returns:
- policy_id (:obj:`str`): New initialized policy id.
"""
policy_id = 'policy_{}'.format(get_task_uid())
self._current_policy_id = policy_id
return policy_id
def _init_buffer_id(self) -> str:
r"""
Overview:
Init the buffer id and return it.
Returns:
- buffer_id (:obj:`str`): New initialized buffer id.
"""
buffer_id = 'buffer_{}'.format(get_task_uid())
self._current_buffer_id = buffer_id
return buffer_id
def increase_collector_task_space(self):
r""""
Overview:
Increase task space when a new collector has added dynamically.
"""
self._collector_task_space.increase_space()
def decrease_collector_task_space(self):
r""""
Overview:
Decrease task space when a new collector has removed dynamically.
"""
self._collector_task_space.decrease_space()