We have now released v0.3.0! Please use the latest version for the best experience.

Source code for omni.isaac.orbit_tasks.utils.wrappers.sb3

# Copyright (c) 2022-2024, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Wrapper to configure an :class:`RLTaskEnv` instance to Stable-Baselines3 vectorized environment.

The following example shows how to wrap an environment for Stable-Baselines3:

.. code-block:: python

    from omni.isaac.orbit_tasks.utils.wrappers.sb3 import Sb3VecEnvWrapper

    env = Sb3VecEnvWrapper(env)

"""

# needed to import for allowing type-hinting: torch.Tensor | dict[str, torch.Tensor]
from __future__ import annotations

import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn  # noqa: F401
from typing import Any

from stable_baselines3.common.utils import constant_fn
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvObs, VecEnvStepReturn

from omni.isaac.orbit.envs import RLTaskEnv

"""
Configuration Parser.
"""


[docs]def process_sb3_cfg(cfg: dict) -> dict: """Convert simple YAML types to Stable-Baselines classes/components. Args: cfg: A configuration dictionary. Returns: A dictionary containing the converted configuration. Reference: https://github.com/DLR-RM/rl-baselines3-zoo/blob/0e5eb145faefa33e7d79c7f8c179788574b20da5/utils/exp_manager.py#L358 """ def update_dict(hyperparams: dict[str, Any]) -> dict[str, Any]: for key, value in hyperparams.items(): if isinstance(value, dict): update_dict(value) else: if key in ["policy_kwargs", "replay_buffer_class", "replay_buffer_kwargs"]: hyperparams[key] = eval(value) elif key in ["learning_rate", "clip_range", "clip_range_vf", "delta_std"]: if isinstance(value, str): _, initial_value = value.split("_") initial_value = float(initial_value) hyperparams[key] = lambda progress_remaining: progress_remaining * initial_value elif isinstance(value, (float, int)): # Negative value: ignore (ex: for clipping) if value < 0: continue hyperparams[key] = constant_fn(float(value)) else: raise ValueError(f"Invalid value for {key}: {hyperparams[key]}") return hyperparams # parse agent configuration and convert to classes return update_dict(cfg)
""" Vectorized environment wrapper. """
[docs]class Sb3VecEnvWrapper(VecEnv): """Wraps around Orbit environment for Stable Baselines3. Isaac Sim internally implements a vectorized environment. However, since it is still considered a single environment instance, Stable Baselines tries to wrap around it using the :class:`DummyVecEnv`. This is only done if the environment is not inheriting from their :class:`VecEnv`. Thus, this class thinly wraps over the environment from :class:`RLTaskEnv`. Note: While Stable-Baselines3 supports Gym 0.26+ API, their vectorized environment still uses the old API (i.e. it is closer to Gym 0.21). Thus, we implement the old API for the vectorized environment. We also add monitoring functionality that computes the un-discounted episode return and length. This information is added to the info dicts under key `episode`. In contrast to the Orbit environment, stable-baselines expect the following: 1. numpy datatype for MDP signals 2. a list of info dicts for each sub-environment (instead of a dict) 3. when environment has terminated, the observations from the environment should correspond to the one after reset. The "real" final observation is passed using the info dicts under the key ``terminal_observation``. .. warning:: By the nature of physics stepping in Isaac Sim, it is not possible to forward the simulation buffers without performing a physics step. Thus, reset is performed inside the :meth:`step()` function after the actual physics step is taken. Thus, the returned observations for terminated environments is the one after the reset. .. caution:: This class must be the last wrapper in the wrapper chain. This is because the wrapper does not follow the :class:`gym.Wrapper` interface. Any subsequent wrappers will need to be modified to work with this wrapper. Reference: 1. https://stable-baselines3.readthedocs.io/en/master/guide/vec_envs.html 2. https://stable-baselines3.readthedocs.io/en/master/common/monitor.html """
[docs] def __init__(self, env: RLTaskEnv): """Initialize the wrapper. Args: env: The environment to wrap around. Raises: ValueError: When the environment is not an instance of :class:`RLTaskEnv`. """ # check that input is valid if not isinstance(env.unwrapped, RLTaskEnv): raise ValueError(f"The environment must be inherited from RLTaskEnv. Environment type: {type(env)}") # initialize the wrapper self.env = env # collect common information self.num_envs = self.unwrapped.num_envs self.sim_device = self.unwrapped.device self.render_mode = self.unwrapped.render_mode # obtain gym spaces # note: stable-baselines3 does not like when we have unbounded action space so # we set it to some high value here. Maybe this is not general but something to think about. observation_space = self.unwrapped.single_observation_space["policy"] action_space = self.unwrapped.single_action_space if isinstance(action_space, gym.spaces.Box) and action_space.is_bounded() != "both": action_space = gym.spaces.Box(low=-100, high=100, shape=action_space.shape) # initialize vec-env VecEnv.__init__(self, self.num_envs, observation_space, action_space) # add buffer for logging episodic information self._ep_rew_buf = torch.zeros(self.num_envs, device=self.sim_device) self._ep_len_buf = torch.zeros(self.num_envs, device=self.sim_device)
def __str__(self): """Returns the wrapper name and the :attr:`env` representation string.""" return f"<{type(self).__name__}{self.env}>" def __repr__(self): """Returns the string representation of the wrapper.""" return str(self) """ Properties -- Gym.Wrapper """
[docs] @classmethod def class_name(cls) -> str: """Returns the class name of the wrapper.""" return cls.__name__
@property def unwrapped(self) -> RLTaskEnv: """Returns the base environment of the wrapper. This will be the bare :class:`gymnasium.Env` environment, underneath all layers of wrappers. """ return self.env.unwrapped """ Properties """
[docs] def get_episode_rewards(self) -> list[float]: """Returns the rewards of all the episodes.""" return self._ep_rew_buf.cpu().tolist()
[docs] def get_episode_lengths(self) -> list[int]: """Returns the number of time-steps of all the episodes.""" return self._ep_len_buf.cpu().tolist()
""" Operations - MDP """ def seed(self, seed: int | None = None) -> list[int | None]: # noqa: D102 return [self.unwrapped.seed(seed)] * self.unwrapped.num_envs def reset(self) -> VecEnvObs: # noqa: D102 obs_dict, _ = self.env.reset() # convert data types to numpy depending on backend return self._process_obs(obs_dict) def step_async(self, actions): # noqa: D102 # convert input to numpy array if not isinstance(actions, torch.Tensor): actions = np.asarray(actions) actions = torch.from_numpy(actions).to(device=self.sim_device, dtype=torch.float32) else: actions = actions.to(device=self.sim_device, dtype=torch.float32) # convert to tensor self._async_actions = actions def step_wait(self) -> VecEnvStepReturn: # noqa: D102 # record step information obs_dict, rew, terminated, truncated, extras = self.env.step(self._async_actions) # update episode un-discounted return and length self._ep_rew_buf += rew self._ep_len_buf += 1 # compute reset ids dones = terminated | truncated reset_ids = (dones > 0).nonzero(as_tuple=False) # convert data types to numpy depending on backend # note: RLTaskEnv uses torch backend (by default). obs = self._process_obs(obs_dict) rew = rew.detach().cpu().numpy() terminated = terminated.detach().cpu().numpy() truncated = truncated.detach().cpu().numpy() dones = dones.detach().cpu().numpy() # convert extra information to list of dicts infos = self._process_extras(obs, terminated, truncated, extras, reset_ids) # reset info for terminated environments self._ep_rew_buf[reset_ids] = 0 self._ep_len_buf[reset_ids] = 0 return obs, rew, dones, infos def close(self): # noqa: D102 self.env.close() def get_attr(self, attr_name, indices=None): # noqa: D102 # resolve indices if indices is None: indices = slice(None) num_indices = self.num_envs else: num_indices = len(indices) # obtain attribute value attr_val = getattr(self.env, attr_name) # return the value if not isinstance(attr_val, torch.Tensor): return [attr_val] * num_indices else: return attr_val[indices].detach().cpu().numpy() def set_attr(self, attr_name, value, indices=None): # noqa: D102 raise NotImplementedError("Setting attributes is not supported.") def env_method(self, method_name: str, *method_args, indices=None, **method_kwargs): # noqa: D102 if method_name == "render": # gymnasium does not support changing render mode at runtime return self.env.render() else: # this isn't properly implemented but it is not necessary. # mostly done for completeness. env_method = getattr(self.env, method_name) return env_method(*method_args, indices=indices, **method_kwargs) def env_is_wrapped(self, wrapper_class, indices=None): # noqa: D102 raise NotImplementedError("Checking if environment is wrapped is not supported.") def get_images(self): # noqa: D102 raise NotImplementedError("Getting images is not supported.") """ Helper functions. """ def _process_obs(self, obs_dict: torch.Tensor | dict[str, torch.Tensor]) -> np.ndarray | dict[str, np.ndarray]: """Convert observations into NumPy data type.""" # Sb3 doesn't support asymmetric observation spaces, so we only use "policy" obs = obs_dict["policy"] # note: RLTaskEnv uses torch backend (by default). if isinstance(obs, dict): for key, value in obs.items(): obs[key] = value.detach().cpu().numpy() elif isinstance(obs, torch.Tensor): obs = obs.detach().cpu().numpy() else: raise NotImplementedError(f"Unsupported data type: {type(obs)}") return obs def _process_extras( self, obs: np.ndarray, terminated: np.ndarray, truncated: np.ndarray, extras: dict, reset_ids: np.ndarray ) -> list[dict[str, Any]]: """Convert miscellaneous information into dictionary for each sub-environment.""" # create empty list of dictionaries to fill infos: list[dict[str, Any]] = [dict.fromkeys(extras.keys()) for _ in range(self.num_envs)] # fill-in information for each sub-environment # note: This loop becomes slow when number of environments is large. for idx in range(self.num_envs): # fill-in episode monitoring info if idx in reset_ids: infos[idx]["episode"] = dict() infos[idx]["episode"]["r"] = float(self._ep_rew_buf[idx]) infos[idx]["episode"]["l"] = float(self._ep_len_buf[idx]) else: infos[idx]["episode"] = None # fill-in bootstrap information infos[idx]["TimeLimit.truncated"] = truncated[idx] and not terminated[idx] # fill-in information from extras for key, value in extras.items(): # 1. remap extra episodes information safely # 2. for others just store their values if key == "log": # only log this data for episodes that are terminated if infos[idx]["episode"] is not None: for sub_key, sub_value in value.items(): infos[idx]["episode"][sub_key] = sub_value else: infos[idx][key] = value[idx] # add information about terminal observation separately if idx in reset_ids: # extract terminal observations if isinstance(obs, dict): terminal_obs = dict.fromkeys(obs.keys()) for key, value in obs.items(): terminal_obs[key] = value[idx] else: terminal_obs = obs[idx] # add info to dict infos[idx]["terminal_observation"] = terminal_obs else: infos[idx]["terminal_observation"] = None # return list of dictionaries return infos