geowatch.tasks.tracking.phase module¶
- geowatch.tasks.tracking.phase.viterbi(input_sequence, transition_probs, emission_probs)[source]¶
Viterbi decoding function.
Obtain a MAP estimate for the most likely sequence of hidden states using a hidden Markov model.
- Parameters:
input_sequence (ndarray[int]) – Input sequence of shape (T,) encoding the sequence we believe we observed. Items are integers ranging from 0 to (S - 1), where S is the number of possible states. These indicate the “observed” state.
transition_probs (ndarray[float]) – Transition probabilities of shape (S, S), where
transition_probs[i, j]
indicates the probability that statei
transitions to statej
. Rows should sum to 1.emission_probs (ndarray[float]) – Emission probabilities of shape (S, S), where
transition_probs[i, j]
indicates the probability that when we observed statei
the real state was actuallyj
. This encodes now noisy we believe the observations are.
- Returns:
- best_path
The sequence of most likely true states
- Return type:
ndarray[int]
References
Example
>>> # Demo based loosely on a star's simplified life sequence >>> import numpy as np >>> import pandas as pd >>> states = ['cloud', 'small', 'giant', 'dwarf', 'large', >>> 'supergiant', 'supernova', 'neutron_star', 'black_hole'] >>> # How likely is it for a state to change at any given time? >>> transitions = [ >>> {'src': 'cloud', 'dst': 'cloud', 'prob': 0.9}, >>> {'src': 'small', 'dst': 'small', 'prob': 0.9}, >>> {'src': 'giant', 'dst': 'giant', 'prob': 0.9}, >>> {'src': 'dwarf', 'dst': 'dwarf', 'prob': 0.9}, >>> {'src': 'large', 'dst': 'large', 'prob': 0.9}, >>> {'src': 'supergiant', 'dst': 'supergiant', 'prob': 0.9}, >>> {'src': 'supernova', 'dst': 'supernova', 'prob': 0.9}, >>> {'src': 'neutron_star', 'dst': 'neutron_star', 'prob': 0.9}, >>> {'src': 'black_hole', 'dst': 'black_hole', 'prob': 0.9}, >>> # >>> {'src': 'cloud', 'dst': 'small', 'prob': 0.8}, >>> {'src': 'cloud', 'dst': 'large', 'prob': 0.2}, >>> {'src': 'small', 'dst': 'giant', 'prob': 1.0}, >>> {'src': 'giant', 'dst': 'dwarf', 'prob': 1.0}, >>> {'src': 'large', 'dst': 'supergiant', 'prob': 1.0}, >>> {'src': 'supergiant', 'dst': 'supernova', 'prob': 1.0}, >>> {'src': 'supernova', 'dst': 'neutron_star', 'prob': 6.0}, >>> {'src': 'supernova', 'dst': 'black_hole', 'prob': 4.0}, >>> ] >>> # How likely is it that we made an error in observation? >>> emissions = [ >>> {'obs': 'cloud', 'real': 'cloud', 'prob': 0.5}, >>> {'obs': 'small', 'real': 'small', 'prob': 0.5}, >>> {'obs': 'giant', 'real': 'giant', 'prob': 0.5}, >>> {'obs': 'dwarf', 'real': 'dwarf', 'prob': 0.5}, >>> {'obs': 'large', 'real': 'large', 'prob': 0.5}, >>> {'obs': 'supergiant', 'real': 'supergiant', 'prob': 0.5}, >>> {'obs': 'supernova', 'real': 'supernova', 'prob': 0.5}, >>> {'obs': 'neutron_star', 'real': 'neutron_star', 'prob': 0.5}, >>> {'obs': 'black_hole', 'real': 'black_hole', 'prob': 0.5}, >>> ] >>> emission_table = pd.DataFrame.from_dict(emissions) >>> emission_df = emission_table.pivot(index=['obs'], columns=['real'], values=['prob']) >>> # Fill unspecified values in pairwise probability tables >>> import kwarray >>> rng = kwarray.ensure_rng(42110) >>> randfill = rng.rand(*emission_df.shape) * 0.01 >>> flags = emission_df.isnull().astype(int) >>> emission_df = emission_df.fillna(0) + randfill * flags >>> transition_table = pd.DataFrame.from_dict(transitions) >>> transition_df = transition_table.pivot( >>> index=['src'], columns=['dst'], values=['prob']).fillna(0) >>> # Normalize probs >>> emission_df = emission_df.div(emission_df.groupby( >>> axis=1, level=0).sum(), level=0) >>> transition_df = transition_df.div(transition_df.groupby( >>> axis=1, level=0).sum(), level=0) >>> # Reorder indexes so we can use integer states >>> transition_df2 = transition_df.droplevel(axis=1, level=0) >>> emission_df2 = emission_df.droplevel(axis=1, level=0) >>> transition_df2 = transition_df2[states].loc[states] >>> emission_df2 = emission_df2[states].loc[states] >>> # >>> # Convert to ndarrays >>> transition_probs = transition_df2.values >>> emission_probs = emission_df2.values >>> # >>> observed_states = ['cloud', 'small', 'cloud', 'small', 'large', >>> 'supergiant', 'black_hole', 'giant', 'dwarf', 'dwarf'] >>> input_sequence = np.array( >>> [states.index(s) for s in observed_states], dtype=int) >>> from geowatch.tasks.tracking.phase import viterbi >>> best_path = viterbi( >>> input_sequence, transition_probs, emission_probs) >>> predicted_states = [states[idx] for idx in best_path] >>> print('predicted_states = {!r}'.format(predicted_states)) predicted_states = ['cloud', 'small', 'small', 'small', 'small', 'small', 'giant', 'giant', 'dwarf', 'dwarf']
- geowatch.tasks.tracking.phase.class_label_smoothing(track_cats, transition_probs=None, emission_probs=None)[source]¶
- Parameters:
- track_cats – a list of scored SC phase names.
Ex. [‘Site Preparation’, ‘Active Construction’, ‘Site Preparation’]
- transition_probs, emission_probs: see viterbi().
These can be an (n_classes x n_classes) == (4x4) ndarray, or a format read by np.loadtxt - pathlike or list of strings, or None (use default).
- Returns:
A smoothed list using Viterbi decoding. Ex. [‘Site Preparation’, ‘Active Construction’, ‘Active Construction’]
# TODO make this work for subsites
Example
>>> from geowatch.tasks.tracking.phase import * # NOQA >>> import ubelt as ub >>> track_cats = ( >>> ['No Activity'] * 2 + >>> ['Active Construction'] * 1 + >>> ['Site Preparation'] * 2 + >>> ['Active Construction'] * 3 + >>> ['Site Preparation'] * 1 + >>> ['Post Construction'] * 2 >>> ) >>> transition_probs = 'v1' >>> emission_probs = 'v6' >>> smoothed_cats = class_label_smoothing(track_cats, transition_probs, emission_probs) >>> print('smoothed_cats = {}'.format(ub.urepr(smoothed_cats, nl=1))) smoothed_cats = [ 'No Activity', 'No Activity', 'Site Preparation', 'Site Preparation', 'Site Preparation', 'Active Construction', 'Active Construction', 'Active Construction', 'Active Construction', 'Post Construction', 'Post Construction', ]
- geowatch.tasks.tracking.phase.interpolate(coco_dset, track_id, cnames_to_keep=['Site Preparation', 'Active Construction', 'Post Construction'])[source]¶
Replace any annot’s cat not in cnames_to_keep with the most recent of cnames_to_keep
- geowatch.tasks.tracking.phase.baseline(coco_dset, track_id, cnames_to_insert=['Site Preparation', 'Active Construction', 'Post Construction'])[source]¶
Predict site prep for the first half of the track and then active construction for the second half with post construction on the last frame
- geowatch.tasks.tracking.phase.sort_by_gid(coco_dset, track_id, prune=True)[source]¶
Group annots by image and return in sorted order by frame_index.
- Parameters:
prune – if True, remove gids with no anns, else, return whole video
- Returns:
(Images, AnnotGroups)
- Return type:
Tuple
- geowatch.tasks.tracking.phase.ensure_post(coco_dset, track_id, post_cname='Post Construction', neg_cnames=['No Activity'])[source]¶
If the track ends before the end of the video, and the last frame is not post construction, add another frame of post construction
TODO this is not a perfect approach, since we don’t have per-subsite tracking across frames. We can run into a case where: frame 1 2 3 ss1 AC AC PC ss2 AC AC it is ambiguous whether ss2 ends on AC or merges with ss1.
- geowatch.tasks.tracking.phase.dedupe_background_anns(coco_dset, track_id, post_cname='Post Construction', neg_cnames=['No Activity'])[source]¶
Chop off extra Post Construction and No Activity annots from the end of the track so they don’t count as FPs.
TODO same edge case as ensure_post() for lack of subsite tracking