We have now released v0.3.0! Please use the latest version for the best experience.

Creating an RL Environment#

Having learnt how to create a base environment in Creating a Base Environment, we will now look at how to create a task environment for reinforcement learning.

The base environment is designed as an sense-act environment where the agent can send commands to the environment and receive observations from the environment. This minimal interface is sufficient for many applications such as traditional motion planning and controls. However, many applications require a task-specification which often serves as the learning objective for the agent. For instance, in a navigation task, the agent may be required to reach a goal location. To this end, we use the envs.RLTaskEnv class which extends the base environment to include a task specification.

Similar to other components in Orbit, instead of directly modifying the base class RLTaskEnv, we encourage users to simply implement a configuration RLTaskEnvCfg for their task environment. This practice allows us to separate the task specification from the environment implementation, making it easier to reuse components of the same environment for different tasks.

In this tutorial, we will configure the cartpole environment using the RLTaskEnvCfg to create a task for balancing the pole upright. We will learn how to specify the task using reward terms, termination criteria, curriculum and commands.

The Code#

For this tutorial, we use the cartpole environment defined in omni.isaac.orbit_tasks.classic.cartpole module.

Code for cartpole_env_cfg.py
  1# Copyright (c) 2022-2024, The ORBIT Project Developers.
  2# All rights reserved.
  3#
  4# SPDX-License-Identifier: BSD-3-Clause
  5
  6import math
  7
  8import omni.isaac.orbit.sim as sim_utils
  9from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg
 10from omni.isaac.orbit.envs import RLTaskEnvCfg
 11from omni.isaac.orbit.managers import EventTermCfg as EventTerm
 12from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup
 13from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm
 14from omni.isaac.orbit.managers import RewardTermCfg as RewTerm
 15from omni.isaac.orbit.managers import SceneEntityCfg
 16from omni.isaac.orbit.managers import TerminationTermCfg as DoneTerm
 17from omni.isaac.orbit.scene import InteractiveSceneCfg
 18from omni.isaac.orbit.utils import configclass
 19
 20import omni.isaac.orbit_tasks.classic.cartpole.mdp as mdp
 21
 22##
 23# Pre-defined configs
 24##
 25from omni.isaac.orbit_assets.cartpole import CARTPOLE_CFG  # isort:skip
 26
 27
 28##
 29# Scene definition
 30##
 31
 32
 33@configclass
 34class CartpoleSceneCfg(InteractiveSceneCfg):
 35    """Configuration for a cart-pole scene."""
 36
 37    # ground plane
 38    ground = AssetBaseCfg(
 39        prim_path="/World/ground",
 40        spawn=sim_utils.GroundPlaneCfg(size=(100.0, 100.0)),
 41    )
 42
 43    # cartpole
 44    robot: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
 45
 46    # lights
 47    dome_light = AssetBaseCfg(
 48        prim_path="/World/DomeLight",
 49        spawn=sim_utils.DomeLightCfg(color=(0.9, 0.9, 0.9), intensity=500.0),
 50    )
 51    distant_light = AssetBaseCfg(
 52        prim_path="/World/DistantLight",
 53        spawn=sim_utils.DistantLightCfg(color=(0.9, 0.9, 0.9), intensity=2500.0),
 54        init_state=AssetBaseCfg.InitialStateCfg(rot=(0.738, 0.477, 0.477, 0.0)),
 55    )
 56
 57
 58##
 59# MDP settings
 60##
 61
 62
 63@configclass
 64class CommandsCfg:
 65    """Command terms for the MDP."""
 66
 67    # no commands for this MDP
 68    null = mdp.NullCommandCfg()
 69
 70
 71@configclass
 72class ActionsCfg:
 73    """Action specifications for the MDP."""
 74
 75    joint_effort = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=100.0)
 76
 77
 78@configclass
 79class ObservationsCfg:
 80    """Observation specifications for the MDP."""
 81
 82    @configclass
 83    class PolicyCfg(ObsGroup):
 84        """Observations for policy group."""
 85
 86        # observation terms (order preserved)
 87        joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
 88        joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)
 89
 90        def __post_init__(self) -> None:
 91            self.enable_corruption = False
 92            self.concatenate_terms = True
 93
 94    # observation groups
 95    policy: PolicyCfg = PolicyCfg()
 96
 97
 98@configclass
 99class EventCfg:
100    """Configuration for events."""
101
102    # reset
103    reset_cart_position = EventTerm(
104        func=mdp.reset_joints_by_offset,
105        mode="reset",
106        params={
107            "asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
108            "position_range": (-1.0, 1.0),
109            "velocity_range": (-0.5, 0.5),
110        },
111    )
112
113    reset_pole_position = EventTerm(
114        func=mdp.reset_joints_by_offset,
115        mode="reset",
116        params={
117            "asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
118            "position_range": (-0.25 * math.pi, 0.25 * math.pi),
119            "velocity_range": (-0.25 * math.pi, 0.25 * math.pi),
120        },
121    )
122
123
124@configclass
125class RewardsCfg:
126    """Reward terms for the MDP."""
127
128    # (1) Constant running reward
129    alive = RewTerm(func=mdp.is_alive, weight=1.0)
130    # (2) Failure penalty
131    terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
132    # (3) Primary task: keep pole upright
133    pole_pos = RewTerm(
134        func=mdp.joint_pos_target_l2,
135        weight=-1.0,
136        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
137    )
138    # (4) Shaping tasks: lower cart velocity
139    cart_vel = RewTerm(
140        func=mdp.joint_vel_l1,
141        weight=-0.01,
142        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
143    )
144    # (5) Shaping tasks: lower pole angular velocity
145    pole_vel = RewTerm(
146        func=mdp.joint_vel_l1,
147        weight=-0.005,
148        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
149    )
150
151
152@configclass
153class TerminationsCfg:
154    """Termination terms for the MDP."""
155
156    # (1) Time out
157    time_out = DoneTerm(func=mdp.time_out, time_out=True)
158    # (2) Cart out of bounds
159    cart_out_of_bounds = DoneTerm(
160        func=mdp.joint_pos_out_of_manual_limit,
161        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
162    )
163
164
165@configclass
166class CurriculumCfg:
167    """Configuration for the curriculum."""
168
169    pass
170
171
172##
173# Environment configuration
174##
175
176
177@configclass
178class CartpoleEnvCfg(RLTaskEnvCfg):
179    """Configuration for the locomotion velocity-tracking environment."""
180
181    # Scene settings
182    scene: CartpoleSceneCfg = CartpoleSceneCfg(num_envs=4096, env_spacing=4.0)
183    # Basic settings
184    observations: ObservationsCfg = ObservationsCfg()
185    actions: ActionsCfg = ActionsCfg()
186    events: EventCfg = EventCfg()
187    # MDP settings
188    curriculum: CurriculumCfg = CurriculumCfg()
189    rewards: RewardsCfg = RewardsCfg()
190    terminations: TerminationsCfg = TerminationsCfg()
191    # No command generator
192    commands: CommandsCfg = CommandsCfg()
193
194    # Post initialization
195    def __post_init__(self) -> None:
196        """Post initialization."""
197        # general settings
198        self.decimation = 2
199        self.episode_length_s = 5
200        # viewer settings
201        self.viewer.eye = (8.0, 0.0, 5.0)
202        # simulation settings
203        self.sim.dt = 1 / 120

The script for running the environment run_cartpole_rl_env.py is present in the orbit/source/standalone/tutorials/03_envs directory. The script is similar to the cartpole_base_env.py script in the previous tutorial, except that it uses the envs.RLTaskEnv instead of the envs.BaseEnv.

Code for run_cartpole_rl_env.py
 1# Copyright (c) 2022-2024, The ORBIT Project Developers.
 2# All rights reserved.
 3#
 4# SPDX-License-Identifier: BSD-3-Clause
 5
 6"""This script demonstrates how to run the RL environment for the cartpole balancing task."""
 7
 8"""Launch Isaac Sim Simulator first."""
 9
10import argparse
11
12from omni.isaac.orbit.app import AppLauncher
13
14# add argparse arguments
15parser = argparse.ArgumentParser(description="Tutorial on running the cartpole RL environment.")
16parser.add_argument("--num_envs", type=int, default=16, help="Number of environments to spawn.")
17
18# append AppLauncher cli args
19AppLauncher.add_app_launcher_args(parser)
20# parse the arguments
21args_cli = parser.parse_args()
22
23# launch omniverse app
24app_launcher = AppLauncher(args_cli)
25simulation_app = app_launcher.app
26
27"""Rest everything follows."""
28
29import torch
30
31from omni.isaac.orbit.envs import RLTaskEnv
32
33from omni.isaac.orbit_tasks.classic.cartpole.cartpole_env_cfg import CartpoleEnvCfg
34
35
36def main():
37    """Main function."""
38    # create environment configuration
39    env_cfg = CartpoleEnvCfg()
40    env_cfg.scene.num_envs = args_cli.num_envs
41    # setup RL environment
42    env = RLTaskEnv(cfg=env_cfg)
43
44    # simulate physics
45    count = 0
46    while simulation_app.is_running():
47        with torch.inference_mode():
48            # reset
49            if count % 300 == 0:
50                count = 0
51                env.reset()
52                print("-" * 80)
53                print("[INFO]: Resetting environment...")
54            # sample random actions
55            joint_efforts = torch.randn_like(env.action_manager.action)
56            # step the environment
57            obs, rew, terminated, truncated, info = env.step(joint_efforts)
58            # print current orientation of pole
59            print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
60            # update counter
61            count += 1
62
63    # close the environment
64    env.close()
65
66
67if __name__ == "__main__":
68    # run the main function
69    main()
70    # close sim app
71    simulation_app.close()

The Code Explained#

We already went through parts of the above in the Creating a Base Environment tutorial to learn about how to specify the scene, observations, actions and events. Thus, in this tutorial, we will focus only on the RL components of the environment.

In Orbit, we provide various implementations of different terms in the envs.mdp module. We will use some of these terms in this tutorial, but users are free to define their own terms as well. These are usually placed in their task-specific sub-package (for instance, in omni.isaac.orbit_tasks.classic.cartpole.mdp).

Defining rewards#

The managers.RewardManager is used to compute the reward terms for the agent. Similar to the other managers, its terms are configured using the managers.RewardTermCfg class. The managers.RewardTermCfg class specifies the function or callable class that computes the reward as well as the weighting associated with it. It also takes in dictionary of arguments, "params" that are passed to the reward function when it is called.

For the cartpole task, we will use the following reward terms:

  • Alive Reward: Encourage the agent to stay alive for as long as possible.

  • Terminating Reward: Similarly penalize the agent for terminating.

  • Pole Angle Reward: Encourage the agent to keep the pole at the desired upright position.

  • Cart Velocity Reward: Encourage the agent to keep the cart velocity as small as possible.

  • Pole Velocity Reward: Encourage the agent to keep the pole velocity as small as possible.

@configclass
class RewardsCfg:
    """Reward terms for the MDP."""

    # (1) Constant running reward
    alive = RewTerm(func=mdp.is_alive, weight=1.0)
    # (2) Failure penalty
    terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
    # (3) Primary task: keep pole upright
    pole_pos = RewTerm(
        func=mdp.joint_pos_target_l2,
        weight=-1.0,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
    )
    # (4) Shaping tasks: lower cart velocity
    cart_vel = RewTerm(
        func=mdp.joint_vel_l1,
        weight=-0.01,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
    )
    # (5) Shaping tasks: lower pole angular velocity
    pole_vel = RewTerm(
        func=mdp.joint_vel_l1,
        weight=-0.005,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
    )

Defining termination criteria#

Most learning tasks happen over a finite number of steps that we call an episode. For instance, in the cartpole task, we want the agent to balance the pole for as long as possible. However, if the agent reaches an unstable or unsafe state, we want to terminate the episode. On the other hand, if the agent is able to balance the pole for a long time, we want to terminate the episode and start a new one so that the agent can learn to balance the pole from a different starting configuration.

The managers.TerminationsCfg configures what constitutes for an episode to terminate. In this example, we want the task to terminate when either of the following conditions is met:

  • Episode Length The episode length is greater than the defined max_episode_length

  • Cart out of bounds The cart goes outside of the bounds [-3, 3]

The flag managers.TerminationsCfg.time_out specifies whether the term is a time-out (truncation) term or terminated term. These are used to indicate the two types of terminations as described in Gymnasium’s documentation.

@configclass
class TerminationsCfg:
    """Termination terms for the MDP."""

    # (1) Time out
    time_out = DoneTerm(func=mdp.time_out, time_out=True)
    # (2) Cart out of bounds
    cart_out_of_bounds = DoneTerm(
        func=mdp.joint_pos_out_of_manual_limit,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
    )

Defining commands#

For various goal-conditioned tasks, it is useful to specify the goals or commands for the agent. These are handled through the managers.CommandManager. The command manager handles resampling and updating the commands at each step. It can also be used to provide the commands as an observation to the agent.

For this simple task, we do not use any commands. This is specified by using a command term with the envs.mdp.NullCommandCfg configuration. However, you can see an example of command definitions in the locomotion or manipulation tasks.

@configclass
class CommandsCfg:
    """Command terms for the MDP."""

    # no commands for this MDP
    null = mdp.NullCommandCfg()

Defining curriculum#

Often times when training a learning agent, it helps to start with a simple task and gradually increase the tasks’s difficulty as the agent training progresses. This is the idea behind curriculum learning. In Orbit, we provide a managers.CurriculumManager class that can be used to define a curriculum for your environment.

In this tutorial we don’t implement a curriculum for simplicity, but you can see an example of a curriculum definition in the other locomotion or manipulation tasks. We use a simple pass-through curriculum to define a curriculum manager that does not modify the environment.

@configclass
class CurriculumCfg:
    """Configuration for the curriculum."""

    pass

Tying it all together#

With all the above components defined, we can now create the RLTaskEnvCfg configuration for the cartpole environment. This is similar to the BaseEnvCfg defined in Creating a Base Environment, only with the added RL components explained in the above sections.

@configclass
class CartpoleEnvCfg(RLTaskEnvCfg):
    """Configuration for the locomotion velocity-tracking environment."""

    # Scene settings
    scene: CartpoleSceneCfg = CartpoleSceneCfg(num_envs=4096, env_spacing=4.0)
    # Basic settings
    observations: ObservationsCfg = ObservationsCfg()
    actions: ActionsCfg = ActionsCfg()
    events: EventCfg = EventCfg()
    # MDP settings
    curriculum: CurriculumCfg = CurriculumCfg()
    rewards: RewardsCfg = RewardsCfg()
    terminations: TerminationsCfg = TerminationsCfg()
    # No command generator
    commands: CommandsCfg = CommandsCfg()

    # Post initialization
    def __post_init__(self) -> None:
        """Post initialization."""
        # general settings
        self.decimation = 2
        self.episode_length_s = 5
        # viewer settings
        self.viewer.eye = (8.0, 0.0, 5.0)
        # simulation settings
        self.sim.dt = 1 / 120

Running the simulation loop#

Coming back to the run_cartpole_rl_env.py script, the simulation loop is similar to the previous tutorial. The only difference is that we create an instance of envs.RLTaskEnv instead of the envs.BaseEnv. Consequently, now the envs.RLTaskEnv.step() method returns additional signals such as the reward and termination status. The information dictionary also maintains logging of quantities such as the reward contribution from individual terms, the termination status of each term, the episode length etc.

def main():
    """Main function."""
    # create environment configuration
    env_cfg = CartpoleEnvCfg()
    env_cfg.scene.num_envs = args_cli.num_envs
    # setup RL environment
    env = RLTaskEnv(cfg=env_cfg)

    # simulate physics
    count = 0
    while simulation_app.is_running():
        with torch.inference_mode():
            # reset
            if count % 300 == 0:
                count = 0
                env.reset()
                print("-" * 80)
                print("[INFO]: Resetting environment...")
            # sample random actions
            joint_efforts = torch.randn_like(env.action_manager.action)
            # step the environment
            obs, rew, terminated, truncated, info = env.step(joint_efforts)
            # print current orientation of pole
            print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
            # update counter
            count += 1

    # close the environment
    env.close()

The Code Execution#

Similar to the previous tutorial, we can run the environment by executing the run_cartpole_rl_env.py script.

./orbit.sh -p source/standalone/tutorials/03_envs/run_cartpole_rl_env.py --num_envs 32

This should open a similar simulation as in the previous tutorial. However, this time, the environment returns more signals that specify the reward and termination status. Additionally, the individual environments reset themselves when they terminate based on the termination criteria specified in the configuration.

To stop the simulation, you can either close the window, or press Ctrl+C in the terminal where you started the simulation.

In this tutorial, we learnt how to create a task environment for reinforcement learning. We do this by extending the base environment to include the rewards, terminations, commands and curriculum terms. We also learnt how to use the envs.RLTaskEnv class to run the environment and receive various signals from it.

While it is possible to manually create an instance of envs.RLTaskEnv class for a desired task, this is not scalable as it requires specialized scripts for each task. Thus, we exploit the gymnasium.make() function to create the environment with the gym interface. We will learn how to do this in the next tutorial.