Adjusting the metrics for age bracket and gender. More below.

Added metrics for age bracket (`n_off_accuracy`), as well as
binary/multiclass false-{positive,negative} rates (`binary_fpr_fnr`
and `multiclass_fpr_fnr`). Also, added mutual agreement between models
`agreement_fraction` and `agreement_elements`.
This commit is contained in:
2026-05-05 15:19:21 +01:00
parent af43c00aa0
commit 8d3f039bba

View File

@@ -1,225 +1,224 @@
# -*- coding: utf-8 -*-
import logging
from itertools import product
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.metrics import (accuracy_score, cohen_kappa_score,
confusion_matrix, max_error, mean_absolute_error,
multilabel_confusion_matrix, balanced_accuracy_score)
from sklearn.metrics import (accuracy_score, balanced_accuracy_score,
cohen_kappa_score, confusion_matrix, max_error,
mean_absolute_error, multilabel_confusion_matrix,
precision_score)
from facebias.estimators import Capability
from facebias.estimators import BaseEstimator, Capability
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(f"facebias:{__name__}")
logger = logging.getLogger("facebias:metrics.py")
def calc_model_performance(
gt: pd.DataFrame,
preds: pd.DataFrame,
keys: list[str] = [],
) -> pd.DataFrame:
# Needed for the n_off_accuracy family of functions
_agegroup_int_map = {
"00-02": 0,
"03-09": 1,
"10-19": 2,
"20-29": 3,
"30-39": 4,
"40-49": 5,
"50-59": 6,
"60-69": 7,
"70+": 8,
}
def n_off_accuracy(gt: list[int], pred: list[int], n: int=1) -> float:
"""Returns the n-off accuracy for ordinal class labels encoded as consecutive integers.
A prediction is counted as correct if it is exact or off by at most `n`.
"""
We assume that both `gt` and `preds` have the same structure. They should
be indexed by individual ID, such as the image name, and each value is a
dictionary with model prediction capabilities as keys (e.g., "age_group",
"sex", "skin-color", etc.), and the values are the predictions, or ground-truth
values for each ID/capability.
if len(gt) != len(pred):
raise ValueError("Both arrays must have the same length.")
if `keys` is empty, then we infer from common keys present in `preds` and `gt`.
if len(gt) == 0 or len(pred) == 0:
raise ValueError("The arrays must be populated.")
arr_gt = np.asarray(gt)
arr_pred = np.asarray(pred)
return float(np.mean(np.abs(arr_gt - arr_pred) <= n))
def one_off_accuracy(gt: list[int], pred: list[int]) -> float:
"""1-off accuracy for ordinal class labels encoded as consecutive integers."""
return n_off_accuracy(gt, pred, n=1)
def two_off_accuracy(gt: list[int], pred: list[int]) -> float:
"""2-off accuracy for ordinal class labels encoded as consecutive integers."""
return n_off_accuracy(gt, pred, n=2)
def binary_fpr_fnr(cm: np.ndarray) -> dict[str, np.number]:
"""Given a confusion matrix, calculates the false-positive and negative rates.
Parameters
----------
gt: dict[str, dict[str, Any]]
preds: dict[str, dict[str, Any]]
keys: list[str] | None
cm: np.ndarray
The 2x2 confusion matrix returned by `scikit-learn`.
Returns
-------
metrics: dict[str, dict[str, float]]
"""
common_caps = keys
if not keys:
common_caps = set(gt.columns) & set(preds.columns)
if not common_caps:
logger.error(
f'No common capabilities found. Predictions has "{preds.columns}",'
f' ground-truth has "{gt.columns}".'
)
return None
metrics: dict[str, np.number]
The calculated metrics keyed under "FPR", "FNR", "TP", "TN", "FP", "FN".
# Finding common images between predictions and ground-truth.
common_inds = set(preds.index) & set(gt.index)
if not common_inds:
logger.error("No common images found between predictions and ground-truth.")
See Also
--------
sklearn.metrics.multilabel_confusion_matrix
sklearn.metrics.confusion_matrix
"""
tn, fp, fn, tp = cm.ravel()
return {
"FPR": fp / (fp + tn) if (fp + tn) != 0 else np.nan,
"FNR": fn / (fn + tp) if (fn + tp) != 0 else np.nan,
"TN": int(tn),
"FP": int(fp),
"FN": int(fn),
"TP": int(tp),
}
def multiclass_fpr_fnr(gt: pd.Series, preds: pd.Series, labels: list[Any] | None = None):
"""Calculates one-vs-rest false-positive and negative rates for each class.
Also returns the counts of false-positives, false-negatives, true-positives
and true-negatives, i.e., the confusion matrix for each class.
Parameters
----------
gt: pd.Series
Indexed ground-truth data.
preds: pd.Series
Indexed predictions.
labels: list[Any], optional
List of labels in the data. If left empty, then we infer from the
union of unique elements in `gt` and `preds`.
Returns
-------
results: dict[str, dict[str, number]]
The false-{positive,negative}-rates, and true/false positive/negative
values for each label in `labels`.
labels: list[str]
The labels used when calculating the confusion matrix. If labels was
passed as an argument, then it is returned unchanged, else, we return
the labels inferred from the data.
"""
if labels is None or not labels:
logging.info("Labels not provided. Inferring from the data.")
labels = sorted(list(set(gt.unique()) | set(preds.unique())))
mcm = multilabel_confusion_matrix(gt, preds, labels=labels)
results = {}
for label, m in zip(labels, mcm):
results[label] = binary_fpr_fnr(m)
return results, labels
def _agreement_sanity_checks(x_pred: pd.Series, y_pred: pd.Series):
if len(x_pred) != len(y_pred):
raise ValueError(
f"Predictions have different lengths. len(x_pred) = {len(x_pred)}"
f" len(y_pred) = {len(y_pred)}"
)
if not all(x_pred.index == y_pred.index):
raise ValueError("Index mismatch between series")
return True
def agreement_fraction(x_pred: pd.Series, y_pred: pd.Series):
"""Calculates the fraction of agreement between predictions by two models.
Note that the predictions must both have the same indices and lengths.
Parameters
----------
x_pred: pd.Series
Predictions of the first model.
y_pred: pd.Series
Second model predictions.
Returns
-------
fraction: float
The fraction of agreement between the results.
"""
try:
_agreement_sanity_checks(x_pred, y_pred)
except ValueError as e:
logger.error(f"Cannot calculate agreement fraction -- {str(e)}.")
return None
metric_vals = dict()
for cap in common_caps:
if isinstance(preds[cap].iloc[0], (float, int)):
metric_vals[cap] = {
"mean_absolute_error": mean_absolute_error(gt[cap], preds[cap]),
"max_error": max_error(gt[cap], preds[cap]),
}
else:
labels = sorted(list(set(preds[cap].unique()) | set(gt[cap].unique())))
metric_vals[cap] = {
"accuracy": accuracy_score(gt[cap], preds[cap]),
"balanced_accuracy": balanced_accuracy_score(gt[cap], preds[cap]),
"cohen-kappa": cohen_kappa_score(gt[cap], preds[cap], labels=labels),
}
return pd.DataFrame.from_dict(metric_vals)
return (x_pred == y_pred).sum() / len(x_pred)
def _find_unique_values_per_capability(
class_output: dict[str, dict[Capability, Any]], caps: list[Capability] | None = None
) -> dict[Capability, str]:
"""Returns the set of values per capability in `class_output`.
def agreement_elements(x_pred: pd.Series, y_pred: pd.Series, return_disagreement: bool=True):
"""Returns the elements of agreement, and optionally, disagreement between models.
Note that, as in `agreement_fraction`, the predictions must have the same
lengths and matching indices.
Parameters
----------
class_output: dict[str, dict[Capability, Any]]
The classification results, or ground-truth data indexed by element.
x_pred: pd.Series
Predictions of the first model.
caps: list[Capability] | None
The list of capabilities to find unique values for. If left as `None`,
we will find unique values for all of them.
y_pred: pd.Series
Second model predictions.
Results
-------
unique_vals: dict[Capability, str]
The unique values indexed by capability.
"""
if caps is None:
caps = list(next(iter(class_output.values())).keys())
elif not isinstance(caps, (list, tuple)):
caps = [caps]
unique_vals = dict()
for cap in caps:
unique_vals[cap] = set()
for res in class_output.values():
unique_vals[cap].add(res[cap])
return unique_vals
def _get_capability_data(
class_outputs: dict[str, dict[Capability, Any]], cap: Capability
) -> dict[str, Any]:
"""Returns data for all individuals regarding a capability.
Parameters
----------
class_outputs: dict[str, dict[Capability, Any]]
The estimator outputs indexed by individual.
cap: Capability
The desired capability.
return_disagreement: bool, optional
Returns the disagreement as well if set (default behavior), otherwise,
returns `None` for the `disagreement_idx`.
Returns
-------
data: dict[str, Any]
The capability data indexed by individual.
agreement_idx: pd.Series
The index of elements where models have the same results.
disagreement_idx: pd.Series | None
The index of elements where models diverge on their results. Only
returned when `return_disagreement` is True, else it returns `None.
"""
data_per_id = dict()
try:
_agreement_sanity_checks(x_pred, y_pred)
except ValueError as e:
logger.error(f"Cannot get the (dis)agreement elements -- {str(e)}.")
return None
for ind, data in class_outputs.items():
if cap not in data:
logger.warning(
f'Entry for capability "{cap.value}" not found for individual "{ind}". Skipping.'
)
continue
data_per_id[ind] = data[cap]
return data_per_id
def _filter_by_index(data: dict[str, Any], indx: Any):
return dict((k, v) for k, v in data.items() if k in indx)
def calc_metrics_per_subgroup(
gt: dict[str, dict[str, Any]], preds: dict[str, dict[str, Any]]
) -> dict[Capability, dict[Any, dict]]:
"""Calculate performance metrics per sub-group for each capability.
Parameters
----------
gt: dict[str, dict[str, Any]]
preds: dict[str, dict[str, Any]]
Returns
-------
metrics: dict[Capability, dict[Any, dict]]
"""
common_caps = set(_find_common_capabilities(gt, preds))
metrics = {}
for cap in common_caps:
if cap == Capability.AGE:
continue
other_caps = common_caps - set([cap])
unique_values_cap = _find_unique_values_per_capability(gt, cap)[cap]
metrics[cap] = {}
for val in unique_values_cap:
ids = [k for k, v in gt.items() if v[cap] == val]
metrics[cap][val] = {"number_of_elements": len(ids)}
for ocap in other_caps:
metrics[cap][val][ocap] = {}
filtered_pred = _filter_by_index(_get_capability_data(preds, ocap), ids)
filtered_gt = _filter_by_index(_get_capability_data(gt, ocap), ids)
filtered_pred_data = np.array([filtered_pred[i] for i in ids])
filtered_gt_data = np.array([filtered_gt[i] for i in ids])
if isinstance(filtered_pred_data[0], float):
# If data is numeric, we calculate regression-based metrics
metrics[cap][val][ocap] = {
"mean_absolute_error": mean_absolute_error(
filtered_gt_data, filtered_pred_data
),
"max_error": max_error(filtered_gt_data, filtered_pred_data),
}
else:
if len(np.unique(filtered_gt_data)) == 2:
print(cap, val, ocap)
cm = confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt_data, ocap))
else:
print(cap, val, ocap)
cm = multilabel_confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt, ocap))
return cm
metrics[cap][val][ocap] = {
"accuracy": accuracy_score(
filtered_gt_data, filtered_pred_data
),
}
return metrics
def get_unique_labels(data, cap: Capability):
caps = set()
for id, vals in data.items():
if cap not in vals:
logger.warning(f'Capability "{cap.value}" not registered for individual "{id}". Skipping')
continue
caps.add(vals[cap])
return caps
idx = x_pred.index
if return_disagreement:
return idx[x_pred == y_pred], idx[x_pred != y_pred]
else:
return idx[x_pred == y_pred], None
# TODO(gschardong): Move to the same file as `load_dataset`
def _to_age_bracket(row):
iage = int(row["age"])
if iage < 3:
return "00-02"
elif iage < 10:
return "03-09"
elif iage > 69:
return "70+"
d = iage // 10 * 10
return "{}-{}".format(d, d + 9)
@@ -237,7 +236,6 @@ if __name__ == '__main__':
DATASET_PATH = Path("../../data/facing2-train/")
METADATA_PATH = DATASET_PATH / "meta-w-age.csv"
DETECTOR_PATH = Path("../../models/blaze_face_full_range.tflite")
# TEST_IMS: list[str] = ["10", "12", "14", "9"]
detector = MediapipeDetector(str(DETECTOR_PATH))
imdict, _ = load_dataset(
@@ -249,41 +247,77 @@ if __name__ == '__main__':
face_bboxes = get_face_boxes(imdict, detector)
# for t in TEST_IMS:
# logger.info("-- {} - {}".format(t, meta[str(t)]))
print(FairFace.capabilities())
model_ff = FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu")
preds_ff = model_ff.predict(imdict)
preds_ff = pd.DataFrame.from_dict(preds_ff).T
preds_ff.index.rename("image", inplace=True)
preds_ff.index = preds_ff.index.astype(meta.index.dtype)
preds_ff = preds_ff.sort_index()
metrics_ff = calc_model_performance(meta, preds_ff)
# logger.info("FairFace -- Test Images")
# for t in TEST_IMS:
# logger.info("--{} - {}".format(t, preds_ff[str(t)]))
metrics_ff_groups = calc_metrics_per_subgroup(meta, preds_ff)
for k, v in metrics_ff_groups.items():
for kv, vv in v.items():
print(k, kv, vv)
print(MiVOLOv1.capabilities())
model_mv = MiVOLOv1(
Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu"
)
preds_mv = model_mv.predict(imdict)
preds_mv = pd.DataFrame.from_dict(preds_mv).T
preds_mv.index.rename("image", inplace=True)
preds_mv.index = preds_mv.index.astype(meta.index.dtype)
preds_mv = preds_mv.sort_index()
# logger.info("MiVOLOv1(Face Only) -- Test Images")
# for t in TEST_IMS:
# logger.info("{} - {}".format(t, preds_mv[str(t)]))
models = {
"fairface": FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu"),
"mivolo": MiVOLOv1(Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu")
}
metrics_mv = calc_model_performance(meta, preds_mv)
preds_per_model = dict((k, None) for k in models.keys())
for model_name, model in models.items():
preds = model.predict(imdict)
preds = pd.DataFrame.from_dict(preds).T
preds.index.rename("image", inplace=True)
preds.index = preds.index.astype(meta.index.dtype)
preds = preds.sort_index()
preds_per_model[model_name] = preds
for model_name, preds in preds_per_model.items():
gt_age_group_ord = meta["age_group"].apply(lambda x: _agegroup_int_map[x])
preds_age_group_ord = preds["age_group"].apply(lambda x: _agegroup_int_map[x])
acc_one_off = one_off_accuracy(gt_age_group_ord, preds_age_group_ord)
acc_two_off = two_off_accuracy(gt_age_group_ord, preds_age_group_ord)
agegroup_subclass, labels = multiclass_fpr_fnr(
meta["age_group"],
preds["age_group"]
)
print("==== Age group metrics by class ====")
# Print as table.
for k, v in agegroup_subclass.items():
print(f"Class {k}")
for m, vv in v.items():
print(f"\t{m} -- {vv}")
agegroup_subclass = pd.DataFrame.from_dict(agegroup_subclass)
print(agegroup_subclass)
model_cls = type(model)
ordered_labels = model_cls.possible_capability_values(Capability.SEX)
metrics_sex = binary_fpr_fnr(
confusion_matrix(
meta["sex"],
preds["sex"],
labels=ordered_labels
))
print(
"==== Sex metrics ===="
f"\nPositive class -- {ordered_labels[0]},"
f" Negative class -- {ordered_labels[1]}"
)
for k, v in metrics_sex.items():
print(f"\t{k} -- {v}")
# Agreement tests
model_list = list(models.keys())
for i in range(len(model_list)):
for j in range(i+1, len(model_list)):
first, second = model_list[i], model_list[j]
print(f"{first} -- {second}")
for cap in model_cls.capabilities():
if cap == Capability.AGE:
continue
frac = agreement_fraction(preds_per_model[first][cap], preds_per_model[second][cap])
print(f'Agreement fraction for capability: "{cap}" - {frac}')
agreement, disagreement = agreement_elements(
meta[cap],
preds[cap],
return_disagreement=True
)
print(disagreement)