From c6a61e3ba295a9da40e081324f7d02b884136cc7 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Tue, 21 May 2024 16:31:48 +0200 Subject: [PATCH] WIP --- lerobot/scripts/compare_policies.py | 250 ++++++++++++++++------------ 1 file changed, 144 insertions(+), 106 deletions(-) diff --git a/lerobot/scripts/compare_policies.py b/lerobot/scripts/compare_policies.py index 11e49f95..3343b070 100644 --- a/lerobot/scripts/compare_policies.py +++ b/lerobot/scripts/compare_policies.py @@ -34,6 +34,7 @@ import scipy.stats as stats from scipy.stats import anderson, kstest, mannwhitneyu, normaltest, shapiro, ttest_ind, ttest_rel, wilcoxon from statsmodels.stats.contingency_tables import mcnemar from termcolor import colored +from terminaltables import AsciiTable def init_logging() -> None: @@ -75,11 +76,19 @@ def get_eval_info_episodes(eval_info_path: Path) -> dict: } -def describe_samples(ref_sample: dict, new_sample: dict, metric_name: str): - ref_mean, ref_std = np.mean(ref_sample[metric_name]), np.std(ref_sample[metric_name]) - new_mean, new_std = np.mean(new_sample[metric_name]), np.std(new_sample[metric_name]) - logging.info(f"{metric_name} - Ref sample: mean = {ref_mean:.3f}, std = {ref_std:.3f}") - logging.info(f"{metric_name} - New sample: mean = {new_mean:.3f}, std = {new_std:.3f}") +def append_table_metric(table: list, metric: str, ref_sample: dict, new_sample: dict, mean_std: bool = False): + if mean_std: + ref_metric = f"{np.mean(ref_sample[metric]):.3f} ({np.std(ref_sample[metric]):.3f})" + new_metric = f"{np.mean(new_sample[metric]):.3f} ({np.std(new_sample[metric]):.3f})" + row_header = f"{metric} - mean (std)" + else: + ref_metric = ref_sample[metric] + new_metric = new_sample[metric] + row_header = metric + + row = [row_header, ref_metric, new_metric] + table.append(row) + return table def cohens_d(x, y): @@ -103,114 +112,22 @@ def normality_tests(array: np.ndarray, name: str): return sw_p > 0.05 and ks_p > 0.05 -def plot_boxplot(data_a: np.ndarray, data_b: np.ndarray, labels: list[str], title: str, filename: str): - plt.boxplot([data_a, data_b], labels=labels) - plt.title(title) - plt.savefig(filename) - plt.close() - - -def plot_histogram(data_a: np.ndarray, data_b: np.ndarray, labels: list[str], title: str, filename: str): - plt.hist(data_a, bins=30, alpha=0.7, label=labels[0]) - plt.hist(data_b, bins=30, alpha=0.7, label=labels[1]) - plt.title(title) - plt.legend() - plt.savefig(filename) - plt.close() - - -def plot_qqplot(data: np.ndarray, title: str, filename: str): - stats.probplot(data, dist="norm", plot=plt) - plt.title(title) - plt.savefig(filename) - plt.close() - - -def paired_sample_tests(ref_sample: dict, new_sample: dict): - log_section("Normality tests") - max_reward_diff = ref_sample["max_rewards"] - new_sample["max_rewards"] - sum_reward_diff = ref_sample["sum_rewards"] - new_sample["sum_rewards"] - normal_max_reward_diff = normality_tests(max_reward_diff, "Max Reward Difference") - normal_sum_reward_diff = normality_tests(sum_reward_diff, "Sum Reward Difference") - - log_section("Paired-sample tests") - if normal_max_reward_diff: - t_stat_max_reward, p_val_max_reward = ttest_rel(ref_sample["max_rewards"], new_sample["max_rewards"]) - log_test(f"Paired t-test for Max Reward: t-statistic = {t_stat_max_reward:.3f}", p_val_max_reward) - else: - w_stat_max_reward, p_wilcox_max_reward = wilcoxon( - ref_sample["max_rewards"], new_sample["max_rewards"] - ) - log_test(f"Wilcoxon test for Max Reward: statistic = {w_stat_max_reward:.3f}", p_wilcox_max_reward) - - if normal_sum_reward_diff: - t_stat_sum_reward, p_val_sum_reward = ttest_rel(ref_sample["sum_rewards"], new_sample["sum_rewards"]) - log_test(f"Paired t-test for Sum Reward: t-statistic = {t_stat_sum_reward:.3f}", p_val_sum_reward) - else: - w_stat_sum_reward, p_wilcox_sum_reward = wilcoxon( - ref_sample["sum_rewards"], new_sample["sum_rewards"] - ) - log_test(f"Wilcoxon test for Sum Reward: statistic = {w_stat_sum_reward:.3f}", p_wilcox_sum_reward) - - table = np.array( - [ - [ - np.sum((ref_sample["successes"] == 1) & (new_sample["successes"] == 1)), - np.sum((ref_sample["successes"] == 1) & (new_sample["successes"] == 0)), - ], - [ - np.sum((ref_sample["successes"] == 0) & (new_sample["successes"] == 1)), - np.sum((ref_sample["successes"] == 0) & (new_sample["successes"] == 0)), - ], - ] - ) - mcnemar_result = mcnemar(table, exact=True) - log_test(f"McNemar's test for Success: statistic = {mcnemar_result.statistic:.3f}", mcnemar_result.pvalue) - - -def independent_sample_tests(ref_sample: dict, new_sample: dict): - log_section("Normality tests") - normal_max_rewards_a = normality_tests(ref_sample["max_rewards"], "Max Rewards Ref Sample") - normal_max_rewards_b = normality_tests(new_sample["max_rewards"], "Max Rewards New Sample") - normal_sum_rewards_a = normality_tests(ref_sample["sum_rewards"], "Sum Rewards Ref Sample") - normal_sum_rewards_b = normality_tests(new_sample["sum_rewards"], "Sum Rewards New Sample") - - log_section("Independent samples tests") - if normal_max_rewards_a and normal_max_rewards_b: - t_stat_max_reward, p_val_max_reward = ttest_ind( - ref_sample["max_rewards"], new_sample["max_rewards"], equal_var=False - ) - log_test(f"Two-Sample t-test for Max Reward: t-statistic = {t_stat_max_reward:.3f}", p_val_max_reward) - else: - u_stat_max_reward, p_u_max_reward = mannwhitneyu(ref_sample["max_rewards"], new_sample["max_rewards"]) - log_test(f"Mann-Whitney U test for Max Reward: U-statistic = {u_stat_max_reward:.3f}", p_u_max_reward) - - if normal_sum_rewards_a and normal_sum_rewards_b: - t_stat_sum_reward, p_val_sum_reward = ttest_ind( - ref_sample["sum_rewards"], new_sample["sum_rewards"], equal_var=False - ) - log_test(f"Two-Sample t-test for Sum Reward: t-statistic = {t_stat_sum_reward:.3f}", p_val_sum_reward) - else: - u_stat_sum_reward, p_u_sum_reward = mannwhitneyu(ref_sample["sum_rewards"], new_sample["sum_rewards"]) - log_test(f"Mann-Whitney U test for Sum Reward: U-statistic = {u_stat_sum_reward:.3f}", p_u_sum_reward) - - def perform_tests(ref_sample: dict, new_sample: dict, output_dir: Path, independent: bool = False): - log_section("Descriptive Stats") - logging.info(f"Number of episode - Ref Sample: {ref_sample['num_episodes']}") - logging.info(f"Number of episode - New Sample: {new_sample['num_episodes']}") - seeds_a, seeds_b = ref_sample["seeds"], new_sample["seeds"] if (seeds_a == seeds_b) and not independent: - logging.info("Samples are paired (identical seeds).") + logging.info("\nSamples are paired (identical seeds).") paired = True else: - logging.info("Samples are considered independent (seeds are different).") + logging.info("\nSamples are considered independent (seeds are different).") paired = False - describe_samples(ref_sample, new_sample, "successes") - describe_samples(ref_sample, new_sample, "max_rewards") - describe_samples(ref_sample, new_sample, "sum_rewards") + table_data = [["Metric", "Ref.", "New"]] + table_data = append_table_metric(table_data, "num_episodes", ref_sample, new_sample) + table_data = append_table_metric(table_data, "successes", ref_sample, new_sample, mean_std=True) + table_data = append_table_metric(table_data, "max_rewards", ref_sample, new_sample, mean_std=True) + table_data = append_table_metric(table_data, "sum_rewards", ref_sample, new_sample, mean_std=True) + table = AsciiTable(table_data) + print(table.table) log_section("Effect Size") d_max_reward = cohens_d(ref_sample["max_rewards"], new_sample["max_rewards"]) @@ -277,6 +194,127 @@ def perform_tests(ref_sample: dict, new_sample: dict, output_dir: Path, independ ) +def paired_sample_tests(ref_sample: dict, new_sample: dict): + log_section("Normality tests") + max_reward_diff = ref_sample["max_rewards"] - new_sample["max_rewards"] + sum_reward_diff = ref_sample["sum_rewards"] - new_sample["sum_rewards"] + + normal_max_reward_diff = normality_tests(max_reward_diff, "Max Reward Difference") + normal_sum_reward_diff = normality_tests(sum_reward_diff, "Sum Reward Difference") + + log_section("Paired-sample tests") + if normal_max_reward_diff: + t_stat_max_reward, p_val_max_reward = ttest_rel(ref_sample["max_rewards"], new_sample["max_rewards"]) + log_test(f"Paired t-test for Max Reward: t-statistic = {t_stat_max_reward:.3f}", p_val_max_reward) + else: + w_stat_max_reward, p_wilcox_max_reward = wilcoxon( + ref_sample["max_rewards"], new_sample["max_rewards"] + ) + log_test(f"Wilcoxon test for Max Reward: statistic = {w_stat_max_reward:.3f}", p_wilcox_max_reward) + + if normal_sum_reward_diff: + t_stat_sum_reward, p_val_sum_reward = ttest_rel(ref_sample["sum_rewards"], new_sample["sum_rewards"]) + log_test(f"Paired t-test for Sum Reward: t-statistic = {t_stat_sum_reward:.3f}", p_val_sum_reward) + else: + w_stat_sum_reward, p_wilcox_sum_reward = wilcoxon( + ref_sample["sum_rewards"], new_sample["sum_rewards"] + ) + log_test(f"Wilcoxon test for Sum Reward: statistic = {w_stat_sum_reward:.3f}", p_wilcox_sum_reward) + + table = np.array( + [ + [ + np.sum((ref_sample["successes"] == 1) & (new_sample["successes"] == 1)), + np.sum((ref_sample["successes"] == 1) & (new_sample["successes"] == 0)), + ], + [ + np.sum((ref_sample["successes"] == 0) & (new_sample["successes"] == 1)), + np.sum((ref_sample["successes"] == 0) & (new_sample["successes"] == 0)), + ], + ] + ) + mcnemar_result = mcnemar(table, exact=True) + log_test(f"McNemar's test for Success: statistic = {mcnemar_result.statistic:.3f}", mcnemar_result.pvalue) + + +def independent_sample_tests(ref_sample: dict, new_sample: dict): + log_section("Normality tests") + normal_max_rewards_a = normality_tests(ref_sample["max_rewards"], "Max Rewards Ref Sample") + normal_max_rewards_b = normality_tests(new_sample["max_rewards"], "Max Rewards New Sample") + normal_sum_rewards_a = normality_tests(ref_sample["sum_rewards"], "Sum Rewards Ref Sample") + normal_sum_rewards_b = normality_tests(new_sample["sum_rewards"], "Sum Rewards New Sample") + + log_section("Independent samples tests") + table = [["Test", "max_rewards", "sum_rewards"]] + if normal_max_rewards_a and normal_max_rewards_b: + table = append_independent_test( + table, ref_sample, new_sample, ttest_ind, "Two-Sample t-test", kwargs={"equal_var": False} + ) + t_stat_max_reward, p_val_max_reward = ttest_ind( + ref_sample["max_rewards"], new_sample["max_rewards"], equal_var=False + ) + log_test(f"Two-Sample t-test for Max Reward: t-statistic = {t_stat_max_reward:.3f}", p_val_max_reward) + else: + table = append_independent_test(table, ref_sample, new_sample, mannwhitneyu, "Mann-Whitney U") + u_stat_max_reward, p_u_max_reward = mannwhitneyu(ref_sample["max_rewards"], new_sample["max_rewards"]) + log_test(f"Mann-Whitney U test for Max Reward: U-statistic = {u_stat_max_reward:.3f}", p_u_max_reward) + + if normal_sum_rewards_a and normal_sum_rewards_b: + t_stat_sum_reward, p_val_sum_reward = ttest_ind( + ref_sample["sum_rewards"], new_sample["sum_rewards"], equal_var=False + ) + log_test(f"Two-Sample t-test for Sum Reward: t-statistic = {t_stat_sum_reward:.3f}", p_val_sum_reward) + else: + u_stat_sum_reward, p_u_sum_reward = mannwhitneyu(ref_sample["sum_rewards"], new_sample["sum_rewards"]) + log_test(f"Mann-Whitney U test for Sum Reward: U-statistic = {u_stat_sum_reward:.3f}", p_u_sum_reward) + + table = AsciiTable(table) + print(table.table) + + +def append_independent_test( + table: list, + ref_sample: dict, + new_sample: dict, + test: callable, + test_name: str, + kwargs: dict | None = None, +) -> list: + kwargs = {} if kwargs is None else kwargs + row = [f"{test_name}: p-value ≥ alpha"] + for metric in table[0][1:]: + _, p_val = test(ref_sample[metric], new_sample[metric], **kwargs) + alpha = 0.05 + status = "✅" if p_val >= alpha else "❌" + row.append(f"{status} {p_val:.3f} ≥ {alpha}") + + table.append(row) + return table + + +def plot_boxplot(data_a: np.ndarray, data_b: np.ndarray, labels: list[str], title: str, filename: str): + plt.boxplot([data_a, data_b], labels=labels) + plt.title(title) + plt.savefig(filename) + plt.close() + + +def plot_histogram(data_a: np.ndarray, data_b: np.ndarray, labels: list[str], title: str, filename: str): + plt.hist(data_a, bins=30, alpha=0.7, label=labels[0]) + plt.hist(data_b, bins=30, alpha=0.7, label=labels[1]) + plt.title(title) + plt.legend() + plt.savefig(filename) + plt.close() + + +def plot_qqplot(data: np.ndarray, title: str, filename: str): + stats.probplot(data, dist="norm", plot=plt) + plt.title(title) + plt.savefig(filename) + plt.close() + + if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter