From a3ee18748b402c7d6142effe9db0b21675f06f08 Mon Sep 17 00:00:00 2001 From: Guilherme Schardong Date: Mon, 20 Apr 2026 16:50:29 +0100 Subject: [PATCH] Added pandas as dependency. More below. The rationale of using `csvfile` instead of `pandas` directly, was to avoid a fairly heavy dependency, since we were only reading the CSV data. Now, since we need to do some fairly convoluted filtering to calculate the subgroup metrics, its better to use pandas now. --- requirements.txt | 1 + src/facebias/metrics.py | 213 +++++++++++++++++++++++----------------- 2 files changed, 126 insertions(+), 88 deletions(-) diff --git a/requirements.txt b/requirements.txt index b77d065..3fc66e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ timm==1.0.26 torch>=2.2.2 torchvision>=0.17.2 gdown==6.0.0 +pandas>=3.0.2 diff --git a/src/facebias/metrics.py b/src/facebias/metrics.py index 21bb9a4..2e50f1c 100644 --- a/src/facebias/metrics.py +++ b/src/facebias/metrics.py @@ -1,74 +1,26 @@ # -*- coding: utf-8 -*- import logging +from pathlib import Path from typing import Any import numpy as np -from sklearn.metrics import ( - accuracy_score, - cohen_kappa_score, - hamming_loss, - max_error, - mean_absolute_error, - mean_squared_error, - precision_score, -) +import pandas as pd +from sklearn.metrics import (accuracy_score, cohen_kappa_score, + confusion_matrix, max_error, mean_absolute_error, + multilabel_confusion_matrix, balanced_accuracy_score) from facebias.estimators import Capability -logger = logging.getLogger("facebias:metrics") - - -def find_common_capabilities( - gt: dict[str, dict[Capability, Any]], preds: dict[str, dict[Capability, Any]] -) -> list[str]: - """Iterates on `preds` and `gt`, finding common model capabilities. - - Some models predict different features of face images. Some predict sex, - age and skin color, while others may predict only one of these features, or - others beyond them. This function finds the common capabilities, returning - them as a list. - - Parameters - ---------- - gt: dict[str, dict[Capability, Any]] - Ground-truth data indexed by element ID, and values are a - feature -> prediction dictionary. - - preds: dict[str, dict[Capability, Any]] - Predictions data in the same format as `gt`. - - Returns - ------- - common_keys: list[Capability] - The common features between `gt` and `preds`. If no common features are - found, returns an empty list. - """ - # Find the first common element between the `gt` and `preds`. - it = iter(gt) - common_elem = "" - while True: - try: - common_elem = next(it) - except StopIteration: - break - else: - if common_elem in preds: - break - - if not common_elem: - return [] - - gt_keys = set(gt[common_elem].keys()) - preds_keys = set(preds[common_elem].keys()) - return list(gt_keys & preds_keys) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(f"facebias:{__name__}") def calc_model_performance( - gt: dict[str, dict[str, Any]], - preds: dict[str, dict[str, Any]], - keys: list[str] = [], -) -> dict[str, dict[str, float]]: + gt: pd.DataFrame, + preds: pd.DataFrame, + keys: list[str] = [], +) -> pd.DataFrame: """ We assume that both `gt` and `preds` have the same structure. They should be indexed by individual ID, such as the image name, and each value is a @@ -90,46 +42,39 @@ def calc_model_performance( """ common_caps = keys if not keys: - common_caps = find_common_capabilities(gt, preds) + common_caps = set(gt.columns) & set(preds.columns) if not common_caps: - kgt = next(iter(gt)) - kpd = next(iter(preds)) logger.error( - f'No common capabilities found. Predictions has "{preds[kpd].keys()}",' - f' ground-truth has "{gt[kgt].keys()}".' + f'No common capabilities found. Predictions has "{preds.columns}",' + f' ground-truth has "{gt.columns}".' ) return None # Finding common images between predictions and ground-truth. - common_inds = set(preds.keys()) & set(gt.keys()) + common_inds = set(preds.index) & set(gt.index) if not common_inds: logger.error("No common images found between predictions and ground-truth.") return None metric_vals = dict() - for cat in common_caps: - pred_data = [None for _ in common_inds] - gt_data = [None for _ in common_inds] - for i, ix in enumerate(common_inds): - pred_data[i] = preds[ix][cat] - gt_data[i] = gt[ix][cat] - - if isinstance(pred_data[0], float): - pred_data = np.array(pred_data) - gt_data = np.array(gt_data) - metric_vals[cat] = { - "mean_absolute_error": mean_absolute_error(gt_data, pred_data), + for cap in common_caps: + if isinstance(preds[cap].iloc[0], (float, int)): + metric_vals[cap] = { + "mean_absolute_error": mean_absolute_error(gt[cap], preds[cap]), + "max_error": max_error(gt[cap], preds[cap]), } else: - metric_vals[cat] = { - "accuracy": accuracy_score(gt_data, pred_data), - "cohen-kappa": cohen_kappa_score(gt_data, pred_data), + labels = sorted(list(set(preds[cap].unique()) | set(gt[cap].unique()))) + metric_vals[cap] = { + "accuracy": accuracy_score(gt[cap], preds[cap]), + "balanced_accuracy": balanced_accuracy_score(gt[cap], preds[cap]), + "cohen-kappa": cohen_kappa_score(gt[cap], preds[cap], labels=labels), } - return metric_vals + return pd.DataFrame.from_dict(metric_vals) -def find_unique_values_per_capability( +def _find_unique_values_per_capability( class_output: dict[str, dict[Capability, Any]], caps: list[Capability] | None = None ) -> dict[Capability, str]: """Returns the set of values per capability in `class_output`. @@ -162,7 +107,7 @@ def find_unique_values_per_capability( return unique_vals -def get_capability_data( +def _get_capability_data( class_outputs: dict[str, dict[Capability, Any]], cap: Capability ) -> dict[str, Any]: """Returns data for all individuals regarding a capability. @@ -193,7 +138,7 @@ def get_capability_data( return data_per_id -def filter_by_index(data: dict[str, Any], indx: Any): +def _filter_by_index(data: dict[str, Any], indx: Any): return dict((k, v) for k, v in data.items() if k in indx) @@ -212,7 +157,7 @@ def calc_metrics_per_subgroup( ------- metrics: dict[Capability, dict[Any, dict]] """ - common_caps = set(find_common_capabilities(gt, preds)) + common_caps = set(_find_common_capabilities(gt, preds)) metrics = {} for cap in common_caps: @@ -220,7 +165,7 @@ def calc_metrics_per_subgroup( continue other_caps = common_caps - set([cap]) - unique_values_cap = find_unique_values_per_capability(gt, cap)[cap] + unique_values_cap = _find_unique_values_per_capability(gt, cap)[cap] metrics[cap] = {} for val in unique_values_cap: @@ -229,13 +174,14 @@ def calc_metrics_per_subgroup( metrics[cap][val] = {"number_of_elements": len(ids)} for ocap in other_caps: metrics[cap][val][ocap] = {} - filtered_pred = filter_by_index(get_capability_data(preds, ocap), ids) - filtered_gt = filter_by_index(get_capability_data(gt, ocap), ids) + filtered_pred = _filter_by_index(_get_capability_data(preds, ocap), ids) + filtered_gt = _filter_by_index(_get_capability_data(gt, ocap), ids) filtered_pred_data = np.array([filtered_pred[i] for i in ids]) filtered_gt_data = np.array([filtered_gt[i] for i in ids]) if isinstance(filtered_pred_data[0], float): + # If data is numeric, we calculate regression-based metrics metrics[cap][val][ocap] = { "mean_absolute_error": mean_absolute_error( filtered_gt_data, filtered_pred_data @@ -243,6 +189,14 @@ def calc_metrics_per_subgroup( "max_error": max_error(filtered_gt_data, filtered_pred_data), } else: + if len(np.unique(filtered_gt_data)) == 2: + print(cap, val, ocap) + cm = confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt_data, ocap)) + else: + print(cap, val, ocap) + cm = multilabel_confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt, ocap)) + return cm + metrics[cap][val][ocap] = { "accuracy": accuracy_score( filtered_gt_data, filtered_pred_data @@ -250,3 +204,86 @@ def calc_metrics_per_subgroup( } return metrics + + +def get_unique_labels(data, cap: Capability): + caps = set() + for id, vals in data.items(): + if cap not in vals: + logger.warning(f'Capability "{cap.value}" not registered for individual "{id}". Skipping') + continue + + caps.add(vals[cap]) + + return caps + + +def _to_age_bracket(row): + iage = int(row["age"]) + d = iage // 10 * 10 + return "{}-{}".format(d, d + 9) + + +if __name__ == '__main__': + import os + logger.info(os.getcwd()) + + from facebias import load_dataset + from facebias.detectors import get_face_boxes + from facebias.detectors.mediapipe import MediapipeDetector + from facebias.estimators.fairface import FairFace + from facebias.estimators.mivolov1 import MiVOLOv1 + + DATASET_PATH = Path("../../data/facing2-train/") + METADATA_PATH = DATASET_PATH / "meta-w-age.csv" + DETECTOR_PATH = Path("../../models/blaze_face_full_range.tflite") + # TEST_IMS: list[str] = ["10", "12", "14", "9"] + + detector = MediapipeDetector(str(DETECTOR_PATH)) + imdict, _ = load_dataset( + DATASET_PATH, meta_path=None, imname_proc_fn=lambda x: x.split("_")[0] + ) + meta = pd.read_csv(METADATA_PATH, sep=',', index_col="image") + meta[Capability.AGEGROUP.value] = meta.apply(_to_age_bracket, axis=1) + meta = meta.sort_index() + + face_bboxes = get_face_boxes(imdict, detector) + + # for t in TEST_IMS: + # logger.info("-- {} - {}".format(t, meta[str(t)])) + + print(FairFace.capabilities()) + model_ff = FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu") + preds_ff = model_ff.predict(imdict) + preds_ff = pd.DataFrame.from_dict(preds_ff).T + preds_ff.index.rename("image", inplace=True) + preds_ff.index = preds_ff.index.astype(meta.index.dtype) + preds_ff = preds_ff.sort_index() + + metrics_ff = calc_model_performance(meta, preds_ff) + + # logger.info("FairFace -- Test Images") + # for t in TEST_IMS: + # logger.info("--{} - {}".format(t, preds_ff[str(t)])) + + metrics_ff_groups = calc_metrics_per_subgroup(meta, preds_ff) + for k, v in metrics_ff_groups.items(): + for kv, vv in v.items(): + print(k, kv, vv) + + + print(MiVOLOv1.capabilities()) + model_mv = MiVOLOv1( + Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu" + ) + preds_mv = model_mv.predict(imdict) + preds_mv = pd.DataFrame.from_dict(preds_mv).T + preds_mv.index.rename("image", inplace=True) + preds_mv.index = preds_mv.index.astype(meta.index.dtype) + preds_mv = preds_mv.sort_index() + + # logger.info("MiVOLOv1(Face Only) -- Test Images") + # for t in TEST_IMS: + # logger.info("{} - {}".format(t, preds_mv[str(t)])) + + metrics_mv = calc_model_performance(meta, preds_mv)