From 8d3f039bba50003f217942db2cb7331d29a5b3c5 Mon Sep 17 00:00:00 2001 From: Guilherme Schardong Date: Tue, 5 May 2026 15:19:21 +0100 Subject: [PATCH] Adjusting the metrics for age bracket and gender. More below. Added metrics for age bracket (`n_off_accuracy`), as well as binary/multiclass false-{positive,negative} rates (`binary_fpr_fnr` and `multiclass_fpr_fnr`). Also, added mutual agreement between models `agreement_fraction` and `agreement_elements`. --- src/facebias/metrics.py | 468 +++++++++++++++++++++------------------- 1 file changed, 251 insertions(+), 217 deletions(-) diff --git a/src/facebias/metrics.py b/src/facebias/metrics.py index 2e50f1c..bf1e68f 100644 --- a/src/facebias/metrics.py +++ b/src/facebias/metrics.py @@ -1,225 +1,224 @@ # -*- coding: utf-8 -*- import logging +from itertools import product from pathlib import Path from typing import Any import numpy as np import pandas as pd -from sklearn.metrics import (accuracy_score, cohen_kappa_score, - confusion_matrix, max_error, mean_absolute_error, - multilabel_confusion_matrix, balanced_accuracy_score) +from sklearn.metrics import (accuracy_score, balanced_accuracy_score, + cohen_kappa_score, confusion_matrix, max_error, + mean_absolute_error, multilabel_confusion_matrix, + precision_score) -from facebias.estimators import Capability +from facebias.estimators import BaseEstimator, Capability logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(f"facebias:{__name__}") +logger = logging.getLogger("facebias:metrics.py") -def calc_model_performance( - gt: pd.DataFrame, - preds: pd.DataFrame, - keys: list[str] = [], -) -> pd.DataFrame: +# Needed for the n_off_accuracy family of functions +_agegroup_int_map = { + "00-02": 0, + "03-09": 1, + "10-19": 2, + "20-29": 3, + "30-39": 4, + "40-49": 5, + "50-59": 6, + "60-69": 7, + "70+": 8, +} + + +def n_off_accuracy(gt: list[int], pred: list[int], n: int=1) -> float: + """Returns the n-off accuracy for ordinal class labels encoded as consecutive integers. + + A prediction is counted as correct if it is exact or off by at most `n`. """ - We assume that both `gt` and `preds` have the same structure. They should - be indexed by individual ID, such as the image name, and each value is a - dictionary with model prediction capabilities as keys (e.g., "age_group", - "sex", "skin-color", etc.), and the values are the predictions, or ground-truth - values for each ID/capability. + if len(gt) != len(pred): + raise ValueError("Both arrays must have the same length.") - if `keys` is empty, then we infer from common keys present in `preds` and `gt`. + if len(gt) == 0 or len(pred) == 0: + raise ValueError("The arrays must be populated.") + + arr_gt = np.asarray(gt) + arr_pred = np.asarray(pred) + + return float(np.mean(np.abs(arr_gt - arr_pred) <= n)) + + +def one_off_accuracy(gt: list[int], pred: list[int]) -> float: + """1-off accuracy for ordinal class labels encoded as consecutive integers.""" + return n_off_accuracy(gt, pred, n=1) + + +def two_off_accuracy(gt: list[int], pred: list[int]) -> float: + """2-off accuracy for ordinal class labels encoded as consecutive integers.""" + return n_off_accuracy(gt, pred, n=2) + + +def binary_fpr_fnr(cm: np.ndarray) -> dict[str, np.number]: + """Given a confusion matrix, calculates the false-positive and negative rates. Parameters ---------- - gt: dict[str, dict[str, Any]] - preds: dict[str, dict[str, Any]] - keys: list[str] | None + cm: np.ndarray + The 2x2 confusion matrix returned by `scikit-learn`. Returns ------- - metrics: dict[str, dict[str, float]] - """ - common_caps = keys - if not keys: - common_caps = set(gt.columns) & set(preds.columns) - if not common_caps: - logger.error( - f'No common capabilities found. Predictions has "{preds.columns}",' - f' ground-truth has "{gt.columns}".' - ) - return None + metrics: dict[str, np.number] + The calculated metrics keyed under "FPR", "FNR", "TP", "TN", "FP", "FN". - # Finding common images between predictions and ground-truth. - common_inds = set(preds.index) & set(gt.index) - if not common_inds: - logger.error("No common images found between predictions and ground-truth.") + See Also + -------- + sklearn.metrics.multilabel_confusion_matrix + sklearn.metrics.confusion_matrix + """ + tn, fp, fn, tp = cm.ravel() + return { + "FPR": fp / (fp + tn) if (fp + tn) != 0 else np.nan, + "FNR": fn / (fn + tp) if (fn + tp) != 0 else np.nan, + "TN": int(tn), + "FP": int(fp), + "FN": int(fn), + "TP": int(tp), + } + + +def multiclass_fpr_fnr(gt: pd.Series, preds: pd.Series, labels: list[Any] | None = None): + """Calculates one-vs-rest false-positive and negative rates for each class. + + Also returns the counts of false-positives, false-negatives, true-positives + and true-negatives, i.e., the confusion matrix for each class. + + Parameters + ---------- + gt: pd.Series + Indexed ground-truth data. + + preds: pd.Series + Indexed predictions. + + labels: list[Any], optional + List of labels in the data. If left empty, then we infer from the + union of unique elements in `gt` and `preds`. + + Returns + ------- + results: dict[str, dict[str, number]] + The false-{positive,negative}-rates, and true/false positive/negative + values for each label in `labels`. + + labels: list[str] + The labels used when calculating the confusion matrix. If labels was + passed as an argument, then it is returned unchanged, else, we return + the labels inferred from the data. + """ + if labels is None or not labels: + logging.info("Labels not provided. Inferring from the data.") + labels = sorted(list(set(gt.unique()) | set(preds.unique()))) + + mcm = multilabel_confusion_matrix(gt, preds, labels=labels) + + results = {} + for label, m in zip(labels, mcm): + results[label] = binary_fpr_fnr(m) + + return results, labels + + +def _agreement_sanity_checks(x_pred: pd.Series, y_pred: pd.Series): + if len(x_pred) != len(y_pred): + raise ValueError( + f"Predictions have different lengths. len(x_pred) = {len(x_pred)}" + f" len(y_pred) = {len(y_pred)}" + ) + if not all(x_pred.index == y_pred.index): + raise ValueError("Index mismatch between series") + + return True + + +def agreement_fraction(x_pred: pd.Series, y_pred: pd.Series): + """Calculates the fraction of agreement between predictions by two models. + + Note that the predictions must both have the same indices and lengths. + + Parameters + ---------- + x_pred: pd.Series + Predictions of the first model. + + y_pred: pd.Series + Second model predictions. + + Returns + ------- + fraction: float + The fraction of agreement between the results. + """ + try: + _agreement_sanity_checks(x_pred, y_pred) + except ValueError as e: + logger.error(f"Cannot calculate agreement fraction -- {str(e)}.") return None - metric_vals = dict() - for cap in common_caps: - if isinstance(preds[cap].iloc[0], (float, int)): - metric_vals[cap] = { - "mean_absolute_error": mean_absolute_error(gt[cap], preds[cap]), - "max_error": max_error(gt[cap], preds[cap]), - } - else: - labels = sorted(list(set(preds[cap].unique()) | set(gt[cap].unique()))) - metric_vals[cap] = { - "accuracy": accuracy_score(gt[cap], preds[cap]), - "balanced_accuracy": balanced_accuracy_score(gt[cap], preds[cap]), - "cohen-kappa": cohen_kappa_score(gt[cap], preds[cap], labels=labels), - } - - return pd.DataFrame.from_dict(metric_vals) + return (x_pred == y_pred).sum() / len(x_pred) -def _find_unique_values_per_capability( - class_output: dict[str, dict[Capability, Any]], caps: list[Capability] | None = None -) -> dict[Capability, str]: - """Returns the set of values per capability in `class_output`. +def agreement_elements(x_pred: pd.Series, y_pred: pd.Series, return_disagreement: bool=True): + """Returns the elements of agreement, and optionally, disagreement between models. + + Note that, as in `agreement_fraction`, the predictions must have the same + lengths and matching indices. Parameters ---------- - class_output: dict[str, dict[Capability, Any]] - The classification results, or ground-truth data indexed by element. + x_pred: pd.Series + Predictions of the first model. - caps: list[Capability] | None - The list of capabilities to find unique values for. If left as `None`, - we will find unique values for all of them. + y_pred: pd.Series + Second model predictions. - Results - ------- - unique_vals: dict[Capability, str] - The unique values indexed by capability. - """ - if caps is None: - caps = list(next(iter(class_output.values())).keys()) - elif not isinstance(caps, (list, tuple)): - caps = [caps] - - unique_vals = dict() - for cap in caps: - unique_vals[cap] = set() - for res in class_output.values(): - unique_vals[cap].add(res[cap]) - - return unique_vals - - -def _get_capability_data( - class_outputs: dict[str, dict[Capability, Any]], cap: Capability -) -> dict[str, Any]: - """Returns data for all individuals regarding a capability. - - Parameters - ---------- - class_outputs: dict[str, dict[Capability, Any]] - The estimator outputs indexed by individual. - - cap: Capability - The desired capability. + return_disagreement: bool, optional + Returns the disagreement as well if set (default behavior), otherwise, + returns `None` for the `disagreement_idx`. Returns ------- - data: dict[str, Any] - The capability data indexed by individual. + agreement_idx: pd.Series + The index of elements where models have the same results. + + disagreement_idx: pd.Series | None + The index of elements where models diverge on their results. Only + returned when `return_disagreement` is True, else it returns `None. """ - data_per_id = dict() + try: + _agreement_sanity_checks(x_pred, y_pred) + except ValueError as e: + logger.error(f"Cannot get the (dis)agreement elements -- {str(e)}.") + return None - for ind, data in class_outputs.items(): - if cap not in data: - logger.warning( - f'Entry for capability "{cap.value}" not found for individual "{ind}". Skipping.' - ) - continue - data_per_id[ind] = data[cap] - - return data_per_id - - -def _filter_by_index(data: dict[str, Any], indx: Any): - return dict((k, v) for k, v in data.items() if k in indx) - - -def calc_metrics_per_subgroup( - gt: dict[str, dict[str, Any]], preds: dict[str, dict[str, Any]] -) -> dict[Capability, dict[Any, dict]]: - """Calculate performance metrics per sub-group for each capability. - - Parameters - ---------- - gt: dict[str, dict[str, Any]] - - preds: dict[str, dict[str, Any]] - - Returns - ------- - metrics: dict[Capability, dict[Any, dict]] - """ - common_caps = set(_find_common_capabilities(gt, preds)) - - metrics = {} - for cap in common_caps: - if cap == Capability.AGE: - continue - - other_caps = common_caps - set([cap]) - unique_values_cap = _find_unique_values_per_capability(gt, cap)[cap] - - metrics[cap] = {} - for val in unique_values_cap: - ids = [k for k, v in gt.items() if v[cap] == val] - - metrics[cap][val] = {"number_of_elements": len(ids)} - for ocap in other_caps: - metrics[cap][val][ocap] = {} - filtered_pred = _filter_by_index(_get_capability_data(preds, ocap), ids) - filtered_gt = _filter_by_index(_get_capability_data(gt, ocap), ids) - - filtered_pred_data = np.array([filtered_pred[i] for i in ids]) - filtered_gt_data = np.array([filtered_gt[i] for i in ids]) - - if isinstance(filtered_pred_data[0], float): - # If data is numeric, we calculate regression-based metrics - metrics[cap][val][ocap] = { - "mean_absolute_error": mean_absolute_error( - filtered_gt_data, filtered_pred_data - ), - "max_error": max_error(filtered_gt_data, filtered_pred_data), - } - else: - if len(np.unique(filtered_gt_data)) == 2: - print(cap, val, ocap) - cm = confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt_data, ocap)) - else: - print(cap, val, ocap) - cm = multilabel_confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt, ocap)) - return cm - - metrics[cap][val][ocap] = { - "accuracy": accuracy_score( - filtered_gt_data, filtered_pred_data - ), - } - - return metrics - - -def get_unique_labels(data, cap: Capability): - caps = set() - for id, vals in data.items(): - if cap not in vals: - logger.warning(f'Capability "{cap.value}" not registered for individual "{id}". Skipping') - continue - - caps.add(vals[cap]) - - return caps + idx = x_pred.index + if return_disagreement: + return idx[x_pred == y_pred], idx[x_pred != y_pred] + else: + return idx[x_pred == y_pred], None +# TODO(gschardong): Move to the same file as `load_dataset` def _to_age_bracket(row): iage = int(row["age"]) + if iage < 3: + return "00-02" + elif iage < 10: + return "03-09" + elif iage > 69: + return "70+" + d = iage // 10 * 10 return "{}-{}".format(d, d + 9) @@ -237,7 +236,6 @@ if __name__ == '__main__': DATASET_PATH = Path("../../data/facing2-train/") METADATA_PATH = DATASET_PATH / "meta-w-age.csv" DETECTOR_PATH = Path("../../models/blaze_face_full_range.tflite") - # TEST_IMS: list[str] = ["10", "12", "14", "9"] detector = MediapipeDetector(str(DETECTOR_PATH)) imdict, _ = load_dataset( @@ -249,41 +247,77 @@ if __name__ == '__main__': face_bboxes = get_face_boxes(imdict, detector) - # for t in TEST_IMS: - # logger.info("-- {} - {}".format(t, meta[str(t)])) - print(FairFace.capabilities()) - model_ff = FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu") - preds_ff = model_ff.predict(imdict) - preds_ff = pd.DataFrame.from_dict(preds_ff).T - preds_ff.index.rename("image", inplace=True) - preds_ff.index = preds_ff.index.astype(meta.index.dtype) - preds_ff = preds_ff.sort_index() - - metrics_ff = calc_model_performance(meta, preds_ff) - - # logger.info("FairFace -- Test Images") - # for t in TEST_IMS: - # logger.info("--{} - {}".format(t, preds_ff[str(t)])) - - metrics_ff_groups = calc_metrics_per_subgroup(meta, preds_ff) - for k, v in metrics_ff_groups.items(): - for kv, vv in v.items(): - print(k, kv, vv) - - print(MiVOLOv1.capabilities()) - model_mv = MiVOLOv1( - Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu" - ) - preds_mv = model_mv.predict(imdict) - preds_mv = pd.DataFrame.from_dict(preds_mv).T - preds_mv.index.rename("image", inplace=True) - preds_mv.index = preds_mv.index.astype(meta.index.dtype) - preds_mv = preds_mv.sort_index() - # logger.info("MiVOLOv1(Face Only) -- Test Images") - # for t in TEST_IMS: - # logger.info("{} - {}".format(t, preds_mv[str(t)])) + models = { + "fairface": FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu"), + "mivolo": MiVOLOv1(Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu") + } - metrics_mv = calc_model_performance(meta, preds_mv) + preds_per_model = dict((k, None) for k in models.keys()) + for model_name, model in models.items(): + preds = model.predict(imdict) + preds = pd.DataFrame.from_dict(preds).T + preds.index.rename("image", inplace=True) + preds.index = preds.index.astype(meta.index.dtype) + preds = preds.sort_index() + preds_per_model[model_name] = preds + + for model_name, preds in preds_per_model.items(): + gt_age_group_ord = meta["age_group"].apply(lambda x: _agegroup_int_map[x]) + preds_age_group_ord = preds["age_group"].apply(lambda x: _agegroup_int_map[x]) + + acc_one_off = one_off_accuracy(gt_age_group_ord, preds_age_group_ord) + acc_two_off = two_off_accuracy(gt_age_group_ord, preds_age_group_ord) + + agegroup_subclass, labels = multiclass_fpr_fnr( + meta["age_group"], + preds["age_group"] + ) + + print("==== Age group metrics by class ====") + # Print as table. + for k, v in agegroup_subclass.items(): + print(f"Class {k}") + for m, vv in v.items(): + print(f"\t{m} -- {vv}") + + agegroup_subclass = pd.DataFrame.from_dict(agegroup_subclass) + print(agegroup_subclass) + + model_cls = type(model) + ordered_labels = model_cls.possible_capability_values(Capability.SEX) + metrics_sex = binary_fpr_fnr( + confusion_matrix( + meta["sex"], + preds["sex"], + labels=ordered_labels + )) + + print( + "==== Sex metrics ====" + f"\nPositive class -- {ordered_labels[0]}," + f" Negative class -- {ordered_labels[1]}" + ) + for k, v in metrics_sex.items(): + print(f"\t{k} -- {v}") + + # Agreement tests + model_list = list(models.keys()) + for i in range(len(model_list)): + for j in range(i+1, len(model_list)): + first, second = model_list[i], model_list[j] + print(f"{first} -- {second}") + + for cap in model_cls.capabilities(): + if cap == Capability.AGE: + continue + frac = agreement_fraction(preds_per_model[first][cap], preds_per_model[second][cap]) + print(f'Agreement fraction for capability: "{cap}" - {frac}') + agreement, disagreement = agreement_elements( + meta[cap], + preds[cap], + return_disagreement=True + ) + print(disagreement)