forked from tangger/lerobot
Remove deprecated scripts & tests
This commit is contained in:
@@ -65,11 +65,6 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
|
||||
raise ValueError(config.type)
|
||||
|
||||
|
||||
def make_robot(robot_type: str, **kwargs) -> Robot:
|
||||
config = make_robot_config(robot_type, **kwargs)
|
||||
return make_robot_from_config(config)
|
||||
|
||||
|
||||
def ensure_safe_goal_position(
|
||||
goal_present_pos: dict[str, tuple[float, float]], max_relative_target: float | dict[float]
|
||||
) -> dict[str, float]:
|
||||
|
||||
@@ -1,437 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Utilities to control a robot.
|
||||
|
||||
Useful to record a dataset, replay a recorded episode, run the policy on your robot
|
||||
and record an evaluation dataset, and to recalibrate your robot if needed.
|
||||
|
||||
Examples of usage:
|
||||
|
||||
- Recalibrate your robot:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--control.type=calibrate
|
||||
```
|
||||
|
||||
- Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--robot.cameras='{}' \
|
||||
--control.type=teleoperate
|
||||
|
||||
# Add the cameras from the robot definition to visualize them:
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--control.type=teleoperate
|
||||
```
|
||||
|
||||
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--control.type=teleoperate \
|
||||
--control.fps=30
|
||||
```
|
||||
|
||||
- Record one episode in order to test replay:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--control.type=record \
|
||||
--control.fps=30 \
|
||||
--control.single_task="Grasp a lego block and put it in the bin." \
|
||||
--control.repo_id=$USER/koch_test \
|
||||
--control.num_episodes=1 \
|
||||
--control.push_to_hub=True
|
||||
```
|
||||
|
||||
- Visualize dataset:
|
||||
```bash
|
||||
python lerobot/scripts/visualize_dataset.py \
|
||||
--repo-id $USER/koch_test \
|
||||
--episode-index 0
|
||||
```
|
||||
|
||||
- Replay this test episode:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py replay \
|
||||
--robot.type=so100 \
|
||||
--control.type=replay \
|
||||
--control.fps=30 \
|
||||
--control.repo_id=$USER/koch_test \
|
||||
--control.episode=0
|
||||
```
|
||||
|
||||
- Record a full dataset in order to train a policy, with 2 seconds of warmup,
|
||||
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py record \
|
||||
--robot.type=so100 \
|
||||
--control.type=record \
|
||||
--control.fps 30 \
|
||||
--control.repo_id=$USER/koch_pick_place_lego \
|
||||
--control.num_episodes=50 \
|
||||
--control.warmup_time_s=2 \
|
||||
--control.episode_time_s=30 \
|
||||
--control.reset_time_s=10
|
||||
```
|
||||
|
||||
- For remote controlled robots like LeKiwi, run this script on the robot edge device (e.g. RaspBerryPi):
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=lekiwi \
|
||||
--control.type=remote_robot
|
||||
```
|
||||
|
||||
**NOTE**: You can use your keyboard to control data recording flow.
|
||||
- Tap right arrow key '->' to early exit while recording an episode and go to resseting the environment.
|
||||
- Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode.
|
||||
- Tap left arrow key '<-' to early exit and re-record the current episode.
|
||||
- Tap escape key 'esc' to stop the data recording.
|
||||
This might require a sudo permission to allow your terminal to monitor keyboard events.
|
||||
|
||||
**NOTE**: You can resume/continue data recording by running the same data recording command and adding `--control.resume=true`.
|
||||
|
||||
- Train on this dataset with the ACT policy:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--dataset.repo_id=${HF_USER}/koch_pick_place_lego \
|
||||
--policy.type=act \
|
||||
--output_dir=outputs/train/act_koch_pick_place_lego \
|
||||
--job_name=act_koch_pick_place_lego \
|
||||
--device=cuda \
|
||||
--wandb.enable=true
|
||||
```
|
||||
|
||||
- Run the pretrained policy on the robot:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py \
|
||||
--robot.type=so100 \
|
||||
--control.type=record \
|
||||
--control.fps=30 \
|
||||
--control.single_task="Grasp a lego block and put it in the bin." \
|
||||
--control.repo_id=$USER/eval_act_koch_pick_place_lego \
|
||||
--control.num_episodes=10 \
|
||||
--control.warmup_time_s=2 \
|
||||
--control.episode_time_s=30 \
|
||||
--control.reset_time_s=10 \
|
||||
--control.push_to_hub=true \
|
||||
--control.policy.path=outputs/train/act_koch_pick_place_lego/checkpoints/080000/pretrained_model
|
||||
```
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from dataclasses import asdict
|
||||
from pprint import pformat
|
||||
|
||||
import rerun as rr
|
||||
|
||||
# from safetensors.torch import load_file, save_file
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.common.policies.factory import make_policy
|
||||
from lerobot.common.robots.utils import Robot, make_robot_from_config
|
||||
from lerobot.common.utils.control_utils import (
|
||||
control_loop,
|
||||
init_keyboard_listener,
|
||||
is_headless,
|
||||
log_control_info,
|
||||
record_episode,
|
||||
reset_environment,
|
||||
sanity_check_dataset_name,
|
||||
sanity_check_dataset_robot_compatibility,
|
||||
stop_recording,
|
||||
warmup_record,
|
||||
)
|
||||
from lerobot.common.utils.robot_utils import busy_wait, safe_disconnect
|
||||
from lerobot.common.utils.utils import has_method, init_logging, log_say
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.control import (
|
||||
CalibrateControlConfig,
|
||||
ControlConfig,
|
||||
ControlPipelineConfig,
|
||||
RecordControlConfig,
|
||||
RemoteRobotConfig,
|
||||
ReplayControlConfig,
|
||||
TeleoperateControlConfig,
|
||||
)
|
||||
|
||||
########################################################################################
|
||||
# Control modes
|
||||
########################################################################################
|
||||
|
||||
|
||||
@safe_disconnect
|
||||
def calibrate(robot: Robot, cfg: CalibrateControlConfig):
|
||||
# TODO(aliberts): move this code in robots' classes
|
||||
if robot.robot_type.startswith("stretch"):
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
if not robot.is_homed():
|
||||
robot.home()
|
||||
return
|
||||
|
||||
arms = robot.available_arms if cfg.arms is None else cfg.arms
|
||||
unknown_arms = [arm_id for arm_id in arms if arm_id not in robot.available_arms]
|
||||
available_arms_str = " ".join(robot.available_arms)
|
||||
unknown_arms_str = " ".join(unknown_arms)
|
||||
|
||||
if arms is None or len(arms) == 0:
|
||||
raise ValueError(
|
||||
"No arm provided. Use `--arms` as argument with one or more available arms.\n"
|
||||
f"For instance, to recalibrate all arms add: `--arms {available_arms_str}`"
|
||||
)
|
||||
|
||||
if len(unknown_arms) > 0:
|
||||
raise ValueError(
|
||||
f"Unknown arms provided ('{unknown_arms_str}'). Available arms are `{available_arms_str}`."
|
||||
)
|
||||
|
||||
for arm_id in arms:
|
||||
arm_calib_path = robot.calibration_dir / f"{arm_id}.json"
|
||||
if arm_calib_path.exists():
|
||||
print(f"Removing '{arm_calib_path}'")
|
||||
arm_calib_path.unlink()
|
||||
else:
|
||||
print(f"Calibration file not found '{arm_calib_path}'")
|
||||
|
||||
if robot.is_connected:
|
||||
robot.disconnect()
|
||||
|
||||
if robot.robot_type.startswith("lekiwi") and "main_follower" in arms:
|
||||
print("Calibrating only the lekiwi follower arm 'main_follower'...")
|
||||
robot.calibrate_follower()
|
||||
return
|
||||
|
||||
if robot.robot_type.startswith("lekiwi") and "main_leader" in arms:
|
||||
print("Calibrating only the lekiwi leader arm 'main_leader'...")
|
||||
robot.calibrate_leader()
|
||||
return
|
||||
|
||||
# Calling `connect` automatically runs calibration
|
||||
# when the calibration file is missing
|
||||
robot.connect()
|
||||
robot.disconnect()
|
||||
print("Calibration is done! You can now teleoperate and record datasets!")
|
||||
|
||||
|
||||
@safe_disconnect
|
||||
def teleoperate(robot: Robot, cfg: TeleoperateControlConfig):
|
||||
control_loop(
|
||||
robot,
|
||||
control_time_s=cfg.teleop_time_s,
|
||||
fps=cfg.fps,
|
||||
teleoperate=True,
|
||||
display_data=cfg.display_data,
|
||||
)
|
||||
|
||||
|
||||
@safe_disconnect
|
||||
def record(
|
||||
robot: Robot,
|
||||
cfg: RecordControlConfig,
|
||||
) -> LeRobotDataset:
|
||||
# TODO(rcadene): Add option to record logs
|
||||
if cfg.resume:
|
||||
dataset = LeRobotDataset(
|
||||
cfg.repo_id,
|
||||
root=cfg.root,
|
||||
)
|
||||
if len(robot.cameras) > 0:
|
||||
dataset.start_image_writer(
|
||||
num_processes=cfg.num_image_writer_processes,
|
||||
num_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),
|
||||
)
|
||||
sanity_check_dataset_robot_compatibility(dataset, robot, cfg.fps, cfg.video)
|
||||
else:
|
||||
# Create empty dataset or load existing saved episodes
|
||||
sanity_check_dataset_name(cfg.repo_id, cfg.policy)
|
||||
dataset = LeRobotDataset.create(
|
||||
cfg.repo_id,
|
||||
cfg.fps,
|
||||
root=cfg.root,
|
||||
robot=robot,
|
||||
use_videos=cfg.video,
|
||||
image_writer_processes=cfg.num_image_writer_processes,
|
||||
image_writer_threads=cfg.num_image_writer_threads_per_camera * len(robot.cameras),
|
||||
)
|
||||
|
||||
# Load pretrained policy
|
||||
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
|
||||
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
listener, events = init_keyboard_listener()
|
||||
|
||||
# Execute a few seconds without recording to:
|
||||
# 1. teleoperate the robot to move it in starting position if no policy provided,
|
||||
# 2. give times to the robot devices to connect and start synchronizing,
|
||||
# 3. place the cameras windows on screen
|
||||
enable_teleoperation = policy is None
|
||||
log_say("Warmup record", cfg.play_sounds)
|
||||
warmup_record(robot, events, enable_teleoperation, cfg.warmup_time_s, cfg.display_data, cfg.fps)
|
||||
|
||||
if has_method(robot, "teleop_safety_stop"):
|
||||
robot.teleop_safety_stop()
|
||||
|
||||
recorded_episodes = 0
|
||||
while True:
|
||||
if recorded_episodes >= cfg.num_episodes:
|
||||
break
|
||||
|
||||
log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds)
|
||||
record_episode(
|
||||
robot=robot,
|
||||
dataset=dataset,
|
||||
events=events,
|
||||
episode_time_s=cfg.episode_time_s,
|
||||
display_data=cfg.display_data,
|
||||
policy=policy,
|
||||
fps=cfg.fps,
|
||||
single_task=cfg.single_task,
|
||||
)
|
||||
|
||||
# Execute a few seconds without recording to give time to manually reset the environment
|
||||
# Current code logic doesn't allow to teleoperate during this time.
|
||||
# TODO(rcadene): add an option to enable teleoperation during reset
|
||||
# Skip reset for the last episode to be recorded
|
||||
if not events["stop_recording"] and (
|
||||
(recorded_episodes < cfg.num_episodes - 1) or events["rerecord_episode"]
|
||||
):
|
||||
log_say("Reset the environment", cfg.play_sounds)
|
||||
reset_environment(robot, events, cfg.reset_time_s, cfg.fps)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
log_say("Re-record episode", cfg.play_sounds)
|
||||
events["rerecord_episode"] = False
|
||||
events["exit_early"] = False
|
||||
dataset.clear_episode_buffer()
|
||||
continue
|
||||
|
||||
dataset.save_episode()
|
||||
recorded_episodes += 1
|
||||
|
||||
if events["stop_recording"]:
|
||||
break
|
||||
|
||||
log_say("Stop recording", cfg.play_sounds, blocking=True)
|
||||
stop_recording(robot, listener, cfg.display_data)
|
||||
|
||||
if cfg.push_to_hub:
|
||||
dataset.push_to_hub(tags=cfg.tags, private=cfg.private)
|
||||
|
||||
log_say("Exiting", cfg.play_sounds)
|
||||
return dataset
|
||||
|
||||
|
||||
@safe_disconnect
|
||||
def replay(
|
||||
robot: Robot,
|
||||
cfg: ReplayControlConfig,
|
||||
):
|
||||
# TODO(rcadene, aliberts): refactor with control_loop, once `dataset` is an instance of LeRobotDataset
|
||||
# TODO(rcadene): Add option to record logs
|
||||
|
||||
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root, episodes=[cfg.episode])
|
||||
actions = dataset.hf_dataset.select_columns("action")
|
||||
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
log_say("Replaying episode", cfg.play_sounds, blocking=True)
|
||||
for idx in range(dataset.num_frames):
|
||||
start_episode_t = time.perf_counter()
|
||||
|
||||
action = actions[idx]["action"]
|
||||
robot.send_action(action)
|
||||
|
||||
dt_s = time.perf_counter() - start_episode_t
|
||||
busy_wait(1 / cfg.fps - dt_s)
|
||||
|
||||
dt_s = time.perf_counter() - start_episode_t
|
||||
log_control_info(robot, dt_s, fps=cfg.fps)
|
||||
|
||||
|
||||
def _init_rerun(control_config: ControlConfig, session_name: str = "lerobot_control_loop") -> None:
|
||||
"""Initializes the Rerun SDK for visualizing the control loop.
|
||||
|
||||
Args:
|
||||
control_config: Configuration determining data display and robot type.
|
||||
session_name: Rerun session name. Defaults to "lerobot_control_loop".
|
||||
|
||||
Raises:
|
||||
ValueError: If viewer IP is missing for non-remote configurations with display enabled.
|
||||
"""
|
||||
if (control_config.display_data and not is_headless()) or (
|
||||
control_config.display_data and isinstance(control_config, RemoteRobotConfig)
|
||||
):
|
||||
# Configure Rerun flush batch size default to 8KB if not set
|
||||
batch_size = os.getenv("RERUN_FLUSH_NUM_BYTES", "8000")
|
||||
os.environ["RERUN_FLUSH_NUM_BYTES"] = batch_size
|
||||
|
||||
# Initialize Rerun based on configuration
|
||||
rr.init(session_name)
|
||||
if isinstance(control_config, RemoteRobotConfig):
|
||||
viewer_ip = control_config.viewer_ip
|
||||
viewer_port = control_config.viewer_port
|
||||
if not viewer_ip or not viewer_port:
|
||||
raise ValueError(
|
||||
"Viewer IP & Port are required for remote config. Set via config file/CLI or disable control_config.display_data."
|
||||
)
|
||||
logging.info(f"Connecting to viewer at {viewer_ip}:{viewer_port}")
|
||||
rr.connect_tcp(f"{viewer_ip}:{viewer_port}")
|
||||
else:
|
||||
# Get memory limit for rerun viewer parameters
|
||||
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
|
||||
rr.spawn(memory_limit=memory_limit)
|
||||
|
||||
|
||||
@parser.wrap()
|
||||
def control_robot(cfg: ControlPipelineConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
|
||||
# TODO(Steven): Blueprint for fixed window size
|
||||
|
||||
if isinstance(cfg.control, CalibrateControlConfig):
|
||||
calibrate(robot, cfg.control)
|
||||
elif isinstance(cfg.control, TeleoperateControlConfig):
|
||||
_init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_teleop")
|
||||
teleoperate(robot, cfg.control)
|
||||
elif isinstance(cfg.control, RecordControlConfig):
|
||||
_init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_record")
|
||||
record(robot, cfg.control)
|
||||
elif isinstance(cfg.control, ReplayControlConfig):
|
||||
replay(robot, cfg.control)
|
||||
elif isinstance(cfg.control, RemoteRobotConfig):
|
||||
from lerobot.common.robots.lekiwi.old_lekiwi_remote import run_lekiwi
|
||||
|
||||
_init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_remote")
|
||||
run_lekiwi(cfg.robot)
|
||||
|
||||
if robot.is_connected:
|
||||
# Disconnect manually to avoid a "Core dump" during process
|
||||
# termination due to camera threads not properly exiting.
|
||||
robot.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
control_robot()
|
||||
@@ -1,561 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Utilities to control a robot in simulation.
|
||||
|
||||
Useful to record a dataset, replay a recorded episode and record an evaluation dataset.
|
||||
|
||||
Examples of usage:
|
||||
|
||||
|
||||
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency.
|
||||
You can modify this value depending on how fast your simulation can run:
|
||||
```bash
|
||||
python lerobot/scripts/control_robot.py teleoperate \
|
||||
--fps 30 \
|
||||
--robot-path lerobot/configs/robot/your_robot_config.yaml \
|
||||
--sim-config lerobot/configs/env/your_sim_config.yaml
|
||||
```
|
||||
|
||||
- Record one episode in order to test replay:
|
||||
```bash
|
||||
python lerobot/scripts/control_sim_robot.py record \
|
||||
--robot-path lerobot/configs/robot/your_robot_config.yaml \
|
||||
--sim-config lerobot/configs/env/your_sim_config.yaml \
|
||||
--fps 30 \
|
||||
--repo-id $USER/robot_sim_test \
|
||||
--num-episodes 1 \
|
||||
--run-compute-stats 0
|
||||
```
|
||||
|
||||
Enable the --push-to-hub 1 to push the recorded dataset to the huggingface hub.
|
||||
|
||||
- Visualize dataset:
|
||||
```bash
|
||||
python lerobot/scripts/visualize_dataset.py \
|
||||
--repo-id $USER/robot_sim_test \
|
||||
--episode-index 0
|
||||
```
|
||||
|
||||
- Replay a sequence of test episodes:
|
||||
```bash
|
||||
python lerobot/scripts/control_sim_robot.py replay \
|
||||
--robot-path lerobot/configs/robot/your_robot_config.yaml \
|
||||
--sim-config lerobot/configs/env/your_sim_config.yaml \
|
||||
--fps 30 \
|
||||
--repo-id $USER/robot_sim_test \
|
||||
--episode 0
|
||||
```
|
||||
Note: The seed is saved, therefore, during replay we can load the same environment state as the one during collection.
|
||||
|
||||
- Record a full dataset in order to train a policy,
|
||||
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
|
||||
```bash
|
||||
python lerobot/scripts/control_sim_robot.py record \
|
||||
--robot-path lerobot/configs/robot/your_robot_config.yaml \
|
||||
--sim-config lerobot/configs/env/your_sim_config.yaml \
|
||||
--fps 30 \
|
||||
--repo-id $USER/robot_sim_test \
|
||||
--num-episodes 50 \
|
||||
--episode-time-s 30 \
|
||||
```
|
||||
|
||||
**NOTE**: You can use your keyboard to control data recording flow.
|
||||
- Tap right arrow key '->' to early exit while recording an episode and go to resetting the environment.
|
||||
- Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode.
|
||||
- Tap left arrow key '<-' to early exit and re-record the current episode.
|
||||
- Tap escape key 'esc' to stop the data recording.
|
||||
This might require a sudo permission to allow your terminal to monitor keyboard events.
|
||||
|
||||
**NOTE**: You can resume/continue data recording by running the same data recording command twice.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import importlib
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import gymnasium as gym
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.common.robots.utils import Robot, make_robot
|
||||
from lerobot.common.utils.control_utils import (
|
||||
init_keyboard_listener,
|
||||
init_policy,
|
||||
is_headless,
|
||||
log_control_info,
|
||||
predict_action,
|
||||
sanity_check_dataset_name,
|
||||
sanity_check_dataset_robot_compatibility,
|
||||
stop_recording,
|
||||
)
|
||||
from lerobot.common.utils.robot_utils import busy_wait
|
||||
from lerobot.common.utils.utils import init_hydra_config, init_logging, log_say
|
||||
|
||||
raise NotImplementedError("This script is currently deactivated")
|
||||
|
||||
DEFAULT_FEATURES = {
|
||||
"next.reward": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"next.success": {
|
||||
"dtype": "bool",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"seed": {
|
||||
"dtype": "int64",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"timestamp": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
########################################################################################
|
||||
# Utilities
|
||||
########################################################################################
|
||||
def none_or_int(value):
|
||||
if value == "None":
|
||||
return None
|
||||
return int(value)
|
||||
|
||||
|
||||
def init_sim_calibration(robot, cfg):
|
||||
# Constants necessary for transforming the joint pos of the real robot to the sim
|
||||
# depending on the robot description used in that sim.
|
||||
start_pos = np.array(robot.leader_arms.main.calibration["start_pos"])
|
||||
axis_directions = np.array(cfg.get("axis_directions", [1]))
|
||||
offsets = np.array(cfg.get("offsets", [0])) * np.pi
|
||||
|
||||
return {"start_pos": start_pos, "axis_directions": axis_directions, "offsets": offsets}
|
||||
|
||||
|
||||
def real_positions_to_sim(real_positions, axis_directions, start_pos, offsets):
|
||||
"""Counts - starting position -> radians -> align axes -> offset"""
|
||||
return axis_directions * (real_positions - start_pos) * 2.0 * np.pi / 4096 + offsets
|
||||
|
||||
|
||||
########################################################################################
|
||||
# Control modes
|
||||
########################################################################################
|
||||
|
||||
|
||||
def teleoperate(env, robot: Robot, process_action_fn, teleop_time_s=None):
|
||||
env = env()
|
||||
env.reset()
|
||||
start_teleop_t = time.perf_counter()
|
||||
while True:
|
||||
leader_pos = robot.leader_arms.main.read("Present_Position")
|
||||
action = process_action_fn(leader_pos)
|
||||
env.step(np.expand_dims(action, 0))
|
||||
if teleop_time_s is not None and time.perf_counter() - start_teleop_t > teleop_time_s:
|
||||
print("Teleoperation processes finished.")
|
||||
break
|
||||
|
||||
|
||||
def record(
|
||||
env,
|
||||
robot: Robot,
|
||||
process_action_from_leader,
|
||||
root: Path,
|
||||
repo_id: str,
|
||||
task: str,
|
||||
fps: int | None = None,
|
||||
tags: list[str] | None = None,
|
||||
pretrained_policy_name_or_path: str = None,
|
||||
policy_overrides: bool | None = None,
|
||||
episode_time_s: int = 30,
|
||||
num_episodes: int = 50,
|
||||
video: bool = True,
|
||||
push_to_hub: bool = True,
|
||||
num_image_writer_processes: int = 0,
|
||||
num_image_writer_threads_per_camera: int = 4,
|
||||
display_cameras: bool = False,
|
||||
play_sounds: bool = True,
|
||||
resume: bool = False,
|
||||
local_files_only: bool = False,
|
||||
run_compute_stats: bool = True,
|
||||
) -> LeRobotDataset:
|
||||
# Load pretrained policy
|
||||
policy = None
|
||||
if pretrained_policy_name_or_path is not None:
|
||||
policy, policy_fps, device, use_amp = init_policy(pretrained_policy_name_or_path, policy_overrides)
|
||||
|
||||
if fps is None:
|
||||
fps = policy_fps
|
||||
logging.warning(f"No fps provided, so using the fps from policy config ({policy_fps}).")
|
||||
|
||||
if policy is None and process_action_from_leader is None:
|
||||
raise ValueError("Either policy or process_action_fn has to be set to enable control in sim.")
|
||||
|
||||
# initialize listener before sim env
|
||||
listener, events = init_keyboard_listener()
|
||||
|
||||
# create sim env
|
||||
env = env()
|
||||
|
||||
# Create empty dataset or load existing saved episodes
|
||||
num_cameras = sum([1 if "image" in key else 0 for key in env.observation_space])
|
||||
|
||||
# get image keys
|
||||
image_keys = [key for key in env.observation_space if "image" in key]
|
||||
state_keys_dict = env_cfg.state_keys
|
||||
|
||||
if resume:
|
||||
dataset = LeRobotDataset(
|
||||
repo_id,
|
||||
root=root,
|
||||
local_files_only=local_files_only,
|
||||
)
|
||||
dataset.start_image_writer(
|
||||
num_processes=num_image_writer_processes,
|
||||
num_threads=num_image_writer_threads_per_camera * num_cameras,
|
||||
)
|
||||
sanity_check_dataset_robot_compatibility(dataset, robot, fps, video)
|
||||
else:
|
||||
features = DEFAULT_FEATURES
|
||||
# add image keys to features
|
||||
for key in image_keys:
|
||||
shape = env.observation_space[key].shape
|
||||
if not key.startswith("observation.image."):
|
||||
key = "observation.image." + key
|
||||
features[key] = {"dtype": "video", "names": ["channels", "height", "width"], "shape": shape}
|
||||
|
||||
for key, obs_key in state_keys_dict.items():
|
||||
features[key] = {
|
||||
"dtype": "float32",
|
||||
"names": None,
|
||||
"shape": env.observation_space[obs_key].shape,
|
||||
}
|
||||
|
||||
features["action"] = {"dtype": "float32", "shape": env.action_space.shape, "names": None}
|
||||
|
||||
# Create empty dataset or load existing saved episodes
|
||||
sanity_check_dataset_name(repo_id, policy)
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id,
|
||||
fps,
|
||||
root=root,
|
||||
features=features,
|
||||
use_videos=video,
|
||||
image_writer_processes=num_image_writer_processes,
|
||||
image_writer_threads=num_image_writer_threads_per_camera * num_cameras,
|
||||
)
|
||||
|
||||
recorded_episodes = 0
|
||||
while True:
|
||||
log_say(f"Recording episode {dataset.num_episodes}", play_sounds)
|
||||
|
||||
if events is None:
|
||||
events = {"exit_early": False}
|
||||
|
||||
if episode_time_s is None:
|
||||
episode_time_s = float("inf")
|
||||
|
||||
timestamp = 0
|
||||
start_episode_t = time.perf_counter()
|
||||
|
||||
seed = np.random.randint(0, 1e5)
|
||||
observation, info = env.reset(seed=seed)
|
||||
|
||||
while timestamp < episode_time_s:
|
||||
start_loop_t = time.perf_counter()
|
||||
|
||||
if policy is not None:
|
||||
action = predict_action(observation, policy, device, use_amp)
|
||||
else:
|
||||
leader_pos = robot.leader_arms.main.read("Present_Position")
|
||||
action = process_action_from_leader(leader_pos)
|
||||
|
||||
observation, reward, terminated, _, info = env.step(action)
|
||||
|
||||
success = info.get("is_success", False)
|
||||
env_timestamp = info.get("timestamp", dataset.episode_buffer["size"] / fps)
|
||||
|
||||
frame = {
|
||||
"action": torch.from_numpy(action),
|
||||
"next.reward": reward,
|
||||
"next.success": success,
|
||||
"seed": seed,
|
||||
"timestamp": env_timestamp,
|
||||
}
|
||||
|
||||
for key in image_keys:
|
||||
if not key.startswith("observation.image"):
|
||||
frame["observation.image." + key] = observation[key]
|
||||
else:
|
||||
frame[key] = observation[key]
|
||||
|
||||
for key, obs_key in state_keys_dict.items():
|
||||
frame[key] = torch.from_numpy(observation[obs_key])
|
||||
|
||||
dataset.add_frame(frame)
|
||||
|
||||
if display_cameras and not is_headless():
|
||||
for key in image_keys:
|
||||
cv2.imshow(key, cv2.cvtColor(observation[key], cv2.COLOR_RGB2BGR))
|
||||
cv2.waitKey(1)
|
||||
|
||||
if fps is not None:
|
||||
dt_s = time.perf_counter() - start_loop_t
|
||||
busy_wait(1 / fps - dt_s)
|
||||
|
||||
dt_s = time.perf_counter() - start_loop_t
|
||||
log_control_info(robot, dt_s, fps=fps)
|
||||
|
||||
timestamp = time.perf_counter() - start_episode_t
|
||||
if events["exit_early"] or terminated:
|
||||
events["exit_early"] = False
|
||||
break
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
log_say("Re-record episode", play_sounds)
|
||||
events["rerecord_episode"] = False
|
||||
events["exit_early"] = False
|
||||
dataset.clear_episode_buffer()
|
||||
continue
|
||||
|
||||
dataset.save_episode(task=task)
|
||||
recorded_episodes += 1
|
||||
|
||||
if events["stop_recording"] or recorded_episodes >= num_episodes:
|
||||
break
|
||||
else:
|
||||
logging.info("Waiting for a few seconds before starting next episode recording...")
|
||||
busy_wait(3)
|
||||
|
||||
log_say("Stop recording", play_sounds, blocking=True)
|
||||
stop_recording(robot, listener, display_cameras)
|
||||
|
||||
if run_compute_stats:
|
||||
logging.info("Computing dataset statistics")
|
||||
dataset.consolidate(run_compute_stats)
|
||||
|
||||
if push_to_hub:
|
||||
dataset.push_to_hub(tags=tags)
|
||||
|
||||
log_say("Exiting", play_sounds)
|
||||
return dataset
|
||||
|
||||
|
||||
def replay(
|
||||
env, root: Path, repo_id: str, episode: int, fps: int | None = None, local_files_only: bool = True
|
||||
):
|
||||
env = env()
|
||||
|
||||
local_dir = Path(root) / repo_id
|
||||
if not local_dir.exists():
|
||||
raise ValueError(local_dir)
|
||||
|
||||
dataset = LeRobotDataset(repo_id, root=root, local_files_only=local_files_only)
|
||||
items = dataset.hf_dataset.select_columns("action")
|
||||
seeds = dataset.hf_dataset.select_columns("seed")["seed"]
|
||||
|
||||
from_idx = dataset.episode_data_index["from"][episode].item()
|
||||
to_idx = dataset.episode_data_index["to"][episode].item()
|
||||
env.reset(seed=seeds[from_idx].item())
|
||||
logging.info("Replaying episode")
|
||||
log_say("Replaying episode", play_sounds=True)
|
||||
for idx in range(from_idx, to_idx):
|
||||
start_episode_t = time.perf_counter()
|
||||
action = items[idx]["action"]
|
||||
env.step(action.unsqueeze(0).numpy())
|
||||
dt_s = time.perf_counter() - start_episode_t
|
||||
busy_wait(1 / fps - dt_s)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="mode", required=True)
|
||||
|
||||
# Set common options for all the subparsers
|
||||
base_parser = argparse.ArgumentParser(add_help=False)
|
||||
base_parser.add_argument(
|
||||
"--robot-path",
|
||||
type=str,
|
||||
default="lerobot/configs/robot/koch.yaml",
|
||||
help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.",
|
||||
)
|
||||
|
||||
base_parser.add_argument(
|
||||
"--sim-config",
|
||||
help="Path to a yaml config you want to use for initializing a sim environment based on gym ",
|
||||
)
|
||||
|
||||
parser_record = subparsers.add_parser("teleoperate", parents=[base_parser])
|
||||
|
||||
parser_record = subparsers.add_parser("record", parents=[base_parser])
|
||||
parser_record.add_argument(
|
||||
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--root",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
default="lerobot/test",
|
||||
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--episode-time-s",
|
||||
type=int,
|
||||
default=60,
|
||||
help="Number of seconds for data recording for each episode.",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--task",
|
||||
type=str,
|
||||
required=True,
|
||||
help="A description of the task preformed during recording that can be used as a language instruction.",
|
||||
)
|
||||
parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.")
|
||||
parser_record.add_argument(
|
||||
"--run-compute-stats",
|
||||
type=int,
|
||||
default=1,
|
||||
help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--push-to-hub",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Upload dataset to Hugging Face hub.",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--tags",
|
||||
type=str,
|
||||
nargs="*",
|
||||
help="Add tags to your dataset on the hub.",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--num-image-writer-processes",
|
||||
type=int,
|
||||
default=0,
|
||||
help=(
|
||||
"Number of subprocesses handling the saving of frames as PNG. Set to 0 to use threads only; "
|
||||
"set to ≥1 to use subprocesses, each using threads to write images. The best number of processes "
|
||||
"and threads depends on your system. We recommend 4 threads per camera with 0 processes. "
|
||||
"If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses."
|
||||
),
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--num-image-writer-threads-per-camera",
|
||||
type=int,
|
||||
default=4,
|
||||
help=(
|
||||
"Number of threads writing the frames as png images on disk, per camera. "
|
||||
"Too much threads might cause unstable teleoperation fps due to main thread being blocked. "
|
||||
"Not enough threads might cause low camera fps."
|
||||
),
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--display-cameras",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Visualize image observations with opencv.",
|
||||
)
|
||||
parser_record.add_argument(
|
||||
"--resume",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Resume recording on an existing dataset.",
|
||||
)
|
||||
parser_replay = subparsers.add_parser("replay", parents=[base_parser])
|
||||
parser_replay.add_argument(
|
||||
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
|
||||
)
|
||||
parser_replay.add_argument(
|
||||
"--root",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Root directory where the dataset will be stored locally (e.g. 'data/hf_username/dataset_name'). By default, stored in cache folder.",
|
||||
)
|
||||
parser_replay.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
default="lerobot/test",
|
||||
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
|
||||
)
|
||||
parser_replay.add_argument("--episode", type=int, default=0, help="Index of the episodes to replay.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
init_logging()
|
||||
|
||||
control_mode = args.mode
|
||||
robot_path = args.robot_path
|
||||
env_config_path = args.sim_config
|
||||
kwargs = vars(args)
|
||||
del kwargs["mode"]
|
||||
del kwargs["robot_path"]
|
||||
del kwargs["sim_config"]
|
||||
|
||||
# make gym env
|
||||
env_cfg = init_hydra_config(env_config_path)
|
||||
importlib.import_module(f"gym_{env_cfg.env.type}")
|
||||
|
||||
def env_constructor():
|
||||
return gym.make(env_cfg.env.handle, disable_env_checker=True, **env_cfg.env.gym)
|
||||
|
||||
robot = None
|
||||
process_leader_actions_fn = None
|
||||
|
||||
if control_mode in ["teleoperate", "record"]:
|
||||
# make robot
|
||||
robot_overrides = ["~cameras", "~follower_arms"]
|
||||
# TODO(rcadene): remove
|
||||
robot_cfg = init_hydra_config(robot_path, robot_overrides)
|
||||
robot = make_robot(robot_cfg)
|
||||
robot.connect()
|
||||
|
||||
calib_kwgs = init_sim_calibration(robot, env_cfg.calibration)
|
||||
|
||||
def process_leader_actions_fn(action):
|
||||
return real_positions_to_sim(action, **calib_kwgs)
|
||||
|
||||
robot.leader_arms.main.calibration = None
|
||||
|
||||
if control_mode == "teleoperate":
|
||||
teleoperate(env_constructor, robot, process_leader_actions_fn)
|
||||
|
||||
elif control_mode == "record":
|
||||
record(env_constructor, robot, process_leader_actions_fn, **kwargs)
|
||||
|
||||
elif control_mode == "replay":
|
||||
replay(env_constructor, **kwargs)
|
||||
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid control mode: '{control_mode}', only valid modes are teleoperate, record and replay."
|
||||
)
|
||||
|
||||
if robot and robot.is_connected:
|
||||
# Disconnect manually to avoid a "Core dump" during process
|
||||
# termination due to camera threads not properly exiting.
|
||||
robot.disconnect()
|
||||
@@ -19,9 +19,8 @@ import traceback
|
||||
import pytest
|
||||
from serial import SerialException
|
||||
|
||||
from lerobot import available_cameras, available_motors, available_robots
|
||||
from lerobot.common.robots.utils import make_robot
|
||||
from tests.utils import DEVICE, make_camera, make_motors_bus
|
||||
from lerobot import available_cameras
|
||||
from tests.utils import DEVICE, make_camera
|
||||
|
||||
# Import fixture modules as plugins
|
||||
pytest_plugins = [
|
||||
@@ -64,21 +63,11 @@ def _check_component_availability(component_type, available_components, make_com
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def is_robot_available(robot_type):
|
||||
return _check_component_availability(robot_type, available_robots, make_robot)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def is_camera_available(camera_type):
|
||||
return _check_component_availability(camera_type, available_cameras, make_camera)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def is_motor_available(motor_type):
|
||||
return _check_component_availability(motor_type, available_motors, make_motors_bus)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_builtins_input(monkeypatch):
|
||||
def print_text(text=None):
|
||||
|
||||
@@ -1,443 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Tests for physical robots and their mocked versions.
|
||||
If the physical robots are not connected to the computer, or not working,
|
||||
the test will be skipped.
|
||||
|
||||
Example of running a specific test:
|
||||
```bash
|
||||
pytest -sx tests/test_control_robot.py::test_teleoperate
|
||||
```
|
||||
|
||||
Example of running test on real robots connected to the computer:
|
||||
```bash
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[koch-False]'
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[koch_bimanual-False]'
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[aloha-False]'
|
||||
```
|
||||
|
||||
Example of running test on a mocked version of robots:
|
||||
```bash
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[koch-True]'
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[koch_bimanual-True]'
|
||||
pytest -sx 'tests/test_control_robot.py::test_teleoperate[aloha-True]'
|
||||
```
|
||||
"""
|
||||
|
||||
import multiprocessing
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from lerobot.common.policies.act.configuration_act import ACTConfig
|
||||
from lerobot.common.policies.factory import make_policy
|
||||
from lerobot.configs.control import (
|
||||
CalibrateControlConfig,
|
||||
RecordControlConfig,
|
||||
ReplayControlConfig,
|
||||
TeleoperateControlConfig,
|
||||
)
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.scripts.control_robot import calibrate, record, replay, teleoperate
|
||||
from tests.robots.test_robots import make_robot
|
||||
from tests.utils import TEST_ROBOT_TYPES, mock_calibration_dir, require_robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_teleoperate(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock and robot_type != "aloha":
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
teleoperate(robot, TeleoperateControlConfig(teleop_time_s=1))
|
||||
teleoperate(robot, TeleoperateControlConfig(fps=30, teleop_time_s=1))
|
||||
teleoperate(robot, TeleoperateControlConfig(fps=60, teleop_time_s=1))
|
||||
del robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_calibrate(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
calibration_dir = tmp_path / robot_type
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
calib_cfg = CalibrateControlConfig(arms=robot.available_arms)
|
||||
calibrate(robot, calib_cfg)
|
||||
del robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_record_without_cameras(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
# Avoid using cameras
|
||||
robot_kwargs["cameras"] = {}
|
||||
|
||||
if mock and robot_type != "aloha":
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
repo_id = "lerobot/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
single_task=single_task,
|
||||
root=root,
|
||||
fps=30,
|
||||
warmup_time_s=0.1,
|
||||
episode_time_s=1,
|
||||
reset_time_s=0.1,
|
||||
num_episodes=2,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
play_sounds=False,
|
||||
)
|
||||
record(robot, rec_cfg)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_record_and_replay_and_policy(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock and robot_type != "aloha":
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
repo_id = "lerobot_test/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
single_task=single_task,
|
||||
root=root,
|
||||
fps=1,
|
||||
warmup_time_s=0.1,
|
||||
episode_time_s=1,
|
||||
reset_time_s=0.1,
|
||||
num_episodes=2,
|
||||
push_to_hub=False,
|
||||
# TODO(rcadene, aliberts): test video=True
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
)
|
||||
dataset = record(robot, rec_cfg)
|
||||
assert dataset.meta.total_episodes == 2
|
||||
assert len(dataset) == 2
|
||||
|
||||
replay_cfg = ReplayControlConfig(episode=0, fps=1, root=root, repo_id=repo_id, play_sounds=False)
|
||||
replay(robot, replay_cfg)
|
||||
|
||||
policy_cfg = ACTConfig()
|
||||
policy = make_policy(policy_cfg, ds_meta=dataset.meta)
|
||||
|
||||
out_dir = tmp_path / "logger"
|
||||
|
||||
pretrained_policy_path = out_dir / "checkpoints/last/pretrained_model"
|
||||
policy.save_pretrained(pretrained_policy_path)
|
||||
|
||||
# In `examples/9_use_aloha.md`, we advise using `num_image_writer_processes=1`
|
||||
# during inference, to reach constant fps, so we test this here.
|
||||
if robot_type == "aloha":
|
||||
num_image_writer_processes = 1
|
||||
|
||||
# `multiprocessing.set_start_method("spawn", force=True)` avoids a hanging issue
|
||||
# before exiting pytest. However, it outputs the following error in the log:
|
||||
# Traceback (most recent call last):
|
||||
# File "<string>", line 1, in <module>
|
||||
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
|
||||
# exitcode = _main(fd, parent_sentinel)
|
||||
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
|
||||
# self = reduction.pickle.load(from_parent)
|
||||
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/synchronize.py", line 110, in __setstate__
|
||||
# self._semlock = _multiprocessing.SemLock._rebuild(*state)
|
||||
# FileNotFoundError: [Errno 2] No such file or directory
|
||||
# TODO(rcadene, aliberts): fix FileNotFoundError in multiprocessing
|
||||
multiprocessing.set_start_method("spawn", force=True)
|
||||
else:
|
||||
num_image_writer_processes = 0
|
||||
|
||||
eval_repo_id = "lerobot/eval_debug"
|
||||
eval_root = tmp_path / "data" / eval_repo_id
|
||||
|
||||
rec_eval_cfg = RecordControlConfig(
|
||||
repo_id=eval_repo_id,
|
||||
root=eval_root,
|
||||
single_task=single_task,
|
||||
fps=1,
|
||||
warmup_time_s=0.1,
|
||||
episode_time_s=1,
|
||||
reset_time_s=0.1,
|
||||
num_episodes=2,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
num_image_writer_processes=num_image_writer_processes,
|
||||
)
|
||||
|
||||
rec_eval_cfg.policy = PreTrainedConfig.from_pretrained(pretrained_policy_path)
|
||||
rec_eval_cfg.policy.pretrained_path = pretrained_policy_path
|
||||
|
||||
dataset = record(robot, rec_eval_cfg)
|
||||
assert dataset.num_episodes == 2
|
||||
assert len(dataset) == 2
|
||||
|
||||
del robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", [("koch", True)])
|
||||
@require_robot
|
||||
def test_resume_record(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock and robot_type != "aloha":
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
|
||||
repo_id = "lerobot/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
root=root,
|
||||
single_task=single_task,
|
||||
fps=1,
|
||||
warmup_time_s=0,
|
||||
episode_time_s=1,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
num_episodes=1,
|
||||
)
|
||||
|
||||
dataset = record(robot, rec_cfg)
|
||||
assert len(dataset) == 1, f"`dataset` should contain 1 frame, not {len(dataset)}"
|
||||
|
||||
with pytest.raises(FileExistsError):
|
||||
# Dataset already exists, but resume=False by default
|
||||
record(robot, rec_cfg)
|
||||
|
||||
rec_cfg.resume = True
|
||||
dataset = record(robot, rec_cfg)
|
||||
assert len(dataset) == 2, f"`dataset` should contain 2 frames, not {len(dataset)}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", [("koch", True)])
|
||||
@require_robot
|
||||
def test_record_with_event_rerecord_episode(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock and robot_type != "aloha":
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
|
||||
with patch("lerobot.scripts.control_robot.init_keyboard_listener") as mock_listener:
|
||||
mock_events = {}
|
||||
mock_events["exit_early"] = True
|
||||
mock_events["rerecord_episode"] = True
|
||||
mock_events["stop_recording"] = False
|
||||
mock_listener.return_value = (None, mock_events)
|
||||
|
||||
repo_id = "lerobot/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
root=root,
|
||||
single_task=single_task,
|
||||
fps=1,
|
||||
warmup_time_s=0,
|
||||
episode_time_s=1,
|
||||
num_episodes=1,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
)
|
||||
dataset = record(robot, rec_cfg)
|
||||
|
||||
assert not mock_events["rerecord_episode"], "`rerecord_episode` wasn't properly reset to False"
|
||||
assert not mock_events["exit_early"], "`exit_early` wasn't properly reset to False"
|
||||
assert len(dataset) == 1, "`dataset` should contain only 1 frame"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", [("koch", True)])
|
||||
@require_robot
|
||||
def test_record_with_event_exit_early(tmp_path, request, robot_type, mock):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
|
||||
with patch("lerobot.scripts.control_robot.init_keyboard_listener") as mock_listener:
|
||||
mock_events = {}
|
||||
mock_events["exit_early"] = True
|
||||
mock_events["rerecord_episode"] = False
|
||||
mock_events["stop_recording"] = False
|
||||
mock_listener.return_value = (None, mock_events)
|
||||
|
||||
repo_id = "lerobot/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
root=root,
|
||||
single_task=single_task,
|
||||
fps=2,
|
||||
warmup_time_s=0,
|
||||
episode_time_s=1,
|
||||
num_episodes=1,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
)
|
||||
|
||||
dataset = record(robot, rec_cfg)
|
||||
|
||||
assert not mock_events["exit_early"], "`exit_early` wasn't properly reset to False"
|
||||
assert len(dataset) == 1, "`dataset` should contain only 1 frame"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"robot_type, mock, num_image_writer_processes", [("koch", True, 0), ("koch", True, 1)]
|
||||
)
|
||||
@require_robot
|
||||
def test_record_with_event_stop_recording(tmp_path, request, robot_type, mock, num_image_writer_processes):
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
# and avoid writing calibration files in user .cache/calibration folder
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
else:
|
||||
# Use the default .cache/calibration folder when mock=False
|
||||
pass
|
||||
|
||||
robot = make_robot(**robot_kwargs)
|
||||
|
||||
with patch("lerobot.scripts.control_robot.init_keyboard_listener") as mock_listener:
|
||||
mock_events = {}
|
||||
mock_events["exit_early"] = True
|
||||
mock_events["rerecord_episode"] = False
|
||||
mock_events["stop_recording"] = True
|
||||
mock_listener.return_value = (None, mock_events)
|
||||
|
||||
repo_id = "lerobot/debug"
|
||||
root = tmp_path / "data" / repo_id
|
||||
single_task = "Do something."
|
||||
|
||||
rec_cfg = RecordControlConfig(
|
||||
repo_id=repo_id,
|
||||
root=root,
|
||||
single_task=single_task,
|
||||
fps=1,
|
||||
warmup_time_s=0,
|
||||
episode_time_s=1,
|
||||
reset_time_s=0.1,
|
||||
num_episodes=2,
|
||||
push_to_hub=False,
|
||||
video=False,
|
||||
display_data=False,
|
||||
play_sounds=False,
|
||||
num_image_writer_processes=num_image_writer_processes,
|
||||
)
|
||||
|
||||
dataset = record(robot, rec_cfg)
|
||||
|
||||
assert not mock_events["exit_early"], "`exit_early` wasn't properly reset to False"
|
||||
assert len(dataset) == 1, "`dataset` should contain only 1 frame"
|
||||
@@ -1,144 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Tests for physical robots and their mocked versions.
|
||||
If the physical robots are not connected to the computer, or not working,
|
||||
the test will be skipped.
|
||||
|
||||
Example of running a specific test:
|
||||
```bash
|
||||
pytest -sx tests/test_robots.py::test_robot
|
||||
```
|
||||
|
||||
Example of running test on real robots connected to the computer:
|
||||
```bash
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch-False]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch_bimanual-False]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[aloha-False]'
|
||||
```
|
||||
|
||||
Example of running test on a mocked version of robots:
|
||||
```bash
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch-True]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[koch_bimanual-True]'
|
||||
pytest -sx 'tests/test_robots.py::test_robot[aloha-True]'
|
||||
```
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.robots.utils import make_robot
|
||||
from tests.utils import TEST_ROBOT_TYPES, mock_calibration_dir, require_robot
|
||||
|
||||
|
||||
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
|
||||
@require_robot
|
||||
def test_robot(tmp_path, request, robot_type, mock):
|
||||
# TODO(rcadene): measure fps in nightly?
|
||||
# TODO(rcadene): test logs
|
||||
# TODO(rcadene): add compatibility with other robots
|
||||
robot_kwargs = {"robot_type": robot_type, "mock": mock}
|
||||
|
||||
if robot_type == "aloha" and mock:
|
||||
# To simplify unit test, we do not rerun manual calibration for Aloha mock=True.
|
||||
# Instead, we use the files from '.cache/calibration/aloha_default'
|
||||
pass
|
||||
else:
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
# Create an empty calibration directory to trigger manual calibration
|
||||
calibration_dir = tmp_path / robot_type
|
||||
mock_calibration_dir(calibration_dir)
|
||||
robot_kwargs["calibration_dir"] = calibration_dir
|
||||
|
||||
# Test using robot before connecting raises an error
|
||||
robot = make_robot(**robot_kwargs)
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
robot.teleop_step()
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
robot.teleop_step(record_data=True)
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
robot.capture_observation()
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
robot.send_action(None)
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
robot.disconnect()
|
||||
|
||||
# Test deleting the object without connecting first
|
||||
del robot
|
||||
|
||||
# Test connecting (triggers manual calibration)
|
||||
robot = make_robot(**robot_kwargs)
|
||||
robot.connect()
|
||||
assert robot.is_connected
|
||||
|
||||
# Test connecting twice raises an error
|
||||
with pytest.raises(DeviceAlreadyConnectedError):
|
||||
robot.connect()
|
||||
|
||||
# TODO(rcadene, aliberts): Test disconnecting with `__del__` instead of `disconnect`
|
||||
# del robot
|
||||
robot.disconnect()
|
||||
|
||||
# Test teleop can run
|
||||
robot = make_robot(**robot_kwargs)
|
||||
robot.connect()
|
||||
robot.teleop_step()
|
||||
|
||||
# Test data recorded during teleop are well formatted
|
||||
observation, action = robot.teleop_step(record_data=True)
|
||||
# State
|
||||
assert "observation.state" in observation
|
||||
assert isinstance(observation["observation.state"], torch.Tensor)
|
||||
assert observation["observation.state"].ndim == 1
|
||||
dim_state = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||||
assert observation["observation.state"].shape[0] == dim_state
|
||||
# Cameras
|
||||
for name in robot.cameras:
|
||||
assert f"observation.images.{name}" in observation
|
||||
assert isinstance(observation[f"observation.images.{name}"], torch.Tensor)
|
||||
assert observation[f"observation.images.{name}"].ndim == 3
|
||||
# Action
|
||||
assert "action" in action
|
||||
assert isinstance(action["action"], torch.Tensor)
|
||||
assert action["action"].ndim == 1
|
||||
dim_action = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||||
assert action["action"].shape[0] == dim_action
|
||||
# TODO(rcadene): test if observation and action data are returned as expected
|
||||
|
||||
# Test capture_observation can run and observation returned are the same (since the arm didnt move)
|
||||
captured_observation = robot.capture_observation()
|
||||
assert set(captured_observation.keys()) == set(observation.keys())
|
||||
for name in captured_observation:
|
||||
if "image" in name:
|
||||
# TODO(rcadene): skipping image for now as it's challenging to assess equality between two consecutive frames
|
||||
continue
|
||||
torch.testing.assert_close(captured_observation[name], observation[name], rtol=1e-4, atol=1)
|
||||
assert captured_observation[name].shape == observation[name].shape
|
||||
|
||||
# Test send_action can run
|
||||
robot.send_action(action["action"])
|
||||
|
||||
# Test disconnecting
|
||||
robot.disconnect()
|
||||
assert not robot.is_connected
|
||||
for name in robot.follower_arms:
|
||||
assert not robot.follower_arms[name].is_connected
|
||||
for name in robot.leader_arms:
|
||||
assert not robot.leader_arms[name].is_connected
|
||||
for name in robot.cameras:
|
||||
assert not robot.cameras[name].is_connected
|
||||
@@ -13,11 +13,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
from functools import wraps
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
@@ -189,46 +187,6 @@ def require_package(package_name):
|
||||
return decorator
|
||||
|
||||
|
||||
def require_robot(func):
|
||||
"""
|
||||
Decorator that skips the test if a robot is not available
|
||||
|
||||
The decorated function must have two arguments `request` and `robot_type`.
|
||||
|
||||
Example of usage:
|
||||
```python
|
||||
@pytest.mark.parametrize(
|
||||
"robot_type", ["koch", "aloha"]
|
||||
)
|
||||
@require_robot
|
||||
def test_require_robot(request, robot_type):
|
||||
pass
|
||||
```
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
# Access the pytest request context to get the is_robot_available fixture
|
||||
request = kwargs.get("request")
|
||||
robot_type = kwargs.get("robot_type")
|
||||
mock = kwargs.get("mock")
|
||||
|
||||
if robot_type is None:
|
||||
raise ValueError("The 'robot_type' must be an argument of the test function.")
|
||||
if request is None:
|
||||
raise ValueError("The 'request' fixture must be an argument of the test function.")
|
||||
if mock is None:
|
||||
raise ValueError("The 'mock' variable must be an argument of the test function.")
|
||||
|
||||
# Run test with a real robot. Skip test if robot connection fails.
|
||||
if not mock and not request.getfixturevalue("is_robot_available"):
|
||||
pytest.skip(f"A {robot_type} robot is not available.")
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def require_camera(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
@@ -252,55 +210,6 @@ def require_camera(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
def require_motor(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
# Access the pytest request context to get the is_motor_available fixture
|
||||
request = kwargs.get("request")
|
||||
motor_type = kwargs.get("motor_type")
|
||||
mock = kwargs.get("mock")
|
||||
|
||||
if request is None:
|
||||
raise ValueError("The 'request' fixture must be an argument of the test function.")
|
||||
if motor_type is None:
|
||||
raise ValueError("The 'motor_type' must be an argument of the test function.")
|
||||
if mock is None:
|
||||
raise ValueError("The 'mock' variable must be an argument of the test function.")
|
||||
|
||||
if not mock and not request.getfixturevalue("is_motor_available"):
|
||||
pytest.skip(f"A {motor_type} motor is not available.")
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def mock_calibration_dir(calibration_dir):
|
||||
# TODO(rcadene): remove this hack
|
||||
# calibration file produced with Moss v1, but works with Koch, Koch bimanual and SO-100
|
||||
example_calib = {
|
||||
"homing_offset": [-1416, -845, 2130, 2872, 1950, -2211],
|
||||
"drive_mode": [0, 0, 1, 1, 1, 0],
|
||||
"start_pos": [1442, 843, 2166, 2849, 1988, 1835],
|
||||
"end_pos": [2440, 1869, -1106, -1848, -926, 3235],
|
||||
"calib_mode": ["DEGREE", "DEGREE", "DEGREE", "DEGREE", "DEGREE", "LINEAR"],
|
||||
"motor_names": ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"],
|
||||
}
|
||||
Path(str(calibration_dir)).mkdir(parents=True, exist_ok=True)
|
||||
with open(calibration_dir / "main_follower.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
with open(calibration_dir / "main_leader.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
with open(calibration_dir / "left_follower.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
with open(calibration_dir / "left_leader.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
with open(calibration_dir / "right_follower.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
with open(calibration_dir / "right_leader.json", "w") as f:
|
||||
json.dump(example_calib, f)
|
||||
|
||||
|
||||
# TODO(rcadene, aliberts): remove this dark pattern that overrides
|
||||
def make_camera(camera_type: str, **kwargs) -> Camera:
|
||||
if camera_type == "opencv":
|
||||
|
||||
Reference in New Issue
Block a user