Source code for ding.policy.plan_diffuser
from typing import List, Dict, Any, Optional, Tuple, Union
from collections import namedtuple, defaultdict
import copy
import numpy as np
import torch
import torch.nn.functional as F
from torch.distributions import Normal, Independent
from ding.torch_utils import Adam, to_device
from ding.rl_utils import v_1step_td_data, v_1step_td_error, get_train_sample, \
qrdqn_nstep_td_data, qrdqn_nstep_td_error, get_nstep_return_data
from ding.policy import Policy
from ding.model import model_wrap
from ding.utils import POLICY_REGISTRY, DatasetNormalizer
from ding.utils.data import default_collate, default_decollate
from .common_utils import default_preprocess_learn
[docs]@POLICY_REGISTRY.register('pd')
class PDPolicy(Policy):
r"""
Overview:
Implicit Plan Diffuser
https://arxiv.org/pdf/2205.09991.pdf
"""
config = dict(
type='pd',
# (bool) Whether to use cuda for network.
cuda=False,
# (bool type) priority: Determine whether to use priority in buffer sample.
# Default False in SAC.
priority=False,
# (bool) Whether use Importance Sampling Weight to correct biased update. If True, priority must be True.
priority_IS_weight=False,
# (int) Number of training samples(randomly collected) in replay buffer when training starts.
# Default 10000 in SAC.
random_collect_size=10000,
nstep=1,
# normalizer type
normalizer='GaussianNormalizer',
model=dict(
diffuser_model='GaussianDiffusion',
diffuser_model_cfg=dict(
# the type of model
model='TemporalUnet',
# config of model
model_cfg=dict(
# model dim, In GaussianInvDynDiffusion, it is obs_dim. In others, it is obs_dim + action_dim
transition_dim=23,
dim=32,
dim_mults=[1, 2, 4, 8],
# whether use return as a condition
returns_condition=False,
condition_dropout=0.1,
# whether use calc energy
calc_energy=False,
kernel_size=5,
# whether use attention
attention=False,
),
# horizon of tarjectory which generated by model
horizon=80,
# timesteps of diffusion
n_timesteps=1000,
# hidden dim of action model
# Whether predict epsilon
predict_epsilon=True,
# discount of loss
loss_discount=1.0,
# whether clip denoise
clip_denoised=False,
action_weight=10,
),
value_model='ValueDiffusion',
value_model_cfg=dict(
# the type of model
model='TemporalValue',
# config of model
model_cfg=dict(
horizon=4,
# model dim, In GaussianInvDynDiffusion, it is obs_dim. In others, it is obs_dim + action_dim
transition_dim=23,
dim=32,
dim_mults=[1, 2, 4, 8],
# whether use calc energy
kernel_size=5,
),
# horizon of tarjectory which generated by model
horizon=80,
# timesteps of diffusion
n_timesteps=1000,
# hidden dim of action model
predict_epsilon=True,
# discount of loss
loss_discount=1.0,
# whether clip denoise
clip_denoised=False,
action_weight=1.0,
),
# guide_steps for p sample
n_guide_steps=2,
# scale of grad for p sample
scale=0.1,
# t of stopgrad for p sample
t_stopgrad=2,
# whether use std as a scale for grad
scale_grad_by_std=True,
),
learn=dict(
# How many updates(iterations) to train after collector's one collection.
# Bigger "update_per_collect" means bigger off-policy.
# collect data -> update policy-> collect data -> ...
update_per_collect=1,
# (int) Minibatch size for gradient descent.
batch_size=100,
# (float type) learning_rate_q: Learning rate for model.
# Default to 3e-4.
# Please set to 1e-3, when model.value_network is True.
learning_rate=3e-4,
# (bool) Whether ignore done(usually for max step termination env. e.g. pendulum)
# Note: Gym wraps the MuJoCo envs by default with TimeLimit environment wrappers.
# These limit HalfCheetah, and several other MuJoCo envs, to max length of 1000.
# However, interaction with HalfCheetah always gets done with done is False,
# Since we inplace done==True with done==False to keep
# TD-error accurate computation(``gamma * (1 - done) * next_v + reward``),
# when the episode step is greater than max episode step.
ignore_done=False,
# (float type) target_theta: Used for soft update of the target network,
# aka. Interpolation factor in polyak averaging for target networks.
# Default to 0.005.
target_theta=0.005,
# (float) discount factor for the discounted sum of rewards, aka. gamma.
discount_factor=0.99,
gradient_accumulate_every=2,
# train_epoch = train_epoch * gradient_accumulate_every
train_epoch=60000,
# batch_size of every env when eval
plan_batch_size=64,
# step start update target model and frequence
step_start_update_target=2000,
update_target_freq=10,
# update weight of target net
target_weight=0.995,
value_step=200e3,
# dataset weight include returns
include_returns=True,
# (float) Weight uniform initialization range in the last output layer
init_w=3e-3,
),
)
def default_model(self) -> Tuple[str, List[str]]:
return 'pd', ['ding.model.template.diffusion']
def _init_learn(self) -> None:
r"""
Overview:
Learn mode init method. Called by ``self.__init__``.
Init q, value and policy's optimizers, algorithm config, main and target models.
"""
# Init
self._priority = self._cfg.priority
self._priority_IS_weight = self._cfg.priority_IS_weight
self.action_dim = self._cfg.model.diffuser_model_cfg.action_dim
self.obs_dim = self._cfg.model.diffuser_model_cfg.obs_dim
self.n_timesteps = self._cfg.model.diffuser_model_cfg.n_timesteps
self.gradient_accumulate_every = self._cfg.learn.gradient_accumulate_every
self.plan_batch_size = self._cfg.learn.plan_batch_size
self.gradient_steps = 1
self.update_target_freq = self._cfg.learn.update_target_freq
self.step_start_update_target = self._cfg.learn.step_start_update_target
self.target_weight = self._cfg.learn.target_weight
self.value_step = self._cfg.learn.value_step
self.use_target = False
self.horizon = self._cfg.model.diffuser_model_cfg.horizon
self.include_returns = self._cfg.learn.include_returns
# Optimizers
self._plan_optimizer = Adam(
self._model.diffuser.model.parameters(),
lr=self._cfg.learn.learning_rate,
)
if self._model.value:
self._value_optimizer = Adam(
self._model.value.model.parameters(),
lr=self._cfg.learn.learning_rate,
)
# Algorithm config
self._gamma = self._cfg.learn.discount_factor
# Main and target models
self._target_model = copy.deepcopy(self._model)
# self._target_model = model_wrap(
# self._target_model,
# wrapper_name='target',
# update_type='momentum',
# update_kwargs={'theta': self._cfg.learn.target_theta}
# )
self._learn_model = model_wrap(self._model, wrapper_name='base')
self._learn_model.reset()
# self._target_model.reset()
self._forward_learn_cnt = 0
def _forward_learn(self, data: dict) -> Dict[str, Any]:
loss_dict = {}
data = default_preprocess_learn(
data,
use_priority=self._priority,
use_priority_IS_weight=self._cfg.priority_IS_weight,
ignore_done=self._cfg.learn.ignore_done,
use_nstep=False
)
conds = {}
vals = data['condition_val']
ids = data['condition_id']
for i in range(len(ids)):
conds[ids[i][0].item()] = vals[i]
if len(ids) > 1:
self.use_target = True
data['conditions'] = conds
if 'returns' in data.keys():
data['returns'] = data['returns'].unsqueeze(-1)
if self._cuda:
data = to_device(data, self._device)
self._learn_model.train()
# self._target_model.train()
x = data['trajectories']
batch_size = len(x)
t = torch.randint(0, self.n_timesteps, (batch_size, ), device=x.device).long()
cond = data['conditions']
if 'returns' in data.keys():
target = data['returns']
loss_dict['diffuse_loss'], loss_dict['a0_loss'] = self._model.diffuser_loss(x, cond, t)
loss_dict['diffuse_loss'] = loss_dict['diffuse_loss'] / self.gradient_accumulate_every
loss_dict['diffuse_loss'].backward()
if self._forward_learn_cnt < self.value_step and self._model.value:
loss_dict['value_loss'], logs = self._model.value_loss(x, cond, target, t)
loss_dict['value_loss'] = loss_dict['value_loss'] / self.gradient_accumulate_every
loss_dict['value_loss'].backward()
loss_dict.update(logs)
if self.gradient_steps >= self.gradient_accumulate_every:
self._plan_optimizer.step()
self._plan_optimizer.zero_grad()
if self._forward_learn_cnt < self.value_step and self._model.value:
self._value_optimizer.step()
self._value_optimizer.zero_grad()
self.gradient_steps = 1
else:
self.gradient_steps += 1
self._forward_learn_cnt += 1
if self._forward_learn_cnt % self.update_target_freq == 0:
if self._forward_learn_cnt < self.step_start_update_target:
self._target_model.load_state_dict(self._model.state_dict())
else:
self.update_model_average(self._target_model, self._learn_model)
if 'returns' in data.keys():
loss_dict['max_return'] = target.max().item()
loss_dict['min_return'] = target.min().item()
loss_dict['mean_return'] = target.mean().item()
loss_dict['max_traj'] = x.max().item()
loss_dict['min_traj'] = x.min().item()
loss_dict['mean_traj'] = x.mean().item()
return loss_dict
def update_model_average(self, ma_model, current_model):
for current_params, ma_params in zip(current_model.parameters(), ma_model.parameters()):
old_weight, up_weight = ma_params.data, current_params.data
if old_weight is None:
ma_params.data = up_weight
else:
old_weight * self.target_weight + (1 - self.target_weight) * up_weight
def _monitor_vars_learn(self) -> List[str]:
return [
'diffuse_loss',
'value_loss',
'max_return',
'min_return',
'mean_return',
'max_traj',
'min_traj',
'mean_traj',
'mean_pred',
'max_pred',
'min_pred',
'a0_loss',
]
def _state_dict_learn(self) -> Dict[str, Any]:
if self._model.value:
return {
'model': self._learn_model.state_dict(),
'target_model': self._target_model.state_dict(),
'plan_optimizer': self._plan_optimizer.state_dict(),
'value_optimizer': self._value_optimizer.state_dict(),
}
else:
return {
'model': self._learn_model.state_dict(),
'target_model': self._target_model.state_dict(),
'plan_optimizer': self._plan_optimizer.state_dict(),
}
def _init_eval(self):
self._eval_model = model_wrap(self._target_model, wrapper_name='base')
self._eval_model.reset()
if self.use_target:
self._plan_seq = []
def init_data_normalizer(self, normalizer: DatasetNormalizer = None):
self.normalizer = normalizer
def _forward_eval(self, data: dict) -> Dict[str, Any]:
data_id = list(data.keys())
data = default_collate(list(data.values()))
self._eval_model.eval()
if self.use_target:
cur_obs = self.normalizer.normalize(data[:, :self.obs_dim], 'observations')
target_obs = self.normalizer.normalize(data[:, self.obs_dim:], 'observations')
else:
obs = self.normalizer.normalize(data, 'observations')
with torch.no_grad():
if self.use_target:
cur_obs = torch.tensor(cur_obs)
target_obs = torch.tensor(target_obs)
if self._cuda:
cur_obs = to_device(cur_obs, self._device)
target_obs = to_device(target_obs, self._device)
conditions = {0: cur_obs, self.horizon - 1: target_obs}
else:
obs = torch.tensor(obs)
if self._cuda:
obs = to_device(obs, self._device)
conditions = {0: obs}
if self.use_target:
if self._plan_seq == [] or 0 in self._eval_t:
plan_traj = self._eval_model.get_eval(conditions, self.plan_batch_size)
plan_traj = to_device(plan_traj, 'cpu').numpy()
if self._plan_seq == []:
self._plan_seq = plan_traj
self._eval_t = [0] * len(data_id)
else:
for id in data_id:
if self._eval_t[id] == 0:
self._plan_seq[id] = plan_traj[id]
action = []
for id in data_id:
if self._eval_t[id] < len(self._plan_seq[id]) - 1:
next_waypoint = self._plan_seq[id][self._eval_t[id] + 1]
else:
next_waypoint = self._plan_seq[id][-1].copy()
next_waypoint[2:] = 0
cur_ob = cur_obs[id]
cur_ob = to_device(cur_ob, 'cpu').numpy()
act = next_waypoint[:2] - cur_ob[:2] + (next_waypoint[2:] - cur_ob[2:])
action.append(act)
self._eval_t[id] += 1
else:
action = self._eval_model.get_eval(conditions, self.plan_batch_size)
if self._cuda:
action = to_device(action, 'cpu')
action = self.normalizer.unnormalize(action, 'actions')
action = torch.tensor(action).to('cpu')
output = {'action': action}
output = default_decollate(output)
return {i: d for i, d in zip(data_id, output)}
def _reset_eval(self, data_id: Optional[List[int]] = None) -> None:
if self.use_target and data_id:
for id in data_id:
self._eval_t[id] = 0
def _init_collect(self) -> None:
pass
def _forward_collect(self, data: dict, **kwargs) -> dict:
pass
def _process_transition(self, obs: Any, model_output: dict, timestep: namedtuple) -> dict:
pass
def _get_train_sample(self, data: list) -> Union[None, List[Any]]:
pass