#!/usr/bin/env python3
"""
KWCoco video visualization script
TODO:
- [ ] Option to interpret a channel as a heatmap and overlay it on top of
another set of channels interpreted as a grayscale image.
- [ ] Migrate to kwcoco proper
- [ ] Show valid image regions
CommandLine:
# A demo of this script on toydata is as follows
# TEMP_DPATH=$(mktemp -d)
TEMP_DPATH=$HOME/.cache/kwcoco/demo/viz
mkdir -p $TEMP_DPATH
echo "TEMP_DPATH = $TEMP_DPATH"
cd $TEMP_DPATH
KWCOCO_BUNDLE_DPATH=$TEMP_DPATH/toy_bundle
KWCOCO_FPATH=$KWCOCO_BUNDLE_DPATH/data.kwcoco.json
VIZ_DPATH=$KWCOCO_BUNDLE_DPATH/_viz
python -m kwcoco toydata --key=vidshapes3-msi-multisensor-frames7 --dst=$KWCOCO_FPATH
python -m geowatch.cli.coco_visualize_videos --src=$KWCOCO_FPATH --viz_dpath=$VIZ_DPATH --animate=True --workers=0 --any3=only --max_dim=128
python -m geowatch.cli.coco_visualize_videos --src=$KWCOCO_FPATH --viz_dpath=$VIZ_DPATH --zoom_to_tracks=True --start_frame=1 --num_frames=5 --animate=True
"""
import scriptconfig as scfg
import ubelt as ub
[docs]
class CocoVisualizeConfig(scfg.DataConfig):
"""
Visualizes annotations on kwcoco video frames on each band
CommandLine:
# Point to your kwcoco file
DVC_DPATH=$HOME/data/dvc-repos/smart_watch_dvc
COCO_FPATH=$DVC_DPATH/drop1-S2-L8-aligned/data.kwcoco.json
python -m geowatch.cli.coco_visualize_videos --src $COCO_FPATH --viz_dpath ./viz_out --channels="red|green|blue" --space="video"
COCO_FPATH=/home/joncrall/data/dvc-repos/smart_watch_dvc/drop1-S2-L8-WV-aligned/KR_R001/subdata.kwcoco.json
COCO_FPATH=/home/joncrall/data/dvc-repos/smart_watch_dvc/drop1-S2-L8-WV-aligned/data.kwcoco.json
python -m geowatch.cli.coco_visualize_videos --src $COCO_FPATH --space="image"
# Also note you can make an animated gif
python -m kwplot.cli.gifify -i "./viz_out/US_Jacksonville_R01/_anns/red|green|blue/" -o US_Jacksonville_R01_anns.gif
# NEW: as of 2021-11-04 : helper animation script
python -m geowatch.cli.animate_visualizations --viz_dpath ./viz_out
"""
__epilog__ = '''
Examples:
# Draw some channels of interest quickly and then animate them, and
# dump them into an autogenerated directory in the source kwcoco bundle
KWCOCO_FPATH=<path-to-kwcoco>
geowatch visualize "$KWCOCO_FPATH" --workers=avail --animate=True --channels="salient,red|green|blue"
'''
__default__ = {
'src': scfg.Value('data.kwcoco.json', help='input dataset', position=1),
'viz_dpath': scfg.Value(None, help=ub.paragraph(
'''
Where to save the visualizations. If unspecified,
writes them adjacent to the input kwcoco file
''')),
'workers': scfg.Value('auto', help='number of parallel procs'),
'max_workers': scfg.Value(None, help='DEPRECATED USE workers'),
'space': scfg.Value('video', help='can be image or video space'),
'max_dim': scfg.Value(None, help='if specified, the visualization will resize if it has a dimension larger than this'),
'min_dim': scfg.Value(384, help='if specified, the visualization will resize if it has a dimension smaller than this'),
'resolution': scfg.Value(None, help=ub.paragraph(
'''
the resolution to make the output at.
If unspecified use the dataset default
''')),
'channels': scfg.Value(None, type=str, help='only viz these channels'),
'any3': scfg.Value(False, help=ub.paragraph(
'''
if True, ensure the "any3" channels are drawn. If set to "only",
then other per-channel visualizations are supressed. TODO: better
name?
TODO: deprecate?
''')),
'draw_imgs': scfg.Value(True, isflag=True),
'draw_anns': scfg.Value('auto', isflag=True, help='auto means only draw anns if they exist'),
'draw_track_trails': scfg.Value(False, isflag=True, help=ub.paragraph(
'''
draw history of track locations (experimental and inefficient).
Other contexts might refer to this as a motion path, trace, trail,
tail, track history, or trajectory.
''')),
'draw_valid_region': scfg.Value(False, help='if True, draw the valid region if it exists'),
'cmap': scfg.Value('viridis', help='colormap for single channel data'),
'animate': scfg.Value('auto', isflag=True, help='if True, make an animated gif from the output. Defaults to False.'),
'num_frames': scfg.Value(None, type=str, help='show the first N frames from each video, if None, all are shown'),
'start_frame': scfg.Value(0, type=str, help='If specified each video will start on this frame'),
'ann_score_thresh': scfg.Value(0, help='If annotations have a score, remove any under this threshold'),
'skip_missing': scfg.Value(True, isflag=True, help=ub.paragraph(
'''
If true, skip any image that does not have the requested channels. Otherwise a nan image will be shown
''')),
'skip_aggressive': scfg.Value(False, isflag=True, help=ub.paragraph(
'''
Aggresively skip frames based on heuristics of badness.
''')),
'only_boxes': scfg.Value(False, isflag=True, help=ub.paragraph(
'''
If false, draws full annotation - which can be time consuming if
there are a lot. DEPRECATED. Set draw_labels=0 and
draw_segmentations=0
''')),
'draw_segmentations': scfg.Value(True, help='if True draw annotation segmentation polygons'),
'draw_labels': scfg.Value(True, help='if True draw text labels on annotations'),
'draw_boxes': scfg.Value(True, help='if True draw bounding boxes around annotations'),
'alpha': scfg.Value(None, help='transparency / opacity of annotations'),
# TODO: better support for this
# TODO: use the kwcoco_video_data, has good logic for this
'zoom_to_tracks': scfg.Value(False, isflag=True, type=str, help='if True, zoom to tracked annotations. Experimental, might not work perfectly yet.'),
'norm_over_time': scfg.Value(False, isflag=True, help='if True, normalize data over time'),
'fixed_normalization_scheme': scfg.Value(
None, type=str, help='Use a fixed normalization scheme for visualization; e.g. "scaled_25percentile"'),
'extra_header': scfg.Value(None, help='extra text to include in the header'),
'draw_header': scfg.Value(True, help='If false disables drawing the header'),
'draw_chancode': scfg.Value(True, help='If false disables drawing the channel code'),
'include_sensors': scfg.Value(None, help='if specified can be comma separated valid sensors'),
'exclude_sensors': scfg.Value(None, help='if specified can be comma separated invalid sensors'),
'select_images': scfg.Value(
None, type=str, help=ub.paragraph(
'''
A json query (via the jq spec) that specifies which images
belong in the subset. Note, this is a passed as the body of
the following jq query format string to filter valid ids
'.images[] | select({select_images}) | .id'.
Examples for this argument are as follows:
'.id < 3' will select all image ids less than 3.
'.file_name | test(".*png")' will select only images with
file names that end with png.
'.file_name | test(".*png") | not' will select only images
with file names that do not end with png.
'.myattr == "foo"' will select only image dictionaries
where the value of myattr is "foo".
'.id < 3 and (.file_name | test(".*png"))' will select only
images with id less than 3 that are also pngs.
.myattr | in({"val1": 1, "val4": 1}) will take images
where myattr is either val1 or val4.
Requries the "jq" python library is installed.
''')),
'select_videos': scfg.Value(
None, help=ub.paragraph(
'''
A json query (via the jq spec) that specifies which videos
belong in the subset. Note, this is a passed as the body of
the following jq query format string to filter valid ids
'.videos[] | select({select_images}) | .id'.
Examples for this argument are as follows:
'.name | startswith("foo")' will select only videos
where the name starts with foo.
Only applicable for dataset that contain videos.
Requries the "jq" python library is installed.
''')),
'verbose': scfg.Value(0, isflag=True, help='verbosity level'),
'stack': scfg.Value('auto', isflag=True, help='if True stack late fused channels in the same image'),
'role_order': scfg.Value(None, help=ub.paragraph(
'''
if specified, annotations are grouped by roles and drawn on different items in a channels stack in the given order
''')),
'smart': scfg.Value(False, isflag=True, help=ub.paragraph(
'''
if True, override params based on "smart" settings. This defaults
to going fast and using more resources, and stacked data.
'''), alias=['fast']),
}
[docs]
def main(cmdline=True, **kwargs):
"""
CommandLine:
xdoctest -m geowatch.cli.coco_visualize_videos main:2
Example:
>>> import kwcoco
>>> from geowatch.utils import kwcoco_extensions
>>> from geowatch.cli.coco_visualize_videos import * # NOQA
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('geowatch/tests/viz_video1').delete().ensuredir()
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', num_frames=2, image_size=(64, 64), num_videos=2)
>>> img = dset.dataset['images'][0]
>>> coco_img = dset.coco_image(img['id'])
>>> kwargs = {
>>> 'src': dset.fpath,
>>> 'viz_dpath': dpath,
>>> 'space': 'video',
>>> 'channels': None,
>>> 'zoom_to_tracks': True,
>>> }
>>> cmdline = False
>>> main(cmdline=cmdline, **kwargs)
Example:
>>> import kwcoco
>>> from geowatch.utils import kwcoco_extensions
>>> from geowatch.cli.coco_visualize_videos import * # NOQA
>>> import geowatch
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('geowatch/tests/viz_video2').delete().ensuredir()
>>> dset = geowatch.coerce_kwcoco('geowatch-msi', num_frames=5, image_size=(64, 64), num_videos=1)
>>> img = dset.dataset['images'][0]
>>> coco_img = dset.coco_image(img['id'])
>>> kwargs = {
>>> 'src': dset.fpath,
>>> 'viz_dpath': dpath,
>>> 'space': 'video',
>>> 'channels': None,
>>> 'zoom_to_tracks': False,
>>> 'stack': 'only',
>>> }
>>> cmdline = False
>>> main(cmdline=cmdline, **kwargs)
Example:
>>> # Test ability to draw track trails
>>> from geowatch.cli.coco_visualize_videos import * # NOQA
>>> import kwcoco
>>> import numpy as np
>>> from kwcoco.demo.perterb import perterb_coco
>>> import kwimage
>>> anchors = np.array([[0.05, 0.05]])
>>> size = (512, 512)
>>> dset = kwcoco.CocoDataset.demo('vidshapes1', num_frames=20, num_tracks=20, anchors=anchors, image_size=size)
>>> # Make 3 annots not have tracks
>>> dset.remove_tracks(dset.tracks()[0:3], keep_annots=True)
>>> # Give half of the tracks a random color
>>> for track in dset.dataset['tracks'][::2]:
>>> track['color'] = kwimage.Color.random().ashex()
>>> # Give tracks roles
>>> for track in dset.dataset['tracks'][2::2]:
... track['role'] = 'role1'
>>> for track in dset.dataset['tracks'][3::2]:
... track['role'] = 'role2'
>>> dpath = ub.Path.appdir('geowatch/tests/viz_video3').ensuredir()
>>> for r, ds, fs in dpath.walk():
... for f in fs:
... (r / f).delete()
>>> kwargs = {
>>> 'src': dset,
>>> 'viz_dpath': dpath,
>>> 'channels': 'r,b',
>>> 'draw_track_trails': True,
>>> 'stack': 'only',
>>> 'role_order': ['role1', 'role2'],
>>> 'cmap': 'gray',
>>> 'workers': 0,
>>> 'verbose': 101,
>>> }
>>> cmdline = False
>>> main(cmdline=cmdline, **kwargs)
"""
config = CocoVisualizeConfig.cli(data=kwargs, cmdline=cmdline and
{'strict': True}, strict=True)
from kwutil import util_parallel
from kwutil import util_resources
from geowatch.utils import kwcoco_extensions
import kwcoco
import kwarray
import rich
from rich.markup import escape
import numpy as np
rich.print('config = {}'.format(escape(ub.urepr(dict(config), nl=2))))
space = config['space']
channels = config['channels']
if config['smart']:
if config['workers'] == 'auto':
config['workers'] = 'avail'
if config['animate'] == 'auto':
config['animate'] = True
if config['stack'] == 'auto':
config['stack'] = 'only'
if config['channels'] is None:
channels = config['channels'] = 'auto'
# if config['draw_valid_region'] is None:
# config['draw_valid_region'] = False
if config['animate'] == 'auto':
config['animate'] = False
if config['stack'] == 'auto':
config['stack'] = False
if config['max_workers'] is not None:
ub.schedule_deprecation(
modname='geowatch', name='max_workers', type='argument to coco_visualize_videos',
migration='use workers instead',
deprecate='now', error='later', remove='later')
max_workers = util_parallel.coerce_num_workers(config['max_workers'])
else:
max_workers = util_parallel.coerce_num_workers(config['workers'])
rich.print('max_workers = {!r}'.format(max_workers))
coco_dset = kwcoco.CocoDataset.coerce(config['src'])
rich.print('coco_dset.fpath = {!r}'.format(coco_dset.fpath))
rich.print('coco_dset = {!r}'.format(coco_dset))
from geowatch import heuristics
heuristics.ensure_heuristic_coco_colors(coco_dset)
if channels == 'auto':
from delayed_image import FusedChannelSpec
auto_channels = [
FusedChannelSpec.coerce('r|g|b'),
FusedChannelSpec.coerce('red|green|blue'),
FusedChannelSpec.coerce('ir'),
FusedChannelSpec.coerce('No Activity|Site Preparation|Active Construction|Post Construction'),
FusedChannelSpec.coerce('salient'),
FusedChannelSpec.coerce('ac_salient'),
FusedChannelSpec.coerce('pan'),
]
from collections import defaultdict, Counter
channel_stats = kwcoco_extensions.coco_channel_stats(coco_dset)
all_sensorchan = channel_stats['all_sensorchan']
sensor_to_single_channels = defaultdict(Counter)
for spec in all_sensorchan.streams():
sensor_to_single_channels[spec.sensor.spec].update(spec.chans.as_list())
chosen = []
for sensor, chanhist in sensor_to_single_channels.items():
has_chans = set(chanhist.keys())
for ac in auto_channels:
if ac.to_set().issubset(has_chans):
chosen.append(ac.spec)
chosen = ub.oset(sorted(set(chosen)))
if 'red|green|blue' in chosen:
# force RGB first
chosen = ub.oset(['red|green|blue']) | (chosen - {'red|green|blue'})
channels = ','.join(chosen)
rich.print(f'AUTO channels={escape(channels)}')
if len(channels) == 0:
print('Auto channel selection did not find any known heuristic, attempt to fallback to to all sensorchan')
channels = None
if channels is None:
channel_stats = kwcoco_extensions.coco_channel_stats(coco_dset)
all_sensorchan = channel_stats['all_sensorchan']
channels = all_sensorchan
# Expand certain channels
requested_sensorchan = kwcoco.SensorChanSpec.coerce(channels)
requested_sensorchan.streams()
expanded_streams = []
chan_alias = {
'rgb': 'red|green|blue',
'sc': 'No Activity|Site Preparation|Active Construction|Post Construction',
'bas': 'salient',
}
for fused_sensorchan in requested_sensorchan.streams():
chan = fused_sensorchan.chans.spec
chan = chan_alias.get(chan, chan)
# TODO: handle the sensor part
# expanded_streams.append(fused_sensorchan.sensor.spec + ':' + chan)
expanded_streams.append(chan)
channels = (','.join(expanded_streams))
print(f'channels = {ub.urepr(channels, nl=1)}')
if config['draw_anns'] == 'auto':
config['draw_anns'] = coco_dset.n_annots > 0
bundle_dpath = ub.Path(coco_dset.bundle_dpath)
dset_idstr = coco_dset._dataset_id()
if config['viz_dpath'] is not None:
viz_dpath = ub.Path(config['viz_dpath'])
else:
viz_dpath = bundle_dpath / '_viz_{}'.format(dset_idstr)
rich.print('viz_dpath = {!r}'.format(viz_dpath))
from kwutil import util_progress
pman = util_progress.ProgressManager()
pman.__enter__()
# prog = ub.ProgIter(
# coco_dset.index.videos.items(), total=len(coco_dset.index.videos),
# desc='viz videos', verbose=3)
import itertools as it
# Add a fake video for loose images
video_items = it.chain(coco_dset.index.videos.items(), [(None, None)])
prog = pman.progiter(
video_items, total=len(coco_dset.index.videos) + 1,
desc='viz videos', verbose=3)
util_resources.request_nofile_limits()
pool = ub.JobPool(mode='thread', max_workers=max_workers)
from scriptconfig.smartcast import smartcast
num_frames = smartcast(config['num_frames'])
start_frame = smartcast(config['start_frame'])
end_frame = None if num_frames is None else start_frame + num_frames
selected_gids = None
selected_gids = kwcoco_extensions.filter_image_ids(
coco_dset,
gids=selected_gids,
include_sensors=config['include_sensors'],
exclude_sensors=config['exclude_sensors'],
select_images=config['select_images'],
select_videos=config['select_videos'],
)
if config['skip_missing'] and channels is not None:
requested_channels = kwcoco.ChannelSpec.coerce(channels).fuse().as_set()
print(f'requested_channels={requested_channels}')
coco_images = coco_dset.images(selected_gids).coco_images
keep = []
for coco_img in coco_images:
img_channels = coco_img.channels
if img_channels is None:
if not config['skip_aggressive']:
keep.append(coco_img.img['id'])
else:
code = img_channels.fuse().as_set()
if config['skip_aggressive']:
if len(requested_channels & code) == len(requested_channels):
keep.append(coco_img.img['id'])
else:
if requested_channels & code:
keep.append(coco_img.img['id'])
rich.print(f'Filtered {len(coco_images) - len(keep)} images without requested channels. Keeping {len(keep)}')
selected_gids = keep
viz_dpath_abs = viz_dpath.absolute()
rich.print(f'Will write to: [link={viz_dpath_abs}]{viz_dpath_abs}[/link]')
video_names = []
for vidid, video in prog:
if video is None:
video = {
'name': 'loose-images',
}
video_name = str(video['name'])
sub_dpath = viz_dpath / video_name
if vidid is None:
loose_gids = [
gid for gid, v in coco_dset.images().lookup('video_id', None, keepid=1).items()
if v is None
]
gids = loose_gids
else:
gids = coco_dset.index.vidid_to_gids[vidid]
if selected_gids is not None:
gids = list(ub.oset(gids) & set(selected_gids))
if len(gids) == 0:
rich.print(f'Skip {video["name"]=!r} with no selected images')
continue
sub_dpath.ensuredir()
video_names.append(video_name)
if config['animate'] == 'oops':
rich.print('Got animate=oops. '
'Assuming images already exists and you forgot to animate'
'Skipping video draw')
continue
norm_over_time = config['norm_over_time']
if not norm_over_time:
chan_to_normalizer = None
else:
coco_images = [coco_dset.coco_image(gid) for gid in gids]
# quick and dirty:
# Find the first image for each visualization channel
# to use as the normalizer.
# Probably better to use multiple images from the sequence
# to do normalization
if channels is not None:
requested_channels = kwcoco.ChannelSpec.coerce(channels).fuse().as_set()
else:
requested_channels = set()
for coco_img in coco_images:
code = coco_img.channels.fuse().as_set()
requested_channels.update(code)
chan_to_ref_imgs = {}
for code in requested_channels:
chan_to_ref_imgs[code] = []
_remain = requested_channels.copy()
for coco_img in coco_images:
imghas = coco_img.channels.fuse().as_set()
common = imghas & _remain
for c in common:
chan_to_ref_imgs[c].append(coco_img)
chan_to_normalizer = {}
for chan, coco_imgs in chan_to_ref_imgs.items():
s = max(1, len(coco_imgs) // 10)
obs = []
for coco_img in coco_imgs[::s]:
rawdata = coco_img.imdelay(channels=chan).prepare().optimize().finalize()
mask = rawdata != 0
obs.append(rawdata[mask].ravel())
allobs = np.hstack(obs)
normalizer = kwarray.find_robust_normalizers(allobs, params={
'high': 0.90,
'mid': 0.5,
'low': 0.01,
'mode': 'linear',
# 'mode': 'sigmoid',
})
chan_to_normalizer[chan] = normalizer
rich.print('chan_to_normalizer = {}'.format(ub.urepr(chan_to_normalizer, nl=1)))
if config['draw_valid_region']:
valid_vidspace_region = video.get('valid_region', None)
else:
valid_vidspace_region = None
common_kw = ub.udict(config) & {
'resolution', 'draw_header', 'draw_chancode', 'skip_aggressive',
'stack', 'min_dim', 'max_dim', 'verbose', 'only_boxes',
'draw_boxes', 'draw_labels', 'fixed_normalization_scheme', 'any3',
'cmap', 'role_order', 'smart', 'ann_score_thresh', 'alpha',
'draw_track_trails',
}
if config['zoom_to_tracks']:
assert space == 'video'
tid_to_info = video_track_info(coco_dset, vidid)
for tid, track_info in tid_to_info.items():
track_dpath = sub_dpath / '_tracks' / 'tid_{}'.format(tid)
track_dpath.ensuredir()
vid_crop_box = track_info['full_vid_box']
# Add context (todo: parameterize how much)
vid_crop_box = vid_crop_box.scale(1.5, about='center')
vid_crop_box = vid_crop_box.clip(
0, 0, video['width'] - 2, video['height'] - 2)
vid_crop_box = vid_crop_box.to_xywh()
vid_crop_box = vid_crop_box.quantize()
gid_subset = gids[start_frame:end_frame]
local_max_frame = len(gid_subset)
for local_frame_index, gid in enumerate(gid_subset):
img = coco_dset.index.imgs[gid]
anns = coco_dset.annots(gid=gid).objs
if config['extra_header']:
_header_extra = f'tid={tid}' + config['extra_header']
else:
_header_extra = f'tid={tid}'
pool.submit(_write_ann_visualizations2,
coco_dset, img, anns, track_dpath, space=space,
channels=channels, vid_crop_box=vid_crop_box,
_header_extra=_header_extra,
chan_to_normalizer=chan_to_normalizer,
local_frame_index=local_frame_index,
local_max_frame=local_max_frame,
dset_idstr=dset_idstr,
**common_kw
)
else:
gid_subset = gids[start_frame:end_frame]
local_max_frame = len(gid_subset)
for local_frame_index, gid in enumerate(gid_subset):
img = coco_dset.index.imgs[gid]
anns = coco_dset.annots(gid=gid).objs
if config['extra_header']:
_header_extra = config['extra_header']
else:
_header_extra = ''
pool.submit(_write_ann_visualizations2,
coco_dset, img, anns, sub_dpath, space=space,
channels=channels,
draw_imgs=config['draw_imgs'],
draw_anns=config['draw_anns'],
_header_extra=_header_extra,
chan_to_normalizer=chan_to_normalizer,
dset_idstr=dset_idstr,
local_frame_index=local_frame_index,
local_max_frame=local_max_frame,
valid_vidspace_region=valid_vidspace_region,
skip_missing=config['skip_missing'],
**common_kw
)
# for job in ub.ProgIter(pool.as_completed(), total=len(pool), desc='write imgs'):
for job in pman.progiter(pool.as_completed(), total=len(pool), desc='write imgs'):
try:
job.result()
except SkipFrame:
...
pool.jobs.clear()
pman.__exit__(None, None, None)
rich.print(f'Wrote images to: [link={viz_dpath_abs}]{viz_dpath_abs}[/link]')
if config['animate']:
# TODO: develop this idea more
# Try to parse out an animation config
import scriptconfig as scfg
from kwutil import util_yaml
class AnimateConfig(scfg.DataConfig):
# TODO: should be able to load from an alias
frames_per_second = scfg.Value(0.7, alias=['fps'])
animate_config = dict(AnimateConfig())
if isinstance(config['animate'], str) and config['animate'] not in {'oops'}:
try:
user_config = util_yaml.Yaml.loads(config['animate'])
assert isinstance(user_config, dict), 'animate subconfig should be coercable into a dict'
# hack
if 'fps' in user_config:
user_config['frames_per_second'] = user_config.pop('fps')
animate_config = AnimateConfig(**user_config)
except Exception:
print('Tried to pass animate as a yaml config but loading failed')
raise
rich.print('animate_config = {}'.format(ub.urepr(animate_config, nl=1)))
from geowatch.cli import animate_visualizations
# Hack: pretend that stack is a channel even though it is not.
if config['stack']:
if not channels:
channels = 'stack'
else:
channels = channels + ',stack'
outputs = animate_visualizations.animate_visualizations(
viz_dpath=viz_dpath,
channels=channels,
video_names=video_names,
draw_imgs=config['draw_imgs'],
draw_anns=config['draw_anns'],
workers=max_workers,
zoom_to_tracks=config['zoom_to_tracks'],
**animate_config,
)
# Links for summaries
type_to_anis = ub.group_items(outputs, lambda x: x['type'])
for ani_type, items in type_to_anis.items():
summary_fpath = (viz_dpath / ('_' + ani_type)).ensuredir()
for item in items:
fpath = ub.Path(item['fpath'])
dst = summary_fpath / fpath.name
ub.symlink(fpath, dst)
# Terminal fixup
import sys
if sys.stdout.isatty():
# FFmpeg seems to mess up terminal output. I'm not sure why.
# Also running this "fixup" seems to break things when people run
# multiple commands in a copy-paste fashion, so we should remove
# this.
ub.cmd('stty sane', verbose=3)
[docs]
class SkipFrame(Exception):
pass
[docs]
class SkipChanGroup(Exception):
pass
[docs]
def video_track_info(coco_dset, vidid):
import kwimage
vid_annots = coco_dset.images(video_id=vidid).annots
track_ids = set(ub.flatten(vid_annots.lookup('track_id')))
tid_to_info = {}
for tid in track_ids:
track_aids = coco_dset.index.trackid_to_aids[tid]
vidspace_boxes = []
track_gids = []
for aid in track_aids:
ann = coco_dset.index.anns[aid]
gid = ann['image_id']
img = coco_dset.index.imgs[gid]
bbox = ann['bbox']
vid_from_img = kwimage.Affine.coerce(img.get('warp_img_to_vid', None))
imgspace_box = kwimage.Boxes([bbox], 'xywh')
vidspace_box = imgspace_box.warp(vid_from_img)
vidspace_boxes.append(vidspace_box)
track_gids.append(gid)
all_vidspace_boxes = kwimage.Boxes.concatenate(vidspace_boxes)
full_vid_box = all_vidspace_boxes.bounding_box().to_xywh()
tid_to_info[tid] = {
'tid': tid,
'full_vid_box': full_vid_box,
'track_gids': track_gids,
'track_aids': track_aids,
}
return tid_to_info
[docs]
class TrackInfoLookup:
"""
Prototype implementation for something that kwcoco can use to make track
information more accessible. TODO: cache information to make subsequent
queries slightly faster.
"""
def __init__(self, dset):
self.dset = dset
[docs]
def get_track_trail_by_video_id(self, video_id, image_id=None):
import kwimage
vid_images = self.dset.images(video_id=video_id)
if image_id is not None:
vid_gids = list(vid_images)
# Only get part of the tracks
max_index = vid_gids.index(image_id)
vid_gids = vid_gids[:max_index + 1]
vid_images = self.dset.images(vid_gids)
# TODO: nicer way to get images "before" a point.
max_frame_index = self.dset.imgs[image_id]['frame_index']
else:
max_frame_index = None
vid_annots = vid_images.annots
track_ids = set(ub.flatten(vid_annots.lookup('track_id', None)))
track_ids -= {None}
trails = []
for track_id in track_ids:
track_aids = self.dset.index.trackid_to_aids[track_id]
vidspace_boxes = []
track_colors = []
track = self.dset.index.tracks[track_id]
track_color = track.get('color', None)
track_gids = []
default_color = kwimage.Color.random(rng=track_id)
for aid in track_aids:
ann = self.dset.index.anns[aid]
gid = ann['image_id']
img = self.dset.index.imgs[gid]
if max_frame_index is not None:
if img['frame_index'] >= max_frame_index:
continue
bbox = ann['bbox']
vid_from_img = kwimage.Affine.coerce(img.get('warp_img_to_vid', None))
imgspace_box = kwimage.Boxes([bbox], 'xywh')
vidspace_box = imgspace_box.warp(vid_from_img)
vidspace_boxes.append(vidspace_box)
if track_color is None:
color = ann.get('color', default_color)
else:
color = track_color
# Hack: this colors assigns colors to polyon endpoints, not edges.
track_colors.append(color)
track_gids.append(gid)
if len(vidspace_boxes):
vidspace_boxes = kwimage.Boxes.concatenate(vidspace_boxes)
motion_path = {
'vidspace_boxes': vidspace_boxes,
'track_gids': track_gids,
'track_aids': track_aids,
'track_colors': track_colors,
}
trail = {
'track_id': track_id,
'motion_path': motion_path
}
for k in ['role', 'color', 'thickness']:
if k in track:
trail[k] = track[k]
trails.append(trail)
return trails
__cli__ = CocoVisualizeConfig
[docs]
def select_fixed_normalization(fixed_normalization_scheme, sensor_coarse):
chan_to_normalizer = {}
if fixed_normalization_scheme == 'scaled':
if sensor_coarse in {'L8', 'S2'}:
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 0, 'max_val': 10_000}
elif fixed_normalization_scheme == 'scaled_50percentile':
if sensor_coarse in {'L8', 'S2'}:
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 0, 'max_val': 5_000}
elif fixed_normalization_scheme == 'scaled_25percentile':
if sensor_coarse in {'L8', 'S2'}:
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 0, 'max_val': 2_500}
elif fixed_normalization_scheme == 'scaled_raw':
if sensor_coarse == 'L8':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 7_272, 'max_val': 36_363}
if sensor_coarse == 'S2':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 1, 'max_val': 10_000}
elif fixed_normalization_scheme == 'scaled_raw_50percentile':
if sensor_coarse == 'L8':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 7_272, 'max_val': 21_818}
if sensor_coarse == 'S2':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 1, 'max_val': 5_000}
elif fixed_normalization_scheme == 'scaled_raw_25percentile':
if sensor_coarse == 'L8':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 7_272, 'max_val': 14_544}
if sensor_coarse == 'S2':
for c in ['blue', 'green', 'red', 'nir', 'swir16', 'swir22']:
chan_to_normalizer[c] = {'type': 'normalize', 'mode': 'linear',
'min_val': 1, 'max_val': 2_500}
else:
raise NotImplementedError('Unsupported fixed normalization scheme')
return chan_to_normalizer
def _resolve_channel_groups(coco_img, channels, verbose, request_grouped_bands,
any3, smart):
"""
Resolve which channel groups should be requested.
"""
from delayed_image import channel_spec
import kwcoco
import rich
if channels is not None:
if isinstance(channels, list):
channels = ','.join(channels) # hack
channels = channel_spec.ChannelSpec.coerce(channels)
chan_groups = [
{'chan': chan_obj} for chan_obj in channels.streams()
]
else:
if verbose > 0:
rich.print('Choosing channels')
rich.print(f'request_grouped_bands={request_grouped_bands}')
channels = coco_img.channels
if channels is None:
# Image does not have channel metadata, the best we can do is
# assume RGB
chan_groups = [{
'pname': 'null',
'chan': None,
}]
return chan_groups
if request_grouped_bands == 'default':
# Use false color for special groups
request_grouped_bands = ['red|green|blue', 'r|g|b', 'ir']
for cand in request_grouped_bands:
cand = kwcoco.FusedChannelSpec.coerce(cand)
has_cand = (channels & cand).numel() == cand.numel()
if has_cand:
channels = channels - cand
# todo: nicer way to join streams
# channels = kwcoco.ChannelSpec.coerce(channels.spec + ',' + cand.spec)
channels = channels + cand
# kwcoco.ChannelSpec.coerce(channels.spec + ',' + cand.spec)
initial_groups = channels.streams()
chan_groups = []
group : kwcoco.FusedChannelSpec
for group in initial_groups:
if group.numel() > 3:
# For large group, just take the first 3 channels
if group.numel() > 8:
group = group.normalize()[0:3]
chan_groups.append({
'chan': group,
})
else:
# For smaller groups split them into singles
for part in group:
chan_groups.append({
'chan': kwcoco.FusedChannelSpec.coerce(part)
})
else:
chan_groups.append({
'chan': group,
})
for row in chan_groups:
row['pname'] = row['chan'].path_sanitize()
if any3:
if any3 == 'only':
# Kick everything else out
chan_groups = []
# Try to visualize any3 channels to get a nice viewable sequence
avail_channels = channels.fuse()
common_visualizers = list(map(kwcoco.FusedChannelSpec.coerce, [
'red|green|blue', 'r|g|b', 'pan', 'panchromatic']))
found = None
for cand in common_visualizers:
flag = (cand & avail_channels).spec == cand.spec
if flag:
found = cand
break
# Just show false color from the first few channels
if found is None:
first3 = avail_channels.as_list()[0:3]
found = kwcoco.FusedChannelSpec.coerce('|'.join(first3))
chan_groups.append({
'pname': 'any3',
'chan': found,
})
return chan_groups
def _write_ann_visualizations2(coco_dset,
img : dict,
anns : list,
sub_dpath : str,
space : str,
channels=None,
vid_crop_box=None,
request_grouped_bands='default',
draw_imgs=True,
draw_anns=True, _header_extra=None,
chan_to_normalizer=None,
fixed_normalization_scheme=None,
any3=True, dset_idstr='',
skip_missing=False,
only_boxes=1,
draw_boxes=True,
draw_labels=True,
draw_segmentations=True,
cmap='viridis',
max_dim=None,
min_dim=None,
local_frame_index=None,
local_max_frame=None,
valid_vidspace_region=None,
stack=False,
draw_valid_region=True,
verbose=0,
skip_aggressive=False,
draw_header=True,
draw_chancode=True,
resolution=None,
role_order=None,
smart=None,
ann_score_thresh=0,
alpha=None,
draw_track_trails=False,
):
"""
Dumps an intensity normalized "space-aligned" kwcoco image visualization
(with or without annotation overlays) for specific bands to disk.
"""
# See if we can look at what we made
sensor_coarse = img.get('sensor_coarse', 'unknown')
# align_method = img.get('align_method', 'unknown')
name = img.get('name', 'unnamed')
name = name.replace('/', '_')
# Ensure names are differentiated between frames.
import math
import rich
import kwimage
import numpy as np
if local_max_frame is None:
num_digits = 8
else:
num_digits = int(math.log10(max(local_max_frame, 1))) + 1
if local_frame_index is None:
local_frame_index = -1
frame_id = f'{local_frame_index:0{num_digits}d}'
if verbose > 2:
_body = f'--- Render frame {frame_id} ---'
rich.print('=' * len(_body))
rich.print(_body)
rich.print('=' * len(_body))
from geowatch import heuristics
header_lines = heuristics.build_image_header_text(
img=img,
name=None,
_header_extra=_header_extra,
coco_dset=coco_dset,
)
if verbose > 2:
rich.print('header_lines = {}'.format(ub.urepr(header_lines, nl=1)))
coco_img = coco_dset.coco_image(img['id'])
finalize_opts = {
'interpolation': 'linear',
'nodata_method': 'float',
}
if 1:
if resolution is None:
factor = 1
else:
factor = coco_img._scalefactor_for_resolution(
space=space, resolution=resolution)
warp_viz_from_space = kwimage.Affine.scale(factor)
delayed = coco_img.imdelay(space=space, resolution=resolution,
# This fix doesn't actually work correctly
# channels=channels,
**finalize_opts)
warp_vid_from_img = coco_img.warp_vid_from_img
if space == 'video':
warp_viz_from_img = warp_viz_from_space @ warp_vid_from_img
else:
warp_viz_from_img = warp_viz_from_space
if fixed_normalization_scheme is not None:
chan_to_normalizer = select_fixed_normalization(
fixed_normalization_scheme, sensor_coarse)
# Hacks for common "heatmap" channels
chan_to_normalizer['depth'] = {'type': 'normalize', 'mode': 'linear',
'min_val': 0, 'max_val': 255}
if verbose > 0:
rich.print(f'fixed_normalization_scheme={fixed_normalization_scheme}')
rich.print(f'chan_to_normalizer={chan_to_normalizer}')
rich.print(f'channels={channels}')
chan_groups = _resolve_channel_groups(coco_img, channels, verbose,
request_grouped_bands, any3, smart)
img_view_dpath = sub_dpath / '_imgs'
ann_view_dpath = sub_dpath / '_anns'
anns_ = [ub.dict_diff(ann, ['keypoints']) for ann in anns] # Ignore keypoints
if draw_track_trails:
# Experimental and inefficient. Need to get all previous track
# positions. Could be more efficient if we are able to reuse
# information between frames, but we may need this logic for drawing
# single frames independently.
tilut = TrackInfoLookup(coco_dset)
# TODO: do we need to restrict history? How do we expose that option to
# a user?
trails = tilut.get_track_trail_by_video_id(
video_id=coco_img.img['video_id'],
# image_id=coco_img['id']
)
else:
trails = None
role_to_anns = ub.group_items(anns_, lambda ann: ann.get('role', None))
role_to_anns = {'none' if k is None else k.lower(): v for k, v in role_to_anns.items()}
role_to_drawables = ub.udict()
for role, role_anns in role_to_anns.items():
colors = []
# Determine the color for each annotation
for ann in role_anns:
# color = 'kitware_red'
cid = ann.get('category_id', None)
color = ann.get('color', None)
if color is None and cid is not None:
cat = coco_dset.cats[cid]
color = cat.get('color', color)
if 0:
# todo better role support
# temporary hack
if 'misc_info' in ann:
misc_info = ann['misc_info']
if isinstance(misc_info, dict):
color = misc_info.get('confusion_color', color)
if color is None:
# color = 'kitware_red'
# color = 'kitware_lightgray'
color = 'white'
color = kwimage.Color.coerce(color).as01()
colors.append(color)
role_dets = kwimage.Detections.from_coco_annots(role_anns, dset=coco_dset)
role_dets.data['colors'] = np.array(colors)
if ann_score_thresh:
flags = [float(ann.get('score', 1)) > ann_score_thresh for ann in role_anns]
role_dets = role_dets.compress(flags)
role_dets = role_dets.warp(warp_viz_from_img)
role_to_drawables[role] = {
'dets': role_dets,
}
if trails is not None:
role_to_trails = ub.group_items(trails, lambda trail: trail.get('role', None))
role_to_trails = {'none' if k is None else k.lower(): v for k, v in role_to_trails.items()}
for role, role_trails in role_to_trails.items():
# TODO: proper warping?
for trail in role_trails:
trail['motion_path']['vizspace_boxes'] = trail['motion_path']['vidspace_boxes']
if role not in role_to_drawables:
role_to_drawables[role] = {}
role_to_drawables[role]['trails'] = role_trails
# TODO: asset space
if vid_crop_box is not None:
# Ensure the crop box is in the proper space
if space == 'image':
warp_viz_from_vid = warp_viz_from_space @ warp_vid_from_img.inv()
elif space == 'video':
warp_viz_from_vid = warp_viz_from_space
crop_box = vid_crop_box
else:
raise KeyError(space)
crop_box = vid_crop_box.warp(warp_viz_from_vid).quantize()
ann_shift = (-crop_box.tl_x.ravel()[0], -crop_box.tl_y.ravel()[0])
for role_drawables in role_to_drawables.values():
role_dets = role_drawables.get('dets', None)
role_trails = role_drawables.get('trails', None)
if role_trails is not None:
raise NotImplementedError('Cant use trails and vid-crop-box yet. Need to implement proper warps')
if role_dets is not None:
role_dets.translate(ann_shift, inplace=True)
delayed = delayed.crop(crop_box.to_slices()[0])
valid_image_poly = None
valid_video_poly = None
if draw_valid_region:
valid_region = img.get('valid_region', None)
else:
valid_region = None
if valid_region:
valid_image_poly: kwimage.MultiPolygon = kwimage.MultiPolygon.coerce(valid_region)
valid_image_poly = valid_image_poly.warp(warp_viz_from_img)
if valid_vidspace_region is not None:
valid_vidspace_region = kwimage.MultiPolygon.coerce(valid_vidspace_region)
if space == 'image':
warp_img_from_vid = warp_vid_from_img.inv()
warp_viz_from_vid = warp_viz_from_space @ warp_img_from_vid
valid_video_poly = valid_vidspace_region.warp(warp_viz_from_vid)
else:
valid_video_poly = valid_vidspace_region.warp(warp_viz_from_space)
# Determine if we need to scale the image for visualization
viz_scale_factor = 1.0
if min_dim is not None:
chan_min_dim = min(delayed.dsize) * viz_scale_factor
if chan_min_dim < min_dim:
viz_scale_factor *= min_dim / chan_min_dim
if max_dim is not None:
chan_max_dim = max(delayed.dsize) * viz_scale_factor
if chan_max_dim > max_dim:
viz_scale_factor *= max_dim / chan_max_dim
if viz_scale_factor != 1:
viz_warp = kwimage.Affine.scale(viz_scale_factor)
delayed = delayed.warp(viz_warp)
for role_drawables in role_to_drawables.values():
role_dets = role_drawables.get('dets', None)
role_trails = role_drawables.get('trails', None)
if role_trails is not None:
raise NotImplementedError('Cant use trails and vid-crop-box yet. Need to implement proper warps')
if role_dets is not None:
role_dets.warp(viz_warp, inplace=True)
if valid_image_poly is not None:
valid_image_poly = valid_image_poly.warp(viz_warp)
if valid_video_poly is not None:
valid_video_poly = valid_video_poly.warp(viz_warp)
if stack:
ann_stack = []
img_stack = []
# Add in custom crop / zoom?
# h, w = delayed.shape[0:2]
# delayed = delayed[0:h // 2, w // 4: w - w // 4]
# TODO: user should need to specify this.
max_stacks = len(chan_groups)
if role_order is not None:
# role_order = ['truth', 'none']
requested_slots = {}
for role in role_to_drawables.keys():
try:
idx = role_order.index(role)
except ValueError:
idx = 0
requested_slots[role] = min(idx, max_stacks)
stack_idx_to_roles = ub.invert_dict(requested_slots, unique_vals=False)
else:
# If unspecified draw all roles on the first part
stack_idx_to_roles = {0: list(role_to_drawables.keys())}
if 1 and verbose > 100:
unique_roles = (role_to_drawables.keys())
rich.print(f'unique_roles = {ub.urepr(unique_roles, nl=1)}')
for role, drawables in role_to_drawables.items():
rich.print(f'role={role}, num_drawables={ub.udict(drawables).map_values(len)}')
rich.print(f'role_order={role_order}')
rich.print(f'stack_idx_to_roles={stack_idx_to_roles}')
stack_idx = 0
handled_specs = set()
for chan_row in chan_groups:
request_roles = stack_idx_to_roles.get(stack_idx, [])
# TODO: only request selected trails as well.
if smart:
# not sure how to encode this with sensible cli args. could use help
# with this API. The idea is we only need to visualize one set of
# visualizable channels, we only need rgb or pan, not both
if 'red|green|blue' in handled_specs and chan_row['chan'].spec == 'pan':
# already visualized rgb, dont need pan
if verbose > 2:
rich.print(f'... smart skip {chan_row=}')
continue
if verbose > 2:
rich.print(f'... render {chan_row=}')
try:
stack_img_item, stack_ann_item = draw_chan_group(
coco_dset, frame_id, name, ann_view_dpath, img_view_dpath,
delayed, chan_row, finalize_opts, verbose, skip_missing,
skip_aggressive, chan_to_normalizer, cmap, header_lines,
valid_image_poly, draw_imgs, draw_anns, only_boxes, draw_boxes,
draw_labels, draw_segmentations, role_to_drawables, valid_video_poly, stack,
draw_header, stack_idx, request_roles, ann_score_thresh, alpha,
)
if stack:
img_stack.append(stack_img_item)
ann_stack.append(stack_ann_item)
except SkipChanGroup:
if verbose > 2:
rich.print(f'... skipped render {chan_row=}')
else:
stack_idx += 1
if chan_row['chan'] is not None:
handled_specs.add(chan_row['chan'].spec)
if verbose > 2:
rich.print(f'... success render {chan_row=}')
if stack:
if verbose > 2:
rich.print('... stacking')
img_stacked_dpath = (img_view_dpath / 'stack')
ann_stacked_dpath = (ann_view_dpath / 'stack')
view_img_fpath = img_stacked_dpath / (frame_id + '_' + name + '_stack' + '.view_img.jpg')
view_ann_fpath = ann_stacked_dpath / (frame_id + '_' + name + '_stack' + '.view_ann.jpg')
stack_header_lines = header_lines.copy()
header_text = '\n'.join(stack_header_lines)
def stack_infos(_stack):
tostack = []
for item in _stack:
if item is None:
...
# print('warning: None stack item')
else:
canvas = item['im']
chan = item['chan']
# canvas = kwimage.ensure_float01(canvas, copy=True)
canvas = kwimage.ensure_uint255(canvas)
if draw_chancode:
canvas = kwimage.draw_text_on_image(
canvas, chan, (1, 2),
valign='top', color='lime', border=3)
tostack.append(canvas)
if len(tostack) > 0:
canvas = kwimage.stack_images(tostack)
else:
canvas = kwimage.draw_text_on_image(None, text='X')
canvas = kwimage.imresize(canvas, dsize=(512, 512))
canvas = kwimage.ensure_uint255(canvas)
return canvas
if ann_stack:
ann_stack_canvas = stack_infos(ann_stack)
if draw_header:
ann_header = kwimage.draw_header_text(image=ann_stack_canvas,
text=header_text,
stack=False,
fit='shrink')
ann_header = kwimage.imresize(
# ann_header, dsize=(None, 100), letterbox=True)
ann_header, dsize=(ann_header.shape[1], 100), letterbox=True)
ann_canvas = kwimage.stack_images([ann_header, ann_stack_canvas])
else:
ann_canvas = ann_stack_canvas
view_ann_fpath.parent.ensuredir()
kwimage.imwrite(view_ann_fpath, ann_canvas)
if img_stack:
img_stack_canvas = stack_infos(img_stack)
if draw_header:
img_header = kwimage.draw_header_text(image=img_stack_canvas,
text=header_text,
fit='shrink', stack=False)
img_header = kwimage.imresize(
img_header, dsize=(img_header.shape[1], 100), letterbox=True)
img_canvas = kwimage.stack_images([img_header, img_stack_canvas])
else:
img_canvas = img_stack_canvas
view_img_fpath.parent.ensuredir()
kwimage.imwrite(view_img_fpath, img_canvas)
if verbose > 2:
rich.print(f'--- End frame {frame_id}')
[docs]
def draw_chan_group(coco_dset, frame_id, name, ann_view_dpath, img_view_dpath,
delayed, chan_row, finalize_opts, verbose, skip_missing,
skip_aggressive, chan_to_normalizer, cmap, header_lines,
valid_image_poly, draw_imgs, draw_anns, only_boxes,
draw_boxes, draw_labels, draw_segmentations, role_to_drawables,
valid_video_poly, stack, draw_header, stack_idx,
request_roles, ann_score_thresh, alpha):
"""
This draws a single image using selected set of channelse for an intensity,
color, or false color visualization with annotations optionally drawn.
"""
from geowatch.utils import util_kwimage
import kwimage
import kwarray
import kwcoco
import numpy as np
import rich
chan_pname = chan_row['pname']
chan_group_obj = chan_row['chan']
img_chan_dpath = img_view_dpath / chan_pname
ann_chan_dpath = ann_view_dpath / chan_pname
# Prevent long names for docker (limit is 242 chars)
if chan_group_obj is not None:
chan_list = chan_group_obj.parsed
chan_group = chan_group_obj.spec
chan_pname2 = kwcoco.FusedChannelSpec.coerce(chan_group).path_sanitize(maxlen=10)
prefix = '_'.join([frame_id, chan_pname2])
else:
chan_group = None
chan_list = None
prefix = '_'.join([frame_id, 'null'])
view_img_fpath = img_chan_dpath / prefix + '_' + name + '.view_img.jpg'
view_ann_fpath = ann_chan_dpath / prefix + '_' + name + '.view_ann.jpg'
if chan_group_obj is not None:
chan = delayed.take_channels(chan_group)
else:
chan = delayed
chan = chan.prepare().optimize()
# When util_delayed_poc is removed, remove **delayed_ops
# as they should be given in the constructor.
raw_canvas = canvas = chan.finalize(**finalize_opts)
# foo = kwimage.fill_nans_with_checkers(raw_canvas)
if verbose > 1:
rich.print('raw_canvas.shape = {!r}'.format(raw_canvas.shape))
rich.print('chan_list = {!r}'.format(chan_list))
try:
# chan_stats = kwarray.stats_dict(raw_canvas, axis=2, nan=True)
chan_stats = kwarray.stats_dict(raw_canvas, axis=(0, 1), nan=True, quantile=False)
rich.print('chan_stats = {}'.format(ub.urepr(chan_stats, nl=1)))
except Exception as ex:
rich.print(f'ex={ex}')
import warnings
warnings.warn('Error printing chan stats, probably need kwarray >= 0.6.1')
if skip_missing and np.all(np.isnan(raw_canvas)):
if skip_aggressive:
print('Skip because all is nan')
raise SkipFrame
raise SkipChanGroup
# if skip_aggressive:
# is_bad = np.isnan(raw_canvas).ravel()
# percent_bad = is_bad.sum() / len(is_bad)
# if percent_bad > 0.5:
# print('Skip because some is nan')
# print('skip')
# raise SkipFrame
if 0 and str(chan_group) == 'salient':
# TEST CODE
# blur1 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=1.6), n=3)
# blur2 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=3.2), n=3)
blur1 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=0.8), n=3)
blur2 = kwarray.atleast_nd(kwimage.gaussian_blur(raw_canvas, sigma=1.6), n=3)
dog = blur1 - blur2
shift_dog = dog - min(0, np.nanmin(dog))
canvas = blur2
canvas = shift_dog
# orig_max = np.nanmax(raw_canvas)
# canvas = kwarray.normalize(shift_dog) * orig_max
# canvas = canvas * raw_canvas
canvas = raw_canvas
# median = np.nanmedian(canvas)
canvas = kwarray.atleast_nd(kwimage.gaussian_blur(canvas, sigma=3.6), n=3)
median, = np.quantile(canvas.ravel()[~np.isnan(canvas.ravel())], q=[0.8])
# median = np.nanmean(canvas) + np.nanstd(canvas)
canvas = (canvas - median).clip(0, None)
canvas = np.sqrt(canvas)
# raw_canvas = kwarray.normalize(dog)
# raw_canvas = kwarray.atleast_nd(raw_canvas, n=3)
# raw_canvas = raw_canvas[:, :, None]
if verbose > 100:
print('chan normalizer part')
if chan_to_normalizer is None:
dmax = np.nanmax(raw_canvas)
# dmin = canvas.min()
needs_norm = dmax > 1.0
# if canvas.max() <= 0 or canvas.min() >= 255:
# Hack to only do noramlization on "non-standard" data ranges
if needs_norm:
mask = ~np.isnan(raw_canvas)
norm_canvas = kwimage.normalize_intensity(raw_canvas, mask=mask, params={
'high': 0.90,
'mid': 0.5,
'low': 0.01,
'mode': 'linear',
})
canvas = norm_canvas
canvas = np.clip(canvas, 0, None)
else:
new_parts = []
for cx, c in enumerate(chan_list):
normalizer = chan_to_normalizer.get(c, None)
data = canvas[..., cx]
mask = ~np.isnan(data)
if normalizer is None:
p = kwimage.normalize_intensity(data, params={
'high': 0.90,
'mid': 0.5,
'low': 0.01,
'mode': 'linear',
})
else:
p = kwarray.apply_normalizer(data, normalizer, mask=mask,
set_value_at_mask=0.)
new_parts.append(p)
canvas = np.stack(new_parts, axis=2)
if verbose > 100:
print('after normalizer part')
canvas = kwimage.nodata_checkerboard(canvas, on_value=0.3)
if verbose > 100:
print('after checkers part')
# Do the channels correspond to classes with known colors?
if chan_group_obj is not None:
chan_names = chan_row['chan'].to_list()
else:
chan_names = []
channel_colors = []
from geowatch import heuristics
# For some reason predict is not preserving categories
for cat in heuristics.CATEGORIES:
coco_dset.ensure_category(**cat)
heuristics.ensure_heuristic_coco_colors(coco_dset)
for cname in chan_names:
if cname in coco_dset.index.name_to_cat:
cat = coco_dset.index.name_to_cat[cname]
if 'color' in cat:
channel_colors.append(cat['color'])
else:
channel_colors.append(None)
else:
channel_colors.append(None)
if verbose > 100:
print('after channel colors part')
if any(c is not None for c in channel_colors):
# This flag makes it so 1 channel outputs always use cmap.
# not sure if I like that or not, probably needs to be configurable
_flag = kwimage.num_channels(canvas) != 1
if _flag:
if verbose > 100:
print('do perchannel_colorize')
print(f'channel_colors={channel_colors}')
canvas = util_kwimage.perchannel_colorize(canvas, channel_colors=channel_colors)
if verbose > 100:
print('finished perchannel_colorize')
canvas = canvas[..., 0:3]
if verbose > 100:
print('finished channel color part')
if cmap is not None:
if kwimage.num_channels(canvas) == 1:
if verbose > 100:
print('doing 1 channel cmap')
import matplotlib as mpl
if chan_group == 'pan':
# Use grayscale for certain 1 band images
canvas = np.nan_to_num(canvas)
canvas = kwimage.atleast_3channels(canvas)
if len(canvas) == 3:
canvas = canvas[..., 0]
# canvas = kwimage.ensure_float01(canvas)
else:
try:
import matplotlib.cm # NOQA
cmap_ = mpl.cm.get_cmap(cmap)
except AttributeError:
# https://github.com/matplotlib/matplotlib/issues/20853
cmap_ = mpl.colormaps[cmap]
canvas = np.nan_to_num(canvas)
if len(canvas.shape) == 3:
canvas = canvas[..., 0]
canvas = cmap_(canvas)[..., 0:3].astype(np.float32)
if verbose > 100:
print('after cmap part')
canvas = util_kwimage.ensure_false_color(canvas)
canvas = kwimage.ensure_uint255(canvas)
if len(canvas.shape) > 2 and canvas.shape[2] > 4:
# hack for wv
canvas = canvas[..., 0]
chan_header_lines = header_lines.copy()
chan_header_lines.append(str(chan_group))
header_text = '\n'.join(chan_header_lines)
if valid_image_poly is not None:
# Draw the valid region specified at the image level
if any([p.data['exterior'].data.size for p in valid_image_poly.data]):
canvas = valid_image_poly.draw_on(canvas, color='kitware_green',
fill=False, border=True, alpha=alpha)
if valid_video_poly is not None:
# Draw the valid region specified at the video level
if any([p.data['exterior'].data.size for p in valid_video_poly.data]):
canvas = valid_video_poly.draw_on(canvas, color='lawngreen',
fill=False, border=True, alpha=alpha)
stack_imgs = draw_imgs and stack
stack_anns = draw_anns and stack
draw_anns_alone = draw_anns and stack != 'only'
draw_imgs_alone = draw_imgs and stack != 'only'
if draw_anns_alone:
ann_chan_dpath.ensuredir()
if draw_imgs_alone:
img_chan_dpath.ensuredir()
img_canvas = None
ann_canvas = None
img_stack_item = None
ann_stack_item = None
if verbose > 100:
print('before canvas parts')
rich.print(f'draw_imgs_alone={draw_imgs_alone}')
rich.print(f'draw_anns_alone={draw_anns_alone}')
rich.print(f'stack_imgs={stack_imgs}')
rich.print(f'stack_anns={stack_anns}')
if draw_imgs_alone or stack_imgs:
img_canvas = kwimage.ensure_uint255(canvas, copy=True)
if stack_imgs:
img_stack_item = {
'im': img_canvas,
'chan': chan_group,
}
if draw_imgs_alone:
img_header = kwimage.draw_header_text(image=img_canvas,
text=header_text,
stack=False,
fit='shrink')
img_header = kwimage.stack_images([img_header, img_canvas])
kwimage.imwrite(view_img_fpath, img_canvas)
if draw_anns_alone or stack_anns:
ONLY_BOXES = only_boxes
if ONLY_BOXES:
ub.schedule_deprecation(
'geowatch', 'only_boxes', 'argument',
deprecate='now', error='1.0.0', remove='1.1.0',
)
draw_on_kwargs = dict(sseg=False, labels=False)
else:
draw_on_kwargs = {}
draw_on_kwargs['labels'] = bool(draw_labels)
draw_on_kwargs['sseg'] = bool(draw_segmentations)
draw_on_kwargs['boxes'] = bool(draw_boxes)
requested_role_to_drawables = role_to_drawables.intersection(request_roles)
need_ann_canvas = (
bool(requested_role_to_drawables) or
img_canvas is None or
draw_anns_alone
)
# FIXME; respect if the user does not want to draw anns
need_ann_canvas = True
if need_ann_canvas:
ann_canvas = kwimage.ensure_float01(canvas, copy=True)
if draw_anns_alone and not requested_role_to_drawables:
# fallback to drawing all anns in this weird case
requested_role_to_drawables = role_to_drawables
for role_drawables in requested_role_to_drawables.values():
# TODO: better role handling
role_dets = role_drawables.get('dets', None)
role_trails = role_drawables.get('trails', None)
if role_trails is not None:
for trail in role_trails:
thickness = trail.get('thickness', 2)
trail_cxy = trail['motion_path']['vizspace_boxes'].xy_center
trail_colors = trail['motion_path']['track_colors'][1:]
ann_canvas = draw_polyline_on_image(
ann_canvas, trail_cxy, color=trail_colors,
thickness=thickness)
if role_dets is not None:
colors = [kwimage.Color.coerce(c).as01() for c in role_dets.data['colors']]
if verbose > 100:
print('About to draw dets on a canvas')
ann_canvas = role_dets.draw_on(
ann_canvas,
color=colors,
ssegkw={'fill': False, 'border': True, 'edgecolor': colors},
# color='classes',
**draw_on_kwargs, alpha=alpha)
if verbose > 100:
print('That seemed to work')
if stack_anns:
if ann_canvas is None:
ann_canvas = img_canvas.copy()
ann_stack_item = {
'im': ann_canvas,
'chan': chan_group,
}
if draw_anns_alone:
assert ann_canvas is not None
ann_canvas = kwimage.ensure_uint255(ann_canvas)
if draw_header:
ann_header = kwimage.draw_header_text(image=ann_canvas,
text=header_text,
stack=False,
fit='shrink')
ann_header = kwimage.imresize(
ann_header, dsize=(ann_header.shape[1], 100), letterbox=True)
ann_canvas = kwimage.stack_images([ann_header, ann_canvas])
kwimage.imwrite(view_ann_fpath, ann_canvas)
if verbose > 100:
print('returning canvases')
return img_stack_item, ann_stack_item
[docs]
def draw_polyline_on_image(image, xy_pts, color=None, thickness=1):
"""
TODO: port to kwimage
"""
import kwimage
if len(xy_pts) > 1:
pts1 = xy_pts[0:-1]
pts2 = xy_pts[1:]
image = kwimage.draw_line_segments_on_image(image, pts1, pts2, color=color, thickness=thickness)
return image
if __name__ == '__main__':
main(cmdline=True)