diff --git a/examples/tutorial/act/act_training_example.py b/examples/tutorial/act/act_training_example.py new file mode 100644 index 000000000..6db495ca2 --- /dev/null +++ b/examples/tutorial/act/act_training_example.py @@ -0,0 +1,98 @@ +"""This script demonstrates how to train ACT Policy on a real-world dataset.""" + +from pathlib import Path + +import torch + +from lerobot.configs.types import FeatureType +from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.utils import dataset_to_policy_features +from lerobot.policies.act.configuration_act import ACTConfig +from lerobot.policies.act.modeling_act import ACTPolicy +from lerobot.policies.factory import make_pre_post_processors + + +def make_delta_timestamps(delta_indices: list[int] | None, fps: int) -> list[float]: + if delta_indices is None: + return [0] + + return [i / fps for i in delta_indices] + + +output_directory = Path("outputs/robot_learning_tutorial/act") +output_directory.mkdir(parents=True, exist_ok=True) + +# Select your device +device = torch.device("mps") # or "cuda" or "cpu" + +dataset_id = "lerobot/svla_so101_pickplace" + +# This specifies the inputs the model will be expecting and the outputs it will produce +dataset_metadata = LeRobotDatasetMetadata(dataset_id) +features = dataset_to_policy_features(dataset_metadata.features) + +output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION} +input_features = {key: ft for key, ft in features.items() if key not in output_features} + +cfg = ACTConfig(input_features=input_features, output_features=output_features) +policy = ACTPolicy(cfg) +preprocessor, postprocessor = make_pre_post_processors(cfg, dataset_stats=dataset_metadata.stats) + +policy.train() +policy.to(device) + +# To perform action chunking, ACT expects a given number of actions as targets +delta_timestamps = { + "action": make_delta_timestamps(cfg.action_delta_indices, dataset_metadata.fps), +} + +# add image features if they are present +delta_timestamps |= { + k: make_delta_timestamps(cfg.observation_delta_indices, dataset_metadata.fps) for k in cfg.image_features +} + +# Instantiate the dataset +dataset = LeRobotDataset(dataset_id, delta_timestamps=delta_timestamps) + +# Create the optimizer and dataloader for offline training +optimizer = cfg.get_optimizer_preset().build(policy.parameters()) +batch_size = 32 +dataloader = torch.utils.data.DataLoader( + dataset, + batch_size=batch_size, + shuffle=True, + pin_memory=device.type != "cpu", + drop_last=True, +) + +# Number of training steps and logging frequency +training_steps = 1 +log_freq = 1 + +# Run training loop +step = 0 +done = False +while not done: + for batch in dataloader: + batch = preprocessor(batch) + loss, _ = policy.forward(batch) + loss.backward() + optimizer.step() + optimizer.zero_grad() + + if step % log_freq == 0: + print(f"step: {step} loss: {loss.item():.3f}") + step += 1 + if step >= training_steps: + done = True + break + +# Save the policy checkpoint, alongside the pre/post processors +policy.save_pretrained(output_directory) +preprocessor.save_pretrained(output_directory) +postprocessor.save_pretrained(output_directory) + +# Save all assets to the Hub +policy.push_to_hub("fracapuano/robot_learning_tutorial_act") +preprocessor.push_to_hub("fracapuano/robot_learning_tutorial_act") +postprocessor.push_to_hub("fracapuano/robot_learning_tutorial_act") diff --git a/examples/tutorial/act/act_using_example.py b/examples/tutorial/act/act_using_example.py new file mode 100644 index 000000000..8beef7654 --- /dev/null +++ b/examples/tutorial/act/act_using_example.py @@ -0,0 +1,57 @@ +import torch + +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata +from lerobot.policies.act.modeling_act import ACTPolicy +from lerobot.policies.factory import make_pre_post_processors +from lerobot.policies.utils import build_inference_frame, make_robot_action +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.so100_follower import SO100Follower + +device = torch.device("mps") # or "cuda" or "cpu" +model_id = "fracapuano/robot_learning_tutorial_act" +model = ACTPolicy.from_pretrained(model_id) + +dataset_id = "lerobot/svla_so101_pickplace" +# This only downloads the metadata for the dataset, ~10s of MB even for large-scale datasets +dataset_metadata = LeRobotDatasetMetadata(dataset_id) +preprocess, postprocess = make_pre_post_processors(model.config, dataset_stats=dataset_metadata.stats) + +# # find ports using lerobot-find-port +follower_port = ... # something like "/dev/tty.usbmodem58760431631" + +# # the robot ids are used the load the right calibration files +follower_id = ... # something like "follower_so100" + +MAX_EPISODES = 5 +MAX_STEPS_PER_EPISODE = 20 + +# Robot and environment configuration +# Camera keys must match the name and resolutions of the ones used for training! +# You can check the camera keys expected by a model in the info.json card on the model card on the Hub +camera_config = { + "side": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30), + "up": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30), +} + +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id, cameras=camera_config) +robot = SO100Follower(robot_cfg) +robot.connect() + +for _ in range(MAX_EPISODES): + for _ in range(MAX_STEPS_PER_EPISODE): + obs = robot.get_observation() + obs_frame = build_inference_frame( + observation=obs, ds_features=dataset_metadata.features, device=device + ) + + obs = preprocess(obs_frame) + + action = model.select_action(obs) + action = postprocess(action) + + action = make_robot_action(action, dataset_metadata.features) + + robot.send_action(action) + + print("Episode finished! Starting new episode...") diff --git a/examples/tutorial/async-inf/policy_server.py b/examples/tutorial/async-inf/policy_server.py new file mode 100644 index 000000000..cd2b1f04c --- /dev/null +++ b/examples/tutorial/async-inf/policy_server.py @@ -0,0 +1,11 @@ +from lerobot.async_inference.configs import PolicyServerConfig +from lerobot.async_inference.policy_server import serve + +host = ... # something like "127.0.0.1" if you're exposing to localhost +port = ... # something like 8080 + +config = PolicyServerConfig( + host=host, + port=port, +) +serve(config) diff --git a/examples/tutorial/async-inf/robot_client.py b/examples/tutorial/async-inf/robot_client.py new file mode 100644 index 000000000..3e46e5e31 --- /dev/null +++ b/examples/tutorial/async-inf/robot_client.py @@ -0,0 +1,55 @@ +import threading + +from lerobot.async_inference.configs import RobotClientConfig +from lerobot.async_inference.helpers import visualize_action_queue_size +from lerobot.async_inference.robot_client import RobotClient +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.robots.so100_follower import SO100FollowerConfig + +# these cameras must match the ones expected by the policy - find your cameras with lerobot-find-cameras +# check the config.json on the Hub for the policy you are using to see the expected camera specs +camera_cfg = { + "up": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30), + "side": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30), +} + +# # find ports using lerobot-find-port +follower_port = ... # something like "/dev/tty.usbmodem58760431631" + +# # the robot ids are used the load the right calibration files +follower_id = ... # something like "follower_so100" + +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id, cameras=camera_cfg) + +server_address = ... # something like "127.0.0.1:8080" if using localhost + +# 3. Create client configuration +client_cfg = RobotClientConfig( + robot=robot_cfg, + server_address=server_address, + policy_device="mps", + policy_type="act", + pretrained_name_or_path="fracapuano/robot_learning_tutorial_act", + chunk_size_threshold=0.5, # g + actions_per_chunk=50, # make sure this is less than the max actions of the policy +) + +# 4. Create and start client +client = RobotClient(client_cfg) + +# 5. Provide a textual description of the task +task = ... + +if client.start(): + # Start action receiver thread + action_receiver_thread = threading.Thread(target=client.receive_actions, daemon=True) + action_receiver_thread.start() + + try: + # Run the control loop + client.control_loop(task) + except KeyboardInterrupt: + client.stop() + action_receiver_thread.join() + # (Optionally) plot the action queue size + visualize_action_queue_size(client.action_queue_size) diff --git a/examples/tutorial/diffusion/diffusion_training_example.py b/examples/tutorial/diffusion/diffusion_training_example.py new file mode 100644 index 000000000..4c3fac09e --- /dev/null +++ b/examples/tutorial/diffusion/diffusion_training_example.py @@ -0,0 +1,99 @@ +"""This script demonstrates how to train Diffusion Policy on a real-world dataset.""" + +from pathlib import Path + +import torch + +from lerobot.configs.types import FeatureType +from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.datasets.utils import dataset_to_policy_features +from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig +from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy +from lerobot.policies.factory import make_pre_post_processors + + +def make_delta_timestamps(delta_indices: list[int] | None, fps: int) -> list[float]: + if delta_indices is None: + return [0] + + return [i / fps for i in delta_indices] + + +output_directory = Path("outputs/robot_learning_tutorial/diffusion") +output_directory.mkdir(parents=True, exist_ok=True) + +# Select your device +device = torch.device("mps") # or "cuda" or "cpu" + +dataset_id = "lerobot/svla_so101_pickplace" + +# This specifies the inputs the model will be expecting and the outputs it will produce +dataset_metadata = LeRobotDatasetMetadata(dataset_id) +features = dataset_to_policy_features(dataset_metadata.features) + +output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION} +input_features = {key: ft for key, ft in features.items() if key not in output_features} + +cfg = DiffusionConfig(input_features=input_features, output_features=output_features) +policy = DiffusionPolicy(cfg) +preprocessor, postprocessor = make_pre_post_processors(cfg, dataset_stats=dataset_metadata.stats) + +policy.train() +policy.to(device) + +# To perform action chunking, ACT expects a given number of actions as targets +delta_timestamps = { + "observation.state": make_delta_timestamps(cfg.observation_delta_indices, dataset_metadata.fps), + "action": make_delta_timestamps(cfg.action_delta_indices, dataset_metadata.fps), +} + +# add image features if they are present +delta_timestamps |= { + k: make_delta_timestamps(cfg.observation_delta_indices, dataset_metadata.fps) for k in cfg.image_features +} + +# Instantiate the dataset +dataset = LeRobotDataset(dataset_id, delta_timestamps=delta_timestamps) + +# Create the optimizer and dataloader for offline training +optimizer = cfg.get_optimizer_preset().build(policy.parameters()) +batch_size = 32 +dataloader = torch.utils.data.DataLoader( + dataset, + batch_size=batch_size, + shuffle=True, + pin_memory=device.type != "cpu", + drop_last=True, +) + +# Number of training steps and logging frequency +training_steps = 1 +log_freq = 1 + +# Run training loop +step = 0 +done = False +while not done: + for batch in dataloader: + batch = preprocessor(batch) + loss, _ = policy.forward(batch) + loss.backward() + optimizer.step() + optimizer.zero_grad() + + if step % log_freq == 0: + print(f"step: {step} loss: {loss.item():.3f}") + step += 1 + if step >= training_steps: + done = True + break + +# Save the policy checkpoint, alongside the pre/post processors +policy.save_pretrained(output_directory) +preprocessor.save_pretrained(output_directory) +postprocessor.save_pretrained(output_directory) + +# Save all assets to the Hub +policy.push_to_hub("fracapuano/robot_learning_tutorial_diffusion") +preprocessor.push_to_hub("fracapuano/robot_learning_tutorial_diffusion") +postprocessor.push_to_hub("fracapuano/robot_learning_tutorial_diffusion") diff --git a/examples/tutorial/diffusion/diffusion_using_example.py b/examples/tutorial/diffusion/diffusion_using_example.py new file mode 100644 index 000000000..fe516383f --- /dev/null +++ b/examples/tutorial/diffusion/diffusion_using_example.py @@ -0,0 +1,60 @@ +import torch + +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata +from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy +from lerobot.policies.factory import make_pre_post_processors +from lerobot.policies.utils import build_inference_frame, make_robot_action +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.so100_follower import SO100Follower + +device = torch.device("mps") # or "cuda" or "cpu" +model_id = "fracapuano/robot_learning_tutorial_diffusion" + +model = DiffusionPolicy.from_pretrained(model_id) + +dataset_id = "lerobot/svla_so101_pickplace" +# This only downloads the metadata for the dataset, ~10s of MB even for large-scale datasets +dataset_metadata = LeRobotDatasetMetadata(dataset_id) +preprocess, postprocess = make_pre_post_processors( + model.config, model_id, dataset_stats=dataset_metadata.stats +) + +MAX_EPISODES = 5 +MAX_STEPS_PER_EPISODE = 20 + + +# # find ports using lerobot-find-port +follower_port = ... # something like "/dev/tty.usbmodem58760431631" + +# # the robot ids are used the load the right calibration files +follower_id = ... # something like "follower_so100" + +# Robot and environment configuration +# Camera keys must match the name and resolutions of the ones used for training! +# You can check the camera keys expected by a model in the info.json card on the model card on the Hub +camera_config = { + "side": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30), + "up": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30), +} + +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id, cameras=camera_config) +robot = SO100Follower(robot_cfg) +robot.connect() + + +for _ in range(MAX_EPISODES): + for _ in range(MAX_STEPS_PER_EPISODE): + obs = robot.get_observation() + obs_frame = build_inference_frame( + observation=obs, ds_features=dataset_metadata.features, device=device + ) + + obs = preprocess(obs_frame) + + action = model.select_action(obs) + action = postprocess(action) + action = make_robot_action(action, dataset_metadata.features) + robot.send_action(action) + + print("Episode finished! Starting new episode...") diff --git a/examples/tutorial/pi0/using_pi0_example.py b/examples/tutorial/pi0/using_pi0_example.py new file mode 100644 index 000000000..98fc4b4be --- /dev/null +++ b/examples/tutorial/pi0/using_pi0_example.py @@ -0,0 +1,67 @@ +import torch + +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.policies.factory import make_pre_post_processors +from lerobot.policies.pi0.modeling_pi0 import PI0Policy +from lerobot.policies.utils import build_inference_frame, make_robot_action +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.so100_follower import SO100Follower + +MAX_EPISODES = 5 +MAX_STEPS_PER_EPISODE = 20 + +device = torch.device("mps") # or "cuda" or "cpu" +model_id = "lerobot/pi0_base" + +model = PI0Policy.from_pretrained(model_id) + +preprocess, postprocess = make_pre_post_processors( + model.config, + model_id, + # This overrides allows to run on MPS, otherwise defaults to CUDA (if available) + preprocessor_overrides={"device_processor": {"device": str(device)}}, +) + +# find ports using lerobot-find-port +follower_port = ... # something like "/dev/tty.usbmodem58760431631" + +# the robot ids are used the load the right calibration files +follower_id = ... # something like "follower_so100" + +# Robot and environment configuration +# Camera keys must match the name and resolutions of the ones used for training! +# You can check the camera keys expected by a model in the info.json card on the model card on the Hub +camera_config = { + "base_0_rgb": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30), + "left_wrist_0_rgb": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30), + "right_wrist_0_rgb": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=30), +} + +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id, cameras=camera_config) +robot = SO100Follower(robot_cfg) +robot.connect() + +task = "" # something like "pick the red block" +robot_type = "" # something like "so100_follower" for multi-embodiment datasets + +# This is used to match the raw observation keys to the keys expected by the policy +action_features = hw_to_dataset_features(robot.action_features, "action") +obs_features = hw_to_dataset_features(robot.observation_features, "observation") +dataset_features = {**action_features, **obs_features} + +for _ in range(MAX_EPISODES): + for _ in range(MAX_STEPS_PER_EPISODE): + obs = robot.get_observation() + obs_frame = build_inference_frame( + observation=obs, ds_features=dataset_features, device=device, task=task, robot_type=robot_type + ) + + obs = preprocess(obs_frame) + + action = model.select_action(obs) + action = postprocess(action) + action = make_robot_action(action, dataset_features) + robot.send_action(action) + + print("Episode finished! Starting new episode...") diff --git a/examples/tutorial/rl/hilserl_example.py b/examples/tutorial/rl/hilserl_example.py new file mode 100644 index 000000000..865053d36 --- /dev/null +++ b/examples/tutorial/rl/hilserl_example.py @@ -0,0 +1,345 @@ +import multiprocessing as mp +import signal +from pathlib import Path +from queue import Empty, Full + +import torch +import torch.optim as optim + +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.envs.configs import HILSerlProcessorConfig, HILSerlRobotEnvConfig +from lerobot.policies.sac.configuration_sac import SACConfig +from lerobot.policies.sac.modeling_sac import SACPolicy +from lerobot.policies.sac.reward_model.modeling_classifier import Classifier +from lerobot.rl.buffer import ReplayBuffer +from lerobot.rl.gym_manipulator import make_robot_env +from lerobot.robots.so100_follower import SO100FollowerConfig +from lerobot.teleoperators.so100_leader import SO100LeaderConfig +from lerobot.teleoperators.utils import TeleopEvents + +LOG_EVERY = 10 +SEND_EVERY = 10 + + +def run_learner( + transitions_queue: mp.Queue, + parameters_queue: mp.Queue, + shutdown_event: mp.Event, + policy_learner: SACPolicy, + online_buffer: ReplayBuffer, + offline_buffer: ReplayBuffer, + lr: float = 3e-4, + batch_size: int = 32, + device: torch.device = "mps", +): + """The learner process - trains SAC policy on transitions streamed from the actor, updating parameters + for the actor to adopt.""" + policy_learner.train() + policy_learner.to(device) + + # Create Adam optimizer from scratch - simple and clean + optimizer = optim.Adam(policy_learner.parameters(), lr=lr) + + print(f"[LEARNER] Online buffer capacity: {online_buffer.capacity}") + print(f"[LEARNER] Offline buffer capacity: {offline_buffer.capacity}") + + training_step = 0 + + while not shutdown_event.is_set(): + # retrieve incoming transitions from the actor process + try: + transitions = transitions_queue.get(timeout=0.1) + for transition in transitions: + # HIL-SERL: Add ALL transitions to online buffer + online_buffer.add(**transition) + + # HIL-SERL: Add ONLY human intervention transitions to offline buffer + is_intervention = transition.get("complementary_info", {}).get("is_intervention", False) + if is_intervention: + offline_buffer.add(**transition) + print( + f"[LEARNER] Human intervention detected! Added to offline buffer (now {len(offline_buffer)} transitions)" + ) + + except Empty: + pass # No transitions available, continue + + # Train if we have enough data + if len(online_buffer) >= policy_learner.config.online_step_before_learning: + # Sample from online buffer (autonomous + human data) + online_batch = online_buffer.sample(batch_size // 2) + + # Sample from offline buffer (human demonstrations only, either precollected or at runtime) + offline_batch = offline_buffer.sample(batch_size // 2) + + # Combine batches - this is the key HIL-SERL mechanism! + batch = {} + for key in online_batch: + if key in offline_batch: + batch[key] = torch.cat([online_batch[key], offline_batch[key]], dim=0) + else: + batch[key] = online_batch[key] + + loss, _ = policy_learner.forward(batch) + + optimizer.zero_grad() + loss.backward() + optimizer.step() + training_step += 1 + + if training_step % LOG_EVERY == 0: + print( + f"[LEARNER] Training step {training_step}, Loss: {loss.item():.4f}, " + f"Buffers: Online={len(online_buffer)}, Offline={len(offline_buffer)}" + ) + + # Send updated parameters to actor every 10 training steps + if training_step % SEND_EVERY == 0: + try: + state_dict = {k: v.cpu() for k, v in policy_learner.state_dict().items()} + parameters_queue.put_nowait(state_dict) + print("[LEARNER] Sent updated parameters to actor") + except Full: + # Missing write due to queue not being consumed (should happen rarely) + pass + + print("[LEARNER] Learner process finished") + + +def run_actor( + transitions_queue: mp.Queue, + parameters_queue: mp.Queue, + shutdown_event: mp.Event, + policy_actor: SACPolicy, + reward_classifier: Classifier, + env_cfg: HILSerlRobotEnvConfig, + device: torch.device = "mps", + output_directory: Path | None = None, +): + """The actor process - interacts with environment and collects data. + The policy is frozen and only the parameters are updated, popping the most recent ones from a queue.""" + policy_actor.eval() + policy_actor.to(device) + + reward_classifier.eval() + reward_classifier.to(device) + + # Create robot environment inside the actor process + env, teleop_device = make_robot_env(env_cfg) + + try: + for episode in range(MAX_EPISODES): + if shutdown_event.is_set(): + break + + obs, _info = env.reset() + episode_reward = 0.0 + step = 0 + episode_transitions = [] + + print(f"[ACTOR] Starting episode {episode + 1}") + + while step < MAX_STEPS_PER_EPISODE and not shutdown_event.is_set(): + try: + new_params = parameters_queue.get_nowait() + policy_actor.load_state_dict(new_params) + print("[ACTOR] Updated policy parameters from learner") + except Empty: # No new updated parameters available from learner, waiting + pass + + # Get action from policy + policy_obs = make_policy_obs(obs, device=device) + action_tensor = policy_actor.select_action(policy_obs) # predicts a single action + action = action_tensor.squeeze(0).cpu().numpy() + + # Step environment + next_obs, _env_reward, terminated, truncated, _info = env.step(action) + done = terminated or truncated + + # Predict reward + policy_next_obs = make_policy_obs(next_obs, device=device) + reward = reward_classifier.predict_reward(policy_next_obs) + + if reward >= 1.0 and not done: # success detected! halt episode + terminated = True + done = True + + # In HIL-SERL, human interventions come from the teleop device + is_intervention = False + if hasattr(teleop_device, "get_teleop_events"): + # Real intervention detection from teleop device + teleop_events = teleop_device.get_teleop_events() + is_intervention = teleop_events.get(TeleopEvents.IS_INTERVENTION, False) + + # Store transition with intervention metadata + transition = { + "state": policy_obs, + "action": action, + "reward": float(reward) if hasattr(reward, "item") else reward, + "next_state": policy_next_obs, + "done": done, + "truncated": truncated, + "complementary_info": { + "is_intervention": is_intervention, + }, + } + + episode_transitions.append(transition) + + episode_reward += reward + step += 1 + + obs = next_obs + + if done: + break + + # Send episode transitions to learner + transitions_queue.put_nowait(episode_transitions) + + except KeyboardInterrupt: + print("[ACTOR] Interrupted by user") + finally: + # Clean up + if hasattr(env, "robot") and env.robot.is_connected: + env.robot.disconnect() + if teleop_device and hasattr(teleop_device, "disconnect"): + teleop_device.disconnect() + if output_directory is not None: + policy_actor.save_pretrained(output_directory) + print(f"[ACTOR] Latest actor policy saved at: {output_directory}") + + print("[ACTOR] Actor process finished") + + +def make_policy_obs(obs, device: torch.device = "cpu"): + return { + "observation.state": torch.from_numpy(obs["agent_pos"]).float().unsqueeze(0).to(device), + **{ + f"observation.image.{k}": torch.from_numpy(obs["pixels"][k]).float().unsqueeze(0).to(device) + for k in obs["pixels"] + }, + } + + +"""Main function - coordinates actor and learner processes.""" + +device = "mps" # or "cuda" or "cpu" +output_directory = Path("outputs/robot_learning_tutorial/hil_serl") +output_directory.mkdir(parents=True, exist_ok=True) + +# find ports using lerobot-find-port +follower_port = ... +leader_port = ... + +# the robot ids are used the load the right calibration files +follower_id = ... +leader_id = ... + +# A pretrained model (to be used in-distribution!) +reward_classifier_id = "fracapuano/reward_classifier_hil_serl_example" +reward_classifier = Classifier.from_pretrained(reward_classifier_id) + +reward_classifier.to(device) +reward_classifier.eval() + +MAX_EPISODES = 5 +MAX_STEPS_PER_EPISODE = 20 + +# Robot and environment configuration +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id) +teleop_cfg = SO100LeaderConfig(port=leader_port, id=leader_id) +processor_cfg = HILSerlProcessorConfig(control_mode="leader") + +env_cfg = HILSerlRobotEnvConfig(robot=robot_cfg, teleop=teleop_cfg, processor=processor_cfg) + +# Create robot environment +env, teleop_device = make_robot_env(env_cfg) + +obs_features = hw_to_dataset_features(env.robot.observation_features, "observation") +action_features = hw_to_dataset_features(env.robot.action_features, "action") + +# Create SAC policy for action selection +policy_cfg = SACConfig( + device=device, + input_features=obs_features, + output_features=action_features, +) + +policy_actor = SACPolicy(policy_cfg) +policy_learner = SACPolicy(policy_cfg) + +demonstrations_repo_id = "lerobot/example_hil_serl_dataset" +offline_dataset = LeRobotDataset(repo_id=demonstrations_repo_id) + +# Online buffer: initialized from scratch +online_replay_buffer = ReplayBuffer(device=device, state_keys=list(obs_features.keys())) +# Offline buffer: Created from dataset (pre-populated it with demonstrations) +offline_replay_buffer = ReplayBuffer.from_lerobot_dataset( + lerobot_dataset=offline_dataset, device=device, state_keys=list(obs_features.keys()) +) + +# Create communication channels between learner and actor processes +transitions_queue = mp.Queue(maxsize=10) +parameters_queue = mp.Queue(maxsize=2) +shutdown_event = mp.Event() + + +# Signal handler for graceful shutdown +def signal_handler(sig): + print(f"\nSignal {sig} received, shutting down...") + shutdown_event.set() + + +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + +# Create processes +learner_process = mp.Process( + target=run_learner, + args=( + transitions_queue, + parameters_queue, + shutdown_event, + policy_learner, + online_replay_buffer, + offline_replay_buffer, + ), + kwargs={"device": device}, # can run on accelerated hardware for training +) + +actor_process = mp.Process( + target=run_actor, + args=( + transitions_queue, + parameters_queue, + shutdown_event, + policy_actor, + reward_classifier, + env_cfg, + output_directory, + ), + kwargs={"device": "cpu"}, # actor is frozen, can run on CPU or accelerate for inference +) + +learner_process.start() +actor_process.start() + +try: + # Wait for actor to finish (it controls the episode loop) + actor_process.join() + shutdown_event.set() + learner_process.join(timeout=10) + +except KeyboardInterrupt: + print("Main process interrupted") + shutdown_event.set() + actor_process.join(timeout=5) + learner_process.join(timeout=10) + +finally: + if learner_process.is_alive(): + learner_process.terminate() + if actor_process.is_alive(): + actor_process.terminate() diff --git a/examples/tutorial/rl/reward_classifier_example.py b/examples/tutorial/rl/reward_classifier_example.py new file mode 100644 index 000000000..a3d852e30 --- /dev/null +++ b/examples/tutorial/rl/reward_classifier_example.py @@ -0,0 +1,62 @@ +import torch + +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.policies.factory import make_policy, make_pre_post_processors +from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig + +# Device to use for training +device = "mps" # or "cuda", or "cpu" + +# Load the dataset used for training +repo_id = "lerobot/example_hil_serl_dataset" +dataset = LeRobotDataset(repo_id) + +# Configure the policy to extract features from the image frames +camera_keys = dataset.meta.camera_keys + +config = RewardClassifierConfig( + num_cameras=len(camera_keys), + device=device, + # backbone model to extract features from the image frames + model_name="microsoft/resnet-18", +) + +# Make policy, preprocessor, and optimizer +policy = make_policy(config, ds_meta=dataset.meta) +optimizer = config.get_optimizer_preset().build(policy.parameters()) +preprocessor, _ = make_pre_post_processors(policy_cfg=config, dataset_stats=dataset.meta.stats) + + +classifier_id = "fracapuano/reward_classifier_hil_serl_example" + +# Instantiate a dataloader +dataloader = torch.utils.data.DataLoader(dataset, batch_size=16, shuffle=True) + +# Training loop +num_epochs = 5 +for epoch in range(num_epochs): + total_loss = 0 + total_accuracy = 0 + for batch in dataloader: + # Preprocess the batch and move it to the correct device. + batch = preprocessor(batch) + + # Forward pass + loss, output_dict = policy.forward(batch) + + # Backward pass and optimization + optimizer.zero_grad() + loss.backward() + optimizer.step() + + total_loss += loss.item() + total_accuracy += output_dict["accuracy"] + + avg_loss = total_loss / len(dataloader) + avg_accuracy = total_accuracy / len(dataloader) + print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.2f}%") + +print("Training finished!") + +# You can now save the trained policy. +policy.push_to_hub(classifier_id) diff --git a/examples/tutorial/smolvla/using_smolvla_example.py b/examples/tutorial/smolvla/using_smolvla_example.py new file mode 100644 index 000000000..04c327833 --- /dev/null +++ b/examples/tutorial/smolvla/using_smolvla_example.py @@ -0,0 +1,66 @@ +import torch + +from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig +from lerobot.datasets.utils import hw_to_dataset_features +from lerobot.policies.factory import make_pre_post_processors +from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy +from lerobot.policies.utils import build_inference_frame, make_robot_action +from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig +from lerobot.robots.so100_follower.so100_follower import SO100Follower + +MAX_EPISODES = 5 +MAX_STEPS_PER_EPISODE = 20 + +device = torch.device("mps") # or "cuda" or "cpu" +model_id = "lerobot/smolvla_base" + +model = SmolVLAPolicy.from_pretrained(model_id) + +preprocess, postprocess = make_pre_post_processors( + model.config, + model_id, + # This overrides allows to run on MPS, otherwise defaults to CUDA (if available) + preprocessor_overrides={"device_processor": {"device": str(device)}}, +) + +# find ports using lerobot-find-port +follower_port = ... # something like "/dev/tty.usbmodem58760431631" + +# the robot ids are used the load the right calibration files +follower_id = ... # something like "follower_so100" + +# Robot and environment configuration +# Camera keys must match the name and resolutions of the ones used for training! +# You can check the camera keys expected by a model in the info.json card on the model card on the Hub +camera_config = { + "camera1": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=30), + "camera2": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=30), +} + +robot_cfg = SO100FollowerConfig(port=follower_port, id=follower_id, cameras=camera_config) +robot = SO100Follower(robot_cfg) +robot.connect() + +task = "" # something like "pick the red block" +robot_type = "" # something like "so100_follower" for multi-embodiment datasets + +# This is used to match the raw observation keys to the keys expected by the policy +action_features = hw_to_dataset_features(robot.action_features, "action") +obs_features = hw_to_dataset_features(robot.observation_features, "observation") +dataset_features = {**action_features, **obs_features} + +for _ in range(MAX_EPISODES): + for _ in range(MAX_STEPS_PER_EPISODE): + obs = robot.get_observation() + obs_frame = build_inference_frame( + observation=obs, ds_features=dataset_features, device=device, task=task, robot_type=robot_type + ) + + obs = preprocess(obs_frame) + + action = model.select_action(obs) + action = postprocess(action) + action = make_robot_action(action, dataset_features) + robot.send_action(action) + + print("Episode finished! Starting new episode...")