We have now released v0.3.0! Please use the latest version for the best experience.

Source code for omni.isaac.orbit_tasks.utils.wrappers.rl_games

# Copyright (c) 2022-2024, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Wrapper to configure an :class:`RLTaskEnv` instance to RL-Games vectorized environment.

The following example shows how to wrap an environment for RL-Games and register the environment construction
for RL-Games :class:`Runner` class:

.. code-block:: python

    from rl_games.common import env_configurations, vecenv

    from omni.isaac.orbit_tasks.utils.wrappers.rl_games import RlGamesGpuEnv, RlGamesVecEnvWrapper

    # configuration parameters
    rl_device = "cuda:0"
    clip_obs = 10.0
    clip_actions = 1.0

    # wrap around environment for rl-games
    env = RlGamesVecEnvWrapper(env, rl_device, clip_obs, clip_actions)

    # register the environment to rl-games registry
    # note: in agents configuration: environment name must be "rlgpu"
    vecenv.register(
        "IsaacRlgWrapper", lambda config_name, num_actors, **kwargs: RlGamesGpuEnv(config_name, num_actors, **kwargs)
    )
    env_configurations.register("rlgpu", {"vecenv_type": "IsaacRlgWrapper", "env_creator": lambda **kwargs: env})

"""

# needed to import for allowing type-hinting:gym.spaces.Box | None
from __future__ import annotations

import gym.spaces  # needed for rl-games incompatibility: https://github.com/Denys88/rl_games/issues/261
import gymnasium
import torch

from rl_games.common import env_configurations
from rl_games.common.vecenv import IVecEnv

from omni.isaac.orbit.envs import RLTaskEnv, VecEnvObs

"""
Vectorized environment wrapper.
"""


[docs]class RlGamesVecEnvWrapper(IVecEnv): """Wraps around Orbit environment for RL-Games. This class wraps around the Orbit environment. Since RL-Games works directly on GPU buffers, the wrapper handles moving of buffers from the simulation environment to the same device as the learning agent. Additionally, it performs clipping of observations and actions. For algorithms like asymmetric actor-critic, RL-Games expects a dictionary for observations. This dictionary contains "obs" and "states" which typically correspond to the actor and critic observations respectively. To use asymmetric actor-critic, the environment observations from :class:`RLTaskEnv` must have the key or group name "critic". The observation group is used to set the :attr:`num_states` (int) and :attr:`state_space` (:obj:`gym.spaces.Box`). These are used by the learning agent in RL-Games to allocate buffers in the trajectory memory. Since this is optional for some environments, the wrapper checks if these attributes exist. If they don't then the wrapper defaults to zero as number of privileged observations. .. caution:: This class must be the last wrapper in the wrapper chain. This is because the wrapper does not follow the :class:`gym.Wrapper` interface. Any subsequent wrappers will need to be modified to work with this wrapper. Reference: https://github.com/Denys88/rl_games/blob/master/rl_games/common/ivecenv.py https://github.com/NVIDIA-Omniverse/IsaacGymEnvs """
[docs] def __init__(self, env: RLTaskEnv, rl_device: str, clip_obs: float, clip_actions: float): """Initializes the wrapper instance. Args: env: The environment to wrap around. rl_device: The device on which agent computations are performed. clip_obs: The clipping value for observations. clip_actions: The clipping value for actions. Raises: ValueError: The environment is not inherited from :class:`RLTaskEnv`. ValueError: If specified, the privileged observations (critic) are not of type :obj:`gym.spaces.Box`. """ # check that input is valid if not isinstance(env.unwrapped, RLTaskEnv): raise ValueError(f"The environment must be inherited from RLTaskEnv. Environment type: {type(env)}") # initialize the wrapper self.env = env # store provided arguments self._rl_device = rl_device self._clip_obs = clip_obs self._clip_actions = clip_actions self._sim_device = env.unwrapped.device # information for privileged observations if self.state_space is None: self.rlg_num_states = 0 else: self.rlg_num_states = self.state_space.shape[0]
def __str__(self): """Returns the wrapper name and the :attr:`env` representation string.""" return ( f"<{type(self).__name__}{self.env}>" f"\n\tObservations clipping: {self._clip_obs}" f"\n\tActions clipping : {self._clip_actions}" f"\n\tAgent device : {self._rl_device}" f"\n\tAsymmetric-learning : {self.rlg_num_states != 0}" ) def __repr__(self): """Returns the string representation of the wrapper.""" return str(self) """ Properties -- Gym.Wrapper """ @property def render_mode(self) -> str | None: """Returns the :attr:`Env` :attr:`render_mode`.""" return self.env.render_mode @property def observation_space(self) -> gym.spaces.Box: """Returns the :attr:`Env` :attr:`observation_space`.""" # note: rl-games only wants single observation space policy_obs_space = self.unwrapped.single_observation_space["policy"] if not isinstance(policy_obs_space, gymnasium.spaces.Box): raise NotImplementedError( f"The RL-Games wrapper does not currently support observation space: '{type(policy_obs_space)}'." f" If you need to support this, please modify the wrapper: {self.__class__.__name__}," " and if you are nice, please send a merge-request." ) # note: maybe should check if we are a sub-set of the actual space. don't do it right now since # in RLTaskEnv we are setting action space as (-inf, inf). return gym.spaces.Box(-self._clip_obs, self._clip_obs, policy_obs_space.shape) @property def action_space(self) -> gym.Space: """Returns the :attr:`Env` :attr:`action_space`.""" # note: rl-games only wants single action space action_space = self.unwrapped.single_action_space if not isinstance(action_space, gymnasium.spaces.Box): raise NotImplementedError( f"The RL-Games wrapper does not currently support action space: '{type(action_space)}'." f" If you need to support this, please modify the wrapper: {self.__class__.__name__}," " and if you are nice, please send a merge-request." ) # return casted space in gym.spaces.Box (OpenAI Gym) # note: maybe should check if we are a sub-set of the actual space. don't do it right now since # in RLTaskEnv we are setting action space as (-inf, inf). return gym.spaces.Box(-self._clip_actions, self._clip_actions, action_space.shape)
[docs] @classmethod def class_name(cls) -> str: """Returns the class name of the wrapper.""" return cls.__name__
@property def unwrapped(self) -> RLTaskEnv: """Returns the base environment of the wrapper. This will be the bare :class:`gymnasium.Env` environment, underneath all layers of wrappers. """ return self.env.unwrapped """ Properties """ @property def num_envs(self) -> int: """Returns the number of sub-environment instances.""" return self.unwrapped.num_envs @property def device(self) -> str: """Returns the base environment simulation device.""" return self.unwrapped.device @property def state_space(self) -> gym.spaces.Box | None: """Returns the :attr:`Env` :attr:`observation_space`.""" # note: rl-games only wants single observation space critic_obs_space = self.unwrapped.single_observation_space.get("critic") # check if we even have a critic obs if critic_obs_space is None: return None elif not isinstance(critic_obs_space, gymnasium.spaces.Box): raise NotImplementedError( f"The RL-Games wrapper does not currently support state space: '{type(critic_obs_space)}'." f" If you need to support this, please modify the wrapper: {self.__class__.__name__}," " and if you are nice, please send a merge-request." ) # return casted space in gym.spaces.Box (OpenAI Gym) # note: maybe should check if we are a sub-set of the actual space. don't do it right now since # in RLTaskEnv we are setting action space as (-inf, inf). return gym.spaces.Box(-self._clip_obs, self._clip_obs, critic_obs_space.shape)
[docs] def get_number_of_agents(self) -> int: """Returns number of actors in the environment.""" return getattr(self, "num_agents", 1)
[docs] def get_env_info(self) -> dict: """Returns the Gym spaces for the environment.""" return { "observation_space": self.observation_space, "action_space": self.action_space, "state_space": self.state_space, }
""" Operations - MDP """ def seed(self, seed: int = -1) -> int: # noqa: D102 return self.unwrapped.seed(seed) def reset(self): # noqa: D102 obs_dict, _ = self.env.reset() # process observations and states return self._process_obs(obs_dict) def step(self, actions): # noqa: D102 # move actions to sim-device actions = actions.detach().clone().to(device=self._sim_device) # clip the actions actions = torch.clamp(actions, -self._clip_actions, self._clip_actions) # perform environment step obs_dict, rew, terminated, truncated, extras = self.env.step(actions) # move time out information to the extras dict # this is only needed for infinite horizon tasks # note: only useful when `value_bootstrap` is True in the agent configuration if not self.unwrapped.cfg.is_finite_horizon: extras["time_outs"] = truncated.to(device=self._rl_device) # process observations and states obs_and_states = self._process_obs(obs_dict) # move buffers to rl-device # note: we perform clone to prevent issues when rl-device and sim-device are the same. rew = rew.to(device=self._rl_device) dones = (terminated | truncated).to(device=self._rl_device) extras = { k: v.to(device=self._rl_device, non_blocking=True) if hasattr(v, "to") else v for k, v in extras.items() } # remap extras from "log" to "episode" if "log" in extras: extras["episode"] = extras.pop("log") return obs_and_states, rew, dones, extras def close(self): # noqa: D102 return self.env.close() """ Helper functions """ def _process_obs(self, obs_dict: VecEnvObs) -> torch.Tensor | dict[str, torch.Tensor]: """Processing of the observations and states from the environment. Note: States typically refers to privileged observations for the critic function. It is typically used in asymmetric actor-critic algorithms. Args: obs_dict: The current observations from environment. Returns: If environment provides states, then a dictionary containing the observations and states is returned. Otherwise just the observations tensor is returned. """ # process policy obs obs = obs_dict["policy"] # clip the observations obs = torch.clamp(obs, -self._clip_obs, self._clip_obs) # move the buffer to rl-device obs = obs.to(device=self._rl_device).clone() # check if asymmetric actor-critic or not if self.rlg_num_states > 0: # acquire states from the environment if it exists try: states = obs_dict["critic"] except AttributeError: raise NotImplementedError("Environment does not define key 'critic' for privileged observations.") # clip the states states = torch.clamp(states, -self._clip_obs, self._clip_obs) # move buffers to rl-device states = states.to(self._rl_device).clone() # convert to dictionary return {"obs": obs, "states": states} else: return obs
""" Environment Handler. """
[docs]class RlGamesGpuEnv(IVecEnv): """Thin wrapper to create instance of the environment to fit RL-Games runner.""" # TODO: Adding this for now but do we really need this?
[docs] def __init__(self, config_name: str, num_actors: int, **kwargs): """Initialize the environment. Args: config_name: The name of the environment configuration. num_actors: The number of actors in the environment. This is not used in this wrapper. """ self.env: RlGamesVecEnvWrapper = env_configurations.configurations[config_name]["env_creator"](**kwargs)
def step(self, action): # noqa: D102 return self.env.step(action) def reset(self): # noqa: D102 return self.env.reset()
[docs] def get_number_of_agents(self) -> int: """Get number of agents in the environment. Returns: The number of agents in the environment. """ return self.env.get_number_of_agents()
[docs] def get_env_info(self) -> dict: """Get the Gym spaces for the environment. Returns: The Gym spaces for the environment. """ return self.env.get_env_info()