Shortcuts

lightrft.strategy.utils.distributed_util

lightrft.strategy.utils.distributed_util.all_gather_all_prompt_token_ids(all_prompt_token_ids: List[List[int]], group: torch.distributed.ProcessGroup) List[List[int]][source]

Collect prompt token_ids across processes with different lengths, handle padding and alignment.

This function gathers prompt token lists from all processes in the distributed group. It handles sequences of different lengths by padding them to the maximum length, performing the all-gather operation, and then removing the padding from the results.

Parameters:
  • all_prompt_token_ids (List[List[int]]) – List of prompt token lists from the current process. Each inner list represents tokens for one prompt.

  • group (dist.ProcessGroup) – Distributed communication group for gathering operations.

Returns:

Collected and processed prompt token lists from all processes. The padding tokens (-1) are removed from the final result.

Return type:

List[List[int]]

Raises:

AssertionError – If distributed environment is not initialized.

Example:

>>> # Gather tokens across processes
>>> tokens = [[1, 2, 3], [4, 5]]  # Current process tokens
>>> gathered_tokens = all_gather_all_prompt_token_ids(tokens, process_group)
>>> # Result contains tokens from all processes in the group
lightrft.strategy.utils.distributed_util.create_sub_group(group_size: int, backend: str = 'nccl') Tuple[torch.distributed.ProcessGroup, torch.distributed.ProcessGroup][source]

Create process subgroups for distributed computing with validation and communication testing.

This function creates two types of process groups for distributed computing: 1. Regular groups: processes are grouped consecutively (e.g., [0,1,2,3], [4,5,6,7]) 2. Orthogonal groups: processes are grouped with stride equal to group_size (e.g., [0,4], [1,5], [2,6], [3,7])

The function also performs communication testing to ensure the groups are working correctly.

Parameters:
  • group_size (int) – Size of each process subgroup. Must be a divisor of world_size.

  • backend (str) – Backend for distributed communication (“nccl” for GPU, other options for CPU).

Returns:

Tuple of (regular process group, orthogonal process group).

Return type:

Tuple[dist.ProcessGroup, dist.ProcessGroup]

Raises:

ValueError – If world_size is not divisible by group_size.

Example:

>>> # Create subgroups with size 4 using NCCL backend
>>> regular_group, orthogonal_group = create_sub_group(4, "nccl")
>>> # With world_size=8, this creates:
>>> # Regular groups: [0,1,2,3] and [4,5,6,7]
>>> # Orthogonal groups: [0,4], [1,5], [2,6], [3,7]
lightrft.strategy.utils.distributed_util.gather_inputs_object_for_inference(input_data: List[Any], group: torch.distributed.ProcessGroup) List[Any][source]

All-gather data between inference engine tensor parallel group.

This function collects data from all processes in the specified process group and returns a combined list of all items. It’s useful for aggregating distributed inputs before processing in a tensor-parallel inference setup. The function handles arbitrary Python objects using PyTorch’s object gathering mechanism.

Parameters:
  • input_data (List[Any]) – List of objects to be gathered from the current process. Can contain any serializable Python objects.

  • group (torch.distributed.ProcessGroup) – Inference engine tensor parallel process group that defines the communication context for gathering operations.

Returns:

Combined list of objects from all processes in the group. Items from each process are flattened into a single list.

Return type:

List[Any]

Example:

>>> # Gather inference inputs across tensor parallel processes
>>> local_inputs = [{"prompt": "Hello"}, {"prompt": "World"}]
>>> all_inputs = gather_inputs_object_for_inference(local_inputs, tp_group)
>>> # Result contains inputs from all processes in the tensor parallel group
lightrft.strategy.utils.distributed_util.init_process_group(backend: str | torch.distributed.distributed_c10d.Backend = None, init_method: str | None = None, timeout: timedelta | None = None, world_size: int = -1, rank: int = -1, store: torch.distributed.distributed_c10d.Store | None = None, group_name: str = None, pg_options: Any | None = None)[source]

Initialize the distributed process group for multi-process training.

This function is a custom wrapper around torch.distributed.init_process_group that allows creating multiple main process groups, which is not supported by the standard PyTorch API. It handles the rendezvous process, backend initialization, and provides additional validation.

Process Groups Overview: A process group is a collection of processes that can communicate with each other using collective operations (e.g., all_reduce, all_gather). Process groups are essential for distributed training, enabling data parallelism, model parallelism, and other distributed computing patterns.

Initialization Methods: - env:// (default): Uses environment variables (MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE) - tcp://: Explicitly specifies master address and port (e.g., tcp://10.1.1.20:23456) - file://: Uses a shared file system for coordination (e.g., file:///mnt/nfs/sharedfile)

Backends: - nccl: Recommended for GPU training, optimized for NVIDIA GPUs - gloo: Works for both CPU and GPU, recommended for CPU training - mpi: Requires MPI installation, useful for HPC environments

Rendezvous Process: The rendezvous mechanism coordinates all processes to discover each other and establish communication channels. This function handles the rendezvous automatically based on the init_method or store provided.

For more details, see PyTorch distributed documentation: https://pytorch.org/docs/stable/distributed.html

Parameters:
  • backend (Union[str, Backend], optional) – Backend to use (e.g., ‘nccl’, ‘gloo’, ‘mpi’).

  • init_method (Optional[str]) – URL specifying how to initialize the process group. Defaults to ‘env://’ if not specified.

  • timeout (Optional[timedelta]) – Timeout for operations executed against the process group. Defaults to 30 minutes if not specified.

  • world_size (int) – Total number of processes participating in the job. Required if using store-based initialization.

  • rank (int) – Rank of the current process (0 to world_size - 1). Required if using store-based initialization.

  • store (Optional[Store]) – Key/value store accessible to all workers for coordination. If provided, init_method must be None.

  • group_name (str, optional) – Name of the process group for identification. Useful when creating multiple process groups.

  • pg_options (Optional[Any]) – Process group options (backend-specific configuration). Note: Renamed to backend_options in PyTorch 2.6.0+.

Returns:

The initialized process group.

Return type:

dist.ProcessGroup

Raises:

AssertionError – If both init_method and store are specified, or if world_size/rank are invalid when using store.