Added pandas as dependency. More below.
The rationale of using `csvfile` instead of `pandas` directly, was to avoid a fairly heavy dependency, since we were only reading the CSV data. Now, since we need to do some fairly convoluted filtering to calculate the subgroup metrics, its better to use pandas now.
This commit is contained in:
@@ -7,3 +7,4 @@ timm==1.0.26
|
||||
torch>=2.2.2
|
||||
torchvision>=0.17.2
|
||||
gdown==6.0.0
|
||||
pandas>=3.0.2
|
||||
|
||||
@@ -1,74 +1,26 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from sklearn.metrics import (
|
||||
accuracy_score,
|
||||
cohen_kappa_score,
|
||||
hamming_loss,
|
||||
max_error,
|
||||
mean_absolute_error,
|
||||
mean_squared_error,
|
||||
precision_score,
|
||||
)
|
||||
import pandas as pd
|
||||
from sklearn.metrics import (accuracy_score, cohen_kappa_score,
|
||||
confusion_matrix, max_error, mean_absolute_error,
|
||||
multilabel_confusion_matrix, balanced_accuracy_score)
|
||||
|
||||
from facebias.estimators import Capability
|
||||
|
||||
logger = logging.getLogger("facebias:metrics")
|
||||
|
||||
|
||||
def find_common_capabilities(
|
||||
gt: dict[str, dict[Capability, Any]], preds: dict[str, dict[Capability, Any]]
|
||||
) -> list[str]:
|
||||
"""Iterates on `preds` and `gt`, finding common model capabilities.
|
||||
|
||||
Some models predict different features of face images. Some predict sex,
|
||||
age and skin color, while others may predict only one of these features, or
|
||||
others beyond them. This function finds the common capabilities, returning
|
||||
them as a list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
gt: dict[str, dict[Capability, Any]]
|
||||
Ground-truth data indexed by element ID, and values are a
|
||||
feature -> prediction dictionary.
|
||||
|
||||
preds: dict[str, dict[Capability, Any]]
|
||||
Predictions data in the same format as `gt`.
|
||||
|
||||
Returns
|
||||
-------
|
||||
common_keys: list[Capability]
|
||||
The common features between `gt` and `preds`. If no common features are
|
||||
found, returns an empty list.
|
||||
"""
|
||||
# Find the first common element between the `gt` and `preds`.
|
||||
it = iter(gt)
|
||||
common_elem = ""
|
||||
while True:
|
||||
try:
|
||||
common_elem = next(it)
|
||||
except StopIteration:
|
||||
break
|
||||
else:
|
||||
if common_elem in preds:
|
||||
break
|
||||
|
||||
if not common_elem:
|
||||
return []
|
||||
|
||||
gt_keys = set(gt[common_elem].keys())
|
||||
preds_keys = set(preds[common_elem].keys())
|
||||
return list(gt_keys & preds_keys)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(f"facebias:{__name__}")
|
||||
|
||||
|
||||
def calc_model_performance(
|
||||
gt: dict[str, dict[str, Any]],
|
||||
preds: dict[str, dict[str, Any]],
|
||||
keys: list[str] = [],
|
||||
) -> dict[str, dict[str, float]]:
|
||||
gt: pd.DataFrame,
|
||||
preds: pd.DataFrame,
|
||||
keys: list[str] = [],
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
We assume that both `gt` and `preds` have the same structure. They should
|
||||
be indexed by individual ID, such as the image name, and each value is a
|
||||
@@ -90,46 +42,39 @@ def calc_model_performance(
|
||||
"""
|
||||
common_caps = keys
|
||||
if not keys:
|
||||
common_caps = find_common_capabilities(gt, preds)
|
||||
common_caps = set(gt.columns) & set(preds.columns)
|
||||
if not common_caps:
|
||||
kgt = next(iter(gt))
|
||||
kpd = next(iter(preds))
|
||||
logger.error(
|
||||
f'No common capabilities found. Predictions has "{preds[kpd].keys()}",'
|
||||
f' ground-truth has "{gt[kgt].keys()}".'
|
||||
f'No common capabilities found. Predictions has "{preds.columns}",'
|
||||
f' ground-truth has "{gt.columns}".'
|
||||
)
|
||||
return None
|
||||
|
||||
# Finding common images between predictions and ground-truth.
|
||||
common_inds = set(preds.keys()) & set(gt.keys())
|
||||
common_inds = set(preds.index) & set(gt.index)
|
||||
if not common_inds:
|
||||
logger.error("No common images found between predictions and ground-truth.")
|
||||
return None
|
||||
|
||||
metric_vals = dict()
|
||||
for cat in common_caps:
|
||||
pred_data = [None for _ in common_inds]
|
||||
gt_data = [None for _ in common_inds]
|
||||
for i, ix in enumerate(common_inds):
|
||||
pred_data[i] = preds[ix][cat]
|
||||
gt_data[i] = gt[ix][cat]
|
||||
|
||||
if isinstance(pred_data[0], float):
|
||||
pred_data = np.array(pred_data)
|
||||
gt_data = np.array(gt_data)
|
||||
metric_vals[cat] = {
|
||||
"mean_absolute_error": mean_absolute_error(gt_data, pred_data),
|
||||
for cap in common_caps:
|
||||
if isinstance(preds[cap].iloc[0], (float, int)):
|
||||
metric_vals[cap] = {
|
||||
"mean_absolute_error": mean_absolute_error(gt[cap], preds[cap]),
|
||||
"max_error": max_error(gt[cap], preds[cap]),
|
||||
}
|
||||
else:
|
||||
metric_vals[cat] = {
|
||||
"accuracy": accuracy_score(gt_data, pred_data),
|
||||
"cohen-kappa": cohen_kappa_score(gt_data, pred_data),
|
||||
labels = sorted(list(set(preds[cap].unique()) | set(gt[cap].unique())))
|
||||
metric_vals[cap] = {
|
||||
"accuracy": accuracy_score(gt[cap], preds[cap]),
|
||||
"balanced_accuracy": balanced_accuracy_score(gt[cap], preds[cap]),
|
||||
"cohen-kappa": cohen_kappa_score(gt[cap], preds[cap], labels=labels),
|
||||
}
|
||||
|
||||
return metric_vals
|
||||
return pd.DataFrame.from_dict(metric_vals)
|
||||
|
||||
|
||||
def find_unique_values_per_capability(
|
||||
def _find_unique_values_per_capability(
|
||||
class_output: dict[str, dict[Capability, Any]], caps: list[Capability] | None = None
|
||||
) -> dict[Capability, str]:
|
||||
"""Returns the set of values per capability in `class_output`.
|
||||
@@ -162,7 +107,7 @@ def find_unique_values_per_capability(
|
||||
return unique_vals
|
||||
|
||||
|
||||
def get_capability_data(
|
||||
def _get_capability_data(
|
||||
class_outputs: dict[str, dict[Capability, Any]], cap: Capability
|
||||
) -> dict[str, Any]:
|
||||
"""Returns data for all individuals regarding a capability.
|
||||
@@ -193,7 +138,7 @@ def get_capability_data(
|
||||
return data_per_id
|
||||
|
||||
|
||||
def filter_by_index(data: dict[str, Any], indx: Any):
|
||||
def _filter_by_index(data: dict[str, Any], indx: Any):
|
||||
return dict((k, v) for k, v in data.items() if k in indx)
|
||||
|
||||
|
||||
@@ -212,7 +157,7 @@ def calc_metrics_per_subgroup(
|
||||
-------
|
||||
metrics: dict[Capability, dict[Any, dict]]
|
||||
"""
|
||||
common_caps = set(find_common_capabilities(gt, preds))
|
||||
common_caps = set(_find_common_capabilities(gt, preds))
|
||||
|
||||
metrics = {}
|
||||
for cap in common_caps:
|
||||
@@ -220,7 +165,7 @@ def calc_metrics_per_subgroup(
|
||||
continue
|
||||
|
||||
other_caps = common_caps - set([cap])
|
||||
unique_values_cap = find_unique_values_per_capability(gt, cap)[cap]
|
||||
unique_values_cap = _find_unique_values_per_capability(gt, cap)[cap]
|
||||
|
||||
metrics[cap] = {}
|
||||
for val in unique_values_cap:
|
||||
@@ -229,13 +174,14 @@ def calc_metrics_per_subgroup(
|
||||
metrics[cap][val] = {"number_of_elements": len(ids)}
|
||||
for ocap in other_caps:
|
||||
metrics[cap][val][ocap] = {}
|
||||
filtered_pred = filter_by_index(get_capability_data(preds, ocap), ids)
|
||||
filtered_gt = filter_by_index(get_capability_data(gt, ocap), ids)
|
||||
filtered_pred = _filter_by_index(_get_capability_data(preds, ocap), ids)
|
||||
filtered_gt = _filter_by_index(_get_capability_data(gt, ocap), ids)
|
||||
|
||||
filtered_pred_data = np.array([filtered_pred[i] for i in ids])
|
||||
filtered_gt_data = np.array([filtered_gt[i] for i in ids])
|
||||
|
||||
if isinstance(filtered_pred_data[0], float):
|
||||
# If data is numeric, we calculate regression-based metrics
|
||||
metrics[cap][val][ocap] = {
|
||||
"mean_absolute_error": mean_absolute_error(
|
||||
filtered_gt_data, filtered_pred_data
|
||||
@@ -243,6 +189,14 @@ def calc_metrics_per_subgroup(
|
||||
"max_error": max_error(filtered_gt_data, filtered_pred_data),
|
||||
}
|
||||
else:
|
||||
if len(np.unique(filtered_gt_data)) == 2:
|
||||
print(cap, val, ocap)
|
||||
cm = confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt_data, ocap))
|
||||
else:
|
||||
print(cap, val, ocap)
|
||||
cm = multilabel_confusion_matrix(filtered_gt_data, filtered_pred_data, labels=get_unique_labels(filtered_gt, ocap))
|
||||
return cm
|
||||
|
||||
metrics[cap][val][ocap] = {
|
||||
"accuracy": accuracy_score(
|
||||
filtered_gt_data, filtered_pred_data
|
||||
@@ -250,3 +204,86 @@ def calc_metrics_per_subgroup(
|
||||
}
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
def get_unique_labels(data, cap: Capability):
|
||||
caps = set()
|
||||
for id, vals in data.items():
|
||||
if cap not in vals:
|
||||
logger.warning(f'Capability "{cap.value}" not registered for individual "{id}". Skipping')
|
||||
continue
|
||||
|
||||
caps.add(vals[cap])
|
||||
|
||||
return caps
|
||||
|
||||
|
||||
def _to_age_bracket(row):
|
||||
iage = int(row["age"])
|
||||
d = iage // 10 * 10
|
||||
return "{}-{}".format(d, d + 9)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import os
|
||||
logger.info(os.getcwd())
|
||||
|
||||
from facebias import load_dataset
|
||||
from facebias.detectors import get_face_boxes
|
||||
from facebias.detectors.mediapipe import MediapipeDetector
|
||||
from facebias.estimators.fairface import FairFace
|
||||
from facebias.estimators.mivolov1 import MiVOLOv1
|
||||
|
||||
DATASET_PATH = Path("../../data/facing2-train/")
|
||||
METADATA_PATH = DATASET_PATH / "meta-w-age.csv"
|
||||
DETECTOR_PATH = Path("../../models/blaze_face_full_range.tflite")
|
||||
# TEST_IMS: list[str] = ["10", "12", "14", "9"]
|
||||
|
||||
detector = MediapipeDetector(str(DETECTOR_PATH))
|
||||
imdict, _ = load_dataset(
|
||||
DATASET_PATH, meta_path=None, imname_proc_fn=lambda x: x.split("_")[0]
|
||||
)
|
||||
meta = pd.read_csv(METADATA_PATH, sep=',', index_col="image")
|
||||
meta[Capability.AGEGROUP.value] = meta.apply(_to_age_bracket, axis=1)
|
||||
meta = meta.sort_index()
|
||||
|
||||
face_bboxes = get_face_boxes(imdict, detector)
|
||||
|
||||
# for t in TEST_IMS:
|
||||
# logger.info("-- {} - {}".format(t, meta[str(t)]))
|
||||
|
||||
print(FairFace.capabilities())
|
||||
model_ff = FairFace(Path("../../models/fairface_alldata_4race_20191111.pt"), device="cpu")
|
||||
preds_ff = model_ff.predict(imdict)
|
||||
preds_ff = pd.DataFrame.from_dict(preds_ff).T
|
||||
preds_ff.index.rename("image", inplace=True)
|
||||
preds_ff.index = preds_ff.index.astype(meta.index.dtype)
|
||||
preds_ff = preds_ff.sort_index()
|
||||
|
||||
metrics_ff = calc_model_performance(meta, preds_ff)
|
||||
|
||||
# logger.info("FairFace -- Test Images")
|
||||
# for t in TEST_IMS:
|
||||
# logger.info("--{} - {}".format(t, preds_ff[str(t)]))
|
||||
|
||||
metrics_ff_groups = calc_metrics_per_subgroup(meta, preds_ff)
|
||||
for k, v in metrics_ff_groups.items():
|
||||
for kv, vv in v.items():
|
||||
print(k, kv, vv)
|
||||
|
||||
|
||||
print(MiVOLOv1.capabilities())
|
||||
model_mv = MiVOLOv1(
|
||||
Path("../../models/volo-v1_model_imdb_age_gender_4.22.pth.tar"), device="cpu"
|
||||
)
|
||||
preds_mv = model_mv.predict(imdict)
|
||||
preds_mv = pd.DataFrame.from_dict(preds_mv).T
|
||||
preds_mv.index.rename("image", inplace=True)
|
||||
preds_mv.index = preds_mv.index.astype(meta.index.dtype)
|
||||
preds_mv = preds_mv.sort_index()
|
||||
|
||||
# logger.info("MiVOLOv1(Face Only) -- Test Images")
|
||||
# for t in TEST_IMS:
|
||||
# logger.info("{} - {}".format(t, preds_mv[str(t)]))
|
||||
|
||||
metrics_mv = calc_model_performance(meta, preds_mv)
|
||||
|
||||
Reference in New Issue
Block a user