Source code for ding.utils.data.collate_fn
from collections.abc import Sequence, Mapping
from typing import List, Dict, Union, Any
import torch
import treetensor.torch as ttorch
import re
import collections.abc as container_abcs
from ding.compatibility import torch_ge_131
int_classes = int
string_classes = (str, bytes)
np_str_obj_array_pattern = re.compile(r'[SaUO]')
default_collate_err_msg_format = (
"default_collate: batch must contain tensors, numpy arrays, numbers, "
"dicts or lists; found {}"
)
[docs]def ttorch_collate(x, json: bool = False, cat_1dim: bool = True):
"""
Overview:
Collates a list of tensors or nested dictionaries of tensors into a single tensor or nested \
dictionary of tensors.
Arguments:
- x : The input list of tensors or nested dictionaries of tensors.
- json (:obj:`bool`): If True, converts the output to JSON format. Defaults to False.
- cat_1dim (:obj:`bool`): If True, concatenates tensors with shape (B, 1) along the last dimension. \
Defaults to True.
Returns:
The collated output tensor or nested dictionary of tensors.
Examples:
>>> # case 1: Collate a list of tensors
>>> tensors = [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6]), torch.tensor([7, 8, 9])]
>>> collated = ttorch_collate(tensors)
collated = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
>>> # case 2: Collate a nested dictionary of tensors
>>> nested_dict = {
'a': torch.tensor([1, 2, 3]),
'b': torch.tensor([4, 5, 6]),
'c': torch.tensor([7, 8, 9])
}
>>> collated = ttorch_collate(nested_dict)
collated = {
'a': torch.tensor([1, 2, 3]),
'b': torch.tensor([4, 5, 6]),
'c': torch.tensor([7, 8, 9])
}
>>> # case 3: Collate a list of nested dictionaries of tensors
>>> nested_dicts = [
{'a': torch.tensor([1, 2, 3]), 'b': torch.tensor([4, 5, 6])},
{'a': torch.tensor([7, 8, 9]), 'b': torch.tensor([10, 11, 12])}
]
>>> collated = ttorch_collate(nested_dicts)
collated = {
'a': torch.tensor([[1, 2, 3], [7, 8, 9]]),
'b': torch.tensor([[4, 5, 6], [10, 11, 12]])
}
"""
def inplace_fn(t):
for k in t.keys():
if isinstance(t[k], torch.Tensor):
if len(t[k].shape) == 2 and t[k].shape[1] == 1: # reshape (B, 1) -> (B)
t[k] = t[k].squeeze(-1)
else:
inplace_fn(t[k])
x = ttorch.stack(x)
if cat_1dim:
inplace_fn(x)
if json:
x = x.json()
return x
[docs]def default_collate(batch: Sequence,
cat_1dim: bool = True,
ignore_prefix: list = ['collate_ignore']) -> Union[torch.Tensor, Mapping, Sequence]:
"""
Overview:
Put each data field into a tensor with outer dimension batch size.
Arguments:
- batch (:obj:`Sequence`): A data sequence, whose length is batch size, whose element is one piece of data.
- cat_1dim (:obj:`bool`): Whether to concatenate tensors with shape (B, 1) to (B), defaults to True.
- ignore_prefix (:obj:`list`): A list of prefixes to ignore when collating dictionaries, \
defaults to ['collate_ignore'].
Returns:
- ret (:obj:`Union[torch.Tensor, Mapping, Sequence]`): the collated data, with batch size into each data \
field. The return dtype depends on the original element dtype, can be [torch.Tensor, Mapping, Sequence].
Example:
>>> # a list with B tensors shaped (m, n) -->> a tensor shaped (B, m, n)
>>> a = [torch.zeros(2,3) for _ in range(4)]
>>> default_collate(a).shape
torch.Size([4, 2, 3])
>>>
>>> # a list with B lists, each list contains m elements -->> a list of m tensors, each with shape (B, )
>>> a = [[0 for __ in range(3)] for _ in range(4)]
>>> default_collate(a)
[tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0])]
>>>
>>> # a list with B dicts, whose values are tensors shaped :math:`(m, n)` -->>
>>> # a dict whose values are tensors with shape :math:`(B, m, n)`
>>> a = [{i: torch.zeros(i,i+1) for i in range(2, 4)} for _ in range(4)]
>>> print(a[0][2].shape, a[0][3].shape)
torch.Size([2, 3]) torch.Size([3, 4])
>>> b = default_collate(a)
>>> print(b[2].shape, b[3].shape)
torch.Size([4, 2, 3]) torch.Size([4, 3, 4])
"""
if isinstance(batch, ttorch.Tensor):
return batch.json()
elem = batch[0]
elem_type = type(elem)
if isinstance(elem, torch.Tensor):
out = None
if torch_ge_131() and torch.utils.data.get_worker_info() is not None:
# If we're in a background process, directly concatenate into a
# shared memory tensor to avoid an extra copy
numel = sum([x.numel() for x in batch])
storage = elem.storage()._new_shared(numel)
out = elem.new(storage)
if elem.shape == (1, ) and cat_1dim:
# reshape (B, 1) -> (B)
return torch.cat(batch, 0, out=out)
# return torch.stack(batch, 0, out=out)
else:
return torch.stack(batch, 0, out=out)
elif isinstance(elem, ttorch.Tensor):
return ttorch_collate(batch, json=True, cat_1dim=cat_1dim)
elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \
and elem_type.__name__ != 'string_':
if elem_type.__name__ == 'ndarray':
# array of string classes and object
if np_str_obj_array_pattern.search(elem.dtype.str) is not None:
raise TypeError(default_collate_err_msg_format.format(elem.dtype))
return default_collate([torch.as_tensor(b) for b in batch], cat_1dim=cat_1dim)
elif elem.shape == (): # scalars
return torch.as_tensor(batch)
elif isinstance(elem, float):
return torch.tensor(batch, dtype=torch.float32)
elif isinstance(elem, int_classes):
dtype = torch.bool if isinstance(elem, bool) else torch.int64
return torch.tensor(batch, dtype=dtype)
elif isinstance(elem, string_classes):
return batch
elif isinstance(elem, container_abcs.Mapping):
ret = {}
for key in elem:
if any([key.startswith(t) for t in ignore_prefix]):
ret[key] = [d[key] for d in batch]
else:
ret[key] = default_collate([d[key] for d in batch], cat_1dim=cat_1dim)
return ret
elif isinstance(elem, tuple) and hasattr(elem, '_fields'): # namedtuple
return elem_type(*(default_collate(samples, cat_1dim=cat_1dim) for samples in zip(*batch)))
elif isinstance(elem, container_abcs.Sequence):
transposed = zip(*batch)
return [default_collate(samples, cat_1dim=cat_1dim) for samples in transposed]
raise TypeError(default_collate_err_msg_format.format(elem_type))
[docs]def timestep_collate(batch: List[Dict[str, Any]]) -> Dict[str, Union[torch.Tensor, list]]:
"""
Overview:
Collates a batch of timestepped data fields into tensors with the outer dimension being the batch size. \
Each timestepped data field is represented as a tensor with shape [T, B, any_dims], where T is the length \
of the sequence, B is the batch size, and any_dims represents the shape of the tensor at each timestep.
Arguments:
- batch(:obj:`List[Dict[str, Any]]`): A list of dictionaries with length B, where each dictionary represents \
a timestepped data field. Each dictionary contains a key-value pair, where the key is the name of the \
data field and the value is a sequence of torch.Tensor objects with any shape.
Returns:
- ret(:obj:`Dict[str, Union[torch.Tensor, list]]`): The collated data, with the timestep and batch size \
incorporated into each data field. The shape of each data field is [T, B, dim1, dim2, ...].
Examples:
>>> batch = [
{'data0': [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6])]},
{'data1': [torch.tensor([7, 8, 9]), torch.tensor([10, 11, 12])]}
]
>>> collated_data = timestep_collate(batch)
>>> print(collated_data['data'].shape)
torch.Size([2, 2, 3])
"""
def stack(data):
if isinstance(data, container_abcs.Mapping):
return {k: stack(data[k]) for k in data}
elif isinstance(data, container_abcs.Sequence) and isinstance(data[0], torch.Tensor):
return torch.stack(data)
else:
return data
elem = batch[0]
assert isinstance(elem, (container_abcs.Mapping, list)), type(elem)
if isinstance(batch[0], list): # new pipeline + treetensor
prev_state = [[b[i].get('prev_state') for b in batch] for i in range(len(batch[0]))]
batch_data = ttorch.stack([ttorch_collate(b) for b in batch]) # (B, T, *)
del batch_data.prev_state
batch_data = batch_data.transpose(1, 0)
batch_data.prev_state = prev_state
else:
prev_state = [b.pop('prev_state') for b in batch]
batch_data = default_collate(batch) # -> {some_key: T lists}, each list is [B, some_dim]
batch_data = stack(batch_data) # -> {some_key: [T, B, some_dim]}
transformed_prev_state = list(zip(*prev_state))
batch_data['prev_state'] = transformed_prev_state
# append back prev_state, avoiding multi batch share the same data bug
for i in range(len(batch)):
batch[i]['prev_state'] = prev_state[i]
return batch_data
[docs]def diff_shape_collate(batch: Sequence) -> Union[torch.Tensor, Mapping, Sequence]:
"""
Overview:
Collates a batch of data with different shapes.
This function is similar to `default_collate`, but it allows tensors in the batch to have `None` values, \
which is common in StarCraft observations.
Arguments:
- batch (:obj:`Sequence`): A sequence of data, where each element is a piece of data.
Returns:
- ret (:obj:`Union[torch.Tensor, Mapping, Sequence]`): The collated data, with the batch size applied \
to each data field. The return type depends on the original element type and can be a torch.Tensor, \
Mapping, or Sequence.
Examples:
>>> # a list with B tensors shaped (m, n) -->> a tensor shaped (B, m, n)
>>> a = [torch.zeros(2,3) for _ in range(4)]
>>> diff_shape_collate(a).shape
torch.Size([4, 2, 3])
>>>
>>> # a list with B lists, each list contains m elements -->> a list of m tensors, each with shape (B, )
>>> a = [[0 for __ in range(3)] for _ in range(4)]
>>> diff_shape_collate(a)
[tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0]), tensor([0, 0, 0, 0])]
>>>
>>> # a list with B dicts, whose values are tensors shaped :math:`(m, n)` -->>
>>> # a dict whose values are tensors with shape :math:`(B, m, n)`
>>> a = [{i: torch.zeros(i,i+1) for i in range(2, 4)} for _ in range(4)]
>>> print(a[0][2].shape, a[0][3].shape)
torch.Size([2, 3]) torch.Size([3, 4])
>>> b = diff_shape_collate(a)
>>> print(b[2].shape, b[3].shape)
torch.Size([4, 2, 3]) torch.Size([4, 3, 4])
"""
elem = batch[0]
elem_type = type(elem)
if any([isinstance(elem, type(None)) for elem in batch]):
return batch
elif isinstance(elem, torch.Tensor):
shapes = [e.shape for e in batch]
if len(set(shapes)) != 1:
return batch
else:
return torch.stack(batch, 0)
elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \
and elem_type.__name__ != 'string_':
if elem_type.__name__ == 'ndarray':
return diff_shape_collate([torch.as_tensor(b) for b in batch]) # todo
elif elem.shape == (): # scalars
return torch.as_tensor(batch)
elif isinstance(elem, float):
return torch.tensor(batch, dtype=torch.float32)
elif isinstance(elem, int_classes):
dtype = torch.bool if isinstance(elem, bool) else torch.int64
return torch.tensor(batch, dtype=dtype)
elif isinstance(elem, Mapping):
return {key: diff_shape_collate([d[key] for d in batch]) for key in elem}
elif isinstance(elem, tuple) and hasattr(elem, '_fields'): # namedtuple
return elem_type(*(diff_shape_collate(samples) for samples in zip(*batch)))
elif isinstance(elem, Sequence):
transposed = zip(*batch)
return [diff_shape_collate(samples) for samples in transposed]
raise TypeError('not support element type: {}'.format(elem_type))
[docs]def default_decollate(
batch: Union[torch.Tensor, Sequence, Mapping],
ignore: List[str] = ['prev_state', 'prev_actor_state', 'prev_critic_state']
) -> List[Any]:
"""
Overview:
Drag out batch_size collated data's batch size to decollate it, which is the reverse operation of \
``default_collate``.
Arguments:
- batch (:obj:`Union[torch.Tensor, Sequence, Mapping]`): The collated data batch. It can be a tensor, \
sequence, or mapping.
- ignore(:obj:`List[str]`): A list of names to be ignored. Only applicable if the input ``batch`` is a \
dictionary. If a key is in this list, its value will remain the same without decollation. Defaults to \
['prev_state', 'prev_actor_state', 'prev_critic_state'].
Returns:
- ret (:obj:`List[Any]`): A list with B elements, where B is the batch size.
Examples:
>>> batch = {
'a': [
[1, 2, 3],
[4, 5, 6]
],
'b': [
[7, 8, 9],
[10, 11, 12]
]}
>>> default_decollate(batch)
{
0: {'a': [1, 2, 3], 'b': [7, 8, 9]},
1: {'a': [4, 5, 6], 'b': [10, 11, 12]},
}
"""
if isinstance(batch, torch.Tensor):
batch = torch.split(batch, 1, dim=0)
# Squeeze if the original batch's shape is like (B, dim1, dim2, ...);
# otherwise, directly return the list.
if len(batch[0].shape) > 1:
batch = [elem.squeeze(0) for elem in batch]
return list(batch)
elif isinstance(batch, Sequence):
return list(zip(*[default_decollate(e) for e in batch]))
elif isinstance(batch, Mapping):
tmp = {k: v if k in ignore else default_decollate(v) for k, v in batch.items()}
B = len(list(tmp.values())[0])
return [{k: tmp[k][i] for k in tmp.keys()} for i in range(B)]
elif isinstance(batch, torch.distributions.Distribution): # For compatibility
return [None for _ in range(batch.batch_shape[0])]
raise TypeError("Not supported batch type: {}".format(type(batch)))