ding.utils¶
autolog¶
Please refer to ding/utils/autolog
for more details.
TimeMode¶
RangedData¶
TimeRangedData¶
LoggedModel¶
BaseTime¶
NaturalTime¶
TickTime¶
TimeProxy¶
LoggedValue¶
data.structure¶
Please refer to ding/utils/data/structure
for more details.
Cache¶
LifoDeque¶
data.base_dataloader¶
Please refer to ding/utils/data/base_dataloader
for more details.
IDataLoader¶
data.collate_fn¶
Please refer to ding/utils/data/collate_fn
for more details.
ttorch_collate¶
default_collate¶
timestep_collate¶
diff_shape_collate¶
default_decollate¶
data.dataloader¶
Please refer to ding/utils/data/dataloader
for more details.
AsyncDataLoader¶
data.dataset¶
Please refer to ding/utils/data/dataset
for more details.
DatasetStatistics¶
NaiveRLDataset¶
D4RLDataset¶
HDF5Dataset¶
D4RLTrajectoryDataset¶
D4RLDiffuserDataset¶
FixedReplayBuffer¶
PCDataset¶
load_bfs_datasets¶
BCODataset¶
SequenceDataset¶
hdf5_save¶
naive_save¶
offline_data_save_type¶
create_dataset¶
bfs_helper¶
Please refer to ding/utils/bfs_helper
for more details.
get_vi_sequence¶
- ding.utils.bfs_helper.get_vi_sequence(env: gym.core.Env, observation: numpy.ndarray) Tuple[numpy.ndarray, List] [source]¶
- Overview:
Given an instance of the maze environment and the current observation, using Broad-First-Search (BFS) algorithm to plan an optimal path and record the result.
- Arguments:
env (
Env
): The instance of the maze environment.observation (
np.ndarray
): The current observation.
- Returns:
output (
Tuple[np.ndarray, List]
): The BFS result.output[0]
contains the BFS map after each iteration andoutput[1]
contains the optimal actions before reaching the finishing point.
collection_helper¶
Please refer to ding/utils/collection_helper
for more details.
iter_mapping¶
- ding.utils.collection_helper.iter_mapping(iter_: Iterable[ding.utils.collection_helper._IterType], mapping: Callable[[ding.utils.collection_helper._IterType], ding.utils.collection_helper._IterTargetType])[source]¶
- Overview:
Map a list of iterable elements to input iteration callable
- Arguments:
iter_(
_IterType list
): The list for iterationmapping (
Callable [[_IterType], _IterTargetType]
): A callable that maps iterable elements function.
- Return:
(
iter_mapping object
): Iteration results
- Example:
>>> iterable_list = [1, 2, 3, 4, 5] >>> _iter = iter_mapping(iterable_list, lambda x: x ** 2) >>> print(list(_iter)) [1, 4, 9, 16, 25]
compression_helper¶
Please refer to ding/utils/compression_helper
for more details.
CloudPickleWrapper¶
dummy_compressor¶
zlib_data_compressor¶
- ding.utils.compression_helper.zlib_data_compressor(data: Any) bytes [source]¶
- Overview:
Takes the input compressed data and return the compressed original data (zlib compressor) in binary format.
- Arguments:
data (
Any
): The input data of the compressor.
- Returns:
output (
bytes
): The compressed byte-like result.
- Examples:
>>> zlib_data_compressor("Hello")
lz4_data_compressor¶
- ding.utils.compression_helper.lz4_data_compressor(data: Any) bytes [source]¶
- Overview:
Return the compressed original data (lz4 compressor).The compressor outputs in binary format.
- Arguments:
data (
Any
): The input data of the compressor.
- Returns:
output (
bytes
): The compressed byte-like result.
- Examples:
>>> lz4.block.compress(pickle.dumps("Hello")) b'R Hello.'
jpeg_data_compressor¶
- ding.utils.compression_helper.jpeg_data_compressor(data: numpy.ndarray) bytes [source]¶
- Overview:
To reduce memory usage, we can choose to store the jpeg strings of image instead of the numpy array in the buffer. This function encodes the observation numpy arr to the jpeg strings.
- Arguments:
data (
np.array
): the observation numpy arr.
- Returns:
img_str (
bytes
): The compressed byte-like result.
get_data_compressor¶
- ding.utils.compression_helper.get_data_compressor(name: str)[source]¶
- Overview:
Get the data compressor according to the input name.
- Arguments:
name(
str
): Name of the compressor, support['lz4', 'zlib', 'jpeg', 'none']
- Return:
compressor (
Callable
): Corresponding data_compressor, taking input data returning compressed data.
- Example:
>>> compress_fn = get_data_compressor('lz4') >>> compressed_data = compressed(input_data)
dummy_decompressor¶
lz4_data_decompressor¶
zlib_data_decompressor¶
jpeg_data_decompressor¶
- ding.utils.compression_helper.jpeg_data_decompressor(compressed_data: bytes, gray_scale=False) numpy.ndarray [source]¶
- Overview:
To reduce memory usage, we can choose to store the jpeg strings of image instead of the numpy array in the buffer. This function decodes the observation numpy arr from the jpeg strings.
- Arguments:
compressed_data (
bytes
): The jpeg strings.- gray_scale (
bool
): If the observation is gray,gray_scale=True
, if the observation is RGB,
gray_scale=False
.
- gray_scale (
- Returns:
arr (
np.ndarray
): The decompressed numpy array.
get_data_decompressor¶
- ding.utils.compression_helper.get_data_decompressor(name: str) Callable [source]¶
- Overview:
Get the data decompressor according to the input name.
- Arguments:
name(
str
): Name of the decompressor, support['lz4', 'zlib', 'none']
Note
For all the decompressors, the input of a bytes-like object is required.
- Returns:
decompressor (
Callable
): Corresponding data decompressor.
- Examples:
>>> decompress_fn = get_data_decompressor('lz4') >>> origin_data = compressed(compressed_data)
default_helper¶
Please refer to ding/utils/default_helper
for more details.
get_shape0¶
- ding.utils.default_helper.get_shape0(data: Union[List, Dict, torch.Tensor, treetensor.torch.tensor.Tensor]) int [source]¶
- Overview:
Get shape[0] of data’s torch tensor or treetensor
- Arguments:
data (
Union[List,Dict,torch.Tensor,ttorch.Tensor]
): data to be analysed
- Returns:
shape[0] (
int
): first dimension length of data, usually the batchsize.
lists_to_dicts¶
- ding.utils.default_helper.lists_to_dicts(data: Union[List[Union[dict, NamedTuple]], Tuple[Union[dict, NamedTuple]]], recursive: bool = False) Union[Mapping[object, object], NamedTuple] [source]¶
- Overview:
Transform a list of dicts to a dict of lists.
- Arguments:
- data (
Union[List[Union[dict, NamedTuple]], Tuple[Union[dict, NamedTuple]]]
): A dict of lists need to be transformed
- data (
recursive (
bool
): whether recursively deals with dict element
- Returns:
newdata (
Union[Mapping[object, object], NamedTuple]
): A list of dicts as a result
- Example:
>>> from ding.utils import * >>> lists_to_dicts([{1: 1, 10: 3}, {1: 2, 10: 4}]) {1: [1, 2], 10: [3, 4]}
dicts_to_lists¶
- ding.utils.default_helper.dicts_to_lists(data: Mapping[object, List[object]]) List[Mapping[object, object]] [source]¶
- Overview:
Transform a dict of lists to a list of dicts.
- Arguments:
data (
Mapping[object, list]
): A list of dicts need to be transformed
- Returns:
newdata (
List[Mapping[object, object]]
): A dict of lists as a result
- Example:
>>> from ding.utils import * >>> dicts_to_lists({1: [1, 2], 10: [3, 4]}) [{1: 1, 10: 3}, {1: 2, 10: 4}]
override¶
squeeze¶
default_get¶
- ding.utils.default_helper.default_get(data: dict, name: str, default_value: Optional[Any] = None, default_fn: Optional[Callable] = None, judge_fn: Optional[Callable] = None) Any [source]¶
- Overview:
Getting the value by input, checks generically on the inputs with at least
data
andname
. Ifname
exists indata
, get the value atname
; else, addname
todefault_get_set
with value generated bydefault_fn
(or directly asdefault_value
) that is checked by `` judge_fn`` to be legal.- Arguments:
data(
dict
): Data input dictionaryname(
str
): Key namedefault_value(
Optional[Any]
) = None,default_fn(
Optional[Callable]
) = Valuejudge_fn(
Optional[Callable]
) = None
- Returns:
ret(
list
): Splitted dataresidual(
list
): Residule list
list_split¶
- ding.utils.default_helper.list_split(data: list, step: int) List[list] [source]¶
- Overview:
Split list of data by step.
- Arguments:
data(
list
): List of data for splitingstep(
int
): Number of step for spliting
- Returns:
ret(
list
): List of splitted data.residual(
list
): Residule list. This value isNone
whendata
dividessteps
.
- Example:
>>> list_split([1,2,3,4],2) ([[1, 2], [3, 4]], None) >>> list_split([1,2,3,4],3) ([[1, 2, 3]], [4])
error_wrapper¶
- ding.utils.default_helper.error_wrapper(fn, default_ret, warning_msg='')[source]¶
- Overview:
wrap the function, so that any Exception in the function will be catched and return the default_ret
- Arguments:
fn (
Callable
): the function to be wrapeddefault_ret (
obj
): the default return when an Exception occurred in the function
- Returns:
wrapper (
Callable
): the wrapped function
- Examples:
>>> # Used to checkfor Fakelink (Refer to utils.linklink_dist_helper.py) >>> def get_rank(): # Get the rank of linklink model, return 0 if use FakeLink. >>> if is_fake_link: >>> return 0 >>> return error_wrapper(link.get_rank, 0)()
LimitedSpaceContainer¶
- class ding.utils.default_helper.LimitedSpaceContainer(min_val: int, max_val: int)[source]¶
- Overview:
A space simulator.
- Interfaces:
__init__
,get_residual_space
,release_space
- __init__(min_val: int, max_val: int) None [source]¶
- Overview:
Set
min_val
andmax_val
of the container, also setcur
tomin_val
for initialization.- Arguments:
min_val (
int
): Min volume of the container, usually 0.max_val (
int
): Max volume of the container.
- acquire_space() bool [source]¶
- Overview:
Try to get one pice of space. If there is one, return True; Otherwise return False.
- Returns:
flag (
bool
): Whether there is any piece of residual space.
deep_merge_dicts¶
deep_update¶
- ding.utils.default_helper.deep_update(original: dict, new_dict: dict, new_keys_allowed: bool = False, whitelist: Optional[List[str]] = None, override_all_if_type_changes: Optional[List[str]] = None)[source]¶
- Overview:
Update original dict with values from new_dict recursively.
- Arguments:
original (
dict
): Dictionary with default values.new_dict (
dict
): Dictionary with values to be updatednew_keys_allowed (
bool
): Whether new keys are allowed.- whitelist (
Optional[List[str]]
): List of keys that correspond to dict values where new subkeys can be introduced. This is only at the top level.
- whitelist (
- override_all_if_type_changes(
Optional[List[str]]
): List of top level keys with value=dict, for which we always simply override the entire value (
dict
), if the “type” key in that value dict changes.
- override_all_if_type_changes(
Note
If new key is introduced in new_dict, then if new_keys_allowed is not True, an error will be thrown. Further, for sub-dicts, if the key is in the whitelist, then new subkeys can be introduced.
flatten_dict¶
- ding.utils.default_helper.flatten_dict(data: dict, delimiter: str = '/') dict [source]¶
- Overview:
Flatten the dict, see example
- Arguments:
data (
dict
): Original nested dictdelimiter (str): Delimiter of the keys of the new dict
- Returns:
data (
dict
): Flattened nested dict
- Example:
>>> a {'a': {'b': 100}} >>> flatten_dict(a) {'a/b': 100}
set_pkg_seed¶
- ding.utils.default_helper.set_pkg_seed(seed: int, use_cuda: bool = True) None [source]¶
- Overview:
Side effect function to set seed for
random
,numpy random
, andtorch's manual seed
. This is usaually used in entry scipt in the section of setting random seed for all package and instance- Argument:
seed(
int
): Set seeduse_cuda(
bool
) Whether use cude
- Examples:
>>> # ../entry/xxxenv_xxxpolicy_main.py >>> ... # Set random seed for all package and instance >>> collector_env.seed(seed) >>> evaluator_env.seed(seed, dynamic_seed=False) >>> set_pkg_seed(seed, use_cuda=cfg.policy.cuda) >>> ... # Set up RL Policy, etc. >>> ...
one_time_warning¶
split_fn¶
split_data_generator¶
RunningMeanStd¶
- class ding.utils.default_helper.RunningMeanStd(epsilon=0.0001, shape=(), device=device(type='cpu'))[source]¶
- Overview:
Wrapper to update new variable, new mean, and new count
- Interfaces:
__init__
,update
,reset
,new_shape
- Properties:
mean
,std
,_epsilon
,_shape
,_mean
,_var
,_count
- __init__(epsilon=0.0001, shape=(), device=device(type='cpu'))[source]¶
- Overview:
Initialize
self.
Seehelp(type(self))
for accurate signature; setup the properties.- Arguments:
env (
gym.Env
): the environment to wrap.epsilon (
Float
): the epsilon used for self for the std outputshape (:obj: np.array): the np array shape used for the expression of this wrapper on attibutes of mean and variance
- property mean: numpy.ndarray¶
- Overview:
Property
mean
gotten fromself._mean
- static new_shape(obs_shape, act_shape, rew_shape)[source]¶
- Overview:
Get new shape of observation, acton, and reward; in this case unchanged.
- Arguments:
obs_shape (
Any
), act_shape (Any
), rew_shape (Any
)- Returns:
obs_shape (
Any
), act_shape (Any
), rew_shape (Any
)
- reset()[source]¶
- Overview:
Resets the state of the environment and reset properties:
_mean
,_var
,_count
- property std: numpy.ndarray¶
- Overview:
Property
std
calculated fromself._var
and the epsilon value ofself._epsilon
make_key_as_identifier¶
- ding.utils.default_helper.make_key_as_identifier(data: Dict[str, Any]) Dict[str, Any] [source]¶
- Overview:
Make the key of dict into legal python identifier string so that it is compatible with some python magic method such as
__getattr
.- Arguments:
data (
Dict[str, Any]
): The original dict data.
- Return:
new_data (
Dict[str, Any]
): The new dict data with legal identifier keys.
remove_illegal_item¶
- ding.utils.default_helper.remove_illegal_item(data: Dict[str, Any]) Dict[str, Any] [source]¶
- Overview:
Remove illegal item in dict info, like str, which is not compatible with Tensor.
- Arguments:
data (
Dict[str, Any]
): The original dict data.
- Return:
new_data (
Dict[str, Any]
): The new dict data without legal items.
design_helper¶
Please refer to ding/utils/design_helper
for more details.
SingletonMetaclass¶
fake_linklink¶
Please refer to ding/utils/fake_linklink
for more details.
FakeClass¶
FakeNN¶
FakeLink¶
fast_copy¶
Please refer to ding/utils/fast_copy
for more details.
_FastCopy¶
- class ding.utils.fast_copy._FastCopy[source]¶
- Overview:
The idea of this class comes from this article https://newbedev.com/what-is-a-fast-pythonic-way-to-deepcopy-just-data-from-a-python-dict-or-list. We use recursive calls to copy each object that needs to be copied, which will be 5x faster than copy.deepcopy.
- Interfaces:
__init__
,_copy_list
,_copy_dict
,_copy_tensor
,_copy_ndarray
,copy
.
- _copy_dict(d: dict) dict [source]¶
- Overview:
Copy the dict.
- Arguments:
d (
dict
): The dict to be copied.
- _copy_list(l: List) dict [source]¶
- Overview:
Copy the list.
- Arguments:
l (
List
): The list to be copied.
- _copy_ndarray(a: numpy.ndarray) numpy.ndarray [source]¶
- Overview:
Copy the ndarray.
- Arguments:
a (
np.ndarray
): The ndarray to be copied.
file_helper¶
Please refer to ding/utils/file_helper
for more details.
read_from_ceph¶
_get_redis¶
read_from_redis¶
_ensure_rediscluster¶
- ding.utils.file_helper._ensure_rediscluster(startup_nodes=[{'host': '127.0.0.1', 'port': '7000'}])[source]¶
- Overview:
Ensures redis usage
- Arguments:
- List of startup nodes (
dict
) of host (
str
): Host stringport (
int
): Port number
- List of startup nodes (
- Returns:
(
RedisCluster(object)
): RedisCluster object with givenhost
,port
, andFalse
fordecode_responses
in default.
read_from_rediscluster¶
read_from_file¶
_ensure_memcached¶
read_from_mc¶
read_from_path¶
save_file_ceph¶
save_file_redis¶
save_file_rediscluster¶
read_file¶
- ding.utils.file_helper.read_file(path: str, fs_type: Union[None, str] = None, use_lock: bool = False) object [source]¶
- Overview:
Read file from path
- Arguments:
path (
str
): The path of file to readfs_type (
str
orNone
): The file system type, support{'normal', 'ceph'}
use_lock (
bool
): Whetheruse_lock
is in local normal file system
save_file¶
- ding.utils.file_helper.save_file(path: str, data: object, fs_type: Union[None, str] = None, use_lock: bool = False) None [source]¶
- Overview:
Save data to file of path
- Arguments:
path (
str
): The path of file to save todata (
object
): The data to savefs_type (
str
orNone
): The file system type, support{'normal', 'ceph'}
use_lock (
bool
): Whetheruse_lock
is in local normal file system
remove_file¶
import_helper¶
Please refer to ding/utils/import_helper
for more details.
try_import_ceph¶
try_import_mc¶
try_import_redis¶
try_import_rediscluster¶
try_import_link¶
import_module¶
k8s_helper¶
Please refer to ding/utils/k8s_helper
for more details.
get_operator_server_kwargs¶
exist_operator_server¶
pod_exec_command¶
K8sType¶
K8sLauncher¶
- class ding.utils.k8s_helper.K8sLauncher(config_path: str)[source]¶
- Overview:
object to manage the K8s cluster
- Interfaces:
__init__
,_load
,create_cluster
,_check_k3d_tools
,delete_cluster
,preload_images
- __init__(config_path: str) None [source]¶
- Overview:
Initialize the K8sLauncher object.
- Arguments:
config_path (
str
): The path of the config file.
linklink_dist_helper¶
Please refer to ding/utils/linklink_dist_helper
for more details.
get_rank¶
get_world_size¶
broadcast¶
allreduce¶
allreduce_async¶
get_group¶
dist_mode¶
dist_init¶
dist_finalize¶
DistContext¶
simple_group_split¶
synchronize¶
lock_helper¶
Please refer to ding/utils/lock_helper
for more details.
LockContextType¶
LockContext¶
- class ding.utils.lock_helper.LockContext(lock_type: ding.utils.lock_helper.LockContextType = LockContextType.THREAD_LOCK)[source]¶
- Overview:
Generate a LockContext in order to make sure the thread safety.
- Interfaces:
__init__
,__enter__
,__exit__
.- Example:
>>> with LockContext() as lock: >>> print("Do something here.")
- __init__(lock_type: ding.utils.lock_helper.LockContextType = LockContextType.THREAD_LOCK)[source]¶
- Overview:
Init the lock according to the given type.
- Arguments:
lock_type (
LockContextType
): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK.
get_rw_file_lock¶
FcntlContext¶
- class ding.utils.lock_helper.FcntlContext(lock_path: str)[source]¶
- Overview:
A context manager that acquires an exclusive lock on a file using fcntl. This is useful for preventing multiple processes from running the same code.
- Interfaces:
__init__
,__enter__
,__exit__
.- Example:
>>> lock_path = "/path/to/lock/file" >>> with FcntlContext(lock_path) as lock: >>> # Perform operations while the lock is held
get_file_lock¶
- ding.utils.lock_helper.get_file_lock(name: str, op: str) ding.utils.lock_helper.FcntlContext [source]¶
- Overview:
Acquires a file lock for the specified file.
- Arguments:
name (
str
): The name of the file.op (
str
): The operation to perform on the file lock.
log_helper¶
Please refer to ding/utils/log_helper
for more details.
build_logger¶
- ding.utils.log_helper.build_logger(path: str, name: Optional[str] = None, need_tb: bool = True, need_text: bool = True, text_level: Union[int, str] = 20) Tuple[Optional[logging.Logger], Optional[SummaryWriter]] [source]¶
- Overview:
Build text logger and tensorboard logger.
- Arguments:
path (
str
): Logger(Textlogger
&SummaryWriter
)’s saved dirname (
str
): The logger file nameneed_tb (
bool
): WhetherSummaryWriter
instance would be created and returnedneed_text (
bool
): WhetherloggingLogger
instance would be created and returnedtext_level (
int`
orstr
): Logging level oflogging.Logger
, default set tologging.INFO
- Returns:
logger (
Optional[logging.Logger]
): Logger that displays terminal outputtb_logger (
Optional['SummaryWriter']
): Saves output to tfboard, only return whenneed_tb
.
TBLoggerFactory¶
- class ding.utils.log_helper.TBLoggerFactory[source]¶
- Overview:
TBLoggerFactory is a factory class for
SummaryWriter
.- Interfaces:
create_logger
- Properties:
tb_loggers
(Dict[str, SummaryWriter]
): A dict that storesSummaryWriter
instances.
- classmethod create_logger(logdir: str) ding.utils.log_writer_helper.DistributedWriter [source]¶
- tb_loggers = {}¶
LoggerFactory¶
- class ding.utils.log_helper.LoggerFactory[source]¶
- Overview:
LoggerFactory is a factory class for
logging.Logger
.- Interfaces:
create_logger
,get_tabulate_vars
,get_tabulate_vars_hor
- classmethod create_logger(path: str, name: str = 'default', level: Union[int, str] = 20) logging.Logger [source]¶
- Overview:
Create logger using logging
- Arguments:
name (
str
): Logger’s namepath (
str
): Logger’s save dirlevel (
int
orstr
): Used to set the level. Reference:Logger.setLevel
method.
- Returns:
(
logging.Logger
): new logging logger
pretty_print¶
log_writer_helper¶
Please refer to ding/utils/log_writer_helper
for more details.
DistributedWriter¶
- class ding.utils.log_writer_helper.DistributedWriter(*args, **kwargs)[source]¶
- Overview:
A simple subclass of SummaryWriter that supports writing to one process in multi-process mode. The best way is to use it in conjunction with the
router
to take advantage of the message and event components of the router (seewriter.plugin
).- Interfaces:
get_instance
,plugin
,initialize
,__del__
- classmethod get_instance(*args, **kwargs) ding.utils.log_writer_helper.DistributedWriter [source]¶
- Overview:
Get instance and set the root level instance on the first called. If args and kwargs is none, this method will return root instance.
- Arguments:
args (
Tuple
): The arguments passed to the__init__
function of the parent class, SummaryWriter.kwargs (
Dict
): The keyword arguments passed to the__init__
function of the parent class, SummaryWriter.
- plugin(router: Parallel, is_writer: bool = False) DistributedWriter [source]¶
- Overview:
Plugin
router
, so when using this writer with active router, it will automatically send requests to the main writer instead of writing it to the disk. So we can collect data from multiple processes and write them into one file.- Arguments:
router (
Parallel
): The router to be plugged in.is_writer (
bool
): Whether this writer is the main writer.
- Examples:
>>> DistributedWriter().plugin(router, is_writer=True)
enable_parallel¶
normalizer_helper¶
Please refer to ding/utils/normalizer_helper
for more details.
DatasetNormalizer¶
- class ding.utils.normalizer_helper.DatasetNormalizer(dataset: numpy.ndarray, normalizer: str, path_lengths: Optional[list] = None)[source]¶
- Overview:
The DatasetNormalizer class provides functionality to normalize and unnormalize data in a dataset. It takes a dataset as input and applies a normalizer function to each key in the dataset.
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
- __init__(dataset: numpy.ndarray, normalizer: str, path_lengths: Optional[list] = None)[source]¶
- Overview:
Initialize the NormalizerHelper object.
- Arguments:
dataset (
np.ndarray
): The dataset to be normalized.normalizer (
str
): The type of normalizer to be used. Can be a string representing the name of the normalizer class.path_lengths (
list
): The length of the paths in the dataset. Defaults to None.
- normalize(x: numpy.ndarray, key: str) numpy.ndarray [source]¶
- Overview:
Normalize the input data using the specified key.
- Arguments:
x (
np.ndarray
): The input data to be normalized.key (:obj`str`): The key to identify the normalizer.
- Returns:
ret (
np.ndarray
): The normalized value of the input data.
flatten¶
- ding.utils.normalizer_helper.flatten(dataset: dict, path_lengths: list) dict [source]¶
- Overview:
Flattens dataset of { key: [ n_episodes x max_path_length x dim ] } to { key : [ (n_episodes * sum(path_lengths)) x dim ] }
- Arguments:
dataset (
dict
): The dataset to be flattened.path_lengths (
list
): A list of path lengths for each episode.
- Returns:
flattened (
dict
): The flattened dataset.
Normalizer¶
- class ding.utils.normalizer_helper.Normalizer(X)[source]¶
- Overview:
Parent class, subclass by defining the normalize and unnormalize methods
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
- __init__(X)[source]¶
- Overview:
Initialize the Normalizer object.
- Arguments:
X (
np.ndarray
): The data to be normalized.
GaussianNormalizer¶
- class ding.utils.normalizer_helper.GaussianNormalizer(*args, **kwargs)[source]¶
- Overview:
A class that normalizes data to zero mean and unit variance.
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
- __init__(*args, **kwargs)[source]¶
- Overview:
Initialize the GaussianNormalizer object.
- Arguments:
args (
list
): The arguments passed to the__init__
function of the parent class, i.e., the Normalizer class.kwargs (
dict
): The keyword arguments passed to the__init__
function of the parent class, i.e., the Normalizer class.
CDFNormalizer¶
- class ding.utils.normalizer_helper.CDFNormalizer(X)[source]¶
- Overview:
A class that makes training data uniform (over each dimension) by transforming it with marginal CDFs.
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
- __init__(X)[source]¶
- Overview:
Initialize the CDFNormalizer object.
- Arguments:
X (
np.ndarray
): The data to be normalized.
- normalize(x: numpy.ndarray) numpy.ndarray [source]¶
- Overview:
Normalizes the input data.
- Arguments:
x (
np.ndarray
): The input data.
- Returns:
ret (
np.ndarray
): The normalized data.
CDFNormalizer1d¶
- class ding.utils.normalizer_helper.CDFNormalizer1d(X: numpy.ndarray)[source]¶
- Overview:
CDF normalizer for a single dimension. This class provides methods to normalize and unnormalize data using the Cumulative Distribution Function (CDF) approach.
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
- __init__(X: numpy.ndarray)[source]¶
- Overview:
Initialize the CDFNormalizer1d object.
- Arguments:
X (
np.ndarray
): The data to be normalized.
empirical_cdf¶
- ding.utils.normalizer_helper.empirical_cdf(sample: numpy.ndarray) -> (<class 'numpy.ndarray'>, <class 'numpy.ndarray'>)[source]¶
- Overview:
Compute the empirical cumulative distribution function (CDF) of a given sample.
- Arguments:
sample (
np.ndarray
): The input sample for which to compute the empirical CDF.
- Returns:
quantiles (
np.ndarray
): The unique values in the sample.cumprob (
np.ndarray
): The cumulative probabilities corresponding to the quantiles.
- References:
Stack Overflow: https://stackoverflow.com/a/33346366
atleast_2d¶
LimitsNormalizer¶
- class ding.utils.normalizer_helper.LimitsNormalizer(X)[source]¶
- Overview:
A class that normalizes and unnormalizes values within specified limits. This class maps values within the range [xmin, xmax] to the range [-1, 1].
- Interfaces:
__init__
,__repr__
,normalize
,unnormalize
.
orchestrator_launcher¶
Please refer to ding/utils/orchestrator_launcher
for more details.
OrchestratorLauncher¶
- class ding.utils.orchestrator_launcher.OrchestratorLauncher(version: str, name: str = 'di-orchestrator', cluster: Optional[ding.utils.k8s_helper.K8sLauncher] = None, registry: str = 'diorchestrator', cert_manager_version: str = 'v1.3.1', cert_manager_registry: str = 'quay.io/jetstack')[source]¶
- Overview:
Object to manage di-orchestrator in existing k8s cluster
- Interfaces:
__init__
,create_orchestrator
,delete_orchestrator
- __init__(version: str, name: str = 'di-orchestrator', cluster: Optional[ding.utils.k8s_helper.K8sLauncher] = None, registry: str = 'diorchestrator', cert_manager_version: str = 'v1.3.1', cert_manager_registry: str = 'quay.io/jetstack') None [source]¶
- Overview:
Initialize the OrchestratorLauncher object.
- Arguments:
version (
str
): The version of di-orchestrator.name (
str
): The name of di-orchestrator.cluster (
K8sLauncher
): The k8s cluster to deploy di-orchestrator.registry (
str
): The docker registry to pull images.cert_manager_version (
str
): The version of cert-manager.cert_manager_registry (
str
): The docker registry to pull cert-manager images.
create_components_from_config¶
wait_to_be_ready¶
- ding.utils.orchestrator_launcher.wait_to_be_ready(namespace: str, component: str, timeout: int = 120) None [source]¶
- Overview:
Wait for the component to be ready.
- Arguments:
namespace (
str
): The namespace of the component.component (
str
): The name of the component.timeout (
int
): The timeout of waiting.
profiler_helper¶
Please refer to ding/utils/profiler_helper
for more details.
Profiler¶
- class ding.utils.profiler_helper.Profiler[source]¶
- Overview:
A class for profiling code execution. It can be used as a context manager or a decorator.
- Interfaces:
__init__
,mkdir
,write_profile
,profile
.
- mkdir(directory: str)[source]¶
- OverView:
Create a directory if it doesn’t exist.
- Arguments:
directory (
str
): The path of the directory to be created.
pytorch_ddp_dist_helper¶
Please refer to ding/utils/pytorch_ddp_dist_helper
for more details.
get_rank¶
get_world_size¶
allreduce¶
allreduce_async¶
reduce_data¶
allreduce_data¶
get_group¶
dist_mode¶
dist_init¶
dist_finalize¶
DDPContext¶
simple_group_split¶
to_ddp_config¶
registry¶
Please refer to ding/utils/registry
for more details.
Registry¶
- class ding.utils.registry.Registry(*args, **kwargs)[source]¶
- Overview:
A helper class for managing registering modules, it extends a dictionary and provides a register functions.
- Interfaces:
__init__
,register
,get
,build
,query
,query_details
- Examples (creating):
>>> some_registry = Registry({"default": default_module})
- Examples (registering: normal way):
>>> def foo(): >>> ... >>> some_registry.register("foo_module", foo)
- Examples (registering: decorator way):
>>> @some_registry.register("foo_module") >>> @some_registry.register("foo_modeul_nickname") >>> def foo(): >>> ...
- Examples (accessing):
>>> f = some_registry["foo_module"]
- __init__(*args, **kwargs) None [source]¶
- Overview:
Initialize the Registry object.
- Arguments:
args (
Tuple
): The arguments passed to the__init__
function of the parent class, dict.kwargs (
Dict
): The keyword arguments passed to the__init__
function of the parent class, dict.
- static _register_generic(module_dict: dict, module_name: str, module: Callable, force_overwrite: bool = False) None [source]¶
- Overview:
Register the module.
- Arguments:
module_dict (
dict
): The dict to store the module.module_name (
str
): The name of the module.module (
Callable
): The module to be registered.force_overwrite (
bool
): Whether to overwrite the module with the same name.
- build(obj_type: str, *obj_args, **obj_kwargs) object [source]¶
- Overview:
Build the object.
- Arguments:
obj_type (
str
): The type of the object.obj_args (
Tuple
): The arguments passed to the object.obj_kwargs (
Dict
): The keyword arguments passed to the object.
- get(module_name: str) Callable [source]¶
- Overview:
Get the module.
- Arguments:
module_name (
str
): The name of the module.
- query_details(aliases: Optional[Iterable] = None) collections.OrderedDict [source]¶
- Overview:
Get the details of the registered modules.
- Arguments:
aliases (
Optional[Iterable]
): The aliases of the modules.
- register(module_name: Union[None, str] = None, module: Optional[Callable] = None, force_overwrite: bool = False) Callable [source]¶
- Overview:
Register the module.
- Arguments:
module_name (
Optional[str]
): The name of the module.module (
Optional[Callable]
): The module to be registered.force_overwrite (
bool
): Whether to overwrite the module with the same name.
render_helper¶
Please refer to ding/utils/render_helper
for more details.
render_env¶
render¶
get_env_fps¶
fps¶
scheduler_helper¶
Please refer to ding/utils/scheduler_helper
for more details.
Scheduler¶
- class ding.utils.scheduler_helper.Scheduler(merged_scheduler_config: easydict.EasyDict)[source]¶
- Overview:
Update learning parameters when the trueskill metrics has stopped improving. For example, models often benefits from reducing entropy weight once the learning process stagnates. This scheduler reads a metrics quantity and if no improvement is seen for a ‘patience’ number of epochs, the corresponding parameter is increased or decreased, which decides on the ‘schedule_mode’.
- Arguments:
- schedule_flag (
bool
): Indicates whether to use scheduler in training pipeline. Default: False
- schedule_flag (
- schedule_mode (
str
): One of ‘reduce’, ‘add’,’multi’,’div’. The schecule_mode decides the way of updating the parameters. Default:’reduce’.
- schedule_mode (
- factor (
float
)Amount (greater than 0) by which the parameter will be increased/decreased. Default: 0.05
- factor (
- change_range (
list
): Indicates the minimum and maximum value the parameter can reach respectively. Default: [-1,1]
- change_range (
- threshold (
float
): Threshold for measuring the new optimum, to only focus on significant changes. Default: 1e-4.
- threshold (
- optimize_mode (
str
): One of ‘min’, ‘max’, which indicates the sign of optimization objective. Dynamic_threshold = last_metrics + threshold in max mode or last_metrics - threshold in min mode. Default: ‘min’
- optimize_mode (
- patience (
int
): Number of epochs with no improvement after which the parameter will be updated. For example, if patience = 2, then we will ignore the first 2 epochs with no improvement, and will only update the parameter after the 3rd epoch if the metrics still hasn’t improved then. Default: 10.
- patience (
- cooldown (
int
): Number of epochs to wait before resuming normal operation after the parameter has been updated. Default: 0.
- cooldown (
- Interfaces:
__init__, update_param, step
- Property:
in_cooldown, is_better
- __init__(merged_scheduler_config: easydict.EasyDict) None [source]¶
- Overview:
Initialize the scheduler.
- Arguments:
- merged_scheduler_config (
EasyDict
): the scheduler config, which merges the user config and defaul config
- merged_scheduler_config (
- config = {'change_range': [-1, 1], 'cooldown': 0, 'factor': 0.05, 'optimize_mode': 'min', 'patience': 10, 'schedule_flag': False, 'schedule_mode': 'reduce', 'threshold': 0.0001}¶
- property in_cooldown: bool¶
- Overview:
Checks whether the scheduler is in cooldown peried. If in cooldown, the scheduler will ignore any bad epochs.
- is_better(cur: float) bool [source]¶
- Overview:
Checks whether the current metrics is better than last matric with respect to threshold.
- Args:
cur (
float
): current metrics
segment_tree¶
Please refer to ding/utils/segment_tree
for more details.
njit¶
SegmentTree¶
- class ding.utils.segment_tree.SegmentTree(capacity: int, operation: Callable, neutral_element: Optional[float] = None)[source]¶
- Overview:
Segment tree data structure, implemented by the tree-like array. Only the leaf nodes are real value, non-leaf nodes are to do some operations on its left and right child.
- Interfaces:
__init__
,reduce
,__setitem__
,__getitem__
- __init__(capacity: int, operation: Callable, neutral_element: Optional[float] = None) None [source]¶
- Overview:
Initialize the segment tree. Tree’s root node is at index 1.
- Arguments:
capacity (
int
): Capacity of the tree (the number of the leaf nodes), should be the power of 2.operation (
function
): The operation function to construct the tree, e.g. sum, max, min, etc.neutral_element (
float
orNone
): The value of the neutral element, which is used to init all nodes value in the tree.
- reduce(start: int = 0, end: Optional[int] = None) float [source]¶
- Overview:
Reduce the tree in range
[start, end)
- Arguments:
start (
int
): Start index(relative index, the first leaf node is 0), default set to 0end (
int
orNone
): End index(relative index), default set toself.capacity
- Returns:
reduce_result (
float
): The reduce result value, which is dependent on data type and operation
SumSegmentTree¶
- class ding.utils.segment_tree.SumSegmentTree(capacity: int)[source]¶
- Overview:
Sum segment tree, which is inherited from
SegmentTree
. Init by passingoperation='sum'
.- Interfaces:
__init__
,find_prefixsum_idx
- __init__(capacity: int) None [source]¶
- Overview:
Init sum segment tree by passing
operation='sum'
- Arguments:
capacity (
int
): Capacity of the tree (the number of the leaf nodes).
- find_prefixsum_idx(prefixsum: float, trust_caller: bool = True) int [source]¶
- Overview:
Find the highest non-zero index i, sum_{j}leaf[j] <=
prefixsum
(where 0 <= j < i) and sum_{j}leaf[j] >prefixsum
(where 0 <= j < i+1)- Arguments:
prefixsum (
float
): The target prefixsum.- trust_caller (
bool
): Whether to trust caller, which means whether to check whether this tree’s sum is greater than the inputprefixsum
by callingreduce
function. Default set to True.
- trust_caller (
- Returns:
idx (
int
): Eligible index.
MinSegmentTree¶
_setitem¶
- ding.utils.segment_tree._setitem(tree: numpy.ndarray, idx: int, val: float, operation: str) None ¶
- Overview:
Set
tree[idx] = val
; Then update the related nodes.- Arguments:
tree (
np.ndarray
): The tree array.idx (
int
): The index of the leaf node.val (
float
): The value that will be assigned toleaf[idx]
.operation (
str
): The operation function to construct the tree, e.g. sum, max, min, etc.
_reduce¶
- ding.utils.segment_tree._reduce(tree: numpy.ndarray, start: int, end: int, neutral_element: float, operation: str) float ¶
- Overview:
Reduce the tree in range
[start, end)
- Arguments:
tree (
np.ndarray
): The tree array.start (
int
): Start index(relative index, the first leaf node is 0).end (
int
): End index(relative index).neutral_element (
float
): The value of the neutral element, which is used to init all nodes value in the tree.operation (
str
): The operation function to construct the tree, e.g. sum, max, min, etc.
_find_prefixsum_idx¶
- ding.utils.segment_tree._find_prefixsum_idx(tree: numpy.ndarray, capacity: int, prefixsum: float, neutral_element: float) int ¶
- Overview:
Find the highest non-zero index i, sum_{j}leaf[j] <=
prefixsum
(where 0 <= j < i) and sum_{j}leaf[j] >prefixsum
(where 0 <= j < i+1)- Arguments:
tree (
np.ndarray
): The tree array.capacity (
int
): Capacity of the tree (the number of the leaf nodes).prefixsum (
float
): The target prefixsum.neutral_element (
float
): The value of the neutral element, which is used to init all nodes value in the tree.
slurm_helper¶
Please refer to ding/utils/slurm_helper
for more details.
get_ip¶
get_manager_node_ip¶
get_cls_info¶
node_to_partition¶
node_to_host¶
find_free_port_slurm¶
system_helper¶
Please refer to ding/utils/system_helper
for more details.
get_ip¶
get_pid¶
get_task_uid¶
PropagatingThread¶
- class ding.utils.system_helper.PropagatingThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]¶
- Overview:
Subclass of Thread that propagates execution exception in the thread to the caller
- Interfaces:
run
,join
- Examples:
>>> def func(): >>> raise Exception() >>> t = PropagatingThread(target=func, args=()) >>> t.start() >>> t.join()
find_free_port¶
time_helper_base¶
Please refer to ding/utils/time_helper_base
for more details.
TimeWrapper¶
time_helper_cuda¶
Please refer to ding/utils/time_helper_cuda
for more details.
get_cuda_time_wrapper¶
- ding.utils.time_helper_cuda.get_cuda_time_wrapper() Callable[[], ding.utils.time_helper_base.TimeWrapper] [source]¶
- Overview:
Return the
TimeWrapperCuda
class, this wrapper aims to ensure compatibility in no cuda device- Returns:
TimeWrapperCuda(
class
): SeeTimeWrapperCuda
class
Note
Must use
torch.cuda.synchronize()
, reference: <https://blog.csdn.net/u013548568/article/details/81368019>
time_helper¶
Please refer to ding/utils/time_helper
for more details.
build_time_helper¶
- ding.utils.time_helper.build_time_helper(cfg: Optional[easydict.EasyDict] = None, wrapper_type: Optional[str] = None) Callable[[], ding.utils.time_helper_base.TimeWrapper] [source]¶
- Overview:
Build the timehelper
- Arguments:
- cfg (
dict
): The config file, which is a multilevel dict, have large domain like evaluate, common, model, train etc, and each large domain has it’s smaller domain.
- cfg (
wrapper_type (
str
): The type of wrapper returned, support['time', 'cuda']
- Returns:
- time_wrapper (
TimeWrapper
): Return the corresponding timewrapper, Reference:
ding.utils.timehelper.TimeWrapperTime
andding.utils.timehelper.get_cuda_time_wrapper
.
- time_wrapper (
EasyTimer¶
TimeWrapperTime¶
- class ding.utils.time_helper.TimeWrapperTime[source]¶
- Overview:
A class method that inherit from
TimeWrapper
class- Interfaces:
start_time
,end_time
WatchDog¶
- class ding.utils.time_helper.WatchDog(timeout: int = 1)[source]¶
- Overview:
Simple watchdog timer to detect timeouts
- Arguments:
timeout (
int
): Timeout value of thewatchdog [seconds]
.
Note
If it is not reset before exceeding this value,
TimeourError
raised.- Interfaces:
start
,stop
- Examples:
>>> watchdog = WatchDog(x) # x is a timeout value >>> ... >>> watchdog.start() >>> ... # Some function
- __init__(timeout: int = 1)[source]¶
- Overview:
Initialize watchdog with
timeout
value.- Arguments:
timeout (
int
): Timeout value of thewatchdog [seconds]
.
loader.base¶
Please refer to ding/utils/loader/base
for more details.
ILoaderClass¶
loader.collection¶
Please refer to ding/utils/loader/collection
for more details.
CollectionError¶
collection¶
tuple¶
length¶
length_is¶
contains¶
cofilter¶
tpselector¶
loader.dict¶
Please refer to ding/utils/loader/dict
for more details.
DictError¶
dict¶
loader.exception¶
Please refer to ding/utils/loader/exception
for more details.
CompositeStructureError¶
loader.mapping¶
Please refer to ding/utils/loader/mapping
for more details.
MappingError¶
mapping¶
mpfilter¶
mpkeys¶
mpvalues¶
mpitems¶
item¶
item_or¶
loader.norm¶
Please refer to ding/utils/loader/norm
for more details.
_callable_to_norm¶
norm¶
normfunc¶
_unary¶
_binary¶
_binary_reducing¶
INormClass¶
lcmp¶
loader.number¶
Please refer to ding/utils/loader/number
for more details.
numeric¶
interval¶
is_negative¶
is_positive¶
non_negative¶
non_positive¶
negative¶
positive¶
_math_binary¶
plus¶
minus¶
minus_with¶
multi¶
divide¶
divide_with¶
power¶
power_with¶
msum¶
mmulti¶
_msinglecmp¶
mcmp¶
loader.string¶
Please refer to ding/utils/loader/string
for more details.
enum¶
_to_regexp¶
rematch¶
regrep¶
loader.types¶
Please refer to ding/utils/loader/types
for more details.
is_type¶
to_type¶
is_callable¶
prop¶
method¶
fcall¶
fpartial¶
loader.utils¶
Please refer to ding/utils/loader/utils
for more details.