forked from tangger/lerobot
Add new test_control_robot
This commit is contained in:
@@ -195,7 +195,7 @@ def record_loop(
|
||||
|
||||
|
||||
@draccus.wrap()
|
||||
def record(cfg: RecordConfig):
|
||||
def record(cfg: RecordConfig) -> LeRobotDataset:
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
if cfg.display_data:
|
||||
|
||||
@@ -51,10 +51,13 @@ class TeleoperateConfig:
|
||||
display_data: bool = False
|
||||
|
||||
|
||||
def teleop_loop(teleop: Teleoperator, robot: Robot, display_data: bool = False):
|
||||
def teleop_loop(
|
||||
teleop: Teleoperator, robot: Robot, display_data: bool = False, duration: float | None = None
|
||||
):
|
||||
display_len = max(len(key) for key in robot.action_features)
|
||||
start = time.perf_counter()
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
loop_start = time.perf_counter()
|
||||
action = teleop.get_action()
|
||||
if display_data:
|
||||
observation = robot.get_observation()
|
||||
@@ -68,18 +71,22 @@ def teleop_loop(teleop: Teleoperator, robot: Robot, display_data: bool = False):
|
||||
rr.log(f"action_{act}", rr.Scalar(val))
|
||||
|
||||
robot.send_action(action)
|
||||
loop_s = time.perf_counter() - start
|
||||
loop_s = time.perf_counter() - loop_start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
|
||||
for motor, value in action.items():
|
||||
print(f"{motor:<{display_len}} | {value:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
|
||||
if duration is not None and time.perf_counter() - start >= duration:
|
||||
return
|
||||
|
||||
move_cursor_up(len(action) + 5)
|
||||
|
||||
|
||||
@draccus.wrap()
|
||||
def control_robot(cfg: TeleoperateConfig):
|
||||
def teleoperate(cfg: TeleoperateConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
if cfg.display_data:
|
||||
@@ -92,7 +99,7 @@ def control_robot(cfg: TeleoperateConfig):
|
||||
robot.connect()
|
||||
|
||||
try:
|
||||
teleop_loop(teleop, robot, display_data=cfg.display_data)
|
||||
teleop_loop(teleop, robot, display_data=cfg.display_data, duration=cfg.teleop_time_s)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
@@ -103,4 +110,4 @@ def control_robot(cfg: TeleoperateConfig):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
control_robot()
|
||||
teleoperate()
|
||||
|
||||
Reference in New Issue
Block a user