Removed configuration and port finding code from classes no longer necessary.

This commit is contained in:
jess-moss
2024-09-17 16:31:08 -05:00
parent 1c1882e5eb
commit aa22946a7f
5 changed files with 6 additions and 298 deletions

View File

@@ -4,7 +4,6 @@ import math
import time import time
import traceback import traceback
from copy import deepcopy from copy import deepcopy
from pathlib import Path
import numpy as np import numpy as np
import tqdm import tqdm
@@ -235,35 +234,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
) )
def find_available_ports():
ports = []
for path in Path("/dev").glob("tty*"):
ports.append(str(path))
return ports
def find_port():
print("Finding all available ports for the DynamixelMotorsBus.")
ports_before = find_available_ports()
print(ports_before)
print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.")
input()
time.sleep(0.5)
ports_after = find_available_ports()
ports_diff = list(set(ports_before) - set(ports_after))
if len(ports_diff) == 1:
port = ports_diff[0]
print(f"The port of this DynamixelMotorsBus is '{port}'")
print("Reconnect the usb cable.")
elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
else:
raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).")
class TorqueMode(enum.Enum): class TorqueMode(enum.Enum):
ENABLED = 1 ENABLED = 1
DISABLED = 0 DISABLED = 0
@@ -377,11 +347,6 @@ class DynamixelMotorsBus:
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
if not self.are_motors_configured():
raise OSError(
"Motors are not configured. Please use configure_motors.py to configure motors before continuing."
)
def reconnect(self): def reconnect(self):
self.port_handler = PortHandler(self.port) self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION) self.packet_handler = PacketHandler(PROTOCOL_VERSION)
@@ -398,112 +363,6 @@ class DynamixelMotorsBus:
print(e) print(e)
return False return False
def configure_motors(self):
# TODO(rcadene): This script assumes motors follow the X_SERIES baudrates
# TODO(rcadene): Refactor this function with intermediate high-level functions
print("Scanning all baudrates and motor indices")
all_baudrates = set(X_SERIES_BAUDRATE_TABLE.values())
ids_per_baudrate = {}
for baudrate in all_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices()
if len(present_ids) > 0:
ids_per_baudrate[baudrate] = present_ids
print(f"Motor indices detected: {ids_per_baudrate}")
print()
possible_baudrates = list(ids_per_baudrate.keys())
possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist})
untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices))
# Connect successively one motor to the chain and write a unique random index for each
for i in range(len(self.motors)):
self.disconnect()
input(
"1. Unplug the power cord\n"
"2. Plug/unplug minimal number of cables to only have the first "
f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n"
"3. Re-plug the power cord\n"
"Press Enter to continue..."
)
print()
self.reconnect()
if i > 0:
try:
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
print()
self.reconnect()
print("Scanning possible baudrates and motor indices")
motor_found = False
for baudrate in possible_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices(possible_ids)
if len(present_ids) == 1:
present_idx = present_ids[0]
print(f"Detected motor with index {present_idx}")
if baudrate != BAUDRATE:
print(f"Setting its baudrate to {BAUDRATE}")
baudrate_idx = list(X_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE)
# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
self.set_bus_baudrate(baudrate)
continue
break
else:
raise
if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")
motor_found = True
break
elif len(present_ids) > 1:
raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.")
if not motor_found:
raise OSError(
"No motor found, but one new motor expected. Verify power cord is plugged in and retry."
)
print()
print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()
if (self.read("ID") != self.motor_indices).any():
raise OSError("Failed to write motors indices.")
print("Configuration is done!")
def find_motor_indices(self, possible_ids=None): def find_motor_indices(self, possible_ids=None):
if possible_ids is None: if possible_ids is None:
possible_ids = range(MAX_ID_RANGE) possible_ids = range(MAX_ID_RANGE)
@@ -968,8 +827,3 @@ class DynamixelMotorsBus:
def __del__(self): def __del__(self):
if getattr(self, "is_connected", False): if getattr(self, "is_connected", False):
self.disconnect() self.disconnect()
if __name__ == "__main__":
# Helper to find the usb port associated to all your DynamixelMotorsBus.
find_port()

View File

@@ -4,7 +4,6 @@ import math
import time import time
import traceback import traceback
from copy import deepcopy from copy import deepcopy
from pathlib import Path
import numpy as np import numpy as np
import tqdm import tqdm
@@ -208,35 +207,6 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
) )
def find_available_ports():
ports = []
for path in Path("/dev").glob("tty*"):
ports.append(str(path))
return ports
def find_port():
print("Finding all available ports for the FeetechMotorsBus.")
ports_before = find_available_ports()
print(ports_before)
print("Remove the usb cable from your FeetechMotorsBus and press Enter when done.")
input()
time.sleep(0.5)
ports_after = find_available_ports()
ports_diff = list(set(ports_before) - set(ports_after))
if len(ports_diff) == 1:
port = ports_diff[0]
print(f"The port of this FeetechMotorsBus is '{port}'")
print("Reconnect the usb cable.")
elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
else:
raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).")
class TorqueMode(enum.Enum): class TorqueMode(enum.Enum):
ENABLED = 1 ENABLED = 1
DISABLED = 0 DISABLED = 0
@@ -350,11 +320,6 @@ class FeetechMotorsBus:
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
if not self.are_motors_configured():
raise OSError(
"Motors are not configured. Please use configure_motors.py to configure motors before continuing."
)
def reconnect(self): def reconnect(self):
self.port_handler = PortHandler(self.port) self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION) self.packet_handler = PacketHandler(PROTOCOL_VERSION)
@@ -371,112 +336,6 @@ class FeetechMotorsBus:
print(e) print(e)
return False return False
def configure_motors(self):
# TODO(rcadene): This script assumes motors follow the X_SERIES baudrates
# TODO(rcadene): Refactor this function with intermediate high-level functions
print("Scanning all baudrates and motor indices")
all_baudrates = set(SCS_SERIES_BAUDRATE_TABLE.values())
ids_per_baudrate = {}
for baudrate in all_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices()
if len(present_ids) > 0:
ids_per_baudrate[baudrate] = present_ids
print(f"Motor indices detected: {ids_per_baudrate}")
print()
possible_baudrates = list(ids_per_baudrate.keys())
possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist})
untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices))
# Connect successively one motor to the chain and write a unique random index for each
for i in range(len(self.motors)):
self.disconnect()
input(
"1. Unplug the power cord\n"
"2. Plug/unplug minimal number of cables to only have the first "
f"{i+1} motor(s) ({self.motor_names[:i+1]}) connected.\n"
"3. Re-plug the power cord\n"
"Press Enter to continue..."
)
print()
self.reconnect()
if i > 0:
try:
self.read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
print()
self.reconnect()
print("Scanning possible baudrates and motor indices")
motor_found = False
for baudrate in possible_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices(possible_ids)
if len(present_ids) == 1:
present_idx = present_ids[0]
print(f"Detected motor with index {present_idx}")
if baudrate != BAUDRATE:
print(f"Setting its baudrate to {BAUDRATE}")
baudrate_idx = list(SCS_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE)
# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self.write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self.read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
self.set_bus_baudrate(baudrate)
continue
break
else:
raise
if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self.write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
present_idx = self.read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")
motor_found = True
break
elif len(present_ids) > 1:
raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.")
if not motor_found:
raise OSError(
"No motor found, but one new motor expected. Verify power cord is plugged in and retry."
)
print()
print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self.write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()
if (self.read("ID") != self.motor_indices).any():
raise OSError("Failed to write motors indices.")
print("Configuration is done!")
def find_motor_indices(self, possible_ids=None): def find_motor_indices(self, possible_ids=None):
if possible_ids is None: if possible_ids is None:
possible_ids = range(MAX_ID_RANGE) possible_ids = range(MAX_ID_RANGE)
@@ -941,8 +800,3 @@ class FeetechMotorsBus:
def __del__(self): def __del__(self):
if getattr(self, "is_connected", False): if getattr(self, "is_connected", False):
self.disconnect() self.disconnect()
if __name__ == "__main__":
# Helper to find the usb port associated to all your FeetechMotorsBus.
find_port()

View File

@@ -55,7 +55,7 @@ def configure_motor(brand, model, motor_idx_des, baudrate_des):
try: try:
motor_bus.connect() motor_bus.connect()
print(f"Connected on port {motor_bus.port}") print(f"Connected on port {motor_bus.port}")
except Exception as e: except OSError as e:
print(f"Error occurred when connecting to the motor bus: {e}") print(f"Error occurred when connecting to the motor bus: {e}")
return return

View File

@@ -10,11 +10,11 @@ def find_available_ports():
def find_port(): def find_port():
print("Finding all available ports for the DynamixelMotorsBus.") print("Finding all available ports for the MotorsBus.")
ports_before = find_available_ports() ports_before = find_available_ports()
print(ports_before) print(ports_before)
print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.") print("Remove the usb cable from your MotorsBus and press Enter when done.")
input() input()
time.sleep(0.5) time.sleep(0.5)
@@ -23,7 +23,7 @@ def find_port():
if len(ports_diff) == 1: if len(ports_diff) == 1:
port = ports_diff[0] port = ports_diff[0]
print(f"The port of this DynamixelMotorsBus is '{port}'") print(f"The port of this MotorsBus is '{port}'")
print("Reconnect the usb cable.") print("Reconnect the usb cable.")
elif len(ports_diff) == 0: elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).") raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
@@ -32,5 +32,5 @@ def find_port():
if __name__ == "__main__": if __name__ == "__main__":
# Helper to find the usb port associated to all your DynamixelMotorsBus. # Helper to find the usb port associated to all your MotorsBus.
find_port() find_port()

View File

@@ -241,7 +241,7 @@
if(!canPlayVideos){ if(!canPlayVideos){
this.videoCodecError = true; this.videoCodecError = true;
} }
// process CSV data // process CSV data
this.videos = document.querySelectorAll('video'); this.videos = document.querySelectorAll('video');
this.video = this.videos[0]; this.video = this.videos[0];