lightrft.strategy.utils.data_utils¶
Distributed Sampling Module for PyTorch
This module provides utilities for distributed data sampling in PyTorch, particularly useful for distributed training scenarios. It includes a customized DistributedSampler that extends PyTorch’s sampling capabilities with additional features like handling consumed samples for resuming training.
The module is designed to work seamlessly with PyTorch’s distributed training infrastructure and provides proper data partitioning across multiple processes.
DistributedSampler¶
- class lightrft.strategy.utils.data_utils.DistributedSampler(*args: Any, **kwargs: Any)[source]¶
Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
torch.nn.parallel.DistributedDataParallel. In such a case, each process can pass aDistributedSamplerinstance as aDataLoadersampler, and load a subset of the original dataset that is exclusive to it.Note
Dataset is assumed to be of constant size and that any instance of it always returns the same elements in the same order.
- Parameters:
dataset (Dataset) – Dataset used for sampling.
num_replicas (Optional[int]) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.
rank (Optional[int]) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.
shuffle (bool) – If True (default), sampler will shuffle the indices.
seed (int) – Random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group.
drop_last (bool) – If True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas.
consumed_samples (int) – Number of samples that have been consumed already, useful for resuming training.
Warning
In distributed mode, calling the
set_epoch()method at the beginning of each epoch before creating theDataLoaderiterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.Example:
>>> # xdoctest: +SKIP >>> sampler = DistributedSampler(dataset) if is_distributed else None >>> loader = DataLoader(dataset, shuffle=(sampler is None), ... sampler=sampler) >>> for epoch in range(start_epoch, n_epochs): ... if is_distributed: ... sampler.set_epoch(epoch) ... train(loader)
- __iter__() Iterator[_T_co][source]¶
Iterate over the indices of the dataset.
- Returns:
An iterator over the indices of the dataset.
- Return type:
Iterator[_T_co]
- __len__() int[source]¶
Return the length of the sampler.
- Returns:
The number of samples in this sampler.
- Return type:
int
- set_epoch(epoch: int, consumed_samples: int = 0) None[source]¶
Set the epoch for this sampler.
When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.
- Parameters:
epoch (int) – Epoch number.
consumed_samples (int) – Number of samples already consumed in this epoch.