Shortcuts

Source code for grl.datasets.qgpo

#############################################################
# This QGPOD4RLDataset is a modification implementation from https://github.com/ChenDRAG/CEP-energy-guided-diffusion
#############################################################

from abc import abstractmethod
from typing import List

import gym
import numpy as np
import torch
from tensordict import TensorDict

from torchrl.data import LazyTensorStorage,LazyMemmapStorage
from grl.utils.log import log


[docs]class QGPODataset(torch.utils.data.Dataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """
[docs] def __init__(self): """ Overview: Initialization method of QGPOD4RLDataset class """ pass
def __getitem__(self, index): """ Overview: Get data by index Arguments: index (:obj:`int`): Index of data Returns: data (:obj:`dict`): Data dict .. note:: The data dict contains the following keys: s (:obj:`torch.Tensor`): State a (:obj:`torch.Tensor`): Action r (:obj:`torch.Tensor`): Reward s_ (:obj:`torch.Tensor`): Next state d (:obj:`torch.Tensor`): Is finished fake_a (:obj:`torch.Tensor`): Fake action for contrastive energy prediction and qgpo training \ (fake action is sampled from the action support generated by the behaviour policy) fake_a_ (:obj:`torch.Tensor`): Fake next action for contrastive energy prediction and qgpo training \ (fake action is sampled from the action support generated by the behaviour policy) """ data = { "s": self.states[index % self.len], "a": self.actions[index % self.len], "r": self.rewards[index % self.len], "s_": self.next_states[index % self.len], "d": self.is_finished[index % self.len], "fake_a": ( self.fake_actions[index % self.len] if hasattr(self, "fake_actions") else 0.0 ), # self.fake_actions <D, 16, A> "fake_a_": ( self.fake_next_actions[index % self.len] if hasattr(self, "fake_next_actions") else 0.0 ), # self.fake_next_actions <D, 16, A> } return data def __len__(self): return self.len def load_fake_actions(self, fake_actions, fake_next_actions): self.fake_actions = fake_actions self.fake_next_actions = fake_next_actions @abstractmethod def return_range(self, dataset, max_episode_steps): raise NotImplementedError
class QGPOTensorDictDataset(torch.utils.data.Dataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__(self): """ Overview: Initialization method of QGPOD4RLDataset class """ pass def __getitem__(self, index): """ Overview: Get data by index Arguments: index (:obj:`int`): Index of data Returns: data (:obj:`dict`): Data dict .. note:: The data dict contains the following keys: s (:obj:`torch.Tensor`): State a (:obj:`torch.Tensor`): Action r (:obj:`torch.Tensor`): Reward s_ (:obj:`torch.Tensor`): Next state d (:obj:`torch.Tensor`): Is finished fake_a (:obj:`torch.Tensor`): Fake action for contrastive energy prediction and qgpo training \ (fake action is sampled from the action support generated by the behaviour policy) fake_a_ (:obj:`torch.Tensor`): Fake next action for contrastive energy prediction and qgpo training \ (fake action is sampled from the action support generated by the behaviour policy) """ data = self.storage.get(index=index) return data def __len__(self): return self.len def load_fake_actions(self, fake_actions, fake_next_actions): self.fake_actions = fake_actions self.fake_next_actions = fake_next_actions self.storage.set( range(self.len), TensorDict( { "s": self.states, "a": self.actions, "r": self.rewards, "s_": self.next_states, "d": self.is_finished, "fake_a": self.fake_actions, "fake_a_": self.fake_next_actions, }, batch_size=[self.len], ) ) @abstractmethod def return_range(self, dataset, max_episode_steps): raise NotImplementedError
[docs]class QGPOD4RLDataset(QGPODataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """
[docs] def __init__( self, env_id: str, ): """ Overview: Initialization method of QGPOD4RLDataset class Arguments: env_id (:obj:`str`): The environment id """ super().__init__() import d4rl data = d4rl.qlearning_dataset(gym.make(env_id)) self.states = torch.from_numpy(data["observations"]).float() self.actions = torch.from_numpy(data["actions"]).float() self.next_states = torch.from_numpy(data["next_observations"]).float() reward = torch.from_numpy(data["rewards"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["terminals"]).view(-1, 1).float() reward_tune = "iql_antmaze" if "antmaze" in env_id else "iql_locomotion" if reward_tune == "normalize": reward = (reward - reward.mean()) / reward.std() elif reward_tune == "iql_antmaze": reward = reward - 1.0 elif reward_tune == "iql_locomotion": min_ret, max_ret = QGPOD4RLDataset.return_range(data, 1000) reward /= max_ret - min_ret reward *= 1000 elif reward_tune == "cql_antmaze": reward = (reward - 0.5) * 4.0 elif reward_tune == "antmaze": reward = (reward - 0.25) * 2.0 self.rewards = reward self.len = self.states.shape[0] log.info(f"{self.len} data loaded in QGPOD4RLDataset")
def return_range(dataset, max_episode_steps): returns, lengths = [], [] ep_ret, ep_len = 0.0, 0 for r, d in zip(dataset["rewards"], dataset["terminals"]): ep_ret += float(r) ep_len += 1 if d or ep_len == max_episode_steps: returns.append(ep_ret) lengths.append(ep_len) ep_ret, ep_len = 0.0, 0 # returns.append(ep_ret) # incomplete trajectory lengths.append(ep_len) # but still keep track of number of steps assert sum(lengths) == len(dataset["rewards"]) return min(returns), max(returns)
class QGPOOnlineDataset(QGPODataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__( self, fake_action_shape: int = None, data: List = None, ): """ Overview: Initialization method of QGPOD4RLDataset class Arguments: data (:obj:`List`): The data list """ super().__init__() self.fake_action_shape = fake_action_shape if data is not None: self.states = torch.from_numpy(data["observations"]).float() self.actions = torch.from_numpy(data["actions"]).float() self.next_states = torch.from_numpy(data["next_observations"]).float() reward = torch.from_numpy(data["rewards"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["terminals"]).view(-1, 1).float() self.rewards = reward # self.fake_actions = torch.zeros_like(self.actions.unsqueeze(1).expand(-1, fake_action_shape, -1)) # self.fake_next_actions = torch.zeros_like(self.actions.unsqueeze(1).expand(-1, fake_action_shape, -1)) self.len = self.states.shape[0] else: self.states = torch.tensor([]) self.actions = torch.tensor([]) self.next_states = torch.tensor([]) self.is_finished = torch.tensor([]) self.rewards = torch.tensor([]) # self.fake_actions = torch.tensor([]) # self.fake_next_actions = torch.tensor([]) self.len = 0 log.debug(f"{self.len} data loaded in QGPOOnlineDataset") def drop_data(self, drop_ratio: float, random: bool = True): # drop the data from the dataset drop_num = int(self.len * drop_ratio) # randomly drop the data if random is True if random: drop_indices = torch.randperm(self.len)[:drop_num] else: drop_indices = torch.arange(drop_num) keep_mask = torch.ones(self.len, dtype=torch.bool) keep_mask[drop_indices] = False self.states = self.states[keep_mask] self.actions = self.actions[keep_mask] self.next_states = self.next_states[keep_mask] self.is_finished = self.is_finished[keep_mask] self.rewards = self.rewards[keep_mask] # self.fake_actions = self.fake_actions[keep_mask] # self.fake_next_actions = self.fake_next_actions[keep_mask] self.len = self.states.shape[0] log.debug(f"{drop_num} data dropped in QGPOOnlineDataset") def load_data(self, data: List): # concatenate the data into the dataset # collate the data by sorting the keys keys = ["obs", "action", "done", "next_obs", "reward"] collated_data = { k: torch.tensor(np.stack([item[k] for item in data])) for i, k in enumerate(keys) } self.states = torch.cat([self.states, collated_data["obs"].float()], dim=0) self.actions = torch.cat([self.actions, collated_data["action"].float()], dim=0) self.next_states = torch.cat( [self.next_states, collated_data["next_obs"].float()], dim=0 ) reward = collated_data["reward"].view(-1, 1).float() self.is_finished = torch.cat( [self.is_finished, collated_data["done"].view(-1, 1).float()], dim=0 ) self.rewards = torch.cat([self.rewards, reward], dim=0) # self.fake_actions = torch.cat([self.fake_actions, torch.zeros_like(collated_data['action'].unsqueeze(1).expand(-1, self.fake_action_shape, -1))], dim=0) # self.fake_next_actions = torch.cat([self.fake_next_actions, torch.zeros_like(collated_data['action'].unsqueeze(1).expand(-1, self.fake_action_shape, -1))], dim=0) self.len = self.states.shape[0] log.debug(f"{self.len} data loaded in QGPOOnlineDataset") def return_range(dataset, max_episode_steps): returns, lengths = [], [] ep_ret, ep_len = 0.0, 0 for r, d in zip(dataset["rewards"], dataset["terminals"]): ep_ret += float(r) ep_len += 1 if d or ep_len == max_episode_steps: returns.append(ep_ret) lengths.append(ep_len) ep_ret, ep_len = 0.0, 0 # returns.append(ep_ret) # incomplete trajectory lengths.append(ep_len) # but still keep track of number of steps assert sum(lengths) == len(dataset["rewards"]) return min(returns), max(returns) class QGPOD4RLOnlineDataset(QGPODataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__( self, env_id: str, fake_action_shape: int = None, ): """ Overview: Initialization method of QGPOD4RLDataset class Arguments: data (:obj:`List`): The data list """ super().__init__() self.fake_action_shape = fake_action_shape import d4rl data = d4rl.qlearning_dataset(gym.make(env_id)) self.states = torch.from_numpy(data["observations"]).float() self.actions = torch.from_numpy(data["actions"]).float() self.next_states = torch.from_numpy(data["next_observations"]).float() reward = torch.from_numpy(data["rewards"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["terminals"]).view(-1, 1).float() reward_tune = "iql_antmaze" if "antmaze" in env_id else "iql_locomotion" if reward_tune == "normalize": reward = (reward - reward.mean()) / reward.std() elif reward_tune == "iql_antmaze": reward = reward - 1.0 elif reward_tune == "iql_locomotion": min_ret, max_ret = QGPOD4RLDataset.return_range(data, 1000) reward /= max_ret - min_ret reward *= 1000 elif reward_tune == "cql_antmaze": reward = (reward - 0.5) * 4.0 elif reward_tune == "antmaze": reward = (reward - 0.25) * 2.0 self.rewards = reward self.len = self.states.shape[0] log.debug(f"{self.len} data loaded in QGPOD4RLOnlineDataset") def drop_data(self, drop_ratio: float, random: bool = True): # drop the data from the dataset drop_num = int(self.len * drop_ratio) # randomly drop the data if random is True if random: drop_indices = torch.randperm(self.len)[:drop_num] else: drop_indices = torch.arange(drop_num) keep_mask = torch.ones(self.len, dtype=torch.bool) keep_mask[drop_indices] = False self.states = self.states[keep_mask] self.actions = self.actions[keep_mask] self.next_states = self.next_states[keep_mask] self.is_finished = self.is_finished[keep_mask] self.rewards = self.rewards[keep_mask] # self.fake_actions = self.fake_actions[keep_mask] # self.fake_next_actions = self.fake_next_actions[keep_mask] self.len = self.states.shape[0] log.debug(f"{drop_num} data dropped in QGPOOnlineDataset") def load_data(self, data: List): # concatenate the data into the dataset # collate the data by sorting the keys keys = ["obs", "action", "done", "next_obs", "reward"] collated_data = { k: torch.tensor(np.stack([item[k] for item in data])) for i, k in enumerate(keys) } self.states = torch.cat([self.states, collated_data["obs"].float()], dim=0) self.actions = torch.cat([self.actions, collated_data["action"].float()], dim=0) self.next_states = torch.cat( [self.next_states, collated_data["next_obs"].float()], dim=0 ) reward = collated_data["reward"].view(-1, 1).float() self.is_finished = torch.cat( [self.is_finished, collated_data["done"].view(-1, 1).float()], dim=0 ) self.rewards = torch.cat([self.rewards, reward], dim=0) # self.fake_actions = torch.cat([self.fake_actions, torch.zeros_like(collated_data['action'].unsqueeze(1).expand(-1, self.fake_action_shape, -1))], dim=0) # self.fake_next_actions = torch.cat([self.fake_next_actions, torch.zeros_like(collated_data['action'].unsqueeze(1).expand(-1, self.fake_action_shape, -1))], dim=0) self.len = self.states.shape[0] log.debug(f"{self.len} data loaded in QGPOOnlineDataset") def return_range(dataset, max_episode_steps): returns, lengths = [], [] ep_ret, ep_len = 0.0, 0 for r, d in zip(dataset["rewards"], dataset["terminals"]): ep_ret += float(r) ep_len += 1 if d or ep_len == max_episode_steps: returns.append(ep_ret) lengths.append(ep_len) ep_ret, ep_len = 0.0, 0 # returns.append(ep_ret) # incomplete trajectory lengths.append(ep_len) # but still keep track of number of steps assert sum(lengths) == len(dataset["rewards"]) return min(returns), max(returns) class QGPOCustomizedDataset(QGPODataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__( self, env_id: str = None, numpy_data_path: str = None, ): """ Overview: Initialization method of QGPOCustomizedDataset class Arguments: env_id (:obj:`str`): The environment id numpy_data_path (:obj:`str`): The path to the numpy data """ super().__init__() data = np.load(numpy_data_path) self.states = torch.from_numpy(data["obs"]).float() self.actions = torch.from_numpy(data["action"]).float() self.next_states = torch.from_numpy(data["next_obs"]).float() reward = torch.from_numpy(data["reward"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["done"]).view(-1, 1).float() self.rewards = reward self.len = self.states.shape[0] log.info(f"{self.len} data loaded in QGPOCustomizedDataset") class QGPOD4RLTensorDictDataset(QGPOTensorDictDataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__( self, env_id: str, action_augment_num: int = 16, ): """ Overview: Initialization method of QGPOD4RLDataset class Arguments: env_id (:obj:`str`): The environment id """ super().__init__() import d4rl data = d4rl.qlearning_dataset(gym.make(env_id)) self.states = torch.from_numpy(data["observations"]).float() self.actions = torch.from_numpy(data["actions"]).float() self.next_states = torch.from_numpy(data["next_observations"]).float() reward = torch.from_numpy(data["rewards"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["terminals"]).view(-1, 1).float() reward_tune = "iql_antmaze" if "antmaze" in env_id else "iql_locomotion" if reward_tune == "normalize": reward = (reward - reward.mean()) / reward.std() elif reward_tune == "iql_antmaze": reward = reward - 1.0 elif reward_tune == "iql_locomotion": min_ret, max_ret = QGPOD4RLDataset.return_range(data, 1000) reward /= max_ret - min_ret reward *= 1000 elif reward_tune == "cql_antmaze": reward = (reward - 0.5) * 4.0 elif reward_tune == "antmaze": reward = (reward - 0.25) * 2.0 self.rewards = reward self.len = self.states.shape[0] log.info(f"{self.len} data loaded in QGPOD4RLDataset") self.storage = LazyTensorStorage(max_size=self.len) self.storage.set( range(self.len), TensorDict( { "s": self.states, "a": self.actions, "r": self.rewards, "s_": self.next_states, "d": self.is_finished, "fake_a": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), "fake_a_": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), }, batch_size=[self.len], ) ) def return_range(dataset, max_episode_steps): returns, lengths = [], [] ep_ret, ep_len = 0.0, 0 for r, d in zip(dataset["rewards"], dataset["terminals"]): ep_ret += float(r) ep_len += 1 if d or ep_len == max_episode_steps: returns.append(ep_ret) lengths.append(ep_len) ep_ret, ep_len = 0.0, 0 # returns.append(ep_ret) # incomplete trajectory lengths.append(ep_len) # but still keep track of number of steps assert sum(lengths) == len(dataset["rewards"]) return min(returns), max(returns) class QGPOCustomizedTensorDictDataset(QGPOTensorDictDataset): """ Overview: Dataset for QGPO algorithm. The training of QGPO algorithm is based on contrastive energy prediction, \ which needs true action and fake action. The true action is sampled from the dataset, and the fake action \ is sampled from the action support generated by the behaviour policy. Interface: ``__init__``, ``__getitem__``, ``__len__``. """ def __init__( self, env_id: str = None, action_augment_num: int = 16, numpy_data_path: str = None, ): """ Overview: Initialization method of QGPOCustomizedDataset class Arguments: env_id (:obj:`str`): The environment id numpy_data_path (:obj:`str`): The path to the numpy data """ super().__init__() data = np.load(numpy_data_path) self.states = torch.from_numpy(data["obs"]).float() self.actions = torch.from_numpy(data["action"]).float() self.next_states = torch.from_numpy(data["next_obs"]).float() reward = torch.from_numpy(data["reward"]).view(-1, 1).float() self.is_finished = torch.from_numpy(data["done"]).view(-1, 1).float() self.rewards = reward self.len = self.states.shape[0] log.info(f"{self.len} data loaded in QGPOCustomizedDataset") self.storage = LazyTensorStorage(max_size=self.len) self.storage.set( range(self.len), TensorDict( { "s": self.states, "a": self.actions, "r": self.rewards, "s_": self.next_states, "d": self.is_finished, "fake_a": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), "fake_a_": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), }, batch_size=[self.len], ) ) class QGPODMcontrolTensorDictDataset(QGPOTensorDictDataset): def __init__( self, directory: str, action_augment_num: int = 16, ): import os state_dicts = {} next_states_dicts = {} actions_list = [] rewards_list = [] npy_files = [] for root, dirs, files in os.walk(directory): for file in files: if file.endswith('.npy'): npy_files.append(os.path.join(root, file)) for file_path in npy_files: data = np.load(file_path, allow_pickle=True) obs_keys = list(data[0]["s"].keys()) for key in obs_keys: if key not in state_dicts: state_dicts[key] = [] next_states_dicts[key] = [] state_values = np.array([item["s"][key] for item in data], dtype=np.float32) next_state_values = np.array([item["s_"][key] for item in data], dtype=np.float32) state_dicts[key].append(torch.tensor(state_values)) next_states_dicts[key].append(torch.tensor(next_state_values)) actions_values = np.array([item["a"] for item in data], dtype=np.float32) rewards_values = np.array([item["r"] for item in data], dtype=np.float32).reshape(-1, 1) actions_list.append(torch.tensor(actions_values)) rewards_list.append(torch.tensor(rewards_values)) # Concatenate all tensors along the first dimension self.actions = torch.cat(actions_list, dim=0) self.rewards = torch.cat(rewards_list, dim=0) self.states = TensorDict( {key: torch.cat(state_dicts[key], dim=0) for key in obs_keys}, batch_size=[self.actions.shape[0]], ) self.next_states = TensorDict( {key: torch.cat(next_states_dicts[key], dim=0) for key in obs_keys}, batch_size=[self.actions.shape[0]], ) self.is_finished = torch.zeros_like(self.rewards, dtype=torch.bool) self.len = self.actions.shape[0] self.storage = LazyMemmapStorage(max_size=self.len) self.storage.set( range(self.len), TensorDict( { "s": self.states, "a": self.rewards, "r": self.rewards, "s_": self.next_states, "fake_a": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), "fake_a_": torch.zeros_like(self.actions).unsqueeze(1).repeat_interleave(action_augment_num, dim=1), "d": self.is_finished, }, batch_size=[self.len], ) )