Source code for ding.reward_model.guided_cost_reward_model
from typing import List, Dict, Any
from easydict import EasyDict
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Independent, Normal
from ding.utils import REWARD_MODEL_REGISTRY
from ding.utils.data import default_collate
from .base_reward_model import BaseRewardModel
class GuidedCostNN(nn.Module):
def __init__(
self,
input_size,
hidden_size=128,
output_size=1,
):
super(GuidedCostNN, self).__init__()
self.net = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size),
)
def forward(self, x):
return self.net(x)
[docs]@REWARD_MODEL_REGISTRY.register('guided_cost')
class GuidedCostRewardModel(BaseRewardModel):
"""
Overview:
Policy class of Guided cost algorithm. (https://arxiv.org/pdf/1603.00448.pdf)
Interface:
``estimate``, ``train``, ``collect_data``, ``clear_date``, \
``__init__``, ``state_dict``, ``load_state_dict``, ``learn``\
``state_dict_reward_model``, ``load_state_dict_reward_model``
Config:
== ==================== ======== ============= ======================================== ================
ID Symbol Type Default Value Description Other(Shape)
== ==================== ======== ============= ======================================== ================
1 ``type`` str guided_cost | Reward model register name, refer |
| to registry ``REWARD_MODEL_REGISTRY`` |
2 | ``continuous`` bool True | Whether action is continuous |
3 | ``learning_rate`` float 0.001 | learning rate for optimizer |
4 | ``update_per_`` int 100 | Number of updates per collect |
| ``collect`` | |
5 | ``batch_size`` int 64 | Training batch size |
6 | ``hidden_size`` int 128 | Linear model hidden size |
7 | ``action_shape`` int 1 | Action space shape |
8 | ``log_every_n`` int 50 | add loss to log every n iteration |
| ``_train`` | |
9 | ``store_model_`` int 100 | save model every n iteration |
| ``every_n_train`` |
== ==================== ======== ============= ======================================== ================
"""
config = dict(
# (str) Reward model register name, refer to registry ``REWARD_MODEL_REGISTRY``.
type='guided_cost',
# (float) The step size of gradient descent.
learning_rate=1e-3,
# (int) Action space shape, such as 1.
action_shape=1,
# (bool) Whether action is continuous.
continuous=True,
# (int) How many samples in a training batch.
batch_size=64,
# (int) Linear model hidden size.
hidden_size=128,
# (int) How many updates(iterations) to train after collector's one collection.
# Bigger "update_per_collect" means bigger off-policy.
# collect data -> update policy-> collect data -> ...
update_per_collect=100,
# (int) Add loss to log every n iteration.
log_every_n_train=50,
# (int) Save model every n iteration.
store_model_every_n_train=100,
)
def __init__(self, config: EasyDict, device: str, tb_logger: 'SummaryWriter') -> None: # noqa
super(GuidedCostRewardModel, self).__init__()
self.cfg = config
self.action_shape = self.cfg.action_shape
assert device == "cpu" or device.startswith("cuda")
self.device = device
self.tb_logger = tb_logger
self.reward_model = GuidedCostNN(config.input_size, config.hidden_size)
self.reward_model.to(self.device)
self.opt = optim.Adam(self.reward_model.parameters(), lr=config.learning_rate)
[docs] def train(self, expert_demo: torch.Tensor, samp: torch.Tensor, iter, step):
device_0 = expert_demo[0]['obs'].device
device_1 = samp[0]['obs'].device
for i in range(len(expert_demo)):
expert_demo[i]['prob'] = torch.FloatTensor([1]).to(device_0)
if self.cfg.continuous:
for i in range(len(samp)):
(mu, sigma) = samp[i]['logit']
dist = Independent(Normal(mu, sigma), 1)
next_action = samp[i]['action']
log_prob = dist.log_prob(next_action)
samp[i]['prob'] = torch.exp(log_prob).unsqueeze(0).to(device_1)
else:
for i in range(len(samp)):
probs = F.softmax(samp[i]['logit'], dim=-1)
prob = probs[samp[i]['action']]
samp[i]['prob'] = prob.to(device_1)
# Mix the expert data and sample data to train the reward model.
samp.extend(expert_demo)
expert_demo = default_collate(expert_demo)
samp = default_collate(samp)
cost_demo = self.reward_model(
torch.cat([expert_demo['obs'], expert_demo['action'].float().reshape(-1, self.action_shape)], dim=-1)
)
cost_samp = self.reward_model(
torch.cat([samp['obs'], samp['action'].float().reshape(-1, self.action_shape)], dim=-1)
)
prob = samp['prob'].unsqueeze(-1)
loss_IOC = torch.mean(cost_demo) + \
torch.log(torch.mean(torch.exp(-cost_samp)/(prob+1e-7)))
# UPDATING THE COST FUNCTION
self.opt.zero_grad()
loss_IOC.backward()
self.opt.step()
if iter % self.cfg.log_every_n_train == 0:
self.tb_logger.add_scalar('reward_model/loss_iter', loss_IOC, iter)
self.tb_logger.add_scalar('reward_model/loss_step', loss_IOC, step)
[docs] def estimate(self, data: list) -> List[Dict]:
# NOTE: this estimate method of gcl alg. is a little different from the one in other irl alg.,
# because its deepcopy is operated before learner train loop.
train_data_augmented = data
for i in range(len(train_data_augmented)):
with torch.no_grad():
reward = self.reward_model(
torch.cat([train_data_augmented[i]['obs'], train_data_augmented[i]['action'].float()]).unsqueeze(0)
).squeeze(0)
train_data_augmented[i]['reward'] = -reward
return train_data_augmented
def collect_data(self, data) -> None:
"""
Overview:
Collecting training data, not implemented if reward model (i.e. online_net) is only trained ones, \
if online_net is trained continuously, there should be some implementations in collect_data method
"""
# if online_net is trained continuously, there should be some implementations in collect_data method
pass
def clear_data(self):
"""
Overview:
Collecting clearing data, not implemented if reward model (i.e. online_net) is only trained ones, \
if online_net is trained continuously, there should be some implementations in clear_data method
"""
# if online_net is trained continuously, there should be some implementations in clear_data method
pass
def state_dict_reward_model(self) -> Dict[str, Any]:
return {
'model': self.reward_model.state_dict(),
'optimizer': self.opt.state_dict(),
}
def load_state_dict_reward_model(self, state_dict: Dict[str, Any]) -> None:
self.reward_model.load_state_dict(state_dict['model'])
self.opt.load_state_dict(state_dict['optimizer'])