Shortcuts

worker.replay_buffer

replay buffer

IBuffer

class ding.worker.replay_buffer.base_buffer.IBuffer[source]
Overview:

Buffer interface

Interfaces:

default_config, push, update, sample, clear, count, state_dict, load_state_dict

abstract clear() None[source]
Overview:

Clear all the data and reset the related variables.

abstract count() int[source]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict[source]
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

abstract load_state_dict(_state_dict: Dict[str, Any]) None[source]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

abstract push(data: List[Any] | Any, cur_collector_envstep: int) None[source]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

abstract sample(batch_size: int, cur_learner_iter: int) list[source]
Overview:

Sample data with length batch_size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration.

Returns:
  • sampled_data (list): A list of data with length batch_size.

abstract state_dict() Dict[str, Any][source]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

abstract update(info: Dict[str, list]) None[source]
Overview:

Update data info, e.g. priority.

Arguments:
  • info (Dict[str, list]): Info dict. Keys depends on the specific buffer type.

NaiveReplayBuffer

class ding.worker.replay_buffer.naive_buffer.NaiveReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

Naive replay buffer, can store and sample data. An naive implementation of replay buffer with no priority or any other advanced features. This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like sample, push, clear are all mutual to each other.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

Property:

replay_buffer_size, push_count

clear() None[source]
Overview:

Clear all the data and reset the related variables.

close() None[source]
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.

count() int[source]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict) None[source]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None[source]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

    Not used in naive buffer, but preserved for compatibility.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None[source]
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

  • replace (bool): Whether sample with replacement

Returns:
  • sample_data (list): A list of data with length size.

start() None[source]
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict[source]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None[source]
Overview:

Naive Buffer does not need to update any info, but this method is preserved for compatibility.

AdvancedReplayBuffer

class ding.worker.replay_buffer.advanced_buffer.AdvancedReplayBuffer(cfg: dict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

Prioritized replay buffer derived from NaiveReplayBuffer. This replay buffer adds:

  1. Prioritized experience replay implemented by segment tree.

  2. Data quality monitor. Monitor use count and staleness of each data.

  3. Throughput monitor and control.

  4. Logger. Log 2) and 3) in tensorboard or text.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

Property:

beta, replay_buffer_size, push_count

clear() None[source]
Overview:

Clear all the data and reset the related variables.

close() None[source]
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data. Join periodic throughtput monitor, flush tensorboard logger.

count() int[source]
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict, deepcopy: bool = False) None[source]
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None[source]
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None) list | None[source]
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration, used to calculate staleness.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

Returns:
  • sample_data (list): A list of data with length size

ReturnsKeys:
  • necessary: original keys(e.g. obs, action, next_obs, reward, info), replay_unique_id, replay_buffer_idx

  • optional(if use priority): IS, priority

start() None[source]
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict[source]
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None[source]
Overview:

Update a data’s priority. Use repaly_buffer_idx to locate, and use replay_unique_id to verify.

Arguments:
  • info (dict): Info dict containing all necessary keys for priority update.

ArgumentsKeys:
  • necessary: replay_unique_id, replay_buffer_idx, priority. All values are lists with the same length.

EpisodeReplayBuffer

class ding.worker.replay_buffer.episode_buffer.EpisodeReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]
Overview:

Episode replay buffer is a buffer to store complete episodes, i.e. Each element in episode buffer is an episode. Some algorithms do not want to sample batch_size complete episodes, however, they want some transitions with some fixed length. As a result, sample should be overwritten for those requirements.

Interface:

start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config

__init__(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer') None
Overview:

Initialize the buffer

Arguments:
  • cfg (dict): Config dict.

  • tb_logger (Optional['SummaryWriter']): Outer tb logger. Usually get this argument in serial mode.

  • exp_name (Optional[str]): Name of this experiment.

  • instance_name (Optional[str]): Name of this instance.

clear() None
Overview:

Clear all the data and reset the related variables.

close() None
Overview:

Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.

count() int
Overview:

Count how many valid datas there are in the buffer.

Returns:
  • count (int): Number of valid data.

classmethod default_config() EasyDict
Overview:

Default config of this buffer class.

Returns:
  • default_config (EasyDict)

load_state_dict(_state_dict: dict) None
Overview:

Load state dict to reproduce the buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer.

push(data: List[Any] | Any, cur_collector_envstep: int) None
Overview:

Push a data into buffer.

Arguments:
  • data (Union[List[Any], Any]): The data which will be pushed into buffer. Can be one

    (in Any type), or many(int List[Any] type).

  • cur_collector_envstep (int): Collector’s current env step.

    Not used in naive buffer, but preserved for compatibility.

sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None
Overview:

Sample data with length size.

Arguments:
  • size (int): The number of the data that will be sampled.

  • cur_learner_iter (int): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.

  • sample_range (slice): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data

  • replace (bool): Whether sample with replacement

Returns:
  • sample_data (list): A list of data with length size.

start() None
Overview:

Start the buffer’s used_data_remover thread if enables track_used_data.

state_dict() dict
Overview:

Provide a state dict to keep a record of current buffer.

Returns:
  • state_dict (Dict[str, Any]): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.

update(info: dict) None
Overview:

Naive Buffer does not need to update any info, but this method is preserved for compatibility.

create_buffer

Overview:

Create a buffer according to cfg and other arguments.

Arguments:
  • cfg (EasyDict): Buffer config.

ArgumentsKeys:
  • necessary: type

get_buffer_cls

Overview:

Get a buffer class according to cfg.

Arguments:
  • cfg (EasyDict): Buffer config.

ArgumentsKeys:
  • necessary: type

utils

UsedDataRemover

class ding.worker.replay_buffer.utils.UsedDataRemover[source]
Overview:

UsedDataRemover is a tool to remove file datas that will no longer be used anymore.

Interface:

start, close, add_used_data

add_used_data(data: Any) None[source]
Overview:

Delete all datas in self._used_data. Then join the delete_used_data thread.

Arguments:
  • data (Any): Add a used data item into self._used_data for further remove.

close() None[source]
Overview:

Delete all datas in self._used_data. Then join the delete_used_data thread.

start() None[source]
Overview:

Start the delete_used_data thread.

SampledDataAttrMonitor

class ding.worker.replay_buffer.utils.SampledDataAttrMonitor(time_: BaseTime, expire: int | float)[source]
Overview:

SampledDataAttrMonitor is to monitor read-out indicators for expire times recent read-outs. Indicators include: read out time; average and max of read out data items’ use; average, max and min of read out data items’ priorityl; average and max of staleness.

Interface:

__init__, fixed_time, current_time, freeze, unfreeze, register_attribute_value, __getattr__

Property:

time, expire

PeriodicThruputMonitor

class ding.worker.replay_buffer.utils.PeriodicThruputMonitor(name, cfg, logger, tb_logger)[source]
Overview:

PeriodicThruputMonitor is a tool to record and print logs(text & tensorboard) how many datas are pushed/sampled/removed/valid in a period of time. For tensorboard, you can view it in ‘buffer_{$NAME}_sec’.

Interface:

close

Property:

push_data_count, sample_data_count, remove_data_count, valid_count

Note

thruput_log thread is initialized and started in __init__ method, so PeriodicThruputMonitor only provide one signle interface close