Source code for geowatch.tasks.fusion.evaluate

#!/usr/bin/env python3
"""
Compute semantic segmentation evaluation metrics

TODO::
- RRMSE (relative root mean squared error) RMSE normalized by root mean sqare value where each residual is scaled against the actual value
  sqrt((1 / n) * sum((y - y_hat) ** 2) / sum(y ** 2))

TODO:
    - [ ] Move to kwcoco proper
"""
import json
import kwarray
import kwcoco
import kwimage
import numpy as np
import os
import pandas as pd
import sklearn.metrics as skm
import ubelt as ub
import warnings
from kwcoco.coco_evaluator import CocoSingleResult
from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors
from kwcoco.metrics.confusion_measures import OneVersusRestMeasureCombiner
from kwcoco.metrics.confusion_vectors import OneVsRestConfusionVectors
from kwcoco.metrics.confusion_measures import MeasureCombiner
# from kwcoco.metrics.confusion_measures import PerClass_Measures
from kwcoco.metrics.confusion_measures import Measures
from typing import Dict
import scriptconfig as scfg
from shapely.ops import unary_union

from geowatch.utils import kwcoco_extensions
from geowatch import heuristics

try:
    from line_profiler import profile
except Exception:
    profile = ub.identity


# The colors I traditionally use for truth and predictions
# TRUE_GREEN = 'limegreen'
# PRED_BLUE = 'dodgerblue'

# If we have a recent kwimage we can use kitware colors, which look pretty good
# in these roles too.
TRUE_GREEN = 'kitware_green'
PRED_BLUE = 'kitware_blue'


[docs] class SegmentationEvalConfig(scfg.DataConfig): """ Evaluation script for change/segmentation task """ true_dataset = scfg.Value(None, help='path to the groundtruth dataset') pred_dataset = scfg.Value(None, help='path to the predicted dataset') eval_dpath = scfg.Value(None, help='directory to dump results') eval_fpath = scfg.Value(None, help='path to dump result summary') # options draw_curves = scfg.Value('auto', help='flag to draw curves or not') draw_heatmaps = scfg.Value('auto', help='flag to draw heatmaps or not') draw_legend = scfg.Value(True) draw_weights = scfg.Value(False) score_space = scfg.Value('auto', help='can score in image or video space. If auto, chooses video if there are any, otherwise image') resolution = scfg.Value(None, help='if specified, override the default resolution to score at') workers = scfg.Value('auto', help='number of parallel scoring workers') draw_workers = scfg.Value('auto', help='number of parallel drawing workers') viz_thresh = scfg.Value('auto', help='visualization threshold') balance_area = scfg.Value(False, isflag=True, help='upweight small instances, downweight large instances') # thresh_bins = scfg.Value(128 * 128, help='threshold resolution, default is high, generally ok to lower') thresh_bins = scfg.Value(32 * 32, help='threshold resolution.')
[docs] def main(cmdline=True, **kwargs): """ Entry point: todo: doctest and CLI structure """ full_config = SegmentationEvalConfig.cli( cmdline=cmdline, data=kwargs, strict=True) import rich rich.print('full_config = {}'.format(ub.urepr(full_config, nl=1))) full_config = ub.udict(full_config) true_coco = kwcoco.CocoDataset.coerce(full_config['true_dataset']) pred_coco = kwcoco.CocoDataset.coerce(full_config['pred_dataset']) eval_fpath = full_config['eval_fpath'] eval_dpath = full_config['eval_dpath'] config = full_config - { 'true_dataset', 'pred_dataset', 'eval_dpath', 'eval_fpath'} evaluate_segmentations(true_coco, pred_coco, eval_dpath, eval_fpath, config)
[docs] @profile def single_image_segmentation_metrics(pred_coco_img, true_coco_img, true_classes, true_dets, video1=None, thresh_bins=None, config=None, salient_channel='salient'): """ Args: true_coco_img (kwcoco.CocoImage): detatched true coco image pred_coco_img (kwcoco.CocoImage): detatched predicted coco image thresh_bins (int): if specified rounds scores into this many bins to make calculating metrics more efficient CommandLine: xdoctest -m geowatch.tasks.fusion.evaluate single_image_segmentation_metrics Example: >>> from geowatch.tasks.fusion.evaluate import * # NOQA >>> from kwcoco.coco_evaluator import CocoEvaluator >>> from kwcoco.demo.perterb import perterb_coco >>> import kwcoco >>> # TODO: kwcoco demodata with easy dummy heatmap channels >>> true_coco = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64)) >>> # Score an image against itself >>> true_coco_img = true_coco.images()[0:1].coco_images[0] >>> pred_coco_img = true_coco.images()[0:1].coco_images[0] >>> config = {} >>> true_dets = true_coco_img.annots().detections >>> video1 = true_coco_img.video >>> true_classes = true_coco.object_categories() >>> salient_channel = 'r' # pretend red is the salient channel >>> thresh_bins = np.linspace(0, 255, 1024) >>> info = single_image_segmentation_metrics( >>> pred_coco_img, true_coco_img, true_classes, true_dets, >>> thresh_bins=thresh_bins, config=config, video1=video1, salient_channel=salient_channel) """ if config is None: config = {} viz_thresh = config.get('viz_thresh', 'auto') score_space = config.get('score_space', 'auto') resolution = config.get('resolution', None) balance_area = config.get('balance_area', False) if score_space == 'auto': pred_vidid = pred_coco_img.img.get('video_id', None) true_vidid = true_coco_img.img.get('video_id', None) if true_vidid is not None or pred_vidid is not None: score_space = 'video' else: score_space = 'image' true_gid = true_coco_img.img['id'] pred_gid = pred_coco_img.img['id'] if thresh_bins is not None: if isinstance(thresh_bins, int): left_bin_edges = np.linspace(0, 1, thresh_bins) else: left_bin_edges = thresh_bins else: left_bin_edges = None img1 = true_coco_img.img if score_space == 'image': dsize = np.array((img1['width'], img1['height'])) elif score_space == 'video': dsize = np.array((video1['width'], video1['height'])) else: raise KeyError(score_space) if resolution is None: scale = None else: try: scale = true_coco_img._scalefactor_for_resolution(resolution=resolution, space=score_space) except Exception as ex: print(f'warning: ex={ex}') scale = None if scale is not None: dsize = np.ceil(np.array(dsize) * np.array(scale)).astype(int) row = { 'true_gid': true_gid, 'pred_gid': pred_gid, } if video1 is not None: row['video'] = video1['name'] shape = dsize[::-1] info = { 'row': row, 'shape': shape, } # TODO: parametarize these class categories # TODO: remove and generalize before porting to kwcoco ignore_classes = heuristics.IGNORE_CLASSNAMES background_classes = heuristics.BACKGROUND_CLASSES undistinguished_classes = heuristics.UNDISTINGUISHED_CLASSES context_classes = heuristics.CONTEXT_CLASSES negative_classes = heuristics.NEGATIVE_CLASSES # HACK! FIXME: There needs to be a clear definition of what classes are # scored and which are not. background_classes = background_classes | negative_classes """ The above heuristics should roughtly be: * ignore_classes - ignore, Unknown * background_classes - background, negative * undistinguished_classes - positive * context_classes - No Activity Post Construction inferred: * class_scored_classes - Site Preperation, Active Construction * salient_scored_classes - positive, Site Preperation, Active Construction """ # Determine what true/predicted categories are in common predicted_classes = [] for stream in pred_coco_img.channels.streams(): have = stream.intersection(true_classes) predicted_classes.extend(have.parsed) classes_of_interest = ub.oset(predicted_classes) - ( negative_classes | background_classes | ignore_classes | undistinguished_classes) # Determine if saliency has been predicted salient_class = salient_channel has_saliency = salient_class in pred_coco_img.channels # Load ground truth annotations if score_space == 'video': warp_img_to_vid = kwimage.Affine.coerce( true_coco_img.img.get('warp_img_to_vid', {'type': 'affine'})) true_dets = true_dets.warp(warp_img_to_vid) if scale is not None: true_dets = true_dets.scale(scale) info['true_dets'] = true_dets true_cidxs = true_dets.data['class_idxs'] true_ssegs = true_dets.data['segmentations'] true_catnames = list(ub.take(true_dets.classes.idx_to_node, true_cidxs)) # NOTE: The exact definition of how we build the "truth" segmentation mask # is up for debate. I think this is a reasonable definition, but this needs # to be reviewed. It also likely needs updating to become general and # remove the need for heuristics. # We might need to: # * add in a per-category weight canvas. This lets us say we can ignore # clas A when scoring class B. Is there an example where this is # relevant? # Does negative get moved to the background or scored? # Currently I'm just moving it to the background # How do we distinguish that # TODO: # Use the "valid_polygon" to zero out evaluations in invalid regions # Also use nan values in the predictions to do the same. # Combine these two measures. # Create a truth "panoptic segmentation" style mask for each task if has_saliency: # Truth for saliency-task true_saliency = np.zeros(shape, dtype=np.uint8) saliency_weights = np.ones(shape, dtype=np.float32) sseg_groups = { 'ignore': [], 'context': [], 'foreground': [], 'background': [], } for true_sseg, true_catname in zip(true_ssegs, true_catnames): if true_catname in background_classes: key = 'background' elif true_catname in ignore_classes: key = 'ignore' elif true_catname in context_classes: key = 'context' else: key = 'foreground' sseg_groups[key].append(true_sseg) if balance_area: if len(sseg_groups['foreground']): fg_poly = unary_union([p.to_shapely() for p in sseg_groups['foreground']]) unit_sseg_share = fg_poly.area / len(sseg_groups['foreground']) else: unit_sseg_share = 1 # background should be background, do nothing with it sseg_groups['background'] # Ignore context classes in saliency # Ignore no-activity and post-construction, ignore, and Unknown for true_sseg in sseg_groups['ignore']: saliency_weights = true_sseg.fill(saliency_weights, value=0) for true_sseg in sseg_groups['context']: # saliency_weights = true_sseg.fill(saliency_weights, value=0) ... # Score positive, site prep, and active construction. for true_sseg in sseg_groups['foreground']: true_saliency = true_sseg.fill(true_saliency, value=1) if balance_area: # Fill in the weights to upweight smaller areas. instance_weight = unit_sseg_share / true_sseg.area saliency_weights = true_sseg.fill(saliency_weights, value=instance_weight) # saliency_weights = saliency_weights / saliency_weights.max() if classes_of_interest: # Truth for class-task catname_to_true: Dict[str, np.ndarray] = { catname: np.zeros(shape, dtype=np.float32) for catname in classes_of_interest } class_weights = np.ones(shape, dtype=np.float32) initial_total_weight = class_weights.size sseg_groups = { 'background': [], 'ignore': [], 'undistinguished': [], 'foreground': [], } for true_sseg, true_catname in zip(true_ssegs, true_catnames): if true_catname in background_classes: key = 'background' elif true_catname in ignore_classes: key = 'ignore' elif true_catname in undistinguished_classes: key = 'undistinguished' else: key = 'foreground' true_sseg.meta['true_catname'] = true_catname sseg_groups[key].append(true_sseg) if balance_area: if len(sseg_groups['foreground']): fg_poly = unary_union([p.to_shapely() for p in sseg_groups['foreground']]) unit_sseg_share = fg_poly.area / len(sseg_groups['foreground']) else: unit_sseg_share = 1 true_sseg.area / initial_total_weight # background should be background, do nothing with it sseg_groups['background'] # Ignore context classes in saliency # Ignore no-activity and post-construction, ignore, and Unknown for true_sseg in sseg_groups['ignore']: class_weights = true_sseg.fill(class_weights, value=0) for true_sseg in sseg_groups['undistinguished']: class_weights = true_sseg.fill(class_weights, value=0) # Score positive, site prep, and active construction. for true_sseg in sseg_groups['foreground']: true_catname = true_sseg.meta['true_catname'] if balance_area: # Fill in the weights to upweight smaller areas. instance_weight = unit_sseg_share / true_sseg.area class_weights = true_sseg.fill(class_weights, value=instance_weight) catname_to_true[true_catname] = true_sseg.fill(catname_to_true[true_catname], value=1) # Hack: # normalize to 0-1, this downweights the background too much, but # I think fixes a upstream issue. Remove (or justify?) if possible. # class_weights = class_weights / class_weights.max() if classes_of_interest: # handle multiclass case pred_chan_of_interest = '|'.join(classes_of_interest) delayed_probs = pred_coco_img.imdelay( pred_chan_of_interest, space=score_space, resolution=resolution, nodata_method='float').as_xarray() # Do we need xarray anymore? class_probs = delayed_probs.finalize() invalid_mask = np.isnan(class_probs).all(axis=2) # import xdev # with xdev.embed_on_exception_context(before_embed=util_progress.ProgressManager.stopall): class_weights[invalid_mask] = 0 catname_to_prob = {} cx_to_binvecs = {} for cx, cname in enumerate(classes_of_interest): is_true = catname_to_true[cname] score = class_probs.loc[:, :, cname].data.copy() invalid_mask = np.isnan(score) weights = class_weights.copy() weights[invalid_mask] = 0 score[invalid_mask] = 0 pred_score = score.ravel() if left_bin_edges is not None: # round scores down to the nearest bin rounded_idx = np.searchsorted(left_bin_edges, pred_score) pred_score = left_bin_edges[rounded_idx] catname_to_prob[cname] = score bin_data = { # is_true denotes if the true class of the item is the # category of interest. 'is_true': is_true.ravel(), 'pred_score': pred_score, 'weight': weights.ravel(), } bin_data = kwarray.DataFrameArray(bin_data) bin_cfsn = BinaryConfusionVectors(bin_data, cx, classes_of_interest) # TODO: use me? # bin_measures = bin_cfsn.measures() # bin_measures.summary() cx_to_binvecs[cname] = bin_cfsn ovr_cfns = OneVsRestConfusionVectors(cx_to_binvecs, classes_of_interest) class_measures = ovr_cfns.measures() row['mAP'] = class_measures['mAP'] row['mAUC'] = class_measures['mAUC'] info.update({ 'class_weights': class_weights, 'class_measures': class_measures, 'catname_to_true': catname_to_true, 'catname_to_prob': catname_to_prob, }) if has_saliency: # TODO: consolidate this with above class-specific code salient_delay = pred_coco_img.imdelay(salient_class, space=score_space, resolution=resolution, nodata_method='float') salient_prob = salient_delay.finalize(nodata_method='float')[..., 0] salient_prob_orig = salient_prob.copy() invalid_mask = np.isnan(salient_prob) salient_prob[invalid_mask] = 0 try: saliency_weights[invalid_mask] = 0 except Exception: print(f'invalid_mask.shape={invalid_mask.shape}') print(f'saliency_weights.shape={saliency_weights.shape}') raise pred_score = salient_prob.ravel() if left_bin_edges is not None: rounded_idx = np.searchsorted(left_bin_edges, pred_score) pred_score = left_bin_edges[rounded_idx] bin_cfns = BinaryConfusionVectors(kwarray.DataFrameArray({ 'is_true': true_saliency.ravel(), 'pred_score': pred_score, 'weight': saliency_weights.ravel().astype(np.float32), })) salient_measures = bin_cfns.measures() salient_summary = salient_measures.summary() salient_metrics = { 'salient_' + k: v for k, v in ub.dict_isect(salient_summary, { 'ap', 'auc', 'max_f1'}).items() } try: # Requires kwcoco 0.8.3 salient_metrics['realpos_total'] = salient_measures['realpos_total'] salient_metrics['realneg_total'] = salient_measures['realneg_total'] submeasures = salient_measures['max_f1_submeasures'] salient_metrics['salient_max_f1_thresh'] = submeasures['thresh'] salient_metrics['salient_max_f1_ppv'] = submeasures['ppv'] salient_metrics['salient_max_f1_tpr'] = submeasures['tpr'] salient_metrics['salient_max_f1_fpr'] = submeasures['fpr'] salient_metrics['salient_max_f1_tnr'] = submeasures['tnr'] except Exception: ... row.update(salient_metrics) info.update({ 'salient_measures': salient_measures, 'salient_prob': salient_prob_orig, 'true_saliency': true_saliency, }) if 1: maximized_info = salient_measures.maximized_thresholds() # This cherry-picks a threshold per image! if viz_thresh == 'auto': cherry_picked_thresh = maximized_info['f1']['thresh'] saliency_thresh = cherry_picked_thresh else: saliency_thresh = viz_thresh pred_saliency = salient_prob > saliency_thresh y_true = true_saliency.ravel() y_pred = pred_saliency.ravel() sample_weight = saliency_weights.ravel() mat = skm.confusion_matrix(y_true, y_pred, labels=np.array([0, 1]), sample_weight=sample_weight) info.update({ 'mat': mat, 'pred_saliency': pred_saliency, 'saliency_thresh': saliency_thresh, 'saliency_weights': saliency_weights, }) # TODO: look at the category ranking at each pixel by score. # Is there a generalization of a confusion matrix to a ranking tensor? # if 0: # # TODO: Reintroduce hard-polygon segmentation scoring? # # Score hard-threshold predicted annotations # # SCORE PREDICTED ANNOTATIONS # # Create a pred "panoptic segmentation" style mask # pred_saliency = np.zeros(shape, dtype=np.uint8) # pred_dets = pred_coco.annots(gid=gid2).detections # for pred_sseg in pred_dets.data['segmentations']: # pred_saliency = pred_sseg.fill(pred_saliency, value=1) return info
@ub.memoize def _memo_legend(label_to_color): import kwplot legend_img = kwplot.make_legend_img(label_to_color) return legend_img
[docs] def draw_confusion_image(pred, target): canvas = np.zeros_like(pred) np.putmask(canvas, (target == 0) & (pred == 0), 0) # true-neg np.putmask(canvas, (target == 1) & (pred == 1), 1) # true-pos np.putmask(canvas, (target == 1) & (pred == 0), 2) # false-neg np.putmask(canvas, (target == 0) & (pred == 1), 3) # false-pos return canvas
[docs] @profile def colorize_class_probs(probs, classes): """ probs = pred_cat_ohe classes = pred_classes """ # color = classes.graph.nodes[node].get('color', None) # Define default colors # default_cidx_to_color = kwimage.Color.distinct(len(data)) # try and read colors from classes CategoryTree # try: # cidx_to_color = [] cidx_to_color = [] for cidx in range(len(probs)): node = classes[cidx] color = classes.graph.nodes[node].get('color', None) if color is not None: color = kwimage.Color(color).as01() cidx_to_color.append(color) import distinctipy have_colors = [c for c in cidx_to_color if c is not None] num_need = sum(c is None for c in cidx_to_color) if num_need: new_colors = distinctipy.get_colors( num_need, exclude_colors=have_colors, rng=569944) new_color_iter = iter(new_colors) cidx_to_color = [next(new_color_iter) if c is None else c for c in cidx_to_color] canvas_dtype = np.float32 # Each class gets its own color, and modulates the alpha h, w = probs.shape[-2:] layer_shape = (h, w, 4) background = np.zeros(layer_shape, dtype=canvas_dtype) background[..., 3] = 1.0 layers = [] for cidx, chan in enumerate(probs): color = cidx_to_color[cidx] layer = np.empty(layer_shape, dtype=canvas_dtype) layer[..., 3] = chan layer[..., 0:3] = color layers.append(layer) layers.append(background) colormask = kwimage.overlay_alpha_layers( layers, keepalpha=False, dtype=canvas_dtype) return colormask
[docs] @profile def draw_truth_borders(true_dets, canvas, alpha=1.0, color=None): true_sseg = true_dets.data['segmentations'] true_cidxs = true_dets.data['class_idxs'] _classes = true_dets.data['classes'] if color is None: _nodes = ub.take(_classes.idx_to_node, true_cidxs) _node_data = ub.take(_classes.graph.nodes, _nodes) _node_colors = [d['color'] for d in _node_data] color = _node_colors canvas = kwimage.ensure_float01(canvas) if alpha < 1.0: # remove this condition when kwimage 0.8.3 is released always take else empty_canvas = np.zeros_like(canvas, shape=(canvas.shape[0:2] + (4,))) overlay_canvas = true_sseg.draw_on(empty_canvas, fill=False, border=True, color=color, alpha=1.0) overlay_canvas[..., 3] *= alpha canvas = kwimage.overlay_alpha_images(overlay_canvas, canvas) else: canvas = true_sseg.draw_on(canvas, fill=False, border=True, color=color, alpha=alpha) return canvas
[docs] @profile def dump_chunked_confusion(full_classes, true_coco_imgs, chunk_info, heatmap_dpath, title=None, config=None): """ Draw a a sequence of true/pred image predictions """ color_labels = ['TN', 'TP', 'FN', 'FP'] score_space = config.get('score_space', 'video') colors = list(ub.take(heuristics.CONFUSION_COLOR_SCHEME, color_labels)) # colors = ['blue', 'green', 'yellow', 'red'] # colors = ['black', 'white', 'yellow', 'red'] color_lut = np.array([kwimage.Color(c).as255() for c in colors]) # full_classes: kwcoco.CategoryTree = true_coco.object_categories() if config is None: config = {} resolution = config.get('resolution', None) # Make a legend color01_lut = color_lut / 255.0 legend_images = [] draw_legend = config.get('draw_legend', True) if 'catname_to_prob' in chunk_info[0]: # Class Legend label_to_color = { node: kwimage.Color(data['color']).as01() for node, data in full_classes.graph.nodes.items()} label_to_color = ub.sorted_keys(label_to_color) if draw_legend: legend_img_class = _memo_legend(label_to_color) legend_images.append(legend_img_class) if 'pred_saliency' in chunk_info[0]: # Confusion Legend label_to_color = ub.dzip(color_labels, color01_lut) if draw_legend: legend_img_saliency_cfsn = _memo_legend(label_to_color) legend_img_saliency_cfsn = kwimage.ensure_uint255(legend_img_saliency_cfsn) legend_images.append(legend_img_saliency_cfsn) if len(legend_images): legend_img = kwimage.stack_images(legend_images, axis=0, pad=5) else: legend_img = None # Draw predictions on each frame parts = [] frame_nums = [] true_gids = [] unique_vidnames = set() for info, true_coco_img in zip(chunk_info, true_coco_imgs): row = info['row'] if row.get('video', ''): unique_vidnames.add(row['video']) # true_gid = row['true_gid'] # true_coco_img = true_coco.coco_image(true_gid) true_gid = true_coco_img.img['id'] true_img = true_coco_img.img frame_index = true_img.get('frame_index', None) if frame_index is not None: frame_nums.append(frame_index) true_gids.append(true_gid) # image_header_text = f'{frame_index} - gid = {true_gid}' header_lines = heuristics.build_image_header_text( img=true_img, name=None, _header_extra=None, ) # date_captured = true_img.get('date_captured', '') # frame_index = true_img.get('frame_index', None) # gid = true_img.get('id', None) # sensor_coarse = true_img.get('sensor_coarse', 'unknown') # _header_extra = None # header_line_infos = [ # [f'gid={gid}, frame={frame_index}', _header_extra], # [sensor_coarse, date_captured], # ] # header_lines = [] # for line_info in header_line_infos: # header_line = ' '.join([p for p in line_info if p]) # if header_line: # header_lines.append(header_line) image_header_text = '\n'.join(header_lines) imgw = info['shape'][1] # SC_smt_it_stm_p8_newanns_weighted_raw_v39_epoch=52-step=2269088 header = kwimage.draw_header_text( {'width': imgw}, # image=confusion_image, # image=None, text=image_header_text, color='red', stack=False) vert_parts = [ header, ] DRAW_WEIGHTS = config.get('draw_weights', False) if 'catname_to_prob' in info: true_dets = info['true_dets'] true_dets.data['classes'] = full_classes pred_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_prob'].keys())) true_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_true'].keys())) # todo: ensure colors are robust and consistent for node in pred_classes.graph.nodes(): pred_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color'] for node in true_classes.graph.nodes(): true_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color'] # pred_classes = kwcoco.CategoryTree pred_cat_ohe = np.stack(list(info['catname_to_prob'].values())) true_cat_ohe = np.stack(list(info['catname_to_true'].values())) # class_pred_idx = pred_cat_ohe.argmax(axis=0) # class_true_idx = true_cat_ohe.argmax(axis=0) true_overlay = colorize_class_probs(true_cat_ohe, true_classes)[..., 0:3] # true_heatmap = kwimage.Heatmap(class_probs=true_cat_ohe, classes=true_classes) # true_overlay = true_heatmap.colorize('class_probs')[..., 0:3] true_overlay = draw_truth_borders(true_dets, true_overlay, alpha=1.0) true_overlay = kwimage.ensure_uint255(true_overlay) true_overlay = kwimage.draw_text_on_image( true_overlay, 'true class', org=(1, 1), valign='top', color=TRUE_GREEN, border=1) vert_parts.append(true_overlay) if DRAW_WEIGHTS: class_weights = info['class_weights'] if class_weights.max() > 1: weight_image = kwarray.normalize(class_weights, min_val=0) weight_title = 'weights (normed)' else: weight_image = class_weights weight_title = 'weights' weight_image = kwimage.ensure_uint255(weight_image) weight_image = kwimage.draw_text_on_image( weight_image, weight_title, org=(1, 1), valign='top', color='pink', border=1) vert_parts.append(weight_image) pred_overlay = colorize_class_probs(pred_cat_ohe, pred_classes)[..., 0:3] # pred_heatmap = kwimage.Heatmap(class_probs=pred_cat_ohe, classes=pred_classes) # pred_overlay = pred_heatmap.colorize('class_probs')[..., 0:3] pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05, color='white') # pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05) pred_overlay = kwimage.ensure_uint255(pred_overlay) pred_overlay = kwimage.draw_text_on_image( pred_overlay, 'pred class', org=(1, 1), valign='top', color=PRED_BLUE, border=1) vert_parts.append(pred_overlay) if 'pred_saliency' in info: pred_saliency = info['pred_saliency'].astype(np.uint8) true_saliency = info['true_saliency'] saliency_thresh = info['saliency_thresh'] confusion_idxs = draw_confusion_image(pred_saliency, true_saliency) confusion_image = color_lut[confusion_idxs] confusion_image = kwimage.ensure_uint255(confusion_image) confusion_image = kwimage.draw_text_on_image( confusion_image, f'confusion saliency: thresh={saliency_thresh:0.3f}', org=(1, 1), valign='top', color='white', border=1) vert_parts.append( confusion_image ) if DRAW_WEIGHTS: saliency_weights = info['saliency_weights'] if saliency_weights.max() > 1: weight_image = kwarray.normalize(saliency_weights, min_val=0) weight_title = 'weights (normed)' else: weight_image = saliency_weights weight_title = 'weights' weight_image = kwimage.ensure_uint255(weight_image) weight_image = kwimage.draw_text_on_image( weight_image, weight_title, org=(1, 1), valign='top', color='pink', border=1) vert_parts.append(weight_image) elif 'true_saliency' in info: true_saliency = info['true_saliency'] true_saliency = true_saliency.astype(np.float32) heatmap = kwimage.make_heatmask( true_saliency, with_alpha=0.5, cmap='plasma') # heatmap[invalid_mask] = 0 heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3]) heatmap_int = kwimage.draw_text_on_image( heatmap_int, 'true saliency', org=(1, 1), valign='top', color=TRUE_GREEN, border=1) vert_parts.append(heatmap_int) # confusion_image = kwimage.draw_text_on_image( # confusion_image, image_text, org=(1, 1), valign='top', # color='white', border={'color': 'black'}) # TODO: # Can we show the reference image? # TODO: # Show the datetime on the top of the image (and the display band?) real_image_norm = None real_image_int = None TRY_IMREAD = 1 if TRY_IMREAD: avali_chans = {p2 for p1 in true_coco_img.channels.spec.split(',') for p2 in p1.split('|')} chosen_viz_channs = None if len(avali_chans & {'red', 'green', 'blue'}) == 3: chosen_viz_channs = 'red|green|blue' elif len(avali_chans & {'r', 'g', 'b'}) == 3: chosen_viz_channs = 'r|g|b' elif len(avali_chans & {'pan'}) == 3: chosen_viz_channs = 'pan' else: chosen_viz_channs = true_coco_img.primary_asset()['channels'] try: real_image = true_coco_img.imdelay(chosen_viz_channs, space=score_space, nodata_method='float', resolution=resolution).finalize()[:] real_image_norm = kwimage.normalize_intensity(real_image) real_image_norm = kwimage.fill_nans_with_checkers(real_image_norm) real_image_int = kwimage.ensure_uint255(real_image_norm) except Exception as ex: print('ex = {!r}'.format(ex)) TRY_SOFT = 1 salient_prob = None if TRY_SOFT: salient_prob = info.get('salient_prob', None) # invalid_mask = info.get('invalid_mask', None) if salient_prob is not None: invalid_mask = np.isnan(salient_prob) heatmap = kwimage.make_heatmask( salient_prob, with_alpha=0.5, cmap='plasma') heatmap[invalid_mask] = np.nan heatmap = kwimage.fill_nans_with_checkers(heatmap) # heatmap[invalid_mask] = 0 heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3]) heatmap_int = kwimage.draw_text_on_image( heatmap_int, 'pred saliency', org=(1, 1), valign='top', color=PRED_BLUE, border=1) vert_parts.append(heatmap_int) # if real_image_norm is not None: # overlaid = kwimage.overlay_alpha_layers([heatmap, real_image_norm.mean(axis=2)]) # overlaid = kwimage.ensure_uint255(overlaid[..., 0:3]) # vert_parts.append(overlaid) if real_image_int is not None: vert_parts.append(real_image_int) vert_parts = [kwimage.ensure_uint255(c) for c in vert_parts] vert_stack = kwimage.stack_images(vert_parts, axis=0) parts.append(vert_stack) max_frame = None if len(frame_nums) == 0 else max(frame_nums) min_frame = None if len(frame_nums) == 0 else min(frame_nums) max_gid = max(true_gids) min_gid = min(true_gids) try: # num_digits = _max_digits(max_num) # TODO if max_frame == min_frame: frame_part = f'{min_frame:04d}' else: frame_part = f'{min_frame:04d}-{max_frame:04d}' except TypeError: frame_part = f'{min_frame}' try: if max_gid == min_gid: gid_part = f'{min_gid:04d}' else: gid_part = f'{min_gid:04d}-{max_gid:04d}' except TypeError: gid_part = f'{min_gid}' vidname_part = '_'.join(list(unique_vidnames)) if not vidname_part: vidname_part = '_loose_images' plot_fstem = f'{vidname_part}-{frame_part}-{gid_part}' canvas_title_parts = [] if title: canvas_title_parts.append(title) canvas_title_parts.append(plot_fstem) canvas_title = '\n'.join(canvas_title_parts) plot_canvas = kwimage.stack_images(parts, axis=1, overlap=-10) if draw_legend: if legend_img is not None: plot_canvas = kwimage.stack_images( [plot_canvas, legend_img], axis=1, overlap=-10) header = kwimage.draw_header_text( {'width': plot_canvas.shape[1]}, canvas_title) plot_canvas = kwimage.stack_images([header, plot_canvas], axis=0) heatmap_dpath = ub.Path(str(heatmap_dpath)) vid_plot_dpath = (heatmap_dpath / vidname_part).ensuredir() plot_fpath = vid_plot_dpath / (plot_fstem + '.jpg') kwimage.imwrite(str(plot_fpath), plot_canvas)
[docs] @profile def evaluate_segmentations(true_coco, pred_coco, eval_dpath=None, eval_fpath=None, config=None): """ TODO: - [ ] Fold non-critical options into the config CommandLine: XDEV_PROFILE=1 xdoctest -m geowatch.tasks.fusion.evaluate evaluate_segmentations Example: >>> from geowatch.tasks.fusion.evaluate import * # NOQA >>> from kwcoco.coco_evaluator import CocoEvaluator >>> from kwcoco.demo.perterb import perterb_coco >>> import kwcoco >>> true_coco1 = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64)) >>> true_coco2 = kwcoco.CocoDataset.demo('shapes2', image_size=(64, 64)) >>> #true_coco1 = kwcoco.CocoDataset.demo('vidshapes9') >>> #true_coco2 = kwcoco.CocoDataset.demo('shapes128') >>> true_coco = kwcoco.CocoDataset.union(true_coco1, true_coco2) >>> kwargs = { >>> 'box_noise': 0.5, >>> 'n_fp': (0, 10), >>> 'n_fn': (0, 10), >>> 'with_probs': True, >>> 'with_heatmaps': True, >>> 'verbose': 1, >>> } >>> # TODO: it would be nice to demo the soft metrics >>> # functionality by adding "salient_prob" or "class_prob" >>> # auxiliary channels to this demodata. >>> print('perterbing') >>> pred_coco = perterb_coco(true_coco, **kwargs) >>> eval_dpath = ub.Path.appdir('geowatch/tests/fusion_eval').ensuredir() >>> print('eval_dpath = {!r}'.format(eval_dpath)) >>> config = {} >>> config['score_space'] = 'image' >>> draw_curves = 'auto' >>> draw_heatmaps = 'auto' >>> #draw_heatmaps = False >>> config['workers'] = 'min(avail-2,6)' >>> #workers = 0 >>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config) Example: >>> # xdoctest: +REQUIRES(env:SLOW_DOCTEST) >>> from geowatch.tasks.fusion.evaluate import * # NOQA >>> from kwcoco.coco_evaluator import CocoEvaluator >>> from kwcoco.demo.perterb import perterb_coco >>> import kwcoco >>> true_coco = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64)) >>> kwargs = { >>> 'box_noise': 0.5, >>> 'n_fp': (0, 10), >>> 'n_fn': (0, 10), >>> 'with_probs': True, >>> 'with_heatmaps': True, >>> 'verbose': 1, >>> } >>> # TODO: it would be nice to demo the soft metrics >>> # functionality by adding "salient_prob" or "class_prob" >>> # auxiliary channels to this demodata. >>> print('perterbing') >>> pred_coco = perterb_coco(true_coco, **kwargs) >>> eval_dpath = ub.Path.appdir('geowatch/tests/fusion_eval-video').ensuredir() >>> print('eval_dpath = {!r}'.format(eval_dpath)) >>> config = {} >>> config['score_space'] = 'video' >>> config['balance_area'] = True >>> draw_curves = 'auto' >>> draw_heatmaps = 'auto' >>> #draw_heatmaps = False >>> config['workers'] = 'min(avail-2,6)' >>> #workers = 0 >>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config) """ import rich from kwutil import process_context from kwutil import util_progress from kwutil import util_parallel if config is None: config = {} draw_curves = config.get('draw_curves', 'auto') draw_heatmaps = config.get('draw_heatmaps', 'auto') score_space = config.get('score_space', 'auto') draw_workers = config.get('draw_workers', 'auto') if score_space == 'auto': if true_coco.n_videos: score_space = 'video' else: score_space = 'image' config['score_space'] = score_space # Ensure each class has colors. heuristics.ensure_heuristic_coco_colors(true_coco) true_classes = list(true_coco.object_categories()) full_classes: kwcoco.CategoryTree = true_coco.object_categories() # Sometimes supercategories dont get colors, this fixes that. heuristics.ensure_heuristic_category_tree_colors(full_classes) workers = util_parallel.coerce_num_workers(config.get('workers', 0)) if draw_workers == 'auto': draw_workers = min(2, workers) else: draw_workers = util_parallel.coerce_num_workers(draw_workers) # Extract metadata about the predictions to persist meta = {} meta['info'] = info = [] if pred_coco.fpath is not None: pred_fpath = ub.Path(pred_coco.fpath) meta['pred_name'] = '_'.join((list(pred_fpath.parts[-2:-1]) + [pred_fpath.stem])) predicted_info = pred_coco.dataset.get('info', []) for item in predicted_info: if item.get('type', None) == 'measure': info.append(item) if item.get('type', None) == 'process': proc_name = item.get('properties', {}).get('name', None) if proc_name == 'geowatch.tasks.fusion.predict': package_fpath = item['properties']['config'].get('package_fpath') if 'title' not in item: item['title'] = ub.Path(package_fpath).stem if 'package_name' not in item: item['package_name'] = ub.Path(package_fpath).stem # FIXME: title should also include pred-config info meta['title'] = item['title'] meta['package_name'] = item['package_name'] info.append(item) # Title contains the model package name if we can infer it package_name = meta.get('package_name', '') pred_name = meta.get('pred_name', '') title_parts = [p for p in [package_name, pred_name] if p] resolution = config.get('resolution', None) balance_area = config.get('balance_area', False) if resolution is not None: title_parts.append(f'space={score_space} @ {resolution}, balance_area={balance_area}') else: title_parts.append(f'space={score_space} balance_area={balance_area}') meta['title_parts'] = title_parts title = meta['title'] = ' - '.join(title_parts) required_marked = 'auto' # parametarize if required_marked == 'auto': # In "auto" mode dont require marks if all images are unmarked, # otherwise assume that we should restirct to marked images required_marked = any(pred_coco.images().lookup('has_predictions', False)) matches = kwcoco_extensions.associate_images( true_coco, pred_coco, key_fallback='id') video_matches = matches['video'] image_matches = matches['image'] n_vid_matches = len(video_matches) n_img_per_vid_matches = [len(d['match_gids1']) for d in video_matches] n_img_matches = len(image_matches['match_gids1']) print('n_img_per_vid_matches = {}'.format(ub.urepr(n_img_per_vid_matches, nl=1))) print('n_vid_matches = {}'.format(ub.urepr(n_vid_matches, nl=1))) print('n_img_matches = {!r}'.format(n_img_matches)) rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]') chunk_size = 5 num_thresh_bins = config.get('thresh_bins', 32 * 32) thresh_bins = np.linspace(0, 1, num_thresh_bins) # this is more stable using an ndarray if draw_curves == 'auto': draw_curves = bool(eval_dpath is not None) if draw_heatmaps == 'auto': draw_heatmaps = bool(eval_dpath is not None) pcontext = process_context.ProcessContext( name='geowatch.tasks.fusion.evaluate', config=config, ) pcontext.start() if eval_dpath is None: heatmap_dpath = None else: eval_dpath = ub.Path(eval_dpath) curve_dpath = (eval_dpath / 'curves').ensuredir() pcontext.write_invocation(curve_dpath / 'invocation.sh') # Objects that will aggregate confusion across multiple images salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins) class_measure_combiner = OneVersusRestMeasureCombiner(thresh_bins=thresh_bins) # Gather the true and predicted image pairs to be scored total_images = 0 if required_marked: for video_match in video_matches: gids1 = video_match['match_gids1'] gids2 = video_match['match_gids2'] flags = pred_coco.images(gids2).lookup('has_predictions', False) video_match['match_gids1'] = list(ub.compress(gids1, flags)) video_match['match_gids2'] = list(ub.compress(gids2, flags)) total_images += len(gids1) gids1 = image_matches['match_gids1'] gids2 = image_matches['match_gids2'] flags = pred_coco.images(gids2).lookup('has_predictions', False) image_matches['match_gids1'] = list(ub.compress(gids1, flags)) image_matches['match_gids2'] = list(ub.compress(gids2, flags)) total_images += len(gids1) else: total_images = None # Prepare job pools print('workers = {!r}'.format(workers)) print('draw_workers = {!r}'.format(draw_workers)) # draw_executor = ub.Executor(mode='process', max_workers=draw_workers) # metrics_executor = ub.Executor(mode='process', max_workers=workers) # We want to prevent too many evaluate jobs from piling up results to draw, # as it takes longer to draw than it does to score. For this reason, block # if the draw queue gets too big. metrics_executor = _DelayedBlockingJobQueue(max_unhandled_jobs=max(1, workers), mode='process', max_workers=workers) draw_executor = MaxQueuePool(mode='process', max_workers=draw_workers, max_queue_size=draw_workers * 4) prog = ub.ProgIter(total=total_images, desc='submit scoring jobs', adjust=False, freq=1) prog.begin() job_chunks = [] draw_jobs = [] # Submit scoring jobs over pairs of true-predicted images in videos for video_match in video_matches: prog.set_postfix_str('comparing ' + video_match['vidname']) gids1 = video_match['match_gids1'] gids2 = video_match['match_gids2'] if required_marked: flags = pred_coco.images(gids2).lookup('has_predictions', False) gids1 = list(ub.compress(gids1, flags)) gids2 = list(ub.compress(gids2, flags)) current_chunk = [] for gid1, gid2 in zip(gids1, gids2): pred_coco_img = pred_coco.coco_image(gid1).detach() true_coco_img = true_coco.coco_image(gid2).detach() true_dets = true_coco.annots(gid=gid1).detections vidid1 = true_coco.imgs[gid1]['video_id'] video1 = true_coco.index.videos[vidid1] job = metrics_executor.submit( single_image_segmentation_metrics, pred_coco_img, true_coco_img, true_classes, true_dets, video1, thresh_bins=thresh_bins, config=config) if len(current_chunk) >= chunk_size: job_chunks.append(current_chunk) current_chunk = [] current_chunk.append(job) prog.update() if len(current_chunk) > 0: job_chunks.append(current_chunk) # Submit scoring jobs over pairs of true-predicted images without videos if score_space == 'image': gids1 = image_matches['match_gids1'] gids2 = image_matches['match_gids2'] gid_pairs = list(zip(gids1, gids2)) # Might want to vary the order (or shuffle) depending on user input gid_pairs = sorted(gid_pairs, key=lambda x: x[0]) # TODO: modify to prevent to many unhandled jobs from building up and # causing memory issues. Maybe with kwutil.BlockingJobQueue for gid1, gid2 in gid_pairs: pred_coco_img = pred_coco.coco_image(gid1).detach() true_coco_img = true_coco.coco_image(gid2).detach() true_dets = true_coco.annots(gid=gid1).detections video1 = None job = metrics_executor.submit( single_image_segmentation_metrics, pred_coco_img, true_coco_img, true_classes, true_dets, video1, thresh_bins=thresh_bins, config=config) prog.update() job_chunks.append([job]) else: if len(image_matches['match_gids1']) > 0: warnings.warn(ub.paragraph( f''' Scoring was requested in video mode, but there are {len(image_matches['match_gids1'])} true/pred image pairs that are unassociated with a video. These pairs will not be included in video space scoring. ''')) prog.end() num_jobs = sum(map(len, job_chunks)) RICH_PROG = 'auto' if RICH_PROG == 'auto': # Use rich outside of slurm RICH_PROG = not os.environ.get('SLURM_JOBID', '') pman = util_progress.ProgressManager(backend='rich' if RICH_PROG else 'progiter') DEBUG = 0 if DEBUG: orig_infos = [] VERBOSE_DEBUG = 0 rows = [] with pman: score_prog = pman.progiter(desc="[cyan] Scoring...", total=num_jobs) score_prog.start() if draw_heatmaps: draw_prog = pman.progiter(desc="[green] Drawing...", total=len(job_chunks)) draw_prog.start() for job_chunk in job_chunks: chunk_info = [] for job in job_chunk: info = job.result() if VERBOSE_DEBUG: print('Gather job result') if DEBUG: orig_infos.append(info) score_prog.update(1) rows.append(info['row']) if VERBOSE_DEBUG: print(f'Add new row: {info["row"]}') print(f'Table size: {len(rows)}') class_measures = info.get('class_measures', None) salient_measures = info.get('salient_measures', None) if salient_measures is not None: salient_measure_combiner.submit(salient_measures) if class_measures is not None: class_measure_combiner.submit(class_measures) if draw_heatmaps: chunk_info.append(info) # Once a job chunk is done, clear its memory if VERBOSE_DEBUG: print(f'Clear job chunk of len {len(job_chunk)}') job = None job_chunk.clear() # Reduce measures over the chunk if salient_measure_combiner.queue_size > chunk_size: salient_measure_combiner.combine() if class_measure_combiner.queue_size > chunk_size: class_measure_combiner.combine() if draw_heatmaps: heatmap_dpath = (ub.Path(eval_dpath) / 'heatmaps').ensuredir() # Let the draw executor release any memory it can remaining_draw_jobs = [] if VERBOSE_DEBUG: print(f'Handle {len(draw_jobs)} draw jobs') for draw_job in draw_jobs: if draw_job.done(): draw_job.result() draw_prog.update(1) else: remaining_draw_jobs.append(draw_job) draw_job = None draw_jobs = remaining_draw_jobs if VERBOSE_DEBUG: print(f'Remaining draw jobs: {len(draw_jobs)}') # As chunks of evaluation jobs complete, submit background jobs to # draw results to disk if requested. true_gids = [info['row']['true_gid'] for info in chunk_info] true_coco_imgs = true_coco.images(true_gids).coco_images true_coco_imgs = [g.detach() for g in true_coco_imgs] if VERBOSE_DEBUG: print(f'Submit {len(true_gids)} new draw jobs') draw_job = draw_executor.submit( dump_chunked_confusion, full_classes, true_coco_imgs, chunk_info, heatmap_dpath, title=title, config=config) draw_jobs.append(draw_job) if VERBOSE_DEBUG: print('Finished metric jobs') metrics_executor.shutdown() if draw_heatmaps: # Allow all drawing jobs to finalize if VERBOSE_DEBUG: print(f'Finalize {len(draw_jobs)} draw jobs') while draw_jobs: job = draw_jobs.pop() job.result() draw_prog.update(1) draw_executor.shutdown() df = pd.DataFrame(rows) df_summary = df.describe().T print('Per Image Pixel Measures') rich.print(df) rich.print(df_summary.to_string()) if eval_dpath is not None: perimage_table_fpath = eval_dpath / 'perimage_table.json' perimage_summary_fpath = eval_dpath / 'perimage_summary.json' perimage_table_fpath.write_text(df.to_json(orient='table', indent=4)) perimage_summary_fpath.write_text(df_summary.to_json(orient='table', indent=4)) # Finalize all of the aggregated measures print('Finalize salient measures') # Note: this will return False if there are no salient measures salient_combo_measures = salient_measure_combiner.finalize() if salient_combo_measures is False or salient_combo_measures is None: # Use nan measures from empty binary confusion vectors salient_combo_measures = BinaryConfusionVectors(None).measures() # print('salient_combo_measures = {!r}'.format(salient_combo_measures)) if DEBUG: # Redo salient combine tocombine = [] for p in tocombine: z = ub.dict_isect(p, {'fp_count', 'tp_count', 'fn_count', 'tn_count', 'thresholds', 'nsupport'}) print(ub.urepr(ub.map_vals(list, z), nl=0)) salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins) print('salient_combo_measures.__dict__ = {!r}'.format(salient_combo_measures.__dict__)) # precision = None # growth = None from kwcoco.metrics.confusion_measures import Measures for info in orig_infos: class_measures = info.get('class_measures', None) salient_measures = info.get('salient_measures', None) if salient_measures is not None: tocombine.append(salient_measures) salient_measure_combiner.submit(salient_measures) combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() print('combo = {!r}'.format(combo)) combo = Measures.combine(tocombine, precision=2) combo.reconstruct() print('combo = {!r}'.format(combo)) combo = Measures.combine(tocombine, growth='max') combo.reconstruct() print('combo = {!r}'.format(combo)) salient_combo_measures = salient_measure_combiner.finalize() print('salient_combo_measures = {!r}'.format(salient_combo_measures)) print('Finalize class measures') class_combo_measure_dict = class_measure_combiner.finalize() ovr_combo_measures = class_combo_measure_dict['perclass'] # Combine class + salient measures using the "SingleResult" container # (TODO: better API) result = CocoSingleResult( salient_combo_measures, ovr_combo_measures, None, meta) rich.print('result = {}'.format(result)) meta['info'].append(pcontext.stop()) if salient_combo_measures is not None: if eval_dpath is not None: if isinstance(salient_combo_measures, dict): salient_combo_measures['meta'] = meta title = '\n'.join(meta.get('title_parts', [meta.get('title', '')])) if eval_fpath is None: eval_fpath = curve_dpath / 'measures2.json' print('Dump eval_fpath={}'.format(eval_fpath)) result.dump(os.fspath(eval_fpath)) if draw_curves: import kwplot # kwplot.autompl() with kwplot.BackendContext('agg'): fig = kwplot.figure(doclf=True) print('Dump salient figures') salient_combo_measures.summary_plot(fnum=1, title=title) fig = kwplot.autoplt().gcf() fig.savefig(str(curve_dpath / 'salient_summary.png')) print('Dump class figures') result.dump_figures(curve_dpath, expt_title=title) summary = {} if class_combo_measure_dict is not None: summary['class_mAP'] = class_combo_measure_dict['mAP'] summary['class_mAUC'] = class_combo_measure_dict['mAUC'] if salient_combo_measures is not None: summary['salient_ap'] = salient_combo_measures['ap'] summary['salient_auc'] = salient_combo_measures['auc'] summary['salient_max_f1'] = salient_combo_measures['max_f1'] rich.print('summary = {}'.format(ub.urepr( summary, nl=1, precision=4, align=':', sort=0))) rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]') print(f'eval_fpath={eval_fpath}') return df
class _DelayedFuture: """ Wraps a future object so we can execute logic when its result has been accessed. """ def __init__(self, func, args, kwargs, parent): self.func = func self.args = args self.kwargs = kwargs self.task = (func, args, kwargs) self.parent = parent self.future = None def result(self, timeout=None): if self.future is None: raise Exception('The task has not been submitted yet') result = self.future.result(timeout) self.parent._job_result_accessed_callback(self) return result class _DelayedBlockingJobQueue: """ References: .. [GISTnoxdafoxMaxQueuePool] https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e Ignore: >>> self = _DelayedBlockingJobQueue(max_unhandled_jobs=5) >>> futures = [ >>> self.submit(print, i) >>> for i in range(10) >>> ][::-1] >>> import time >>> time.sleep(0.5) >>> print(self._num_submitted_jobs) >>> print(self._num_handled_results) >>> print('--- First 5 should have printed ---') >>> for _ in range(3): >>> f = futures.pop() >>> f.result() >>> time.sleep(0.5) >>> print(self._num_submitted_jobs) >>> print(self._num_handled_results) >>> print('--- 3 Results were haneld, so 3 more can join the queue') >>> for _ in range(3): >>> f = futures.pop() >>> f.result() >>> time.sleep(0.5) >>> print(self._num_submitted_jobs) >>> print(self._num_handled_results) >>> print('--- Handling the rest, but everything should have already been submitted') >>> for _ in range(4): >>> f = futures.pop() >>> f.result() """ def __init__(self, max_unhandled_jobs, mode='thread', max_workers=None): from collections import deque self._unsubmitted = deque() self.pool = ub.Executor(mode=mode, max_workers=max_workers) self.max_unhandled_jobs = max_unhandled_jobs self._num_handled_results = 0 self._num_submitted_jobs = 0 self._num_unhandled = 0 def submit(self, func, *args, **kwargs): """ Queues a new job, but wont execute until some conditions are met """ delayed = _DelayedFuture(func, args, kwargs, parent=self) self._unsubmitted.append(delayed) self._submit_if_room() return delayed def _submit_if_room(self): while self._num_unhandled < self.max_unhandled_jobs and self._unsubmitted: delayed = self._unsubmitted.popleft() self._num_submitted_jobs += 1 self._num_unhandled += 1 delayed.future = self.pool.submit(delayed.func, *delayed.args, **delayed.kwargs) def _job_result_accessed_callback(self, _): """Called when the user handles a result """ self._num_handled_results += 1 self._num_unhandled -= 1 self._submit_if_room() def shutdown(self): """ Calls the shutdown function of the underlying backend. """ return self.pool.shutdown()
[docs] class MaxQueuePool: """ This Class wraps a concurrent.futures.Executor limiting the size of its task queue. If `max_queue_size` tasks are submitted, the next call to submit will block until a previously submitted one is completed. References: .. [GISTnoxdafoxMaxQueuePool] https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e Ignore: import sys, ubelt sys.path.append(ubelt.expandpath('~/code/geowatch')) from geowatch.tasks.fusion.evaluate import * # NOQA from geowatch.tasks.fusion.evaluate import _memo_legend, _redraw_measures self = MaxQueuePool(max_queue_size=0) dpath = ub.Path.appdir('kwutil/doctests/maxpoolqueue') dpath.delete().ensuredir() signal_fpath = dpath / 'signal' def waiting_worker(): counter = 0 while not signal_fpath.exists(): counter += 1 return counter future = self.submit(waiting_worker) try: future.result(timeout=0.001) except TimeoutError: ... signal_fpath.touch() result = future.result() """ def __init__(self, max_queue_size=None, mode='thread', max_workers=0): if max_queue_size is None: max_queue_size = max_workers self.pool = ub.Executor(mode=mode, max_workers=max_workers) if 'serial' in self.pool.backend.__class__.__name__.lower(): self.pool_queue = None else: from threading import BoundedSemaphore # NOQA self.pool_queue = BoundedSemaphore(max_queue_size)
[docs] def submit(self, function, *args, **kwargs): """Submits a new task to the pool, blocks if Pool queue is full.""" if self.pool_queue is not None: self.pool_queue.acquire() future = self.pool.submit(function, *args, **kwargs) future.add_done_callback(self.pool_queue_callback) return future
[docs] def pool_queue_callback(self, _): """Called once task is done, releases one queue slot.""" if self.pool_queue is not None: self.pool_queue.release()
[docs] def shutdown(self): """ Calls the shutdown function of the underlying backend. """ return self.pool.shutdown()
def _redraw_measures(eval_dpath): """ hack helper for developer, not critical """ curve_dpath = ub.Path(eval_dpath) / 'curves' measures_fpath = curve_dpath / 'measures.json' with open(measures_fpath, 'r') as file: state = json.load(file) salient_combo_measures = Measures.from_json(state) meta = salient_combo_measures.get('meta', []) title = '' if meta is not None: if isinstance(meta, list): # Old for item in meta: title = item.get('title', title) else: # title = meta.get('title', title) title = '\n'.join(meta.get('title_parts', [meta.get('title', '')])) import kwplot with kwplot.BackendContext('agg'): salient_combo_measures.summary_plot(fnum=1, title=title) fig = kwplot.autoplt().gcf() fig.savefig(str(curve_dpath / 'summary_redo.png')) def _max_digits(max_num): """ Use like this: your_var = 231 max_num = 9180 num_digits = _max_digits(max_num) f'{your_var:0{num_digits}d}' # or f'{your_var:0{_max_digits(max_num)}d}' """ import math if max_num is None: num_digits = 8 else: num_digits = int(math.log10(max(max_num, 1))) + 1 return num_digits if __name__ == '__main__': # import xdev # xdev.make_warnings_print_tracebacks() main()