import copy
import io
from contextlib import redirect_stdout
import numpy as np
import pycocotools.mask as mask_util
import torch
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from .utils import all_gather
[docs]
class COCOevalIoMean(COCOeval):
"""Replace IoU by IoMean."""
[docs]
def computeIoU(self, imgId, catId):
return self.computeIoMean(imgId, catId)
[docs]
def computeIoMean(self, imgId, catId):
p = self.params
if p.useCats:
gt = self._gts[imgId, catId]
dt = self._dts[imgId, catId]
else:
gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]]
dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]]
if len(gt) == 0 and len(dt) == 0:
return []
inds = np.argsort([-d["score"] for d in dt], kind="mergesort")
dt = [dt[i] for i in inds]
if len(dt) > p.maxDets[-1]:
dt = dt[0 : p.maxDets[-1]]
if p.iouType == "segm":
gt = [g["segmentation"] for g in gt]
dt = [d["segmentation"] for d in dt]
elif p.iouType == "bbox":
gt = [g["bbox"] for g in gt]
dt = [d["bbox"] for d in dt]
else:
raise ValueError("Unknown iouType for iou computation")
# areas for each mask
gt_areas = np.array([mask_util.area(g) for g in gt])
dt_areas = np.array([mask_util.area(d) for d in dt])
iscrowd = [int(o["iscrowd"]) for o in gt]
# compute IoMean = intersection / mean(area_gt, area_dt)
inter_area = mask_util.iou(dt, gt, iscrowd) * (
dt_areas[:, None]
+ gt_areas[None, :]
- mask_util.iou(dt, gt, iscrowd)
)
mean_area = 0.5 * (dt_areas[:, None] + gt_areas[None, :])
return inter_area / np.maximum(mean_area, 1e-10)
[docs]
class CocoEvaluator:
def __init__(self, coco_gt, iou_types):
if not isinstance(iou_types, (list, tuple)):
msg = f"This constructor expects iou_types of type list or tuple, instead got {type(iou_types)}"
raise TypeError(msg)
coco_gt = copy.deepcopy(coco_gt)
self.coco_gt = coco_gt
self.iou_types = iou_types
self.coco_eval = {}
for iou_type in iou_types:
coco_eval = COCOevalIoMean(coco_gt, iouType=iou_type)
num_keypoints = 3
coco_eval.params.kpt_oks_sigmas = np.full(
num_keypoints, fill_value=0.05
)
self.coco_eval[iou_type] = coco_eval
self.img_ids = []
self.eval_imgs = {k: [] for k in iou_types}
[docs]
def update(self, predictions):
img_ids = list(np.unique(list(predictions.keys())))
self.img_ids.extend(img_ids)
for iou_type in self.iou_types:
results = self.prepare(predictions, iou_type)
with redirect_stdout(io.StringIO()):
if "info" not in self.coco_gt.dataset:
self.coco_gt.dataset["info"] = {}
coco_dt = (
COCO.loadRes(self.coco_gt, results) if results else COCO()
)
coco_eval = self.coco_eval[iou_type]
coco_eval.cocoDt = coco_dt
coco_eval.params.imgIds = list(img_ids)
img_ids, eval_imgs = evaluate(coco_eval)
self.eval_imgs[iou_type].append(eval_imgs)
[docs]
def synchronize_between_processes(self):
for iou_type in self.iou_types:
self.eval_imgs[iou_type] = np.concatenate(
self.eval_imgs[iou_type], 2
)
create_common_coco_eval(
self.coco_eval[iou_type], self.img_ids, self.eval_imgs[iou_type]
)
[docs]
def accumulate(self):
for coco_eval in self.coco_eval.values():
coco_eval.accumulate()
[docs]
def summarize(self):
for iou_type, coco_eval in self.coco_eval.items():
print(f"IoU metric: {iou_type}")
coco_eval.summarize()
[docs]
def prepare(self, predictions, iou_type):
if iou_type == "bbox":
return self.prepare_for_coco_detection(predictions)
if iou_type == "segm":
return self.prepare_for_coco_segmentation(predictions)
if iou_type == "keypoints":
return self.prepare_for_coco_keypoint(predictions)
raise ValueError(f"Unknown iou type {iou_type}")
[docs]
def prepare_for_coco_detection(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
boxes = prediction["boxes"]
boxes = convert_to_xywh(boxes).tolist()
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"bbox": box,
"score": scores[k],
}
for k, box in enumerate(boxes)
]
)
return coco_results
[docs]
def prepare_for_coco_segmentation(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
scores = prediction["scores"]
labels = prediction["labels"]
masks = prediction["masks"]
masks = masks > 0.5
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
rles = [
mask_util.encode(
np.array(
mask[0, :, :, np.newaxis], dtype=np.uint8, order="F"
)
)[0]
for mask in masks
]
for rle in rles:
rle["counts"] = rle["counts"].decode("utf-8")
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"segmentation": rle,
"score": scores[k],
}
for k, rle in enumerate(rles)
]
)
return coco_results
[docs]
def prepare_for_coco_keypoint(self, predictions):
coco_results = []
for original_id, prediction in predictions.items():
if len(prediction) == 0:
continue
boxes = prediction["boxes"]
boxes = convert_to_xywh(boxes).tolist()
scores = prediction["scores"].tolist()
labels = prediction["labels"].tolist()
keypoints = prediction["keypoints"]
keypoints = keypoints.flatten(start_dim=1).tolist()
coco_results.extend(
[
{
"image_id": original_id,
"category_id": labels[k],
"keypoints": keypoint,
"score": scores[k],
}
for k, keypoint in enumerate(keypoints)
]
)
return coco_results
[docs]
def convert_to_xywh(boxes):
xmin, ymin, xmax, ymax = boxes.unbind(1)
return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)
[docs]
def merge(img_ids, eval_imgs):
all_img_ids = all_gather(img_ids)
all_eval_imgs = all_gather(eval_imgs)
merged_img_ids = []
for p in all_img_ids:
merged_img_ids.extend(p)
merged_eval_imgs = []
for p in all_eval_imgs:
merged_eval_imgs.append(p)
merged_img_ids = np.array(merged_img_ids)
merged_eval_imgs = np.concatenate(merged_eval_imgs, 2)
# keep only unique (and in sorted order) images
merged_img_ids, idx = np.unique(merged_img_ids, return_index=True)
merged_eval_imgs = merged_eval_imgs[..., idx]
return merged_img_ids, merged_eval_imgs
[docs]
def create_common_coco_eval(coco_eval, img_ids, eval_imgs):
img_ids, eval_imgs = merge(img_ids, eval_imgs)
img_ids = list(img_ids)
eval_imgs = list(eval_imgs.flatten())
coco_eval.evalImgs = eval_imgs
coco_eval.params.imgIds = img_ids
coco_eval._paramsEval = copy.deepcopy(coco_eval.params)
[docs]
def evaluate(imgs):
with redirect_stdout(io.StringIO()):
imgs.evaluate()
return imgs.params.imgIds, np.asarray(imgs.evalImgs).reshape(
-1, len(imgs.params.areaRng), len(imgs.params.imgIds)
)