323 lines
10 KiB
Python
323 lines
10 KiB
Python
import cv2
|
||
import numpy as np
|
||
import h5py
|
||
import time
|
||
|
||
|
||
|
||
def display_camera_grid(image_dict, grid_shape=None, window_name="MindRobot-V1 Data Collection", scale=1.0):
|
||
"""
|
||
显示多摄像头画面(保持原始比例,但可整体缩放)
|
||
|
||
参数:
|
||
image_dict: {摄像头名称: 图像numpy数组}
|
||
grid_shape: (行, 列) 布局,None自动计算
|
||
window_name: 窗口名称
|
||
scale: 整体显示缩放比例(0.5表示显示为原尺寸的50%)
|
||
"""
|
||
# 输入验证和数据处理(保持原代码不变)
|
||
if not isinstance(image_dict, dict):
|
||
raise TypeError("输入必须是字典类型")
|
||
|
||
valid_data = []
|
||
for name, img in image_dict.items():
|
||
if not isinstance(img, np.ndarray):
|
||
continue
|
||
if img.dtype != np.uint8:
|
||
img = img.astype(np.uint8)
|
||
if img.ndim == 2:
|
||
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
|
||
elif img.shape[2] == 4:
|
||
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||
elif img.shape[2] == 3:
|
||
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||
valid_data.append((name, img))
|
||
|
||
if not valid_data:
|
||
print("错误: 没有有效的图像可显示!")
|
||
return None
|
||
|
||
# 自动计算网格布局
|
||
num_valid = len(valid_data)
|
||
if grid_shape is None:
|
||
grid_shape = (1, num_valid) if num_valid <= 3 else (2, int(np.ceil(num_valid/2)))
|
||
|
||
rows, cols = grid_shape
|
||
|
||
# 计算每行/列的最大尺寸
|
||
row_heights = [0]*rows
|
||
col_widths = [0]*cols
|
||
|
||
for i, (_, img) in enumerate(valid_data[:rows*cols]):
|
||
r, c = i//cols, i%cols
|
||
row_heights[r] = max(row_heights[r], img.shape[0])
|
||
col_widths[c] = max(col_widths[c], img.shape[1])
|
||
|
||
# 计算画布总尺寸(应用整体缩放)
|
||
canvas_h = int(sum(row_heights) * scale)
|
||
canvas_w = int(sum(col_widths) * scale)
|
||
|
||
# 创建画布
|
||
canvas = np.zeros((canvas_h, canvas_w, 3), dtype=np.uint8)
|
||
|
||
# 计算每个子画面的显示区域
|
||
row_pos = [0] + [int(sum(row_heights[:i+1])*scale) for i in range(rows)]
|
||
col_pos = [0] + [int(sum(col_widths[:i+1])*scale) for i in range(cols)]
|
||
|
||
# 填充图像
|
||
for i, (name, img) in enumerate(valid_data[:rows*cols]):
|
||
r, c = i//cols, i%cols
|
||
|
||
# 计算当前图像的显示区域
|
||
x1, x2 = col_pos[c], col_pos[c+1]
|
||
y1, y2 = row_pos[r], row_pos[r+1]
|
||
|
||
# 计算当前图像的缩放后尺寸
|
||
display_h = int(img.shape[0] * scale)
|
||
display_w = int(img.shape[1] * scale)
|
||
|
||
# 缩放图像(保持比例)
|
||
resized_img = cv2.resize(img, (display_w, display_h))
|
||
|
||
# 放置到画布
|
||
canvas[y1:y1+display_h, x1:x1+display_w] = resized_img
|
||
|
||
# 添加标签(按比例缩放字体)
|
||
font_scale = 0.8 *scale
|
||
thickness = max(2, int(2 * scale))
|
||
cv2.putText(canvas, name, (x1+10, y1+30),
|
||
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255,255,255), thickness)
|
||
|
||
# 显示窗口(自动适应屏幕)
|
||
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
|
||
cv2.imshow(window_name, canvas)
|
||
cv2.resizeWindow(window_name, canvas_w, canvas_h)
|
||
cv2.waitKey(1)
|
||
|
||
return canvas
|
||
|
||
|
||
# 保存数据函数
|
||
def save_data(args, timesteps, actions, dataset_path):
|
||
# 数据字典
|
||
data_size = len(actions)
|
||
data_dict = {
|
||
# 一个是奖励里面的qpos,qvel, effort ,一个是实际发的acition
|
||
'/observations/qpos': [],
|
||
'/observations/qvel': [],
|
||
'/observations/effort': [],
|
||
'/action': [],
|
||
'/base_action': [],
|
||
# '/base_action_t265': [],
|
||
}
|
||
|
||
# 相机字典 观察的图像
|
||
for cam_name in args.camera_names:
|
||
data_dict[f'/observations/images/{cam_name}'] = []
|
||
if args.use_depth_image:
|
||
data_dict[f'/observations/images_depth/{cam_name}'] = []
|
||
|
||
# len(action): max_timesteps, len(time_steps): max_timesteps + 1
|
||
# 动作长度 遍历动作
|
||
while actions:
|
||
# 循环弹出一个队列
|
||
action = actions.pop(0) # 动作 当前动作
|
||
ts = timesteps.pop(0) # 奖励 前一帧
|
||
|
||
# 往字典里面添值
|
||
# Timestep返回的qpos,qvel,effort
|
||
data_dict['/observations/qpos'].append(ts.observation['qpos'])
|
||
data_dict['/observations/qvel'].append(ts.observation['qvel'])
|
||
data_dict['/observations/effort'].append(ts.observation['effort'])
|
||
|
||
# 实际发的action
|
||
data_dict['/action'].append(action)
|
||
data_dict['/base_action'].append(ts.observation['base_vel'])
|
||
|
||
# 相机数据
|
||
# data_dict['/base_action_t265'].append(ts.observation['base_vel_t265'])
|
||
for cam_name in args.camera_names:
|
||
data_dict[f'/observations/images/{cam_name}'].append(ts.observation['images'][cam_name])
|
||
if args.use_depth_image:
|
||
data_dict[f'/observations/images_depth/{cam_name}'].append(ts.observation['images_depth'][cam_name])
|
||
|
||
t0 = time.time()
|
||
with h5py.File(dataset_path + '.hdf5', 'w', rdcc_nbytes=1024**2*2) as root:
|
||
# 文本的属性:
|
||
# 1 是否仿真
|
||
# 2 图像是否压缩
|
||
#
|
||
root.attrs['sim'] = False
|
||
root.attrs['compress'] = False
|
||
|
||
# 创建一个新的组observations,观测状态组
|
||
# 图像组
|
||
obs = root.create_group('observations')
|
||
image = obs.create_group('images')
|
||
for cam_name in args.camera_names:
|
||
_ = image.create_dataset(cam_name, (data_size, 480, 640, 3), dtype='uint8',
|
||
chunks=(1, 480, 640, 3), )
|
||
if args.use_depth_image:
|
||
image_depth = obs.create_group('images_depth')
|
||
for cam_name in args.camera_names:
|
||
_ = image_depth.create_dataset(cam_name, (data_size, 480, 640), dtype='uint16',
|
||
chunks=(1, 480, 640), )
|
||
|
||
_ = obs.create_dataset('qpos', (data_size, 14))
|
||
_ = obs.create_dataset('qvel', (data_size, 14))
|
||
_ = obs.create_dataset('effort', (data_size, 14))
|
||
_ = root.create_dataset('action', (data_size, 14))
|
||
_ = root.create_dataset('base_action', (data_size, 2))
|
||
|
||
# data_dict write into h5py.File
|
||
for name, array in data_dict.items():
|
||
root[name][...] = array
|
||
print(f'\033[32m\nSaving: {time.time() - t0:.1f} secs. %s \033[0m\n'%dataset_path)
|
||
|
||
|
||
def is_headless():
|
||
"""
|
||
Check if the environment is headless (no display available).
|
||
|
||
Returns:
|
||
bool: True if the environment is headless, False otherwise.
|
||
"""
|
||
try:
|
||
import tkinter as tk
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
root.update()
|
||
root.destroy()
|
||
return False
|
||
except:
|
||
return True
|
||
|
||
|
||
def init_keyboard_listener():
|
||
"""
|
||
Initialize keyboard listener for control events with new key mappings:
|
||
- Left arrow: Start data recording
|
||
- Right arrow: Save current data
|
||
- Down arrow: Discard current data
|
||
- Up arrow: Replay current data
|
||
- ESC: Early termination
|
||
|
||
Returns:
|
||
tuple: (listener, events) - Keyboard listener and events dictionary
|
||
"""
|
||
events = {
|
||
"exit_early": False,
|
||
"record_start": False,
|
||
"save_data": False,
|
||
"discard_data": False,
|
||
"replay_data": False
|
||
}
|
||
|
||
if is_headless():
|
||
print(
|
||
"Headless environment detected. On-screen cameras display and keyboard inputs will not be available."
|
||
)
|
||
return None, events
|
||
|
||
# Only import pynput if not in a headless environment
|
||
from pynput import keyboard
|
||
|
||
def on_press(key):
|
||
try:
|
||
if key == keyboard.Key.left:
|
||
print("← Left arrow: STARTING data recording...")
|
||
events.update({
|
||
"record_start": True,
|
||
"exit_early": False,
|
||
"save_data": False,
|
||
"discard_data": False
|
||
})
|
||
|
||
elif key == keyboard.Key.right:
|
||
print("→ Right arrow: SAVING current data...")
|
||
events.update({
|
||
"save_data": True,
|
||
"exit_early": False,
|
||
"record_start": False
|
||
})
|
||
|
||
elif key == keyboard.Key.down:
|
||
print("↓ Down arrow: DISCARDING current data...")
|
||
events.update({
|
||
"discard_data": True,
|
||
"exit_early": False,
|
||
"record_start": False
|
||
})
|
||
|
||
elif key == keyboard.Key.up:
|
||
print("↑ Up arrow: REPLAYING current data...")
|
||
events.update({
|
||
"replay_data": True,
|
||
"exit_early": False
|
||
})
|
||
|
||
elif key == keyboard.Key.esc:
|
||
print("ESC: EARLY TERMINATION requested")
|
||
events.update({
|
||
"exit_early": True,
|
||
"record_start": False
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"Error handling key press: {e}")
|
||
|
||
listener = keyboard.Listener(on_press=on_press)
|
||
listener.start()
|
||
|
||
return listener, events
|
||
|
||
|
||
import yaml
|
||
from argparse import Namespace
|
||
def load_config(yaml_path):
|
||
"""Load configuration from YAML file and return as Namespace object"""
|
||
with open(yaml_path, 'r') as f:
|
||
config_dict = yaml.safe_load(f)
|
||
|
||
# Convert dict to Namespace (similar to argparse.Namespace)
|
||
return Namespace(**config_dict)
|
||
|
||
|
||
import platform
|
||
import subprocess
|
||
# import pyttsx3
|
||
def say(text, blocking=False):
|
||
system = platform.system()
|
||
|
||
if system == "Darwin":
|
||
cmd = ["say", text]
|
||
|
||
elif system == "Linux":
|
||
# cmd = ["spd-say", text]
|
||
# if blocking:
|
||
# cmd.append("--wait")
|
||
cmd = ["edge-playback", "--text", text]
|
||
|
||
elif system == "Windows":
|
||
cmd = [
|
||
"PowerShell",
|
||
"-Command",
|
||
"Add-Type -AssemblyName System.Speech; "
|
||
f"(New-Object System.Speech.Synthesis.SpeechSynthesizer).Speak('{text}')",
|
||
]
|
||
|
||
else:
|
||
raise RuntimeError("Unsupported operating system for text-to-speech.")
|
||
|
||
if blocking:
|
||
subprocess.run(cmd, check=True)
|
||
else:
|
||
subprocess.Popen(cmd, creationflags=subprocess.CREATE_NO_WINDOW if system == "Windows" else 0)
|
||
|
||
|
||
def log_say(text, play_sounds, blocking=False):
|
||
print(text)
|
||
|
||
if play_sounds:
|
||
say(text, blocking)
|