diff --git a/source/extensions/omni.isaac.orbit/config/extension.toml b/source/extensions/omni.isaac.orbit/config/extension.toml index 58bfe4343d..9cd0c873a0 100644 --- a/source/extensions/omni.isaac.orbit/config/extension.toml +++ b/source/extensions/omni.isaac.orbit/config/extension.toml @@ -1,7 +1,7 @@ [package] # Note: Semantic Versioning is used: https://semver.org/ -version = "0.9.43" +version = "0.9.44" # Description title = "ORBIT framework for Robot Learning" diff --git a/source/extensions/omni.isaac.orbit/docs/CHANGELOG.rst b/source/extensions/omni.isaac.orbit/docs/CHANGELOG.rst index 37ee69f33c..6b0adad876 100644 --- a/source/extensions/omni.isaac.orbit/docs/CHANGELOG.rst +++ b/source/extensions/omni.isaac.orbit/docs/CHANGELOG.rst @@ -1,6 +1,16 @@ Changelog --------- +0.9.44 (2023-11-16) +~~~~~~~~~~~~~~~~~~~ + +Added +^^^^^ + +* Added methods :meth:`reset` and :meth:`step` to the :class:`omni.isaac.orbit.envs.BaseEnv`. This unifies + the environment interface for simple standalone applications with the class. + + 0.9.43 (2023-11-16) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/base_env.py b/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/base_env.py index 40fd8973b0..d7c65bef0a 100644 --- a/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/base_env.py +++ b/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/base_env.py @@ -6,6 +6,8 @@ from __future__ import annotations import builtins +import torch +from typing import Any, Dict, Sequence, Union import omni.isaac.core.utils.torch as torch_utils @@ -16,6 +18,29 @@ from .base_env_cfg import BaseEnvCfg +VecEnvObs = Dict[str, Union[torch.Tensor, Dict[str, torch.Tensor]]] +"""Observation returned by the environment. + +The observations are stored in a dictionary. The keys are the group to which the observations belong. +This is useful for various setups such as reinforcement learning with asymmetric actor-critic or +multi-agent learning. For non-learning paradigms, this may include observations for different components +of a system. + +Within each group, the observations can be stored either as a dictionary with keys as the names of each +observation term in the group, or a single tensor obtained from concatenating all the observation terms. +For example, for asymmetric actor-critic, the observation for the actor and the critic can be accessed +using the keys ``"policy"`` and ``"critic"`` respectively. + +Note: + By default, most learning frameworks deal with default and privileged observations in different ways. + This handling must be taken care of by the wrapper around the :class:`RLTaskEnv` instance. + + For included frameworks (RSL-RL, RL-Games, skrl), the observations must have the key "policy". In case, + the key "critic" is also present, then the critic observations are taken from the "critic" group. + Otherwise, they are the same as the "policy" group. + +""" + class BaseEnv: """The base environment encapsulates the simulation scene and the environment managers. @@ -112,6 +137,9 @@ def __init__(self, cfg: BaseEnvCfg): # if no window, then we don't need to store the window self._window = None + # allocate dictionary to store metrics + self.extras = {} + def __del__(self): """Cleanup for the environment.""" self.close() @@ -171,6 +199,66 @@ def load_managers(self): Operations - MDP. """ + def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[VecEnvObs, dict]: + """Resets all the environments and returns observations. + + Args: + seed: The seed to use for randomization. Defaults to None, in which case the seed is not set. + options: Additional information to specify how the environment is reset. Defaults to None. + + Note: + This argument is used for compatibility with Gymnasium environment definition. + + Returns: + A tuple containing the observations and extras. + """ + # set the seed + if seed is not None: + self.seed(seed) + # reset state of scene + indices = torch.arange(self.num_envs, dtype=torch.int64, device=self.device) + self._reset_idx(indices) + # return observations + return self.observation_manager.compute(), self.extras + + def step(self, action: torch.Tensor) -> VecEnvObs: + """Execute one time-step of the environment's dynamics. + + The environment steps forward at a fixed time-step, while the physics simulation is + decimated at a lower time-step. This is to ensure that the simulation is stable. These two + time-steps can be configured independently using the :attr:`BaseEnvCfg.decimation` (number of + simulation steps per environment step) and the :attr:`BaseEnvCfg.sim.dt` (physics time-step). + Based on these parameters, the environment time-step is computed as the product of the two. + + Args: + action: The actions to apply on the environment. Shape is ``(num_envs, action_dim)``. + + Returns: + A tuple containing the observations and extras. + """ + # process actions + self.action_manager.process_action(action) + # perform physics stepping + for _ in range(self.cfg.decimation): + # set actions into buffers + self.action_manager.apply_action() + # set actions into simulator + self.scene.write_data_to_sim() + # simulate + self.sim.step(render=False) + # update buffers at sim dt + self.scene.update(dt=self.physics_dt) + # perform rendering if gui is enabled + if self.sim.has_gui(): + self.sim.render() + + # post-step: step interval randomization + if "interval" in self.randomization_manager.available_modes: + self.randomization_manager.randomize(mode="interval", dt=self.step_dt) + + # return observations and extras + return self.observation_manager.compute(), self.extras + @staticmethod def seed(seed: int = -1) -> int: """Set the seed for the environment. @@ -202,3 +290,33 @@ def close(self): self._window = None # update closing status self._is_closed = True + + """ + Helper functions. + """ + + def _reset_idx(self, env_ids: Sequence[int]): + """Reset environments based on specified indices. + + Args: + env_ids: List of environment ids which must be reset + """ + # reset the internal buffers of the scene elements + self.scene.reset(env_ids) + # randomize the MDP for environments that need a reset + if "reset" in self.randomization_manager.available_modes: + self.randomization_manager.randomize(env_ids=env_ids, mode="reset") + + # iterate over all managers and reset them + # this returns a dictionary of information which is stored in the extras + # note: This is order-sensitive! Certain things need be reset before others. + self.extras["log"] = dict() + # -- observation manager + info = self.observation_manager.reset(env_ids) + self.extras["log"].update(info) + # -- action manager + info = self.action_manager.reset(env_ids) + self.extras["log"].update(info) + # -- randomization manager + info = self.randomization_manager.reset(env_ids) + self.extras["log"].update(info) diff --git a/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/rl_task_env.py b/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/rl_task_env.py index d1f99bdc6e..6cd0fa9ec8 100644 --- a/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/rl_task_env.py +++ b/source/extensions/omni.isaac.orbit/omni/isaac/orbit/envs/rl_task_env.py @@ -9,40 +9,16 @@ import math import numpy as np import torch -from typing import Any, ClassVar, Dict, Sequence, Tuple, Union +from typing import Any, ClassVar, Dict, Sequence, Tuple from omni.isaac.version import get_version from omni.isaac.orbit.command_generators import CommandGeneratorBase from omni.isaac.orbit.managers import CurriculumManager, RewardManager, TerminationManager -from .base_env import BaseEnv +from .base_env import BaseEnv, VecEnvObs from .rl_task_env_cfg import RLTaskEnvCfg -VecEnvObs = Dict[str, Union[torch.Tensor, Dict[str, torch.Tensor]]] -"""Observation returned by the environment. - -The observations are stored in a dictionary. The keys are the group to which the observations belong. -This is useful for various learning setups beyond vanilla reinforcement learning, such as asymmetric -actor-critic, multi-agent, or hierarchical reinforcement learning. - -For example, for asymmetric actor-critic, the observation for the actor and the critic can be accessed -using the keys ``"policy"`` and ``"critic"`` respectively. - -Within each group, the observations can be stored either as a dictionary with keys as the names of each -observation term in the group, or a single tensor obtained from concatenating all the observation terms. - -Note: - By default, most learning frameworks deal with default and privileged observations in different ways. - This handling must be taken care of by the wrapper around the :class:`RLTaskEnv` instance. - - For included frameworks (RSL-RL, RL-Games, skrl), the observations must have the key "policy". In case, - the key "critic" is also present, then the critic observations are taken from the "critic" group. - Otherwise, they are the same as the "policy" group. - -""" - - VecEnvStepReturn = Tuple[VecEnvObs, torch.Tensor, torch.Tensor, torch.Tensor, Dict] """The environment signals processed at the end of each step. @@ -76,6 +52,14 @@ class RLTaskEnv(BaseEnv, gym.Env): environment. Thus, to reduce complexity, we directly use the :class:`gym.Env` over here and leave it up to library-defined wrappers to take care of wrapping this environment for their agents. + + Note: + For vectorized environments, it is recommended to **only** call the :meth:`reset` + method once before the first call to :meth:`step`, i.e. after the environment is created. + After that, the :meth:`step` function handles the reset of terminated sub-environments. + This is because the simulator does not support resetting individual sub-environments + in a vectorized environment. + """ is_vector_env: ClassVar[bool] = True @@ -107,8 +91,6 @@ def __init__(self, cfg: RLTaskEnvCfg, render_mode: str | None = None, **kwargs): self.common_step_counter = 0 # -- init buffers self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long) - # -- allocate dictionary to store metrics - self.extras = {} # setup the action and observation spaces for Gym self._configure_gym_env_spaces() @@ -158,48 +140,18 @@ def load_managers(self): Operations - MDP """ - def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[VecEnvObs, dict]: - """Resets all the environments and returns observations and extras. - - Note: - This function (if called) must **only** be called before the first call to :meth:`step`, i.e. - after the environment is created. After that, the :meth:`step` function handles the reset - of terminated sub-environments. - - Args: - seed: The seed to use for randomization. Defaults to None, in which case the seed is not set. - options: Additional information to specify how the environment is reset. Defaults to None. - - Note: - This is not used in the current implementation. It is mostly there for compatibility with - Gymnasium environment definition. - - Returns: - A tuple containing the observations and extras. - """ - # set the seed - if seed is not None: - gym.Env.reset(self, seed=seed) - self.seed(seed) - # reset state of scene - indices = torch.arange(self.num_envs, dtype=torch.int64, device=self.device) - self._reset_idx(indices) - # return observations - return self.observation_manager.compute(), self.extras - def step(self, action: torch.Tensor) -> VecEnvStepReturn: - """Run one timestep of the environment's dynamics and reset terminated environments. + """Execute one time-step of the environment's dynamics and reset terminated environments. - The environment dynamics may comprise of many steps of the physics engine. The number of steps - is controlled by the :attr:`RLTaskEnvCfg.decimation` parameter in the configuration. This means - that the agent control can happen at a slower rate than the physics simulation. This is useful - for real-time control of the robot, where the control loop may be slower than the frequency of - the actual dynamics. + Unlike the :class:`BaseEnv.step` class, the function performs the following operations: - The function also handles resetting of the terminated environments, at the end of the physics - stepping and computation of the reward and terminated signals. This is because it is not - possible to reset the sub-environments individually due to the vectorized implementation - of sub-environments in the simulator. + 1. Process the actions. + 2. Perform physics stepping. + 3. Perform rendering if gui is enabled. + 4. Update the environment counters and compute the rewards and terminations. + 5. Reset the environments that terminated. + 6. Compute the observations. + 7. Return the observations, rewards, resets and extras. Args: action: The actions to apply on the environment. Shape is ``(num_envs, action_dim)``. @@ -255,12 +207,12 @@ def render(self) -> np.ndarray | None: By convention, if mode is: - - **human**: render to the current display and return nothing. Usually for human consumption. + - **human**: Render to the current display and return nothing. Usually for human consumption. - **rgb_array**: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video. Returns: - The rendered image as a numpy array if mode is "rgb_array". + The rendered image as a numpy array if mode is "rgb_array". Otherwise, returns None. Raises: RuntimeError: If mode is set to "rgb_data" and simulation render mode does not support it. diff --git a/source/extensions/omni.isaac.orbit/test/envs/check_base_env_anymal_locomotion.py b/source/extensions/omni.isaac.orbit/test/envs/check_base_env_anymal_locomotion.py new file mode 100644 index 0000000000..b1942f19b1 --- /dev/null +++ b/source/extensions/omni.isaac.orbit/test/envs/check_base_env_anymal_locomotion.py @@ -0,0 +1,255 @@ +# Copyright (c) 2022-2023, The ORBIT Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +""" +This script demonstrates the environment concept that combines a scene with an action, +observation and randomization manager for a quadruped robot. + +A locomotion policy is loaded and used to control the robot. This shows how to use the +environment with a policy. +""" + +from __future__ import annotations + +"""Launch Isaac Sim Simulator first.""" + + +import argparse + +from omni.isaac.orbit.app import AppLauncher + +# add argparse arguments +parser = argparse.ArgumentParser(description="This script demonstrates how to use the concept of an Environment.") +parser.add_argument("--num_envs", type=int, default=64, help="Number of environments to spawn.") + +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +# parse the arguments +args_cli = parser.parse_args() + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" +import os +import torch +import traceback + +import carb +from omni.isaac.orbit_assets import ORBIT_ASSETS_DATA_DIR + +import omni.isaac.orbit.envs.mdp as mdp +import omni.isaac.orbit.sim as sim_utils +from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg +from omni.isaac.orbit.assets.config.anymal import ANYMAL_C_CFG +from omni.isaac.orbit.envs import BaseEnv, BaseEnvCfg +from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup +from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm +from omni.isaac.orbit.managers import RandomizationTermCfg as RandTerm +from omni.isaac.orbit.managers import SceneEntityCfg +from omni.isaac.orbit.scene import InteractiveSceneCfg +from omni.isaac.orbit.sensors import RayCasterCfg, patterns +from omni.isaac.orbit.terrains import TerrainImporterCfg +from omni.isaac.orbit.utils import configclass +from omni.isaac.orbit.utils.assets import check_file_path +from omni.isaac.orbit.utils.noise import AdditiveUniformNoiseCfg as Unoise + +## +# Pre-defined configs +## +from omni.isaac.orbit.terrains.config.rough import ROUGH_TERRAINS_CFG # isort: skip + + +## +# Scene definition +## + + +@configclass +class MySceneCfg(InteractiveSceneCfg): + """Example scene configuration.""" + + # add terrain + terrain = TerrainImporterCfg( + prim_path="/World/ground", + terrain_type="generator", + terrain_generator=ROUGH_TERRAINS_CFG, + physics_material=sim_utils.RigidBodyMaterialCfg( + friction_combine_mode="multiply", + restitution_combine_mode="multiply", + static_friction=1.0, + dynamic_friction=1.0, + ), + debug_vis=False, + ) + + # add robot + robot: ArticulationCfg = ANYMAL_C_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot") + + # sensors + height_scanner = RayCasterCfg( + prim_path="{ENV_REGEX_NS}/Robot/base", + offset=RayCasterCfg.OffsetCfg(pos=(0.0, 0.0, 20.0)), + attach_yaw_only=True, + pattern_cfg=patterns.GridPatternCfg(resolution=0.1, size=[1.6, 1.0]), + debug_vis=True, + mesh_prim_paths=["/World/ground"], + ) + + # lights + light = AssetBaseCfg( + prim_path="/World/light", + spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0), + ) + + +## +# MDP settings +## + + +def constant_commands(env: BaseEnv) -> torch.Tensor: + """The generated command from the command generator.""" + return torch.tensor([[1, 0, 0]], device=env.device).repeat(env.num_envs, 1) + + +@configclass +class ActionsCfg: + """Action specifications for the MDP.""" + + joint_pos = mdp.JointPositionActionCfg(asset_name="robot", joint_names=[".*"], scale=0.5, use_default_offset=True) + + +@configclass +class ObservationsCfg: + """Observation specifications for the MDP.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # observation terms (order preserved) + base_lin_vel = ObsTerm(func=mdp.base_lin_vel, noise=Unoise(n_min=-0.1, n_max=0.1)) + base_ang_vel = ObsTerm(func=mdp.base_ang_vel, noise=Unoise(n_min=-0.2, n_max=0.2)) + projected_gravity = ObsTerm( + func=mdp.projected_gravity, + noise=Unoise(n_min=-0.05, n_max=0.05), + ) + velocity_commands = ObsTerm(func=constant_commands) + joint_pos = ObsTerm(func=mdp.joint_pos_rel, noise=Unoise(n_min=-0.01, n_max=0.01)) + joint_vel = ObsTerm(func=mdp.joint_vel_rel, noise=Unoise(n_min=-1.5, n_max=1.5)) + actions = ObsTerm(func=mdp.last_action) + height_scan = ObsTerm( + func=mdp.height_scan, + params={"sensor_cfg": SceneEntityCfg("height_scanner")}, + noise=Unoise(n_min=-0.1, n_max=0.1), + clip=(-1.0, 1.0), + ) + + def __post_init__(self): + self.enable_corruption = True + self.concatenate_terms = True + + # observation groups + policy: PolicyCfg = PolicyCfg() + + +@configclass +class RandomizationCfg: + """Configuration for randomization.""" + + reset_base = RandTerm( + func=mdp.reset_root_state_uniform, + mode="reset", + params={ + "pose_range": {"x": (-0.5, 0.5), "y": (-0.5, 0.5), "yaw": (-3.14, 3.14)}, + "velocity_range": { + "x": (-0.5, 0.5), + "y": (-0.5, 0.5), + "z": (-0.5, 0.5), + "roll": (-0.5, 0.5), + "pitch": (-0.5, 0.5), + "yaw": (-0.5, 0.5), + }, + }, + ) + + +## +# Environment configuration +## + + +@configclass +class QuadrupedEnvCfg(BaseEnvCfg): + """Configuration for the locomotion velocity-tracking environment.""" + + # Scene settings + scene: MySceneCfg = MySceneCfg(num_envs=args_cli.num_envs, env_spacing=2.5, replicate_physics=True) + # Basic settings + observations: ObservationsCfg = ObservationsCfg() + actions: ActionsCfg = ActionsCfg() + randomization: RandomizationCfg = RandomizationCfg() + + def __post_init__(self): + """Post initialization.""" + # general settings + self.decimation = 4 + self.episode_length_s = 20.0 + # simulation settings + self.sim.dt = 0.005 + # update sensor update periods + # we tick all the sensors based on the smallest update period (physics update period) + if self.scene.height_scanner is not None: + self.scene.height_scanner.update_period = self.decimation * self.sim.dt + + +def main(): + """Main function.""" + + # setup base environment + env = BaseEnv(cfg=QuadrupedEnvCfg()) + obs, _ = env.reset() + + # load level policy + policy_path = os.path.join(ORBIT_ASSETS_DATA_DIR, "Policies", "ANYmal-C", "policy.pt") + # check if policy file exists + if not check_file_path(policy_path): + raise FileNotFoundError(f"Policy file '{policy_path}' does not exist.") + # jit load the policy + locomotion_policy = torch.jit.load(policy_path) + locomotion_policy.to(env.device) + locomotion_policy.eval() + + # simulate physics + count = 0 + while simulation_app.is_running(): + with torch.inference_mode(): + # reset + if count % 1000 == 0: + obs, _ = env.reset() + count = 0 + print("[INFO]: Resetting robots state...") + + # infer action + action = locomotion_policy(obs["policy"]) + # step env + obs, _ = env.step(action) + # update counter + count += 1 + + +if __name__ == "__main__": + try: + # run the main execution + main() + except Exception as err: + carb.log_error(err) + carb.log_error(traceback.format_exc()) + raise + finally: + # close sim app + simulation_app.close() diff --git a/source/extensions/omni.isaac.orbit/test/envs/check_base_env_floating_cube.py b/source/extensions/omni.isaac.orbit/test/envs/check_base_env_floating_cube.py new file mode 100644 index 0000000000..19cce4bed4 --- /dev/null +++ b/source/extensions/omni.isaac.orbit/test/envs/check_base_env_floating_cube.py @@ -0,0 +1,276 @@ +# Copyright (c) 2022-2023, The ORBIT Project Developers. +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +""" +This script demonstrates the base environment concept that combines a scene with an action, +observation and randomization manager for a floating cube. +""" + +from __future__ import annotations + +"""Launch Isaac Sim Simulator first.""" + + +import argparse + +from omni.isaac.orbit.app import AppLauncher + +# add argparse arguments +parser = argparse.ArgumentParser(description="This script demonstrates how to use the concept of an Environment.") +parser.add_argument("--num_envs", type=int, default=64, help="Number of environments to spawn.") + +# append AppLauncher cli args +AppLauncher.add_app_launcher_args(parser) +# parse the arguments +args_cli = parser.parse_args() + +# launch omniverse app +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" +import torch +import traceback + +import carb + +import omni.isaac.orbit.envs.mdp as mdp +import omni.isaac.orbit.sim as sim_utils +from omni.isaac.orbit.assets import AssetBaseCfg, RigidObject, RigidObjectCfg +from omni.isaac.orbit.envs import BaseEnv, BaseEnvCfg +from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup +from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm +from omni.isaac.orbit.managers import RandomizationTermCfg as RandTerm +from omni.isaac.orbit.managers import SceneEntityCfg +from omni.isaac.orbit.managers.action_manager import ActionTerm, ActionTermCfg +from omni.isaac.orbit.scene import InteractiveSceneCfg +from omni.isaac.orbit.terrains import TerrainImporterCfg +from omni.isaac.orbit.utils import configclass + +## +# Scene definition +## + + +@configclass +class MySceneCfg(InteractiveSceneCfg): + """Example scene configuration.""" + + # add terrain + terrain = TerrainImporterCfg(prim_path="/World/ground", terrain_type="plane", debug_vis=False) + + # add cube + cube: RigidObjectCfg = RigidObjectCfg( + prim_path="{ENV_REGEX_NS}/cube", + spawn=sim_utils.CuboidCfg( + size=(0.2, 0.2, 0.2), + rigid_props=sim_utils.RigidBodyPropertiesCfg(max_depenetration_velocity=1.0), + mass_props=sim_utils.MassPropertiesCfg(mass=1.0), + physics_material=sim_utils.RigidBodyMaterialCfg(), + visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.5, 0.0, 0.0)), + ), + init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 5)), + ) + + # lights + light = AssetBaseCfg( + prim_path="/World/light", + spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0), + ) + + +## +# Action Term +## + + +class CubeActionTerm(ActionTerm): + """Simple action term that implements a PD controller to track a target position.""" + + _asset: RigidObject + """The articulation asset on which the action term is applied.""" + + def __init__(self, cfg: ActionTermCfg, env: BaseEnv): + # call super constructor + super().__init__(cfg, env) + # create buffers + self._raw_actions = torch.zeros(env.num_envs, 3, device=self.device) + self._processed_actions = torch.zeros(env.num_envs, 3, device=self.device) + self._vel_command = torch.zeros(self.num_envs, 6, device=self.device) + # gains of controller + self.p_gain = 5.0 + self.d_gain = 0.5 + + """ + Properties. + """ + + @property + def action_dim(self) -> int: + return self._raw_actions.shape[1] + + @property + def raw_actions(self) -> torch.Tensor: + # desired: (x, y, z) + return self._raw_actions + + @property + def processed_actions(self) -> torch.Tensor: + return self._processed_actions + + """ + Operations + """ + + def process_actions(self, actions: torch.Tensor): + # store the raw actions + self._raw_actions[:] = actions + # no-processing of actions + self._processed_actions[:] = self._raw_actions[:] + + def apply_actions(self): + # implement a PD controller to track the target position + pos_error = self._processed_actions - (self._asset.data.root_pos_w - self._env.scene.env_origins) + vel_error = -self._asset.data.root_lin_vel_w + # set velocity targets + self._vel_command[:, :3] = self.p_gain * pos_error + self.d_gain * vel_error + self._asset.write_root_velocity_to_sim(self._vel_command) + + +@configclass +class CubeActionTermCfg(ActionTermCfg): + """Configuration for the cube action term.""" + + class_type: type = CubeActionTerm + + +## +# Observation Term +## + + +def base_position(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor: + """Root linear velocity in the asset's root frame.""" + # extract the used quantities (to enable type-hinting) + asset: RigidObject = env.scene[asset_cfg.name] + return asset.data.root_pos_w - env.scene.env_origins + + +## +# Environment settings +## + + +@configclass +class ActionsCfg: + """Action specifications for the MDP.""" + + joint_pos = CubeActionTermCfg(asset_name="cube") + + +@configclass +class ObservationsCfg: + """Observation specifications for the MDP.""" + + @configclass + class PolicyCfg(ObsGroup): + """Observations for policy group.""" + + # cube velocity + position = ObsTerm(func=base_position, params={"asset_cfg": SceneEntityCfg("cube")}) + + def __post_init__(self): + self.enable_corruption = True + self.concatenate_terms = True + + # observation groups + policy: PolicyCfg = PolicyCfg() + + +@configclass +class RandomizationCfg: + """Configuration for randomization.""" + + reset_base = RandTerm( + func=mdp.reset_root_state_uniform, + mode="reset", + params={ + "pose_range": {"x": (-0.5, 0.5), "y": (-0.5, 0.5), "yaw": (-3.14, 3.14)}, + "velocity_range": { + "x": (-0.5, 0.5), + "y": (-0.5, 0.5), + "z": (-0.5, 0.5), + }, + "asset_cfg": SceneEntityCfg("cube"), + }, + ) + + +## +# Environment configuration +## + + +@configclass +class CubeEnvCfg(BaseEnvCfg): + """Configuration for the locomotion velocity-tracking environment.""" + + # Scene settings + scene: MySceneCfg = MySceneCfg(num_envs=args_cli.num_envs, env_spacing=2.5, replicate_physics=True) + # Basic settings + observations: ObservationsCfg = ObservationsCfg() + actions: ActionsCfg = ActionsCfg() + randomization: RandomizationCfg = RandomizationCfg() + + def __post_init__(self): + """Post initialization.""" + # general settings + self.decimation = 2 + # simulation settings + self.sim.dt = 0.01 + self.sim.physics_material = self.scene.terrain.physics_material + + +def main(): + """Main function.""" + + # setup base environment + env = BaseEnv(cfg=CubeEnvCfg()) + + # setup target position commands + target_position = torch.rand(env.num_envs, 3, device=env.device) * 2 + target_position[:, 2] += 2.0 + # offset all targets so that they move to the world origin + target_position -= env.scene.env_origins + + # simulate physics + count = 0 + while simulation_app.is_running(): + with torch.inference_mode(): + # reset + if count % 300 == 0: + env.reset() + count = 0 + + # step env + obs, _ = env.step(target_position) + # print mean squared position error between target and current position + error = torch.norm(obs["policy"] - target_position).mean().item() + print(f"[Step: {count:04d}]: Mean position error: {error:.4f}") + # update counter + count += 1 + + +if __name__ == "__main__": + try: + # run the main execution + main() + except Exception as err: + carb.log_error(err) + carb.log_error(traceback.format_exc()) + raise + finally: + # close sim app + simulation_app.close()