import json
import geojson
import ubelt as ub
import dateutil
import datetime
from collections import defaultdict
# import pandas as pd
# import geopandas as gpd
[docs]
def default_feat():
return {
'is_occluded': 'False',
'is_site_boundary': 'True',
'type': 'observation',
'source': 'DUMMY',
'sensor_name': 'Sentinel-2',
'score': 1.0
}
[docs]
def to_date(date_str):
return dateutil.parser.parse(date_str).date()
# TODO should phase_transition_days be in a list []? It is in the delivered
# system.
[docs]
def extend(site_summary, site):
sstart_date = site_summary['properties']['start_date']
send_date = site_summary['properties']['end_date']
ssd, sed = to_date(sstart_date), to_date(send_date)
start_date = site['features'][0]['properties']['start_date']
end_date = site['features'][0]['properties']['end_date']
sd, ed = to_date(start_date), to_date(end_date)
site_id = site_summary['properties']['site_id']
if ssd < sd:
print(f'extending start of {site_id} from {sd} to {ssd}')
geom = site['features'][1]['geometry']
days = site['features'][1]['properties']['misc_info'][
'phase_transition_days']
days[0] += (sd - ssd).days
obs0_feat = {}
obs0_feat['geometry'] = geom
obs0_feat['properties'] = default_feat()
obs0_feat['properties']['current_phase'] = 'Site Preparation'
obs0_feat['properties']['observation_date'] = start_date
obs0_feat['properties']['misc_info'] = {
'phase_transition_days': [days]
}
site['features'].insert(1, obs0_feat)
site['features'][0]['properties']['start_date'] = sstart_date
if ed < sed:
print(f'extending end of {site_id} from {ed} to {sed}')
try:
phase = site['features'][-2]['properties']
except IndexError:
phase = 'Active Construction'
site['features'][-1]['properties']['current_phase'] = phase
if phase == 'Post Construction':
days = 1.0
else:
days = (sed - ed).days
site['features'][-1]['properties']['misc_info'][
'phase_transition_days'] = [days]
obsn_feat = {}
obsn_feat['geometry'] = site['features'][-1]['geometry']
obsn_feat['properties'] = default_feat()
obsn_feat['properties']['current_phase'] = 'Post Construction'
obsn_feat['properties']['observation_date'] = send_date
obsn_feat['properties']['misc_info'] = {'phase_transition_days': [1.0]}
site['features'].append(obsn_feat)
site['features'][0]['properties']['end_date'] = send_date
return site
[docs]
def extend_clone(bas_site, site):
'''
TODO does not handle phase_transition_days or enforce temporal consistency
in phases.
'''
sstart_date = bas_site['features'][0]['properties']['start_date']
send_date = bas_site['features'][0]['properties']['end_date']
ssd, sed = to_date(sstart_date), to_date(send_date)
start_date = site['features'][0]['properties']['start_date']
end_date = site['features'][0]['properties']['end_date']
sd, ed = to_date(start_date), to_date(end_date)
site_id = bas_site['features'][0]['properties']['site_id']
n_existing_feats = len(site['features']) - 1
site_feats = defaultdict(list)
for f in site['features'][1:]:
k = to_date(f['properties']['observation_date'])
site_feats[k].append(f)
bas_site_feats = defaultdict(list)
for f in bas_site['features'][1:]:
k = to_date(f['properties']['observation_date'])
bas_site_feats[k].append(f)
if ssd < sd:
print(f'extending start of {site_id} from {sd} to {ssd}')
site['features'][0]['properties']['start_date'] = sstart_date
if ed < sed:
print(f'extending end of {site_id} from {ed} to {sed}')
site['features'][0]['properties']['end_date'] = send_date
for k in bas_site_feats:
if k not in site_feats:
site_feats[k] = bas_site_feats[k]
site['features'] = (site['features'][:1] +
list(ub.flatten(ub.sorted_keys(site_feats).values())))
n_after_feats = len(site['features']) - 1
print(f'added {n_after_feats - n_existing_feats} features to {site_id}')
return site
[docs]
def create(site_summary):
geom = site_summary['geometry']
start_date = site_summary['properties']['start_date']
end_date = site_summary['properties']['end_date']
site_id = site_summary['properties']['site_id']
sd, ed = to_date(start_date), to_date(end_date)
tot_days = (ed - sd).days
mid_days = int(tot_days / 2)
end_days = tot_days - mid_days
mid_date = (sd + datetime.timedelta(days=mid_days)).isoformat()
print(f'creating {site_id} from {sd} to {ed}')
site = []
site_feat = site_summary.copy()
site_feat['properties']['type'] = 'site'
site_feat['properties']['region_id'] = '_'.join(site_id.split('_')[:-1])
site_feat['properties']['misc_info'] = {}
site.append(site_feat)
obs0_feat = {}
obs0_feat['geometry'] = geom
obs0_feat['properties'] = default_feat()
obs0_feat['properties']['current_phase'] = 'Site Preparation'
obs0_feat['properties']['observation_date'] = start_date
obs0_feat['properties']['misc_info'] = {
'phase_transition_days': [mid_days]
}
site.append(obs0_feat)
obs1_feat = {}
obs1_feat['geometry'] = geom
obs1_feat['properties'] = default_feat()
obs1_feat['properties']['current_phase'] = 'Active Construction'
obs0_feat['properties']['observation_date'] = mid_date
obs1_feat['properties']['misc_info'] = {
'phase_transition_days': [end_days]
}
site.append(obs1_feat)
obs2_feat = {}
obs2_feat['geometry'] = geom
obs2_feat['properties'] = default_feat()
obs2_feat['properties']['current_phase'] = 'Post Construction'
obs0_feat['properties']['observation_date'] = end_date
obs2_feat['properties']['misc_info'] = {'phase_transition_days': [1.0]}
site.append(obs2_feat)
return {'type': 'FeatureCollection', 'features': site}
[docs]
def main(region_models_pth,
site_models_pth,
out_region_models_pth,
out_site_models_pth,
modify_region_models_pth,
modify_site_models_pth=None,
create_missing=True,
extend_existing=True):
'''
Modifies site models based on site summaries, and saves new site models and
site summaries.
Args:
modify_site_models_pth: If given, create/extend from site models
instead of site summaries
create_missing: Create a new site model for any site summary that
doesn't have one
extend_existing: Extend the temporal duration of any site model smaller
than its site summary
Ignore:
>>> from geowatch.cli.extend_sc_sites import main
>>> main(
>>> 'sc_out_region_models',
>>> 'sc_out_site_models',
>>> 'extended_sc_out_region_models',
>>> 'extended_sc_out_site_models',
>>> 'cropped_region_models_bas',
>>> )
>>> import jsonschema
>>> import json
>>> import geowatch
>>> import ubelt as ub
>>> REGION_SCHEMA = geowatch.rc.load_region_model_schema()
>>> region_pths = ub.Path('extended_sc_out_region_models').glob('*.geojson')
>>> regions = [json.load(open(p)) for p in region_pths]
>>> for region in regions:
>>> # jsonschema.validate(region, schema=REGION_SCHEMA)
>>> pass
>>> SITE_SCHEMA = geowatch.rc.load_site_model_schema()
>>> site_pths = ub.Path('extended_sc_out_site_models').glob('*.geojson')
>>> sites = [json.load(open(p)) for p in site_pths]
>>> for site in sites:
>>> # jsonschema.validate(site, schema=SITE_SCHEMA)
>>> pass
>>> # new way to validate sites
>>> from iarpa_smart_metrics.evaluation import SiteStack
>>> for p in site_pths:
>>> s = SiteStack(p)
'''
from kwgis.utils import util_gis
mr_pths = util_gis.coerce_geojson_paths(
modify_region_models_pth)
rm_pths = util_gis.coerce_geojson_paths(region_models_pth)
ms_pths = util_gis.coerce_geojson_paths(modify_site_models_pth)
sm_pths = util_gis.coerce_geojson_paths(site_models_pth)
assert len(mr_pths) == 1, 'need 1 region -> n site models'
assert len(rm_pths) == 1, 'need 1 region -> n site models'
if len(sm_pths) < 1:
print('warning: no existing site models found')
msite_summaries = {}
for pth in mr_pths:
rm = json.load(open(pth))
for f in rm['features']:
if (f['properties']['type'] == 'site_summary'
and f['properties']['model_content'] == 'proposed'):
msite_summaries[f['properties']['site_id']] = f
site_summaries = {}
others = []
for pth in rm_pths:
rm = json.load(open(pth))
for f in rm['features']:
if (f['properties']['type'] == 'site_summary'
and f['properties']['model_content'] == 'proposed'):
site_summaries[f['properties']['site_id']] = f
else:
others.append(f)
msite_models = {}
for pth in ms_pths:
sm = json.load(open(pth))
f = sm['features'][0]
if (f['properties']['type'] == 'site'
and f['properties']['model_content'] == 'proposed'):
msite_models[f['properties']['site_id']] = sm
site_models = {}
for pth in sm_pths:
sm = json.load(open(pth))
f = sm['features'][0]
if (f['properties']['type'] == 'site'
and f['properties']['model_content'] == 'proposed'):
site_models[f['properties']['site_id']] = sm
default_summary_settings = next(iter(site_summaries.values())).copy()
for k in ['start_date', 'end_date', 'site_id']:
default_summary_settings['properties'].pop(k)
rid = others[0]['properties']['region_id']
if extend_existing:
for k in site_models:
if k in msite_models:
sm = extend_clone(msite_models[k], site_models[k])
else:
sm = extend(msite_summaries[k], site_models[k])
site_models[k] = sm
ss = site_summaries[k]
ss['start_date'] = sm['features'][0]['properties']['start_date']
ss['end_date'] = sm['features'][0]['properties']['end_date']
site_summaries[k] = ss
if create_missing:
for k in msite_summaries:
if k not in site_models:
if k in msite_models:
site_models[k] = msite_models[k]
else:
sm = create(msite_summaries[k])
ss = default_summary_settings.copy()
ss['site_id'] = k
ss['start_date'] = sm['features'][0]['properties'][
'start_date']
ss['end_date'] = sm['features'][0]['properties']['end_date']
site_models[k] = sm
site_summaries[k] = ss
new_rm = geojson.FeatureCollection(others + list(site_summaries.values()))
with open(
ub.Path(out_region_models_pth).ensuredir() / f'{rid}.geojson',
'w') as f:
json.dump(new_rm, f, indent=2)
for sm in site_models.values():
sid = sm['features'][0]['properties']['site_id']
with open(
ub.Path(out_site_models_pth).ensuredir() / f'{sid}.geojson',
'w') as f:
json.dump(sm, f, indent=2)