"""
Functions to perterb or "jitter" truth that gradually degrade the site models
in specified ways. This can be used to check the scoring metrics at various
levels of accuracy / inaccuracy.
"""
import numpy as np
import geojson
import kwarray
import kwimage
import ubelt as ub
import geopandas as gpd
from geowatch.demo.metrics_demo import demo_truth
from geowatch.demo.metrics_demo import demo_utils
[docs]
def perterb_site_model(sites, rng=None, **kwargs):
"""
Given a true site models from a region, perterb them to make "demo"
predicted site models.
Args:
sites (List[geojson.FeatureCollection]): geojson site observations
rng : random seed or generator
**kwargs: factors to control per-site perterbations. See
:func:`perterb_single_site_model` for available options.
Returns:
List[geojson.FeatureCollection]: modified site models
Example:
>>> from geowatch.demo.metrics_demo.site_perterbing import * # NOQA
>>> _, sites, _ = demo_truth.random_region_model(rng=12345)
>>> pred_sites1 = perterb_site_model(sites, noise=1, rng=34567)
>>> pred_sites2 = perterb_site_model(sites, noise=0, rng=34567)
"""
rng = kwarray.ensure_rng(rng)
# Shuffle our site models so the indices don't correspond.
templates = sites.copy()
rng.shuffle(templates)
# TODO: drop sites to simulate false negatives and add new random sites to
# generate false positives.
pred_sites = []
for idx, site in enumerate(templates):
try:
pred_site = perterb_single_site_model(site, idx=idx, rng=rng, **kwargs)
except IndexError:
continue # drop the site
pred_sites.append(pred_site)
return pred_sites
[docs]
class PerterbModel:
"""
TODO:
consume the functionality of perterb_single_site_model
Example:
>>> from geowatch.demo.metrics_demo.site_perterbing import * # NOQA
>>> self = PerterbModel()
Example:
>>> from geowatch.demo.metrics_demo.site_perterbing import * # NOQA
>>> from geowatch.demo.metrics_demo import demo_truth
>>> region_id = 'DR_0042'
>>> site_id = 'DR_0042_9001'
>>> rng = kwarray.ensure_rng(4222)
>>> region_corners = kwimage.Boxes([[5, 7, 11, 13]], 'xywh').to_ltrb().corners()
>>> observables = demo_truth.random_observables(15, rng=rng)
>>> p_observe = 0.5
>>> site_summary, site = demo_truth.random_site_model(
>>> region_id, site_id, region_corners, observables,
>>> p_observe=p_observe, p_transition=0.4, rng=rng)
>>> kwargs = dict(noise=10, drop_limit=0.1)
>>> self = PerterbModel(**kwargs)
>>> pred_site = self.perterb_single_site(site)
>>> # xdoctest: +REQUIRES(--show)
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwplot
>>> kwplot.autompl()
>>> import geopandas as gpd
>>> # Draw our true and perterbed site model
>>> orig_site_gdf = gpd.GeoDataFrame.from_features(site)
>>> pred_site_gdf = gpd.GeoDataFrame.from_features(pred_site)
>>> total_poly1 = orig_site_gdf['geometry'].unary_union
>>> total_poly2 = pred_site_gdf['geometry'].unary_union
>>> total_poly = total_poly1.union(total_poly2)
>>> minx, miny, maxx, maxy = total_poly.bounds
>>> fig = kwplot.figure(doclf=True, fnum=1)
>>> phases = demo_truth.SiteModelGenerator.SITE_PHASES
>>> phase_to_color = ub.udict(ub.dzip(phases, kwimage.Color.distinct(len(phases))))
>>> pnum_ = kwplot.PlotNums(nRows=4, nSubplots=len(observables) + 1)
>>> boundary_colors = ub.udict({'true': 'limegreen', 'pred': 'dodgerblue'}).map_values(lambda c: kwimage.Color(c).as01())
>>> for obs in observables:
>>> title = obs['datetime'].isoformat()
>>> trues = orig_site_gdf[orig_site_gdf['observation_date'] == obs['datetime'].date().isoformat()]
>>> preds = pred_site_gdf[pred_site_gdf['observation_date'] == obs['datetime'].date().isoformat()]
>>> fig = kwplot.figure(pnum=pnum_(), title=title)
>>> ax = fig.gca()
>>> if len(trues):
>>> colors = list(phase_to_color.take(trues['current_phase']))
>>> trues.plot(ax=ax, alpha=0.5, color=colors)
>>> trues.boundary.plot(ax=ax, alpha=0.5, color=boundary_colors['true'], linewidth=3)
>>> trues.centroid.plot(ax=ax, alpha=0.5, color=boundary_colors['true'])
>>> if len(preds):
>>> colors = list(phase_to_color.take(preds['current_phase']))
>>> preds.plot(ax=ax, alpha=0.5, color=colors)
>>> preds.boundary.plot(ax=ax, alpha=0.5, color=boundary_colors['pred'], linewidth=3)
>>> preds.centroid.plot(ax=ax, alpha=0.5, color=boundary_colors['pred'])
>>> ax.set_xlim(minx, maxx)
>>> ax.set_ylim(miny, maxy)
>>> legend1 = kwimage.draw_header_text(kwplot.make_legend_img(phase_to_color), 'Face Key', fit=True)
>>> legend2 = kwimage.draw_header_text(kwplot.make_legend_img(boundary_colors), 'Border Key', fit=True)
>>> legend = kwimage.stack_images([legend1, legend2], axis=0, resize='smaller')
>>> kwplot.imshow(legend, pnum=pnum_())
>>> kwplot.show_if_requested()
"""
def __init__(self, noise=0.0, performer_id='alice', rng=None, **kwargs):
self.rng = rng
self.performer_id = performer_id
default_noise = ub.udict({
'warp_noise': 1.0,
'label_noise': 1.0,
'drop_noise': 0.5,
})
default_limits = ub.udict({
'drop_limit': 0.5,
'scale_limit': 0.3,
'theta_limit': np.pi / 16,
})
kwargs = ub.udict(kwargs)
limits = default_limits | (kwargs & default_limits)
kwargs -= limits
limits = limits.map_keys(lambda k: k.split('_limit')[0])
noises = default_noise | (kwargs & default_noise)
kwargs -= noises
noises = noises.map_keys(lambda k: k.split('_noise')[0])
self.limits = limits
self.noises = noises
self.noise = noise
if len(kwargs) > 0:
raise ValueError('unknown kwargs={}'.format(kwargs))
self.distris = None
self._setup_distributions()
def _setup_distributions(self):
from kwarray import distributions as distri
rng = self.rng
noises = self.noises
limits = self.limits
max_scale = 1 + limits['scale'] * (noises['warp'] * self.noise)
max_theta = 0 + limits['theta'] * (noises['warp'] * self.noise)
p_randomize = max(0, np.tanh(noises['label'] * self.noise))
p_drop = min(max(0, np.tanh(noises['drop'] * self.noise)), limits['drop'])
distris = {}
distris['scale'] = distri.Uniform(1, max_scale, rng=rng) ** distri.CategoryUniform([-1, 1])
distris['theta'] = distri.Uniform(0, max_theta, rng=rng) * distri.CategoryUniform([-1, 1])
distris['phase'] = distri.CategoryUniform(demo_truth.SiteModelGenerator.SITE_PHASES, rng=rng)
distris['randomize_phase'] = distri.Bernoulli(p_randomize, rng=rng)
distris['drop_frame'] = distri.Bernoulli(p_drop, rng=rng)
self.distris = distris
[docs]
def perterb_single_site(self, site, idx=None):
true_site_summary = site["features"][0]["properties"]
mgrs_code = true_site_summary["mgrs"]
# TODO: add the ability to pass in observables to add temporal noise
# TODO: could be more complex here.
valid_status_codes = [
"system_proposed",
"system_confirmed",
"system_rejected",
]
status = valid_status_codes[1]
# Find the true site header
orig_site_header = None
orig_observations = []
for feat in site["features"]:
if feat['properties']['type'] == 'site':
if orig_site_header is not None:
raise ValueError('More than one site header')
orig_site_header = feat
elif feat['properties']['type'] == 'observation':
orig_observations.append(feat)
else:
raise TypeError('Unknown feature type, should be site or observation')
if orig_site_header is None:
raise ValueError(
'A site feature collection does not have a summary header '
'with type=site in its properties')
region_id = orig_site_header['properties']['region_id']
if idx is None:
site_id = orig_site_header['properties']['site_id']
else:
site_id = f"{region_id}_{idx:03d}"
orig_obs_crs84 = gpd.GeoDataFrame.from_features(
orig_observations, crs=demo_utils.get_crs84())
# Do any shape warping in the local UTM space.
orig_obs_utm = demo_utils.project_gdf_to_local_utm(orig_obs_crs84)
orig_geoms_utm = orig_obs_utm['geometry']
new_geoms_utm = []
new_properties = []
for feat, geom_utm in zip(orig_observations, orig_geoms_utm):
if self.distris['drop_frame'].sample():
continue
orig_props = ub.udict(feat["properties"])
poly = kwimage.MultiPolygon.coerce(geom_utm)
xy = [d[0] for d in poly.to_shapely().centroid.xy]
rand_aff_params = {
'scale': self.distris['scale'].sample(),
'theta': self.distris['theta'].sample(),
}
aff = kwimage.Affine.affine(**rand_aff_params, about=xy)
new_poly = poly.warp(aff)
current_phase = orig_props["current_phase"]
if self.distris['randomize_phase'].sample():
current_phase = self.distris['phase'].sample()
new_props = orig_props | {
"id": site_id,
"current_phase": current_phase,
"is_site_boundary": True,
"originator": self.performer_id,
"validated": "False",
}
new_properties.append(new_props)
new_geoms_utm.append(new_poly.to_shapely())
# Convert back to CRS84
new_geoms_utm = gpd.GeoDataFrame(geometry=new_geoms_utm, crs=orig_obs_utm.crs)
new_summary_utm = gpd.GeoDataFrame(geometry=[new_geoms_utm.unary_union], crs=orig_obs_utm.crs)
new_summary_crs84 = new_summary_utm.to_crs(orig_obs_crs84.crs)['geometry'].iloc[0]
new_geoms_crs84 = new_geoms_utm.to_crs(orig_obs_crs84.crs)['geometry']
# Build the geojson features
new_observations = []
for new_props, new_geom in zip(new_properties, new_geoms_crs84):
new_poly = kwimage.MultiPolygon.coerce(new_geom)
new = geojson.Feature(
geometry=new_poly.to_geojson(),
properties=new_props,
)
new_observations.append(new)
new_site_header = demo_truth.make_site_summary(
new_observations, mgrs_code, site_id, status,
summary_geom=new_summary_crs84)
new_site_header["properties"].update(
{
"type": "site",
"site_id": site_id,
"region_id": region_id,
}
)
pred_site = geojson.FeatureCollection([new_site_header] + new_observations)
return pred_site
[docs]
def perterb_single_site_model(site, noise=0.0, warp_noise=1.0,
performer_id='alice', idx=None, rng=None, **kwargs):
"""
Given a true site model, perterb it to make a "demo" predicted site model.
Args:
site (geojson.FeatureCollection): geojson site observations
noise (float):
Magnitude of all noise. This factor modulates all other noise
values. Setting to 0 disables all perterbation. Typically 1
is the largest "reasonable" value. Setting any noise magnitude
higher than 1 may result in pathological perterbations.
warp_noise (float):
magnitude of random warp that we will use to perterb the boundary
of the true site polygons.
performer_id (str):
used in the originator property
idx (int): the index of this new site model.
If unspecified, the same site id is used in truth and pred.
rng : random seed or generator
**kwargs : other PerterbModel params
Returns:
geojson.FeatureCollection: modified site model
Example:
>>> # Demo case with a lot of noise
>>> from geowatch.demo.metrics_demo.site_perterbing import * # NOQA
>>> rng = kwarray.ensure_rng(43240830)
>>> _, sites, _ = demo_truth.random_region_model(num_observations=20, rng=rng)
>>> site = sites[0]
>>> pred_site = perterb_single_site_model(site, noise=1.0, rng=rng)
>>> # xdoctest: +REQUIRES(--show)
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwplot
>>> kwplot.autompl()
>>> # Draw our true and perterbed site model
>>> orig_site_gdf = gpd.GeoDataFrame.from_features(site)
>>> pred_site_gdf = gpd.GeoDataFrame.from_features(pred_site)
>>> fig = kwplot.figure(doclf=True, fnum=1)
>>> ax = fig.gca()
>>> total_poly = pred_site_gdf['geometry'].unary_union.union(orig_site_gdf['geometry'].unary_union)
>>> minx, miny, maxx, maxy = total_poly.bounds
>>> ax.set_xlim(minx, maxx)
>>> ax.set_ylim(miny, maxy)
>>> pred_site_gdf.plot(ax=ax, alpha=0.5, color='orange', edgecolor='purple')
>>> orig_site_gdf.plot(ax=ax, alpha=0.5, color='limegreen', edgecolor='black')
>>> orig_site_gdf.centroid.plot(ax=ax, alpha=0.5, color='green')
>>> pred_site_gdf.centroid.plot(ax=ax, alpha=0.5, color='red')
>>> kwplot.show_if_requested()
Example:
>>> # Demo case with almost zero noise
>>> from geowatch.demo.metrics_demo.site_perterbing import * # NOQA
>>> rng = kwarray.ensure_rng(43240830)
>>> _, sites, _ = demo_truth.random_region_model(num_observations=20, rng=rng)
>>> site = sites[0]
>>> pred_site = perterb_single_site_model(site, noise=1e-2, rng=rng)
>>> # xdoctest: +REQUIRES(--show)
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwplot
>>> kwplot.autompl()
>>> # Draw our true and perterbed site model
>>> orig_site_gdf = gpd.GeoDataFrame.from_features(site)
>>> pred_site_gdf = gpd.GeoDataFrame.from_features(pred_site)
>>> fig = kwplot.figure(doclf=True, fnum=2)
>>> ax = fig.gca()
>>> total_poly = pred_site_gdf['geometry'].unary_union.union(orig_site_gdf['geometry'].unary_union)
>>> minx, miny, maxx, maxy = total_poly.bounds
>>> ax.set_xlim(minx, maxx)
>>> ax.set_ylim(miny, maxy)
>>> pred_site_gdf.plot(ax=ax, alpha=0.5, color='orange', edgecolor='purple')
>>> orig_site_gdf.plot(ax=ax, alpha=0.5, color='limegreen', edgecolor='black')
>>> orig_site_gdf.centroid.plot(ax=ax, alpha=0.5, color='green')
>>> pred_site_gdf.centroid.plot(ax=ax, alpha=0.5, color='red')
>>> kwplot.show_if_requested()
"""
perterber = PerterbModel(noise=noise, warp_noise=warp_noise,
performer_id=performer_id, rng=rng, **kwargs)
pred_site = perterber.perterb_single_site(site, idx=idx)
return pred_site