We have now released v0.3.0! Please use the latest version for the best experience.

Source code for omni.isaac.orbit.utils.assets

# Copyright (c) 2022-2024, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Sub-module that defines the host-server where assets and resources are stored.

By default, we use the Isaac Sim Nucleus Server for hosting assets and resources. This makes
distribution of the assets easier and makes the repository smaller in size code-wise.

For more information, please check information on `Omniverse Nucleus`_.

.. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html
"""

import io
import os
import tempfile
from typing import Literal

import carb
import omni.client
import omni.isaac.core.utils.nucleus as nucleus_utils

# get assets root path
# note: we check only once at the start of the module to prevent multiple checks on the Nucleus Server
NUCLEUS_ASSET_ROOT_DIR = nucleus_utils.get_assets_root_path()
"""Path to the root directory on the Nucleus Server.

This is resolved using Isaac Sim's Nucleus API. If the Nucleus Server is not running, then this
will be set to None. The path is resolved using the following steps:

1. Based on simulation parameter: ``/persistent/isaac/asset_root/default``.
2. Iterating over all the connected Nucleus Servers and checking for the first server that has the
   the connected status.
3. Based on simulation parameter: ``/persistent/isaac/asset_root/cloud``.
"""

# check nucleus connection
if NUCLEUS_ASSET_ROOT_DIR is None:
    msg = (
        "Unable to perform Nucleus login on Omniverse. Assets root path is not set.\n"
        "\tPlease check: https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/overview.html#omniverse-nucleus"
    )
    carb.log_error(msg)
    raise RuntimeError(msg)

NVIDIA_NUCLEUS_DIR = f"{NUCLEUS_ASSET_ROOT_DIR}/NVIDIA"
"""Path to the root directory on the NVIDIA Nucleus Server."""

ISAAC_NUCLEUS_DIR = f"{NUCLEUS_ASSET_ROOT_DIR}/Isaac"
"""Path to the ``Isaac`` directory on the NVIDIA Nucleus Server."""

ISAAC_ORBIT_NUCLEUS_DIR = f"{ISAAC_NUCLEUS_DIR}/Samples/Orbit"
"""Path to the ``Isaac/Samples/Orbit`` directory on the NVIDIA Nucleus Server."""


[docs]def check_file_path(path: str) -> Literal[0, 1, 2]: """Checks if a file exists on the Nucleus Server or locally. Args: path: The path to the file. Returns: The status of the file. Possible values are listed below. * :obj:`0` if the file does not exist * :obj:`1` if the file exists locally * :obj:`2` if the file exists on the Nucleus Server """ if os.path.isfile(path): return 1 elif omni.client.stat(path)[0] == omni.client.Result.OK: return 2 else: return 0
[docs]def retrieve_file_path(path: str, download_dir: str | None = None, force_download: bool = True) -> str: """Retrieves the path to a file on the Nucleus Server or locally. If the file exists locally, then the absolute path to the file is returned. If the file exists on the Nucleus Server, then the file is downloaded to the local machine and the absolute path to the file is returned. Args: path: The path to the file. download_dir: The directory where the file should be downloaded. Defaults to None, in which case the file is downloaded to the system's temporary directory. force_download: Whether to force download the file from the Nucleus Server. This will overwrite the local file if it exists. Defaults to True. Returns: The path to the file on the local machine. Raises: FileNotFoundError: When the file not found locally or on Nucleus Server. RuntimeError: When the file cannot be copied from the Nucleus Server to the local machine. This can happen when the file already exists locally and :attr:`force_download` is set to False. """ # check file status file_status = check_file_path(path) if file_status == 1: return os.path.abspath(path) elif file_status == 2: # resolve download directory if download_dir is None: download_dir = tempfile.gettempdir() else: download_dir = os.path.abspath(download_dir) # create download directory if it does not exist if not os.path.exists(download_dir): os.makedirs(download_dir) # download file in temp directory using os file_name = os.path.basename(omni.client.break_url(path).path) target_path = os.path.join(download_dir, file_name) # check if file already exists locally if not os.path.isfile(target_path) or force_download: # copy file to local machine result = omni.client.copy(path, target_path) if result != omni.client.Result.OK and force_download: raise RuntimeError(f"Unable to copy file: '{path}'. Is the Nucleus Server running?") return os.path.abspath(target_path) else: raise FileNotFoundError(f"Unable to find the file: {path}")
[docs]def read_file(path: str) -> io.BytesIO: """Reads a file from the Nucleus Server or locally. Args: path: The path to the file. Raises: FileNotFoundError: When the file not found locally or on Nucleus Server. Returns: The content of the file. """ # check file status file_status = check_file_path(path) if file_status == 1: with open(path, "rb") as f: return io.BytesIO(f.read()) elif file_status == 2: file_content = omni.client.read_file(path)[2] return io.BytesIO(memoryview(file_content).tobytes()) else: raise FileNotFoundError(f"Unable to find the file: {path}")