geowatch.tasks.tracking.utils module

geowatch.tasks.tracking.utils.trackid_is_default(trackid)[source]

Hack to decide if a trackid is really a site_id or if it was randomly assigned

geowatch.tasks.tracking.utils.check_only_bg(category_sequence, bg_name=['No Activity'])[source]
geowatch.tasks.tracking.utils.gpd_sort_by_gid(gdf, sorted_gids)[source]
geowatch.tasks.tracking.utils.gpd_len(gdf)[source]
geowatch.tasks.tracking.utils.gpd_compute_scores(gdf, sub_dset, thrs: Iterable, ks: Dict, USE_DASK=False, resolution=None, modulate=None)[source]

TODO: This needs docs and examples for the BAS and SC/AC cases.

Parameters:
  • gdf (gdf.GeoDataFrame) – input data frame tracks dataframe containing track_idx, gid, and poly columns.

  • sub_dset (kwcoco.CocoDataset) – dataset with reference to images

  • thrs (List[float]) – thresholds (-1) means take the average response, other values is the fraction of pixels with responses above that value.

  • ks (Dict[str, List[str]]) – mapping from “fg” to a list of “foreground classes” optionally also mapping from “bg” to a list of “background classes”

  • resolution (str | None) – resolution spec to compute scores at (e.g. “2GSD”).

Calls _compute_group_scores() on each dataframe row, which will execute the read for the image prediction scores for polygons with score_poly().

Returns:

gdf.GeoDataFrame - a scored variant of the input data frame

returns the per-channels scores as well as summed groups of channels. (not sure if that last one is necessary, might need to refactor to simplify)

Example

>>> import kwcoco
>>> from geowatch.tasks.tracking.utils import *  # NOQA
>>> from geowatch.tasks.tracking.utils import _compute_group_scores, _build_annot_gdf
>>> sub_dset = kwcoco.CocoDataset.demo('vidshapes1')
>>> gdf, _ = _build_annot_gdf(sub_dset)
>>> thrs = [-1, 'median']
>>> ks = {'r|g': ['r', 'g'], 'bg': ['b']}
>>> USE_DASK = 0
>>> resolution = None
>>> gdf2 = gpd_compute_scores(gdf, sub_dset, thrs, ks, USE_DASK, resolution)
geowatch.tasks.tracking.utils.score_track_polys(coco_dset, video_id, cnames=None, score_chan=None, resolution: str | None = None)[source]

Score the polygons in a kwcoco dataset based on heatmaps without modifying the polygon boundaries.

Parameters:
  • coco_dset (kwcoco.CocoDataset)

  • video_id (int) – video to score tracks for

  • cnames (Iterable[str] | None) – category names. Only annotations with these names will be considered.

  • score_chan (kwcoco.ChannelSpec | None) – score the track polygons by image overlap with this channel

Note

This function needs a rename because we don’t want this to mutate the kwcoco dataset ever.

Returns:

With columns:

gid: the image id poly: the polygon in video space track_idx: the annotation trackid

And then for each score chan: c you get a column:

(c, -1) where the -1 indicates that no threshold was applied, and that this is the mean of that channel intensity under the polygon.

Then there is another column where all channels are fused: f and you get a column: (f, -1)

Return type:

gpd.GeoDataFrame

Note

The returned unerlying GDF should return polygons in video space as it will be consumed by _add_tracks_to_dset().

Example

>>> import kwcoco
>>> coco_dset = kwcoco.CocoDataset.demo('vidshapes8-msi')
>>> video_id = list(coco_dset.videos())[0]
>>> cnames = None
>>> resolution = None
>>> score_chan = kwcoco.ChannelSpec.coerce('B1|B8|B8a|B10|B11')
>>> gdf = score_track_polys(coco_dset, video_id, cnames, score_chan, resolution)
>>> print(gdf)
geowatch.tasks.tracking.utils.score_poly(poly, probs, threshold=-1, use_rasterio=True)[source]

Compute the average heatmap response of a heatmap inside of a polygon.

Parameters:
  • poly (kwimage.Polygon | MultiPolygon) – in pixel coords

  • probs (ndarray) – heatmap to compare poly against in […, c, h, w] format. The last two dimensions should be height, and width. Any leading batch dimensions will be preserved in output, e.g. (gid, chan, h, w) -> (gid, chan)

  • use_rasterio (bool) – use rasterio.features module instead of kwimage

  • threshold (float | List[float | str]) – Return fraction of poly with probs > threshold. If -1, return average value of probs in poly. Can be a list of values, in which case returns all of them.

Returns:

When thresholds is a list, returns a corresponding list of ndarrays with an entry keeping the leading dimensions of probs and marginalizing over the last two.

Return type:

List[ndarray] | ndarray

Example

>>> import numpy as np
>>> import kwimage
>>> from geowatch.tasks.tracking.utils import score_poly
>>> h = w = 64
>>> poly = kwimage.Polygon.random().scale((w, h))
>>> probs = np.random.rand(1, 3, h, w)
>>> # Test with one threshold
>>> threshold = [0.1, 0.2]
>>> result = score_poly(poly, probs, threshold=threshold, use_rasterio=True)
>>> print('result = {}'.format(ub.urepr(result, nl=1)))
>>> # Test with multiple thresholds
>>> threshold = 0.1
>>> result = score_poly(poly, probs, threshold=threshold, use_rasterio=True)
>>> print('result = {}'.format(ub.urepr(result, nl=1)))
>>> # Test with -1 threshold
>>> threshold = [-1, 'min', 'median']
>>> result = score_poly(poly, probs, threshold=threshold, use_rasterio=True)
>>> print('result = {}'.format(ub.urepr(result, nl=1)))

Example

### Grid of cases

basis = {

‘threshold’:

}

geowatch.tasks.tracking.utils.mask_to_polygons(probs, thresh, bounds=None, use_rasterio=True, thresh_hysteresis=None)[source]

Extract a polygon from a 2D heatmap. Optionally within the bounds of another mask or polygon.

Parameters:
  • probs (ndarray) – aka heatmap, image of probability values

  • thresh (float) – to turn probs into a hard mask

  • bounds (kwimage.Polygon) – a kwimage or shapely polygon to crop the results to

  • use_rasterio (bool) – use rasterio.features module instead of kwimage

  • thresh_hysteresis – if not None, only keep polygons with at least one pixel of score >= thresh_hysteresis

Yields:

kwcoco.Polygon – extracted polygons.

Example

>>> from geowatch.tasks.tracking.utils import mask_to_polygons
>>> import kwimage
>>> probs = kwimage.Heatmap.random(dims=(64, 64),
>>>                                rng=0).data['class_probs'][0]
>>> thresh = 0.5
>>> polys = mask_to_polygons(probs, thresh)
>>> poly1 = list(polys)[0]
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(probs > 0.5)

Example

>>> from geowatch.tasks.tracking.utils import mask_to_polygons
>>> import kwimage
>>> import kwarray
>>> rng = kwarray.ensure_rng(1043462368)
>>> probs = kwimage.Heatmap.random(dims=(256, 256), rng=rng,
>>>                                 ).data['class_probs'][0]
>>> thresh = 0.5
>>> polys1 = list(mask_to_polygons(
>>>             probs, thresh, use_rasterio=0))
>>> polys2 = list(mask_to_polygons(
>>>             probs, thresh, use_rasterio=1))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> plt = kwplot.autoplt()
>>> pnum_ = kwplot.PlotNums(nSubplots=4)
>>> kwplot.imshow(probs, pnum=pnum_(), title='pixels_are=points')
>>> for poly in polys1:
>>>     poly.draw(facecolor='none', edgecolor='kitware_blue', alpha=0.5, linewidth=8)
>>> kwplot.imshow(probs, pnum=pnum_(), title='pixels_are=areas')
>>> for poly in polys2:
>>>     poly.draw(facecolor='none', edgecolor='kitware_green', alpha=0.5, linewidth=8)