We have now released v0.3.0! Please use the latest version for the best experience.

Source code for omni.isaac.orbit.sensors.sensor_base

# Copyright (c) 2022-2024, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Base class for sensors.

This class defines an interface for sensors similar to how the :class:`omni.isaac.orbit.assets.AssetBase` class works.
Each sensor class should inherit from this class and implement the abstract methods.
"""

from __future__ import annotations

import inspect
import torch
import weakref
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

import omni.kit.app
import omni.timeline

import omni.isaac.orbit.sim as sim_utils

if TYPE_CHECKING:
    from .sensor_base_cfg import SensorBaseCfg


[docs]class SensorBase(ABC): """The base class for implementing a sensor. The implementation is based on lazy evaluation. The sensor data is only updated when the user tries accessing the data through the :attr:`data` property or sets ``force_compute=True`` in the :meth:`update` method. This is done to avoid unnecessary computation when the sensor data is not used. The sensor is updated at the specified update period. If the update period is zero, then the sensor is updated at every simulation step. """
[docs] def __init__(self, cfg: SensorBaseCfg): """Initialize the sensor class. Args: cfg: The configuration parameters for the sensor. """ # check that config is valid if cfg.history_length < 0: raise ValueError(f"History length must be greater than 0! Received: {cfg.history_length}") # store inputs self.cfg = cfg # flag for whether the sensor is initialized self._is_initialized = False # flag for whether the sensor is in visualization mode self._is_visualizing = False # note: Use weakref on callbacks to ensure that this object can be deleted when its destructor is called. # add callbacks for stage play/stop # The order is set to 10 which is arbitrary but should be lower priority than the default order of 0 timeline_event_stream = omni.timeline.get_timeline_interface().get_timeline_event_stream() self._initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type( int(omni.timeline.TimelineEventType.PLAY), lambda event, obj=weakref.proxy(self): obj._initialize_callback(event), order=10, ) self._invalidate_initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type( int(omni.timeline.TimelineEventType.STOP), lambda event, obj=weakref.proxy(self): obj._invalidate_initialize_callback(event), order=10, ) # add handle for debug visualization (this is set to a valid handle inside set_debug_vis) self._debug_vis_handle = None # set initial state of debug visualization self.set_debug_vis(self.cfg.debug_vis)
def __del__(self): """Unsubscribe from the callbacks.""" # clear physics events handles if self._initialize_handle: self._initialize_handle.unsubscribe() self._initialize_handle = None if self._invalidate_initialize_handle: self._invalidate_initialize_handle.unsubscribe() self._invalidate_initialize_handle = None # clear debug visualization if self._debug_vis_handle: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None """ Properties """ @property def num_instances(self) -> int: """Number of instances of the sensor. This is equal to the number of sensors per environment multiplied by the number of environments. """ return self._num_envs @property def device(self) -> str: """Memory device for computation.""" return self._device @property @abstractmethod def data(self) -> Any: """Data from the sensor. This property is only updated when the user tries to access the data. This is done to avoid unnecessary computation when the sensor data is not used. For updating the sensor when this property is accessed, you can use the following code snippet in your sensor implementation: .. code-block:: python # update sensors if needed self._update_outdated_buffers() # return the data (where `_data` is the data for the sensor) return self._data """ raise NotImplementedError @property def has_debug_vis_implementation(self) -> bool: """Whether the sensor has a debug visualization implemented.""" # check if function raises NotImplementedError source_code = inspect.getsource(self._set_debug_vis_impl) return "NotImplementedError" not in source_code """ Operations """
[docs] def set_debug_vis(self, debug_vis: bool) -> bool: """Sets whether to visualize the sensor data. Args: debug_vis: Whether to visualize the sensor data. Returns: Whether the debug visualization was successfully set. False if the sensor does not support debug visualization. """ # check if debug visualization is supported if not self.has_debug_vis_implementation: return False # toggle debug visualization objects self._set_debug_vis_impl(debug_vis) # toggle debug visualization flag self._is_visualizing = debug_vis # toggle debug visualization handles if debug_vis: # create a subscriber for the post update event if it doesn't exist if self._debug_vis_handle is None: app_interface = omni.kit.app.get_app_interface() self._debug_vis_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop( lambda event, obj=weakref.proxy(self): obj._debug_vis_callback(event) ) else: # remove the subscriber if it exists if self._debug_vis_handle is not None: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None # return success return True
[docs] def reset(self, env_ids: Sequence[int] | None = None): """Resets the sensor internals. Args: env_ids: The sensor ids to reset. Defaults to None. """ # Resolve sensor ids if env_ids is None: env_ids = slice(None) # Reset the timestamp for the sensors self._timestamp[env_ids] = 0.0 self._timestamp_last_update[env_ids] = 0.0 # Set all reset sensors to outdated so that they are updated when data is called the next time. self._is_outdated[env_ids] = True
def update(self, dt: float, force_recompute: bool = False): # Update the timestamp for the sensors self._timestamp += dt self._is_outdated |= self._timestamp - self._timestamp_last_update + 1e-6 >= self.cfg.update_period # Update the buffers # TODO (from @mayank): Why is there a history length here when it doesn't mean anything in the sensor base?!? # It is only for the contact sensor but there we should redefine the update function IMO. if force_recompute or self._is_visualizing or (self.cfg.history_length > 0): self._update_outdated_buffers() """ Implementation specific. """ @abstractmethod def _initialize_impl(self): """Initializes the sensor-related handles and internal buffers.""" # Obtain Simulation Context sim = sim_utils.SimulationContext.instance() if sim is None: raise RuntimeError("Simulation Context is not initialized!") # Obtain device and backend self._device = sim.device self._backend = sim.backend self._sim_physics_dt = sim.get_physics_dt() # Count number of environments env_prim_path_expr = self.cfg.prim_path.rsplit("/", 1)[0] self._parent_prims = sim_utils.find_matching_prims(env_prim_path_expr) self._num_envs = len(self._parent_prims) # Boolean tensor indicating whether the sensor data has to be refreshed self._is_outdated = torch.ones(self._num_envs, dtype=torch.bool, device=self._device) # Current timestamp (in seconds) self._timestamp = torch.zeros(self._num_envs, device=self._device) # Timestamp from last update self._timestamp_last_update = torch.zeros_like(self._timestamp) @abstractmethod def _update_buffers_impl(self, env_ids: Sequence[int]): """Fills the sensor data for provided environment ids. This function does not perform any time-based checks and directly fills the data into the data container. Args: env_ids: The indices of the sensors that are ready to capture. """ raise NotImplementedError def _set_debug_vis_impl(self, debug_vis: bool): """Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.") def _debug_vis_callback(self, event): """Callback for debug visualization. This function calls the visualization objects and sets the data to visualize into them. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.") """ Internal simulation callbacks. """ def _initialize_callback(self, event): """Initializes the scene elements. Note: PhysX handles are only enabled once the simulator starts playing. Hence, this function needs to be called whenever the simulator "plays" from a "stop" state. """ if not self._is_initialized: self._initialize_impl() self._is_initialized = True def _invalidate_initialize_callback(self, event): """Invalidates the scene elements.""" self._is_initialized = False """ Helper functions. """ def _update_outdated_buffers(self): """Fills the sensor data for the outdated sensors.""" outdated_env_ids = self._is_outdated.nonzero().squeeze(-1) if len(outdated_env_ids) > 0: # obtain new data self._update_buffers_impl(outdated_env_ids) # update the timestamp from last update self._timestamp_last_update[outdated_env_ids] = self._timestamp[outdated_env_ids] # set outdated flag to false for the updated sensors self._is_outdated[outdated_env_ids] = False