chore: enable simplify in ruff lint (#2085)

This commit is contained in:
Steven Palma
2025-09-29 15:06:56 +02:00
committed by GitHub
parent c378a325f0
commit bbcf66bd82
13 changed files with 32 additions and 43 deletions

View File

@@ -201,7 +201,7 @@ exclude = ["tests/artifacts/**/*.safetensors", "*_pb2.py", "*_pb2_grpc.py"]
# N: pep8-naming
# TODO: Uncomment rules when ready to use
select = [
"E", "W", "F", "I", "B", "C4", "T20", "N", "UP" # "SIM", "A", "S", "D", "RUF"
"E", "W", "F", "I", "B", "C4", "T20", "N", "UP", "SIM" #, "A", "S", "D", "RUF"
]
ignore = [
"E501", # Line too long

View File

@@ -437,7 +437,9 @@ def concatenate_video_files(
tmp_concatenate_path, mode="r", format="concat", options={"safe": "0"}
) # safe = 0 allows absolute paths as well as relative paths
tmp_output_video_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file:
tmp_output_video_path = tmp_named_file.name
output_container = av.open(
tmp_output_video_path, mode="w", options={"movflags": "faststart"}
) # faststart is to move the metadata to the beginning of the file to speed up loading

View File

@@ -398,10 +398,7 @@ class ACT(nn.Module):
"actions must be provided when using the variational objective in training mode."
)
if OBS_IMAGES in batch:
batch_size = batch[OBS_IMAGES][0].shape[0]
else:
batch_size = batch[OBS_ENV_STATE].shape[0]
batch_size = batch[OBS_IMAGES][0].shape[0] if OBS_IMAGES in batch else batch[OBS_ENV_STATE].shape[0]
# Prepare the latent for input to the transformer encoder.
if self.config.use_vae and ACTION in batch and self.training:

View File

@@ -340,7 +340,7 @@ class GripperPenaltyProcessorStep(ComplementaryDataProcessorStep):
"""
action = self.transition.get(TransitionKey.ACTION)
raw_joint_positions = complementary_data.get("raw_joint_positions", None)
raw_joint_positions = complementary_data.get("raw_joint_positions")
if raw_joint_positions is None:
return complementary_data

View File

@@ -119,13 +119,12 @@ class _NormalizationMixin:
)
self.features = reconstructed
if self.norm_map:
# if keys are strings (JSON), rebuild enum map
if all(isinstance(k, str) for k in self.norm_map.keys()):
reconstructed = {}
for ft_type_str, norm_mode_str in self.norm_map.items():
reconstructed[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
self.norm_map = reconstructed
# if keys are strings (JSON), rebuild enum map
if self.norm_map and all(isinstance(k, str) for k in self.norm_map):
reconstructed = {}
for ft_type_str, norm_mode_str in self.norm_map.items():
reconstructed[FeatureType(ft_type_str)] = NormalizationMode(norm_mode_str)
self.norm_map = reconstructed
# Convert stats to tensors and move to the target device once during initialization.
self.stats = self.stats or {}

View File

@@ -152,7 +152,7 @@ class VanillaObservationProcessorStep(ObservationProcessorStep):
"""
# Build a new features mapping keyed by the same FeatureType buckets
# We assume callers already placed features in the correct FeatureType.
new_features: dict[PipelineFeatureType, dict[str, PolicyFeature]] = {ft: {} for ft in features.keys()}
new_features: dict[PipelineFeatureType, dict[str, PolicyFeature]] = {ft: {} for ft in features}
exact_pairs = {
"pixels": OBS_IMAGE,

View File

@@ -32,11 +32,8 @@ def init_rerun(session_name: str = "lerobot_control_loop") -> None:
def _is_scalar(x):
return (
isinstance(x, float)
or isinstance(x, numbers.Real)
or isinstance(x, (np.integer | np.floating))
or (isinstance(x, np.ndarray) and x.ndim == 0)
return isinstance(x, (float | numbers.Real | np.integer | np.floating)) or (
isinstance(x, np.ndarray) and x.ndim == 0
)

View File

@@ -85,7 +85,7 @@ def policy_feature_factory():
def assert_contract_is_typed(features: dict[PipelineFeatureType, dict[str, PolicyFeature]]) -> None:
assert isinstance(features, dict)
assert all(isinstance(k, PipelineFeatureType) for k in features.keys())
assert all(isinstance(k, PipelineFeatureType) for k in features)
assert all(isinstance(v, dict) for v in features.values())
assert all(all(isinstance(nk, str) for nk in v.keys()) for v in features.values())
assert all(all(isinstance(nk, str) for nk in v) for v in features.values())
assert all(all(isinstance(nv, PolicyFeature) for nv in v.values()) for v in features.values())

View File

@@ -949,7 +949,7 @@ def test_statistics_metadata_validation(tmp_path, empty_lerobot_dataset_factory)
# Check that statistics exist for all features
assert loaded_dataset.meta.stats is not None, "No statistics found"
for feature_name in features.keys():
for feature_name in features:
assert feature_name in loaded_dataset.meta.stats, f"No statistics for feature '{feature_name}'"
feature_stats = loaded_dataset.meta.stats[feature_name]

View File

@@ -246,7 +246,7 @@ def test_step_through():
# Ensure all results are dicts (same format as input)
for result in results:
assert isinstance(result, dict)
assert all(isinstance(k, TransitionKey) for k in result.keys())
assert all(isinstance(k, TransitionKey) for k in result)
def test_step_through_with_dict():
@@ -1623,9 +1623,7 @@ def test_override_with_callables():
# Define a transform function
def double_values(x):
if isinstance(x, (int | float)):
return x * 2
elif isinstance(x, torch.Tensor):
if isinstance(x, (int | float | torch.Tensor)):
return x * 2
return x
@@ -1797,10 +1795,9 @@ def test_from_pretrained_nonexistent_path():
)
# Test with a local directory that exists but has no config files
with tempfile.TemporaryDirectory() as tmp_dir:
with tempfile.TemporaryDirectory() as tmp_dir, pytest.raises(FileNotFoundError):
# Since the directory exists but has no config, it will raise FileNotFoundError
with pytest.raises(FileNotFoundError):
DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="processor.json")
DataProcessorPipeline.from_pretrained(tmp_dir, config_filename="processor.json")
def test_save_load_with_custom_converter_functions():

View File

@@ -32,10 +32,7 @@ class MockTokenizer:
**kwargs,
) -> dict[str, torch.Tensor]:
"""Mock tokenization that returns deterministic tokens based on text."""
if isinstance(text, str):
texts = [text]
else:
texts = text
texts = [text] if isinstance(text, str) else text
batch_size = len(texts)

View File

@@ -245,14 +245,14 @@ def test_get_observation(reachy2):
obs = reachy2.get_observation()
expected_keys = set(reachy2.joints_dict)
expected_keys.update(f"{v}" for v in REACHY2_VEL.keys() if reachy2.config.with_mobile_base)
expected_keys.update(f"{v}" for v in REACHY2_VEL if reachy2.config.with_mobile_base)
expected_keys.update(reachy2.cameras.keys())
assert set(obs.keys()) == expected_keys
for motor in reachy2.joints_dict.keys():
for motor in reachy2.joints_dict:
assert obs[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].present_position
if reachy2.config.with_mobile_base:
for vel in REACHY2_VEL.keys():
for vel in REACHY2_VEL:
assert obs[vel] == reachy2.reachy.mobile_base.odometry[REACHY2_VEL[vel]]
if reachy2.config.with_left_teleop_camera:
assert obs["teleop_left"].shape == (
@@ -282,7 +282,7 @@ def test_send_action(reachy2):
action.update({k: i * 0.1 for i, k in enumerate(REACHY2_VEL.keys(), start=1)})
previous_present_position = {
k: reachy2.reachy.joints[REACHY2_JOINTS[k]].present_position for k in reachy2.joints_dict.keys()
k: reachy2.reachy.joints[REACHY2_JOINTS[k]].present_position for k in reachy2.joints_dict
}
returned = reachy2.send_action(action)
@@ -290,7 +290,7 @@ def test_send_action(reachy2):
assert returned == action
assert reachy2.reachy._goal_position_set_total == len(reachy2.joints_dict)
for motor in reachy2.joints_dict.keys():
for motor in reachy2.joints_dict:
expected_pos = action[motor]
real_pos = reachy2.reachy.joints[REACHY2_JOINTS[motor]].goal_position
if reachy2.config.max_relative_target is None:

View File

@@ -121,20 +121,20 @@ def test_get_action(reachy2):
action = reachy2.get_action()
expected_keys = set(reachy2.joints_dict)
expected_keys.update(f"{v}" for v in REACHY2_VEL.keys() if reachy2.config.with_mobile_base)
expected_keys.update(f"{v}" for v in REACHY2_VEL if reachy2.config.with_mobile_base)
assert set(action.keys()) == expected_keys
for motor in reachy2.joints_dict.keys():
for motor in reachy2.joints_dict:
if reachy2.config.use_present_position:
assert action[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].present_position
else:
assert action[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].goal_position
if reachy2.config.with_mobile_base:
if reachy2.config.use_present_position:
for vel in REACHY2_VEL.keys():
for vel in REACHY2_VEL:
assert action[vel] == reachy2.reachy.mobile_base.odometry[REACHY2_VEL[vel]]
else:
for vel in REACHY2_VEL.keys():
for vel in REACHY2_VEL:
assert action[vel] == reachy2.reachy.mobile_base.last_cmd_vel[REACHY2_VEL[vel]]