geowatch.tasks.tracking.phase module

geowatch.tasks.tracking.phase.viterbi(input_sequence, transition_probs, emission_probs)[source]

Viterbi decoding function.

Obtain a MAP estimate for the most likely sequence of hidden states using a hidden Markov model.

Parameters:
  • input_sequence (ndarray[int]) – Input sequence of shape (T,) encoding the sequence we believe we observed. Items are integers ranging from 0 to (S - 1), where S is the number of possible states. These indicate the “observed” state.

  • transition_probs (ndarray[float]) – Transition probabilities of shape (S, S), where transition_probs[i, j] indicates the probability that state i transitions to state j. Rows should sum to 1.

  • emission_probs (ndarray[float]) – Emission probabilities of shape (S, S), where transition_probs[i, j] indicates the probability that when we observed state i the real state was actually j. This encodes now noisy we believe the observations are.

Returns:

best_path

The sequence of most likely true states

Return type:

ndarray[int]

References

Example

>>> # Demo based loosely on a star's simplified life sequence
>>> import numpy as np
>>> import pandas as pd
>>> states = ['cloud', 'small', 'giant', 'dwarf', 'large',
>>>           'supergiant', 'supernova', 'neutron_star', 'black_hole']
>>> # How likely is it for a state to change at any given time?
>>> transitions = [
>>>     {'src': 'cloud',        'dst': 'cloud',        'prob': 0.9},
>>>     {'src': 'small',        'dst': 'small',        'prob': 0.9},
>>>     {'src': 'giant',        'dst': 'giant',        'prob': 0.9},
>>>     {'src': 'dwarf',        'dst': 'dwarf',        'prob': 0.9},
>>>     {'src': 'large',        'dst': 'large',        'prob': 0.9},
>>>     {'src': 'supergiant',   'dst': 'supergiant',   'prob': 0.9},
>>>     {'src': 'supernova',    'dst': 'supernova',    'prob': 0.9},
>>>     {'src': 'neutron_star', 'dst': 'neutron_star', 'prob': 0.9},
>>>     {'src': 'black_hole',   'dst': 'black_hole',   'prob': 0.9},
>>>     #
>>>     {'src': 'cloud',      'dst': 'small',        'prob': 0.8},
>>>     {'src': 'cloud',      'dst': 'large',        'prob': 0.2},
>>>     {'src': 'small',      'dst': 'giant',        'prob': 1.0},
>>>     {'src': 'giant',      'dst': 'dwarf',        'prob': 1.0},
>>>     {'src': 'large',      'dst': 'supergiant',   'prob': 1.0},
>>>     {'src': 'supergiant', 'dst': 'supernova',    'prob': 1.0},
>>>     {'src': 'supernova',  'dst': 'neutron_star', 'prob': 6.0},
>>>     {'src': 'supernova',  'dst': 'black_hole',   'prob': 4.0},
>>> ]
>>> # How likely is it that we made an error in observation?
>>> emissions = [
>>>     {'obs': 'cloud',        'real': 'cloud',        'prob': 0.5},
>>>     {'obs': 'small',        'real': 'small',        'prob': 0.5},
>>>     {'obs': 'giant',        'real': 'giant',        'prob': 0.5},
>>>     {'obs': 'dwarf',        'real': 'dwarf',        'prob': 0.5},
>>>     {'obs': 'large',        'real': 'large',        'prob': 0.5},
>>>     {'obs': 'supergiant',   'real': 'supergiant',   'prob': 0.5},
>>>     {'obs': 'supernova',    'real': 'supernova',    'prob': 0.5},
>>>     {'obs': 'neutron_star', 'real': 'neutron_star', 'prob': 0.5},
>>>     {'obs': 'black_hole',   'real': 'black_hole',   'prob': 0.5},
>>> ]
>>> emission_table = pd.DataFrame.from_dict(emissions)
>>> emission_df = emission_table.pivot(index=['obs'], columns=['real'], values=['prob'])
>>> # Fill unspecified values in pairwise probability tables
>>> import kwarray
>>> rng = kwarray.ensure_rng(42110)
>>> randfill = rng.rand(*emission_df.shape) * 0.01
>>> flags = emission_df.isnull().astype(int)
>>> emission_df = emission_df.fillna(0) + randfill * flags
>>> transition_table = pd.DataFrame.from_dict(transitions)
>>> transition_df = transition_table.pivot(
>>>     index=['src'], columns=['dst'], values=['prob']).fillna(0)
>>> # Normalize probs
>>> emission_df = emission_df.div(emission_df.groupby(
>>>     axis=1, level=0).sum(), level=0)
>>> transition_df = transition_df.div(transition_df.groupby(
>>>     axis=1, level=0).sum(), level=0)
>>> # Reorder indexes so we can use integer states
>>> transition_df2 = transition_df.droplevel(axis=1, level=0)
>>> emission_df2 = emission_df.droplevel(axis=1, level=0)
>>> transition_df2 = transition_df2[states].loc[states]
>>> emission_df2 = emission_df2[states].loc[states]
>>> #
>>> # Convert to ndarrays
>>> transition_probs = transition_df2.values
>>> emission_probs = emission_df2.values
>>> #
>>> observed_states = ['cloud', 'small', 'cloud', 'small', 'large',
>>>     'supergiant', 'black_hole', 'giant', 'dwarf', 'dwarf']
>>> input_sequence = np.array(
>>>     [states.index(s) for s in observed_states], dtype=int)
>>> from geowatch.tasks.tracking.phase import viterbi
>>> best_path = viterbi(
>>>     input_sequence, transition_probs, emission_probs)
>>> predicted_states = [states[idx] for idx in best_path]
>>> print('predicted_states = {!r}'.format(predicted_states))
predicted_states = ['cloud', 'small', 'small', 'small', 'small',
                    'small', 'giant', 'giant', 'dwarf', 'dwarf']
geowatch.tasks.tracking.phase.class_label_smoothing(track_cats, transition_probs=None, emission_probs=None)[source]
Parameters:
  • track_cats – a list of scored SC phase names.
    • Ex. [‘Site Preparation’, ‘Active Construction’, ‘Site Preparation’]

transition_probs, emission_probs: see viterbi().

These can be an (n_classes x n_classes) == (4x4) ndarray, or a format read by np.loadtxt - pathlike or list of strings, or None (use default).

Returns:

A smoothed list using Viterbi decoding. Ex. [‘Site Preparation’, ‘Active Construction’, ‘Active Construction’]

# TODO make this work for subsites

Example

>>> from geowatch.tasks.tracking.phase import *  # NOQA
>>> import ubelt as ub
>>> track_cats = (
>>>     ['No Activity'] * 2 +
>>>     ['Active Construction'] * 1 +
>>>     ['Site Preparation'] * 2 +
>>>     ['Active Construction'] * 3 +
>>>     ['Site Preparation'] * 1 +
>>>     ['Post Construction'] * 2
>>> )
>>> transition_probs = 'v1'
>>> emission_probs = 'v6'
>>> smoothed_cats = class_label_smoothing(track_cats, transition_probs, emission_probs)
>>> print('smoothed_cats = {}'.format(ub.urepr(smoothed_cats, nl=1)))
smoothed_cats = [
    'No Activity',
    'No Activity',
    'Site Preparation',
    'Site Preparation',
    'Site Preparation',
    'Active Construction',
    'Active Construction',
    'Active Construction',
    'Active Construction',
    'Post Construction',
    'Post Construction',
]
geowatch.tasks.tracking.phase.interpolate(coco_dset, track_id, cnames_to_keep=['Site Preparation', 'Active Construction', 'Post Construction'])[source]

Replace any annot’s cat not in cnames_to_keep with the most recent of cnames_to_keep

geowatch.tasks.tracking.phase.baseline(coco_dset, track_id, cnames_to_insert=['Site Preparation', 'Active Construction', 'Post Construction'])[source]

Predict site prep for the first half of the track and then active construction for the second half with post construction on the last frame

geowatch.tasks.tracking.phase.sort_by_gid(coco_dset, track_id, prune=True)[source]

Group annots by image and return in sorted order by frame_index.

Parameters:

prune – if True, remove gids with no anns, else, return whole video

Returns:

(Images, AnnotGroups)

Return type:

Tuple

geowatch.tasks.tracking.phase.ensure_post(coco_dset, track_id, post_cname='Post Construction', neg_cnames=['No Activity'])[source]

If the track ends before the end of the video, and the last frame is not post construction, add another frame of post construction

TODO this is not a perfect approach, since we don’t have per-subsite tracking across frames. We can run into a case where: frame 1 2 3 ss1 AC AC PC ss2 AC AC it is ambiguous whether ss2 ends on AC or merges with ss1.

geowatch.tasks.tracking.phase.dedupe_background_anns(coco_dset, track_id, post_cname='Post Construction', neg_cnames=['No Activity'])[source]

Chop off extra Post Construction and No Activity annots from the end of the track so they don’t count as FPs.

TODO same edge case as ensure_post() for lack of subsite tracking

geowatch.tasks.tracking.phase.current_date(annots)[source]
geowatch.tasks.tracking.phase.phase_prediction_baseline(annots) List[float][source]

Number of days until the next expected activity phase transition.

Baseline: (average days in current_phase - elapsed days in current_phase)

Returns:

number of days in the future

Return type:

float