Using DDP Distributed Training in DI-engine¶
Distributed Data Parallel (DDP) is an effective approach to accelerate reinforcement learning training. This document provides detailed instructions on configuring and using DDP training in DI-engine, using the example pong_dqn_ddp_config.py
.
Launching DDP Training¶
To launch DDP training, use PyTorch’s distributed.launch
module. Run the following command:
python -m torch.distributed.launch --nproc_per_node=2 --master_port=29501 ./dizoo/atari/config/serial/pong/pong_dqn_ddp_config.py
Where:
--nproc_per_node=2
: Specifies the use of 2 GPUs for training.--master_port=29501
: Specifies the port number for the master process.The final argument is the path to the configuration file.
Configuration for DDP Training¶
Compared to single-GPU training (e.g., pong_dqn_config.py
), the configuration file pong_dqn_ddp_config.py
for enabling multi-GPU DDP training introduces two key changes:
Enable multi-GPU support in the policy configuration:
policy = dict( multi_gpu=True, # Enable multi-GPU training mode cuda=True, # Use CUDA acceleration ... )
The core code for initializing multi-GPU training is located in
base_policy.py
: base_policy.py#L167Gradient synchronization occurs in
policy._forward_learn()
: dqn.py#L281
Use ``DDPContext`` to manage the distributed training process:
if __name__ == '__main__': from ding.utils import DDPContext from ding.entry import serial_pipeline with DDPContext(): serial_pipeline((main_config, create_config), seed=0, max_env_step=int(3e6))
DDPContext
initializes the distributed training environment and releases distributed resources after training completes.
DDP Implementation in DI-engine¶
The DDP implementation in DI-engine includes the following key components:
Distributed Processing of Data Collection in the Collector:
In
SampleSerialCollector
, each process independently collects data samples.After collection, statistical data is synchronized across processes using
allreduce
:if self._world_size > 1: collected_sample = allreduce_data(collected_sample, 'sum') collected_step = allreduce_data(collected_step, 'sum') collected_episode = allreduce_data(collected_episode, 'sum') collected_duration = allreduce_data(collected_duration, 'sum')
Distributed Processing of the Evaluator:
The evaluation logic runs only on the rank 0 process:
if get_rank() == 0: # Perform evaluation logic ...
After evaluation, the results are broadcast to other processes:
if get_world_size() > 1: objects = [stop_flag, episode_info] broadcast_object_list(objects, src=0) stop_flag, episode_info = objects
Distributed Logging:
The logger is initialized only on the rank 0 process:
tb_logger = SummaryWriter(os.path.join('./{}/log/'.format(cfg.exp_name), 'serial')) if get_rank() == 0 else None
Logging is restricted to the rank 0 process:
if self._rank == 0: if tb_logger is not None: self._logger, _ = build_logger( path='./{}/log/{}'.format(self._exp_name, self._instance_name), name=self._instance_name, need_tb=False ) self._tb_logger = tb_logger else: self._logger, self._tb_logger = build_logger( path='./{}/log/{}'.format(self._exp_name, self._instance_name), name=self._instance_name ) else: self._logger, _ = build_logger( path='./{}/log/{}'.format(self._exp_name, self._instance_name), name=self._instance_name, need_tb=False ) self._tb_logger = None
Printing logs is also restricted to the rank 0 process:
if self._rank != 0: return
Summary¶
In DI-engine, DDP distributed training fully utilizes the computational power of multiple GPUs to accelerate the training process through distributed data collection, evaluation, and logging. The core logic of DDP relies on PyTorch’s distributed framework, while DDPContext
provides unified management of the distributed environment, simplifying the configuration and usage process for developers.
For more details on the implementation, refer to the following code references: