worker.replay_buffer¶
replay buffer¶
IBuffer¶
- class ding.worker.replay_buffer.base_buffer.IBuffer[source]¶
- Overview:
Buffer interface
- Interfaces:
default_config, push, update, sample, clear, count, state_dict, load_state_dict
- abstract count() int [source]¶
- Overview:
Count how many valid datas there are in the buffer.
- Returns:
count (
int
): Number of valid data.
- classmethod default_config() EasyDict [source]¶
- Overview:
Default config of this buffer class.
- Returns:
default_config (
EasyDict
)
- abstract load_state_dict(_state_dict: Dict[str, Any]) None [source]¶
- Overview:
Load state dict to reproduce the buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer.
- abstract push(data: List[Any] | Any, cur_collector_envstep: int) None [source]¶
- Overview:
Push a data into buffer.
- Arguments:
- data (
Union[List[Any], Any]
): The data which will be pushed into buffer. Can be one (in Any type), or many(int List[Any] type).
- data (
cur_collector_envstep (
int
): Collector’s current env step.
- abstract sample(batch_size: int, cur_learner_iter: int) list [source]¶
- Overview:
Sample data with length
batch_size
.- Arguments:
size (
int
): The number of the data that will be sampled.cur_learner_iter (
int
): Learner’s current iteration.
- Returns:
sampled_data (
list
): A list of data with length batch_size.
NaiveReplayBuffer¶
- class ding.worker.replay_buffer.naive_buffer.NaiveReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]¶
- Overview:
Naive replay buffer, can store and sample data. An naive implementation of replay buffer with no priority or any other advanced features. This buffer refers to multi-thread/multi-process and guarantees thread-safe, which means that methods like
sample
,push
,clear
are all mutual to each other.- Interface:
start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config
- Property:
replay_buffer_size, push_count
- close() None [source]¶
- Overview:
Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.
- count() int [source]¶
- Overview:
Count how many valid datas there are in the buffer.
- Returns:
count (
int
): Number of valid data.
- classmethod default_config() EasyDict ¶
- Overview:
Default config of this buffer class.
- Returns:
default_config (
EasyDict
)
- load_state_dict(_state_dict: dict) None [source]¶
- Overview:
Load state dict to reproduce the buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer.
- push(data: List[Any] | Any, cur_collector_envstep: int) None [source]¶
- Overview:
Push a data into buffer.
- Arguments:
- data (
Union[List[Any], Any]
): The data which will be pushed into buffer. Can be one (in Any type), or many(int List[Any] type).
- data (
- cur_collector_envstep (
int
): Collector’s current env step. Not used in naive buffer, but preserved for compatibility.
- cur_collector_envstep (
- sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None [source]¶
- Overview:
Sample data with length
size
.- Arguments:
size (
int
): The number of the data that will be sampled.cur_learner_iter (
int
): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.sample_range (
slice
): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 datareplace (
bool
): Whether sample with replacement
- Returns:
sample_data (
list
): A list of data with lengthsize
.
- start() None [source]¶
- Overview:
Start the buffer’s used_data_remover thread if enables track_used_data.
AdvancedReplayBuffer¶
- class ding.worker.replay_buffer.advanced_buffer.AdvancedReplayBuffer(cfg: dict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]¶
- Overview:
Prioritized replay buffer derived from
NaiveReplayBuffer
. This replay buffer adds:Prioritized experience replay implemented by segment tree.
Data quality monitor. Monitor use count and staleness of each data.
Throughput monitor and control.
Logger. Log 2) and 3) in tensorboard or text.
- Interface:
start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config
- Property:
beta, replay_buffer_size, push_count
- close() None [source]¶
- Overview:
Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data. Join periodic throughtput monitor, flush tensorboard logger.
- count() int [source]¶
- Overview:
Count how many valid datas there are in the buffer.
- Returns:
count (
int
): Number of valid data.
- classmethod default_config() EasyDict ¶
- Overview:
Default config of this buffer class.
- Returns:
default_config (
EasyDict
)
- load_state_dict(_state_dict: dict, deepcopy: bool = False) None [source]¶
- Overview:
Load state dict to reproduce the buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer.
- push(data: List[Any] | Any, cur_collector_envstep: int) None [source]¶
- Overview:
Push a data into buffer.
- Arguments:
- data (
Union[List[Any], Any]
): The data which will be pushed into buffer. Can be one (in Any type), or many(int List[Any] type).
- data (
cur_collector_envstep (
int
): Collector’s current env step.
- sample(size: int, cur_learner_iter: int, sample_range: slice | None = None) list | None [source]¶
- Overview:
Sample data with length
size
.- Arguments:
size (
int
): The number of the data that will be sampled.cur_learner_iter (
int
): Learner’s current iteration, used to calculate staleness.sample_range (
slice
): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 data
- Returns:
sample_data (
list
): A list of data with lengthsize
- ReturnsKeys:
necessary: original keys(e.g. obs, action, next_obs, reward, info), replay_unique_id, replay_buffer_idx
optional(if use priority): IS, priority
- start() None [source]¶
- Overview:
Start the buffer’s used_data_remover thread if enables track_used_data.
- state_dict() dict [source]¶
- Overview:
Provide a state dict to keep a record of current buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.
- update(info: dict) None [source]¶
- Overview:
Update a data’s priority. Use repaly_buffer_idx to locate, and use replay_unique_id to verify.
- Arguments:
info (
dict
): Info dict containing all necessary keys for priority update.
- ArgumentsKeys:
necessary: replay_unique_id, replay_buffer_idx, priority. All values are lists with the same length.
EpisodeReplayBuffer¶
- class ding.worker.replay_buffer.episode_buffer.EpisodeReplayBuffer(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer')[source]¶
- Overview:
Episode replay buffer is a buffer to store complete episodes, i.e. Each element in episode buffer is an episode. Some algorithms do not want to sample batch_size complete episodes, however, they want some transitions with some fixed length. As a result,
sample
should be overwritten for those requirements.- Interface:
start, close, push, update, sample, clear, count, state_dict, load_state_dict, default_config
- __init__(cfg: EasyDict, tb_logger: SummaryWriter | None = None, exp_name: str | None = 'default_experiment', instance_name: str | None = 'buffer') None ¶
- Overview:
Initialize the buffer
- Arguments:
cfg (
dict
): Config dict.tb_logger (
Optional['SummaryWriter']
): Outer tb logger. Usually get this argument in serial mode.exp_name (
Optional[str]
): Name of this experiment.instance_name (
Optional[str]
): Name of this instance.
- clear() None ¶
- Overview:
Clear all the data and reset the related variables.
- close() None ¶
- Overview:
Clear the buffer; Join the buffer’s used_data_remover thread if enables track_used_data.
- count() int ¶
- Overview:
Count how many valid datas there are in the buffer.
- Returns:
count (
int
): Number of valid data.
- classmethod default_config() EasyDict ¶
- Overview:
Default config of this buffer class.
- Returns:
default_config (
EasyDict
)
- load_state_dict(_state_dict: dict) None ¶
- Overview:
Load state dict to reproduce the buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer.
- push(data: List[Any] | Any, cur_collector_envstep: int) None ¶
- Overview:
Push a data into buffer.
- Arguments:
- data (
Union[List[Any], Any]
): The data which will be pushed into buffer. Can be one (in Any type), or many(int List[Any] type).
- data (
- cur_collector_envstep (
int
): Collector’s current env step. Not used in naive buffer, but preserved for compatibility.
- cur_collector_envstep (
- sample(size: int, cur_learner_iter: int, sample_range: slice | None = None, replace: bool = False) list | None ¶
- Overview:
Sample data with length
size
.- Arguments:
size (
int
): The number of the data that will be sampled.cur_learner_iter (
int
): Learner’s current iteration. Not used in naive buffer, but preserved for compatibility.sample_range (
slice
): Buffer slice for sampling, such as slice(-10, None), which means only sample among the last 10 datareplace (
bool
): Whether sample with replacement
- Returns:
sample_data (
list
): A list of data with lengthsize
.
- start() None ¶
- Overview:
Start the buffer’s used_data_remover thread if enables track_used_data.
- state_dict() dict ¶
- Overview:
Provide a state dict to keep a record of current buffer.
- Returns:
state_dict (
Dict[str, Any]
): A dict containing all important values in the buffer. With the dict, one can easily reproduce the buffer.
- update(info: dict) None ¶
- Overview:
Naive Buffer does not need to update any info, but this method is preserved for compatibility.
create_buffer¶
- Overview:
Create a buffer according to cfg and other arguments.
- Arguments:
cfg (
EasyDict
): Buffer config.
- ArgumentsKeys:
necessary: type
get_buffer_cls¶
- Overview:
Get a buffer class according to cfg.
- Arguments:
cfg (
EasyDict
): Buffer config.
- ArgumentsKeys:
necessary: type
utils¶
UsedDataRemover¶
- class ding.worker.replay_buffer.utils.UsedDataRemover[source]¶
- Overview:
UsedDataRemover is a tool to remove file datas that will no longer be used anymore.
- Interface:
start, close, add_used_data
- add_used_data(data: Any) None [source]¶
- Overview:
Delete all datas in self._used_data. Then join the delete_used_data thread.
- Arguments:
data (
Any
): Add a used data item into self._used_data for further remove.
SampledDataAttrMonitor¶
- class ding.worker.replay_buffer.utils.SampledDataAttrMonitor(time_: BaseTime, expire: int | float)[source]¶
- Overview:
SampledDataAttrMonitor is to monitor read-out indicators for
expire
times recent read-outs. Indicators include: read out time; average and max of read out data items’ use; average, max and min of read out data items’ priorityl; average and max of staleness.- Interface:
__init__, fixed_time, current_time, freeze, unfreeze, register_attribute_value, __getattr__
- Property:
time, expire
PeriodicThruputMonitor¶
- class ding.worker.replay_buffer.utils.PeriodicThruputMonitor(name, cfg, logger, tb_logger)[source]¶
- Overview:
PeriodicThruputMonitor is a tool to record and print logs(text & tensorboard) how many datas are pushed/sampled/removed/valid in a period of time. For tensorboard, you can view it in ‘buffer_{$NAME}_sec’.
- Interface:
close
- Property:
push_data_count, sample_data_count, remove_data_count, valid_count
Note
thruput_log thread is initialized and started in __init__ method, so PeriodicThruputMonitor only provide one signle interface close