worker.collector.comm¶
base_comm_collector¶
Please Reference ding/worker/collector/comm/base_comm_collector.py for usage
BaseCommCollector¶
- class ding.worker.collector.comm.base_comm_collector.BaseCommCollector(cfg)[source]¶
- Overview:
Abstract baseclass for common collector.
- Interfaces:
__init__, get_policy_update_info, send_metadata, send_stepdata start, close, _create_collector
- Property:
collector_uid
- _create_collector(task_info: dict) BaseParallelCollector [source]¶
- Overview:
Receive
task_info
passed from coordinator and create a collector.- Arguments:
task_info (
dict
): Task info dict from coordinator. Should be like Returns:collector (
BaseParallelCollector
): Created base collector.
- Note:
Four methods(‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’), and policy are set. The reason why they are set here rather than base collector is, they highly depend on the specific task. Only after task info is passed from coordinator to comm collector through learner slave, can they be clarified and initialized.
- abstract get_policy_update_info(path: str) Any [source]¶
- Overview:
Get policy information in corresponding path. Will be registered in base collector.
- Arguments:
path (
str
): path to policy update information.
- abstract send_metadata(metadata: Any) None [source]¶
- Overview:
Store meta data in queue, which will be retrieved by callback function “deal_with_collector_data” in collector slave, then will be sent to coordinator. Will be registered in base collector.
- Arguments:
metadata (
Any
): meta data.
create_comm_collector¶
- Overview:
Given the key(comm_collector_name), create a new comm collector instance if in comm_map’s values, or raise an KeyError. In other words, a derived comm collector must first register, then can call
create_comm_collector
to get the instance.- Arguments:
cfg (
EasyDict
): Collector config. Necessary keys: [import_names, comm_collector_type].
- Returns:
collector (
BaseCommCollector
): The created new comm collector, should be an instance of one of comm_map’s values.
flask_fs_collector¶
Please Reference ding/worker/collector/comm/flask_fs_collector.py for usage
CollectorSlave¶
- class ding.worker.collector.comm.flask_fs_collector.CollectorSlave(*args, callback_fn: Dict[str, Callable], **kwargs)[source]¶
- Overview:
A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.
- Interfaces:
__init__, _process_task
- __init__(*args, callback_fn: Dict[str, Callable], **kwargs) None [source]¶
- Overview:
Init callback functions additionally. Callback functions are methods in comm collector.
- _process_task(task: dict) dict | TaskFail [source]¶
- Overview:
Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.
- Arguments:
cfg (
EasyDict
): Task dict. Must contain key “name”.
- Returns:
result (
Union[dict, TaskFail]
): Task result dict, or task fail exception.
FlaskFileSystemCollector¶
- class ding.worker.collector.comm.flask_fs_collector.FlaskFileSystemCollector(cfg: dict)[source]¶
- Overview:
An implementation of CommLearner, using flask and the file system.
- Interfaces:
__init__, deal_with_resource, deal_with_collector_start, deal_with_collector_data, deal_with_collector_close, get_policy_update_info, send_stepdata, send_metadata, start, close
- __init__(cfg: dict) None [source]¶
- Overview:
Initialization method.
- Arguments:
cfg (
EasyDict
): Config dict
- deal_with_collector_data() dict [source]¶
- Overview:
Callback function in
CollectorSlave
. Get data sample dict from_metadata_queue
, which will be sent to coordinator afterwards.- Returns:
data (
Any
): Data sample dict.
- deal_with_collector_start(task_info: dict) None [source]¶
- Overview:
Callback function in
CollectorSlave
. Create a collector and start a collector thread of the created one.- Arguments:
task_info (
dict
): Task info dict.
- Note:
In
_create_collector
method in base classBaseCommCollector
, 4 methods ‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’, and policy are set. You can refer to it for details.
- deal_with_resource() dict [source]¶
- Overview:
Callback function in
CollectorSlave
. Return how many resources are needed to start current collector.- Returns:
resource (
dict
): Resource info dict, including [‘gpu’, ‘cpu’].
- get_policy_update_info(path: str) dict [source]¶
- Overview:
Get policy information in corresponding path.
- Arguments:
path (
str
): path to policy update information.
- send_metadata(metadata: dict) None [source]¶
- Overview:
Store learn info dict in queue, which will be retrieved by callback function “deal_with_collector_learn” in collector slave, then will be sent to coordinator.
- Arguments:
metadata (
Any
): meta data.
utils¶
Please Reference ding/worker/collector/comm/utils.py for usage
NaiveCollector¶
- class ding.worker.collector.comm.utils.NaiveCollector(*args, prefix='', **kwargs)[source]¶
- Overview:
A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.
- Interfaces:
_process_task, _get_timestep
- _process_task(task)[source]¶
- Overview:
Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.
- Arguments:
cfg (
EasyDict
): Task dict. Must contain key “name”.
- Returns:
result (
Union[dict, TaskFail]
): Task result dict, or task fail exception.