isaac sim获取深度图

作者:iohannes 发布时间: 2025-09-28 阅读量:15
from omni.isaac.kit import SimulationApp
_kit = SimulationApp({
    "headless": True,
    "width": 1280,
    "height": 720,
    "window_width": 1920,
    "window_height": 1080,
    "renderer": "RTXRealTime"
})


from mylogger import logger
from omni.isaac.core.utils.stage import open_stage
from omni.isaac.core import World
from omni.isaac.core.utils.stage import open_stage
from omni.isaac.core.utils.extensions import enable_extension
from omni.isaac.wheeled_robots.robots import WheeledRobot
from isaacsim.core.api.robots import Robot
import numpy as np
import isaacsim.core.utils.numpy.rotations as rot_utils
from omni.isaac.core.utils.prims import define_prim
from pxr import UsdGeom
from utils import get_cur_dir
import cv2, os
from omni.isaac.sensor import Camera
from concurrent.futures import ThreadPoolExecutor
import omni.replicator.core as rep


class AstribotIsaacEnv:
    """
    与 AstribotMujocoEnv 保持接口兼容,方便上层算法零改动切换。
    参数尽量沿用旧签名,只把 mujoco 特有参数去掉了。
    """
    def __init__(self,
                 robot_name: str,
                 usd_path: str,
                 robot_list: str,
                 joint_names_list: list,
                 gravity_compensation: bool = True,
                 frame_skip=1,
                 mode='window',
                 width=640,
                 height=480,
                 object_names=[],
                 camera_names=[]):

        # ---------------- 加载 Isaac Sim 功能-----------------        
        global _kit
        _kit.set_setting("/app/window/drawMouse", True)
        enable_extension("omni.kit.widget.viewport")   # 视口控件
        enable_extension("omni.services.livestream.nvcf")
        enable_extension("omni.isaac.sensor") 

        # ---------------- 创建 Isaac Sim 环境-----------------
        self.world = World(stage_units_in_meters=1.0,
                           physics_dt=1/60,
                           rendering_dt=1/60)
        self.world.scene.add_default_ground_plane()
        self.add_robot(robot_name, usd_path)

        self.head_camera = self.get_camera(robot_name, width, height)

        self.executor = ThreadPoolExecutor(max_workers=4)
        logger.info('has create world')

    def add_robot(self, robot_name, usd_path):
        usd_path = get_cur_dir() + '/usd/' + usd_path
        define_prim(f"/World/{robot_name}", "Xform")
        UsdGeom.Xform.Define(self.world.stage, f"/World/{robot_name}").GetPrim().GetReferences().AddReference(usd_path)
        robot = Robot(prim_path=f"/World/{robot_name}", name=robot_name)
        self.world.scene.add(robot)
        logger.info('has load robot')

        dx_path = get_cur_dir() + '/usd/dianxiang.usd'
        define_prim(f"/World/{robot_name}/dianxiang", "Xform")
        UsdGeom.Xform.Define(self.world.stage, f"/World/{robot_name}/dianxiang").GetPrim().GetReferences().AddReference(dx_path)
        dianxiang = Robot(prim_path=f"/World/{robot_name}/dianxiang", name=f"{robot_name}_dianxiang")        
        self.world.scene.add(dianxiang)
        logger.info('has load dianxiang')

    def get_camera(self, robot_name, width=1280, height=720):
        head_camera_path = f"/World/{robot_name}/head_link/camera_link/camera"

        # 直接创建 Camera 对象
        head_camera = Camera(
            prim_path=head_camera_path,
            resolution=(width, height),
            translation=np.array([0, 0, 0]),
            orientation=np.array([1, 0, 0, 0])
        )

        # 如果要拿深度
        head_camera.initialize()
        head_camera.add_distance_to_image_plane_to_frame()  # 深度 annotator
        # head_camera.set_clipping_range(0.1, 1000.0)
        return head_camera

    def save_camera_image(self, name, camera, frame_cnt, save_path="img"):
        frame = camera.get_current_frame()
        if frame is None or "distance_to_image_plane" not in frame:
            logger.warning(f"{name} camera depth not ready")
            return

        depth = frame["distance_to_image_plane"]
        depth = np.asarray(depth, dtype=np.float32)  # 确保是 CPU 上的普通数组
        d_max = depth.max()
        if d_max == 0.0 or np.isnan(d_max):
            logger.warning(f"{name} depth all zero/nan, skip")
            return

        vis = cv2.convertScaleAbs(depth, alpha=255.0 / d_max)

        full_save_path = os.path.join(get_cur_dir(), save_path)
        if not os.path.exists(full_save_path):
            os.makedirs(full_save_path)

        # 把图片数据拷出来,不要在里面做耗时 IO
        fname = os.path.join(full_save_path, f"{name}_frame_{frame_cnt:06d}.png")

        self.executor.submit(cv2.imwrite, fname, vis)


    def physics_step(self, step_size):
        pass

    def step_forward(self):
        global _kit

        # self.world.reset()
        # self.world.play()
        frame_cnt = 0
        while _kit.is_running():
            self.world.step(render=True)
            frame_cnt += 1
            if frame_cnt % 30 == 0:
                self.save_camera_image("head", self.head_camera, frame_cnt)

        _kit.close()

    def close(self):
        self.head_camera = None
        self.left_camera = None
        self.right_camera = None
        
        self.world.clear_all_callbacks()
        self.world.scene.clear()
        self.world = None

        self.executor.shutdown(wait=True)
        
        _kit.close()