Source code for discopat.nn_training.torch_detection_utils.coco_eval

import copy
import io
from contextlib import redirect_stdout

import numpy as np
import pycocotools.mask as mask_util
import torch
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

from .utils import all_gather


[docs] class COCOevalIoMean(COCOeval): """Replace IoU by IoMean."""
[docs] def computeIoU(self, imgId, catId): return self.computeIoMean(imgId, catId)
[docs] def computeIoMean(self, imgId, catId): p = self.params if p.useCats: gt = self._gts[imgId, catId] dt = self._dts[imgId, catId] else: gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]] dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]] if len(gt) == 0 and len(dt) == 0: return [] inds = np.argsort([-d["score"] for d in dt], kind="mergesort") dt = [dt[i] for i in inds] if len(dt) > p.maxDets[-1]: dt = dt[0 : p.maxDets[-1]] if p.iouType == "segm": gt = [g["segmentation"] for g in gt] dt = [d["segmentation"] for d in dt] elif p.iouType == "bbox": gt = [g["bbox"] for g in gt] dt = [d["bbox"] for d in dt] else: raise ValueError("Unknown iouType for iou computation") # areas for each mask gt_areas = np.array([mask_util.area(g) for g in gt]) dt_areas = np.array([mask_util.area(d) for d in dt]) iscrowd = [int(o["iscrowd"]) for o in gt] # compute IoMean = intersection / mean(area_gt, area_dt) inter_area = mask_util.iou(dt, gt, iscrowd) * ( dt_areas[:, None] + gt_areas[None, :] - mask_util.iou(dt, gt, iscrowd) ) mean_area = 0.5 * (dt_areas[:, None] + gt_areas[None, :]) return inter_area / np.maximum(mean_area, 1e-10)
[docs] class CocoEvaluator: def __init__(self, coco_gt, iou_types): if not isinstance(iou_types, (list, tuple)): msg = f"This constructor expects iou_types of type list or tuple, instead got {type(iou_types)}" raise TypeError(msg) coco_gt = copy.deepcopy(coco_gt) self.coco_gt = coco_gt self.iou_types = iou_types self.coco_eval = {} for iou_type in iou_types: coco_eval = COCOevalIoMean(coco_gt, iouType=iou_type) num_keypoints = 3 coco_eval.params.kpt_oks_sigmas = np.full( num_keypoints, fill_value=0.05 ) self.coco_eval[iou_type] = coco_eval self.img_ids = [] self.eval_imgs = {k: [] for k in iou_types}
[docs] def update(self, predictions): img_ids = list(np.unique(list(predictions.keys()))) self.img_ids.extend(img_ids) for iou_type in self.iou_types: results = self.prepare(predictions, iou_type) with redirect_stdout(io.StringIO()): if "info" not in self.coco_gt.dataset: self.coco_gt.dataset["info"] = {} coco_dt = ( COCO.loadRes(self.coco_gt, results) if results else COCO() ) coco_eval = self.coco_eval[iou_type] coco_eval.cocoDt = coco_dt coco_eval.params.imgIds = list(img_ids) img_ids, eval_imgs = evaluate(coco_eval) self.eval_imgs[iou_type].append(eval_imgs)
[docs] def synchronize_between_processes(self): for iou_type in self.iou_types: self.eval_imgs[iou_type] = np.concatenate( self.eval_imgs[iou_type], 2 ) create_common_coco_eval( self.coco_eval[iou_type], self.img_ids, self.eval_imgs[iou_type] )
[docs] def accumulate(self): for coco_eval in self.coco_eval.values(): coco_eval.accumulate()
[docs] def summarize(self): for iou_type, coco_eval in self.coco_eval.items(): print(f"IoU metric: {iou_type}") coco_eval.summarize()
[docs] def prepare(self, predictions, iou_type): if iou_type == "bbox": return self.prepare_for_coco_detection(predictions) if iou_type == "segm": return self.prepare_for_coco_segmentation(predictions) if iou_type == "keypoints": return self.prepare_for_coco_keypoint(predictions) raise ValueError(f"Unknown iou type {iou_type}")
[docs] def prepare_for_coco_detection(self, predictions): coco_results = [] for original_id, prediction in predictions.items(): if len(prediction) == 0: continue boxes = prediction["boxes"] boxes = convert_to_xywh(boxes).tolist() scores = prediction["scores"].tolist() labels = prediction["labels"].tolist() coco_results.extend( [ { "image_id": original_id, "category_id": labels[k], "bbox": box, "score": scores[k], } for k, box in enumerate(boxes) ] ) return coco_results
[docs] def prepare_for_coco_segmentation(self, predictions): coco_results = [] for original_id, prediction in predictions.items(): if len(prediction) == 0: continue scores = prediction["scores"] labels = prediction["labels"] masks = prediction["masks"] masks = masks > 0.5 scores = prediction["scores"].tolist() labels = prediction["labels"].tolist() rles = [ mask_util.encode( np.array( mask[0, :, :, np.newaxis], dtype=np.uint8, order="F" ) )[0] for mask in masks ] for rle in rles: rle["counts"] = rle["counts"].decode("utf-8") coco_results.extend( [ { "image_id": original_id, "category_id": labels[k], "segmentation": rle, "score": scores[k], } for k, rle in enumerate(rles) ] ) return coco_results
[docs] def prepare_for_coco_keypoint(self, predictions): coco_results = [] for original_id, prediction in predictions.items(): if len(prediction) == 0: continue boxes = prediction["boxes"] boxes = convert_to_xywh(boxes).tolist() scores = prediction["scores"].tolist() labels = prediction["labels"].tolist() keypoints = prediction["keypoints"] keypoints = keypoints.flatten(start_dim=1).tolist() coco_results.extend( [ { "image_id": original_id, "category_id": labels[k], "keypoints": keypoint, "score": scores[k], } for k, keypoint in enumerate(keypoints) ] ) return coco_results
[docs] def convert_to_xywh(boxes): xmin, ymin, xmax, ymax = boxes.unbind(1) return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)
[docs] def merge(img_ids, eval_imgs): all_img_ids = all_gather(img_ids) all_eval_imgs = all_gather(eval_imgs) merged_img_ids = [] for p in all_img_ids: merged_img_ids.extend(p) merged_eval_imgs = [] for p in all_eval_imgs: merged_eval_imgs.append(p) merged_img_ids = np.array(merged_img_ids) merged_eval_imgs = np.concatenate(merged_eval_imgs, 2) # keep only unique (and in sorted order) images merged_img_ids, idx = np.unique(merged_img_ids, return_index=True) merged_eval_imgs = merged_eval_imgs[..., idx] return merged_img_ids, merged_eval_imgs
[docs] def create_common_coco_eval(coco_eval, img_ids, eval_imgs): img_ids, eval_imgs = merge(img_ids, eval_imgs) img_ids = list(img_ids) eval_imgs = list(eval_imgs.flatten()) coco_eval.evalImgs = eval_imgs coco_eval.params.imgIds = img_ids coco_eval._paramsEval = copy.deepcopy(coco_eval.params)
[docs] def evaluate(imgs): with redirect_stdout(io.StringIO()): imgs.evaluate() return imgs.params.imgIds, np.asarray(imgs.evalImgs).reshape( -1, len(imgs.params.areaRng), len(imgs.params.imgIds) )