from omni.isaac.kit import SimulationApp
_kit = SimulationApp({
"headless": True,
"width": 1280,
"height": 720,
"window_width": 1920,
"window_height": 1080,
"renderer": "RTXRealTime"
})
from mylogger import logger
from omni.isaac.core.utils.stage import open_stage
from omni.isaac.core import World
from omni.isaac.core.utils.stage import open_stage
from omni.isaac.core.utils.extensions import enable_extension
from omni.isaac.wheeled_robots.robots import WheeledRobot
from isaacsim.core.api.robots import Robot
import numpy as np
import isaacsim.core.utils.numpy.rotations as rot_utils
from omni.isaac.core.utils.prims import define_prim
from pxr import UsdGeom
from utils import get_cur_dir
import cv2, os
from omni.isaac.sensor import Camera
from concurrent.futures import ThreadPoolExecutor
import omni.replicator.core as rep
class AstribotIsaacEnv:
"""
与 AstribotMujocoEnv 保持接口兼容,方便上层算法零改动切换。
参数尽量沿用旧签名,只把 mujoco 特有参数去掉了。
"""
def __init__(self,
robot_name: str,
usd_path: str,
robot_list: str,
joint_names_list: list,
gravity_compensation: bool = True,
frame_skip=1,
mode='window',
width=640,
height=480,
object_names=[],
camera_names=[]):
# ---------------- 加载 Isaac Sim 功能-----------------
global _kit
_kit.set_setting("/app/window/drawMouse", True)
enable_extension("omni.kit.widget.viewport") # 视口控件
enable_extension("omni.services.livestream.nvcf")
enable_extension("omni.isaac.sensor")
# ---------------- 创建 Isaac Sim 环境-----------------
self.world = World(stage_units_in_meters=1.0,
physics_dt=1/60,
rendering_dt=1/60)
self.world.scene.add_default_ground_plane()
self.add_robot(robot_name, usd_path)
self.head_camera = self.get_camera(robot_name, width, height)
self.executor = ThreadPoolExecutor(max_workers=4)
logger.info('has create world')
def add_robot(self, robot_name, usd_path):
usd_path = get_cur_dir() + '/usd/' + usd_path
define_prim(f"/World/{robot_name}", "Xform")
UsdGeom.Xform.Define(self.world.stage, f"/World/{robot_name}").GetPrim().GetReferences().AddReference(usd_path)
robot = Robot(prim_path=f"/World/{robot_name}", name=robot_name)
self.world.scene.add(robot)
logger.info('has load robot')
dx_path = get_cur_dir() + '/usd/dianxiang.usd'
define_prim(f"/World/{robot_name}/dianxiang", "Xform")
UsdGeom.Xform.Define(self.world.stage, f"/World/{robot_name}/dianxiang").GetPrim().GetReferences().AddReference(dx_path)
dianxiang = Robot(prim_path=f"/World/{robot_name}/dianxiang", name=f"{robot_name}_dianxiang")
self.world.scene.add(dianxiang)
logger.info('has load dianxiang')
def get_camera(self, robot_name, width=1280, height=720):
head_camera_path = f"/World/{robot_name}/head_link/camera_link/camera"
# 直接创建 Camera 对象
head_camera = Camera(
prim_path=head_camera_path,
resolution=(width, height),
translation=np.array([0, 0, 0]),
orientation=np.array([1, 0, 0, 0])
)
# 如果要拿深度
head_camera.initialize()
head_camera.add_distance_to_image_plane_to_frame() # 深度 annotator
# head_camera.set_clipping_range(0.1, 1000.0)
return head_camera
def save_camera_image(self, name, camera, frame_cnt, save_path="img"):
frame = camera.get_current_frame()
if frame is None or "distance_to_image_plane" not in frame:
logger.warning(f"{name} camera depth not ready")
return
depth = frame["distance_to_image_plane"]
depth = np.asarray(depth, dtype=np.float32) # 确保是 CPU 上的普通数组
d_max = depth.max()
if d_max == 0.0 or np.isnan(d_max):
logger.warning(f"{name} depth all zero/nan, skip")
return
vis = cv2.convertScaleAbs(depth, alpha=255.0 / d_max)
full_save_path = os.path.join(get_cur_dir(), save_path)
if not os.path.exists(full_save_path):
os.makedirs(full_save_path)
# 把图片数据拷出来,不要在里面做耗时 IO
fname = os.path.join(full_save_path, f"{name}_frame_{frame_cnt:06d}.png")
self.executor.submit(cv2.imwrite, fname, vis)
def physics_step(self, step_size):
pass
def step_forward(self):
global _kit
# self.world.reset()
# self.world.play()
frame_cnt = 0
while _kit.is_running():
self.world.step(render=True)
frame_cnt += 1
if frame_cnt % 30 == 0:
self.save_camera_image("head", self.head_camera, frame_cnt)
_kit.close()
def close(self):
self.head_camera = None
self.left_camera = None
self.right_camera = None
self.world.clear_all_callbacks()
self.world.scene.clear()
self.world = None
self.executor.shutdown(wait=True)
_kit.close()