"""
Utilities for geopandas and other geographic information system tools
"""
import ubelt as ub
import numpy as np
import json
import os
[docs]
def plot_geo_background():
import kwplot
import geopandas as gpd
kwplot.autompl()
# Broken in geopandas 1.x
wld_map_gdf = gpd.read_file(
gpd.datasets.get_path('naturalearth_lowres')
).to_crs('OGC:CRS84')
ax = wld_map_gdf.plot()
return ax
[docs]
def geopandas_pairwise_overlaps(gdf1, gdf2, locator='iloc', predicate='intersects'):
"""
Find pairwise relationships between each geometries
Args:
gdf1 (GeoDataFrame): query geo data
gdf2 (GeoDataFrame): database geo data (builds spatial index)
locator (str):
either iloc for positional indexes or loc for pandas indexes.
predicate (str, default='intersects'): a DE-9IM [1] predicate.
(e.g. if intersection finds intersections between geometries)
References:
.. [DE_9IM] https://en.wikipedia.org/wiki/DE-9IM
TODO:
- [ ] This can move to geowatch.utils
Returns:
dict:
mapping from integer-iloc-indexes in gdf1 to
overlapping integer-iloc-indexes in gdf2
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import geopandas as gpd
>>> import geodatasets
>>> geodatasets.get_path('GeoDa AirBnB')
>>> gdf1 = gpd.read_file(geodatasets.get_path('GeoDa milwaukee1'))
>>> gdf2 = gpd.read_file(geodatasets.get_path('GeoDa milwaukee2'))
>>> #gdf1 = gpd.read_file(
>>> # gpd.datasets.get_path('naturalearth_lowres')
>>> #)
>>> #gdf2 = gpd.read_file(
>>> # gpd.datasets.get_path('naturalearth_cities')
>>> #)
>>> #gdf1 = gdf1.set_index('name')
>>> #gdf2 = gdf2.set_index('name')
>>> idx_mapping = geopandas_pairwise_overlaps(gdf1, gdf2, locator='iloc')
>>> id_mapping = geopandas_pairwise_overlaps(gdf1, gdf2, locator='loc')
Benchmark:
import timerit
ti = timerit.Timerit(10, bestof=3, verbose=2)
for timer in ti.reset('with sindex O(N * log(M))'):
with timer:
fast_mapping = geopandas_pairwise_overlaps(gdf1, gdf2)
for timer in ti.reset('without sindex O(N * M)'):
with timer:
from collections import defaultdict
slow_mapping = defaultdict(list)
for idx1, geom1 in enumerate(gdf1.geometry):
slow_mapping[idx1] = []
for idx2, geom2 in enumerate(gdf2.geometry):
if geom1.intersects(geom2):
slow_mapping[idx1].append(idx2)
# check they are the same
assert set(slow_mapping) == set(fast_mapping)
for idx1 in slow_mapping:
slow_idx2s = slow_mapping[idx1]
fast_idx2s = fast_mapping[idx1]
assert sorted(fast_idx2s) == sorted(slow_idx2s)
"""
assert gdf1.index.is_unique, 'GeoDataFrame indexes must be unique'
assert gdf2.index.is_unique, 'GeoDataFrame indexes must be unique'
# Construct the spatial index (requires pygeos and/or rtree)
sindex2 = gdf2.sindex
# For each query polygon, lookup intersecting polygons in the spatial index
idx1_to_idxs2 = {}
for idx1, (rowid, row1) in enumerate(gdf1.iterrows()):
idxs2 = sindex2.query(row1.geometry, predicate=predicate)
# Record result indexes that "match" given the geometric predicate
idx1_to_idxs2[idx1] = idxs2
if locator == 'iloc':
return idx1_to_idxs2
elif locator == 'loc':
_idxs1 = list(idx1_to_idxs2.keys())
_nest_idxs2 = list(idx1_to_idxs2.values())
id1_to_ids2 = {}
for idx1, idxs2 in idx1_to_idxs2.items():
id1 = gdf1.index[idx1]
ids2 = gdf2.index[idxs2]
id1_to_ids2[id1] = ids2
if 0:
import kwarray
_flat_idxs2 = np.concatenate(_nest_idxs2, axis=0)
_flat_ids2 = gdf2.index[_flat_idxs2]
indexer = kwarray.FlatIndexer.fromlist(_nest_idxs2)
_a = np.arange(len(indexer))
outer, inner = indexer.unravel(_a)
_ids1 = gdf1.index[_idxs1]
_ids2 = list(kwarray.group_items(_flat_ids2.values, outer).values())
list(map(len, _ids2))[0:10], list(map(len, _nest_idxs2))[0:10]
list(map(len, _ids2))[-10:], list(map(len, _nest_idxs2))[-10:]
list(map(len, _ids2))[0:10] == list(map(len, _nest_idxs2))[0:10]
list(map(len, _ids2))[-10:] == list(map(len, _nest_idxs2))[-10:]
np.vstack([[list(map(len, _ids2))], [list(map(len, _nest_idxs2))]])
list(map(len, _ids2)) == list(map(len, _nest_idxs2))
_id1_to_ids2 = ub.dzip(_ids1, _ids2)
assert _id1_to_ids2 == id1_to_ids2
return id1_to_ids2
else:
raise KeyError(locator)
return idx1_to_idxs2
[docs]
def latlon_text(lat, lon, precision=6):
"""
Make a lat,lon string suitable for a filename.
Pads with leading zeros so file names will align nicely at the same level
of prcision.
Args:
lat (float): degrees latitude
lon (float): degrees longitude
precision (float, default=6):
Number of trailing decimal places. As rule of thumb set this to:
6 - for ~10cm accuracy,
5 - for ~1m accuracy,
2 - for ~1km accuracy,
TODO:
- [ ] This can move to geowatch.utils
Notes:
1 degree of latitude is *very* roughly the order of 100km, so the
default precision of 6 localizes down to ~0.1 meters, which will
usually be sufficient for satellite applications, but be mindful of
using this text in applications that require more precision. Note 1
degree of longitude will vary, but will always be at least as precise
as 1 degree of latitude.
Example:
>>> lat = 90
>>> lon = 180
>>> print(latlon_text(lat, lon))
N90.000000E180.000000
>>> lat = 0
>>> lon = 0
>>> print(latlon_text(lat, lon))
N00.000000E000.000000
Example:
>>> print(latlon_text(80.123, 170.123))
>>> print(latlon_text(10.123, 80.123))
>>> print(latlon_text(0.123, 0.123))
N80.123000E170.123000
N10.123000E080.123000
N00.123000E000.123000
>>> print(latlon_text(80.123, 170.123, precision=2))
>>> print(latlon_text(10.123, 80.123, precision=2))
>>> print(latlon_text(0.123, 0.123, precision=2))
N80.12E170.12
N10.12E080.12
N00.12E000.12
>>> print(latlon_text(80.123, 170.123, precision=5))
>>> print(latlon_text(10.123, 80.123, precision=5))
>>> print(latlon_text(0.123, 0.123, precision=5))
N80.12300E170.12300
N10.12300E080.12300
N00.12300E000.12300
"""
def _build_float_precision_fmt(num_leading, num_trailing):
num2 = num_trailing
# 2 extra for radix and leading sign
num1 = num_leading + num_trailing + 2
fmtparts = ['{:+0', str(num1), '.', str(num2), 'F}']
fmtstr = ''.join(fmtparts)
return fmtstr
assert -90 <= lat <= 90, 'invalid lat'
assert -180 <= lon <= 180, 'invalid lon'
# Ensure latitude had 2 leading places and longitude has 3
latfmt = _build_float_precision_fmt(2, precision)
lonfmt = _build_float_precision_fmt(3, precision)
lat_str = latfmt.format(lat).replace('+', 'N').replace('-', 'S')
lon_str = lonfmt.format(lon).replace('+', 'E').replace('-', 'W')
text = lat_str + lon_str
return text
[docs]
def demo_regions_geojson_text():
import ubelt as ub
geojson_text = ub.codeblock(
'''
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {"type": "region", "region_model_id": "US_Jacksonville_R01", "version": "1.0.1", "mgrs": "17RMP", "start_date": "2009-05-09", "end_date": "2020-01-26" },
"geometry": {"type": "Polygon", "coordinates": [[[-81.6953, 30.3652], [-81.6942, 30.2984], [-81.5975, 30.2992], [-81.5968, 30.3667], [-81.6953, 30.3652]]]}
},
{
"type": "Feature",
"properties": {"type": "site", "site_id": "17RMP_US_Jacksonville_R01_0000", "start_date": "2016-02-14", "end_date": "2017-11-01"},
"geometry": {"type": "Polygon", "coordinates": [[[-81.6364, 30.3209], [-81.6364, 30.3236], [-81.6397, 30.3236], [-81.6397, 30.3209], [-81.6364, 30.3209]]]}
},
{
"type": "Feature",
"properties": {"type": "site", "site_id": "17RMP_US_Jacksonville_R01_0001", "start_date": "2016-07-13", "end_date": "2020-01-26" },
"geometry": {"type": "Polygon", "coordinates": [[[-81.6085, 30.3568], [-81.6085, 30.3600], [-81.6120, 30.3600], [-81.6120, 30.3568], [-81.6085, 30.3568]]]}
}
]
}
''')
return geojson_text
[docs]
def load_geojson(file, default_axis_mapping='OAMS_TRADITIONAL_GIS_ORDER'):
"""
Args:
file (str | file): path or file object containing geojson data.
axis_mapping (str, default='OAMS_TRADITIONAL_GIS_ORDER'):
The axis-ordering of the geojson file on disk. This is assumed to
be traditional ordering by default according to the geojson spec.
Returns:
GeoDataFrame : a dataframe with geo info.
Note: geopandas always stores data in traditional XY, although its
CRS does seem to hold axis order info?
# OLD: This will ALWAYS return
# with an OAMS_AUTHORITY_COMPLIANT wgs84 crs (i.e. lat,lon) even
# though the on disk order is should be OAMS_TRADITIONAL_GIS_ORDER.
References:
https://geopandas.org/docs/user_guide/projections.html#the-axis-order-of-a-crs
Example:
>>> import io
>>> from geowatch.utils.util_gis import * # NOQA
>>> geojson_text = demo_regions_geojson_text()
>>> file = io.StringIO()
>>> file.write(geojson_text)
>>> file.seek(0)
>>> region_df = load_geojson(file)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> import geopandas as gpd
>>> kwplot.autompl()
>>> wld_map_gdf = gpd.read_file(
>>> gpd.datasets.get_path('naturalearth_lowres')
>>> ).to_crs('OGC:CRS84')
>>> ax = wld_map_gdf.plot()
>>> region_df.plot(ax=ax, edgecolor='orange', alpha=0.8)
>>> # https://gis.stackexchange.com/questions/372564/userwarning-when-trying-to-get-centroid-from-a-polygon-geopandas
>>> centroid = region_df.to_crs('+proj=cea').centroid.to_crs(region_df.crs)
>>> centroid.plot(ax=ax, edgecolor='orange', alpha=0.8)
print('region_df.crs = {!r}'.format(region_df.crs))
print('wld_map_gdf.crs = {!r}'.format(wld_map_gdf.crs))
"""
import geopandas as gpd
valid_axis_mappings = {
'OAMS_TRADITIONAL_GIS_ORDER',
'OAMS_AUTHORITY_COMPLIANT',
}
if default_axis_mapping not in valid_axis_mappings:
raise Exception
# Read custom ROI regions
region_df = gpd.read_file(file)
# TODO: can we construct a pyproj.CRS from wgs84, but with traditional
# order?
# import pyproj
# wgs84 = pyproj.CRS.from_epsg(4326)
# z = pyproj.Transformer.from_crs(4326, 4326, always_xy=True)
# crs1 = region_df.crs
# pyproj.CRS.from_dict(crs1.to_json_dict())
# z = region_df.crs
# z.to_json_dict()
# Use a CRS that actually reflects the underlying data
if default_axis_mapping == 'OAMS_TRADITIONAL_GIS_ORDER':
crs84 = get_crs84()
# this is much faster and the only reason this is ok is because the
# input is xy-wgs84 so the transform (which is slow) would be a noop
region_df._crs = crs84
# region_df = region_df.to_crs(crs84)
else:
raise NotImplementedError('geopandas only deals with traditional lon/lat')
return region_df
read_geojson = load_geojson # backwards compat
[docs]
@ub.memoize
def get_crs84():
"""
Constructing the CRS84 is slow.
This function memoizes it so it only happens once.
Returns:
pyproj.crs.crs.CRS
Example:
>>> get_crs84()
"""
from pyproj import CRS
crs84 = CRS.from_user_input('OGC:CRS84')
return crs84
_get_crs84 = get_crs84
def _flip(x, y):
return (y, x)
[docs]
def shapely_flip_xy(geom):
from shapely import ops
return ops.transform(_flip, geom)
[docs]
def project_gdf_to_local_utm(gdf_crs84, max_utm_zones=1, mode=0,
tolerance=None):
"""
Find the local UTM zone for a geo data frame and project to it.
All geometry in the GDF must be in the same UTM zone.
Args:
gdf_crs84 (geopandas.GeoDataFrame):
The data with CRS-84 geometry to project into a local UTM
max_utm_zones (int):
If the data spans more than this many UTM zones, error.
Otherwise, we take the first one.
mode (int):
which version of the logic to use. Default is 0, which is the
original. Also have version 1 which is experimental and maybe
faster.
tolerance (int):
if the projected region is further than this many meters from the
center of the estimated UTM zone, raise a
:class:`pyproj.exceptions.CRSError` error.
Only used in mode 1.
Returns:
geopandas.GeoDataFrame:
The a copy of the input dataframe with projected geometry.
Notes:
Assumes input geometry is in CRS-84.
If an empty dataframe is given as input, it is returned as-is. No copy
is made.
Example:
>>> import geopandas as gpd
>>> from geowatch.utils.util_gis import * # NOQA
>>> import kwarray
>>> import kwimage
>>> rng = kwarray.ensure_rng(0)
>>> # Gen lat/lons between 0 and 1, which is in UTM zone 31N
>>> gdf_crs84 = gpd.GeoDataFrame({'geometry': [
>>> kwimage.Polygon.random(rng=rng).to_shapely(),
>>> kwimage.Polygon.random(rng=rng).to_shapely(),
>>> kwimage.Polygon.random(rng=rng).to_shapely(),
>>> ]}, crs='OGC:CRS84')
>>> gdf_utm = project_gdf_to_local_utm(gdf_crs84)
>>> assert gdf_utm.crs.name == 'WGS 84 / UTM zone 31N'
Example:
>>> import geopandas as gpd
>>> from geowatch.utils.util_gis import * # NOQA
>>> import kwarray
>>> import kwimage
>>> # If the data is too big for a single UTM zone,
>>> rng = kwarray.ensure_rng(0)
>>> gdf_crs84 = gpd.GeoDataFrame({'geometry': [
>>> kwimage.Polygon.random(rng=rng).scale(90).to_shapely(),
>>> kwimage.Polygon.random(rng=rng).scale(90).to_shapely(),
>>> kwimage.Polygon.random(rng=rng).scale(90).to_shapely(),
>>> ]}, crs='OGC:CRS84')
>>> import pytest
>>> with pytest.raises(ValueError):
>>> gdf_utm = project_gdf_to_local_utm(gdf_crs84)
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import geopandas as gpd
>>> import kwarray
>>> import kwimage
>>> # Data convers a lot of utm zones
>>> rng = kwarray.ensure_rng(0)
>>> gdf_crs84 = gpd.GeoDataFrame({'geometry': [
>>> kwimage.Polygon.random(rng=rng).scale(2).translate((-1, -1)).scale(0.001, about='centroid').scale((180, 90)).to_shapely()
>>> for _ in range(5)
>>> ]}, crs='OGC:CRS84')
>>> # Mode 1 uses a tolerance test instead
>>> gdf_utm = project_gdf_to_local_utm(gdf_crs84, mode=1, tolerance=float('inf'))
>>> import pytest
>>> import pyproj
>>> with pytest.raises(pyproj.exceptions.CRSError):
>>> gdf_utm = project_gdf_to_local_utm(gdf_crs84, mode=1, tolerance=10000)
>>> # Mode 1 uses the bounds of the entire region
# TODO: Gracefully handle cases where the UTM zones are different
# but all neighbors. Find a good example of this.
# Example:
# >>> import geopandas as gpd
# >>> from geowatch.utils.util_gis import * # NOQA
# >>> import kwarray
# >>> # If the data is too big for a single UTM zone,
# >>> rng = kwarray.ensure_rng(0)
# >>> # Corner case in Madagascar where the extent isn't too big, but it
# >>> # spans multiple UTM zones
# >>> poly = kwimage.Polygon.coerce({
# >>> "type": "Polygon",
# >>> "coordinates": [
# >>> [[-73.77200379967688, 42.864783745778894],
# >>> [-73.77177715301514, 42.86412514733195],
# >>> [-73.77110660076141, 42.8641654498268],
# >>> [-73.77105563879013, 42.86423720786224],
# >>> [-73.7710489332676, 42.864399400374786],
# >>> [-73.77134531736374, 42.8649134986743],
# >>> [ -73.77200379967688, 42.864783745778894]]]
# >>> })
# >>> gdf_crs84 = gpd.GeoDataFrame({
# >>> 'geometry': [poly.to_shapely()]}, crs='OGC:CRS84')
"""
# if gdf_crs84.crs.name != 'WGS 84 (CRS84)':
# raise AssertionError('expected CRS-84 input')
# try:
# gdf_crs84.estimate_utm_crs()
# except Exception:
if len(gdf_crs84) == 0:
return gdf_crs84
if mode == 0:
epsg_zones = []
for geom_crs84 in gdf_crs84.geometry:
epsg_zone = find_local_meter_epsg_crs(geom_crs84)
epsg_zones.append(epsg_zone)
if not ub.allsame(epsg_zones):
unique_utm = set(epsg_zones)
if len(unique_utm) > max_utm_zones:
raise ValueError(ub.paragraph(
'''
Input data spanned multiple UTM zones.
This is currently not allowed. {}
'''
).format(unique_utm))
# TODO: if there are more than one is there a way to get "the best one?"
epsg_zone = epsg_zones[0]
gdf_utm = gdf_crs84.to_crs(epsg_zone)
elif mode == 1:
utm_crs = gdf_crs84.estimate_utm_crs()
gdf_utm = gdf_crs84.to_crs(utm_crs)
if tolerance is not None:
# not sure what a good default is here.
import pyproj
# Quick check first, then expensive check
max_dist1 = np.abs(gdf_utm.bounds.values).max()
if max_dist1 > tolerance:
for geom in gdf_utm.geometry:
xys = np.array(geom.exterior.xy).T
max_dist = np.abs(xys).max()
if max_dist > tolerance:
if 1:
epsg_zones = []
for geom_crs84 in gdf_crs84.geometry:
epsg_zone = find_local_meter_epsg_crs(geom_crs84)
epsg_zones.append(epsg_zone)
print(f'epsg_zones={epsg_zones}')
raise pyproj.exceptions.CRSError(f'Projection distance={max_dist} is greater than tolerance={tolerance}')
else:
raise NotImplementedError(mode)
return gdf_utm
[docs]
def utm_epsg_from_latlon(lat, lon):
"""
Find a reasonable UTM CRS for a given lat / lon
The purpose of this function is to get a reasonable CRS for computing
distances in meters. If the region of interest is very large, this may not
be valid.
Args:
lat (float): degrees in latitude
lon (float): degrees in longitude
Returns:
int : the ESPG code of the UTM zone
References:
https://gis.stackexchange.com/questions/190198/how-to-get-appropriate-crs-for-a-position-specified-in-lat-lon-coordinates
https://gis.stackexchange.com/questions/365584/convert-utm-zone-into-epsg-code
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> epsg_code = utm_epsg_from_latlon(0, 0)
>>> print('epsg_code = {!r}'.format(epsg_code))
epsg_code = 32631
"""
import utm
# easting, northing, zone_num, zone_code = utm.from_latlon(min_lat, min_lon)
zone_num = utm.latlon_to_zone_number(lat, lon)
# Construction of EPSG code from UTM zone number
south = lat < 0
epsg_code = 32600
epsg_code += int(zone_num)
if south is True:
epsg_code += 100
return epsg_code
[docs]
class UTM_TransformContext:
"""
Helper to project into UTM space, perform some transform, and then project
back to the original CRS.
Currently only supports CRS84
Example:
>>> import kwimage
>>> from geowatch.utils.util_gis import * # NOQA
>>> data_crs84 = kwimage.Polygon.random()
>>> with UTM_TransformContext(data_crs84) as self:
>>> orig_utm_poly = kwimage.Polygon.coerce(self.geoms_utm.iloc[0])
>>> new_utm_poly = orig_utm_poly.scale(2, about='center')
>>> self.finalize(new_utm_poly)
>>> final_result = kwimage.Polygon.coerce(self.final_geoms_crs84.iloc[0])
>>> naive_result = data_crs84.scale(2, about='center')
>>> # Note the subtle difference in the naive vs context result
>>> print(f'final_result={final_result}')
>>> print(f'naive_result={naive_result}')
"""
def __init__(self, data_crs84):
"""
Args:
data_crs84 (Coercible[GeoSeries]):
something we know how to transform into a GeoSeries
"""
from geowatch.utils import util_gis
self.crs84 = util_gis.get_crs84()
self.geoms_crs84 = self._coerce_geo_series(data_crs84, self.crs84)
self.crs_utm = None
self.gdf_utm = None
self.final_geoms_utm = None
self.final_geoms_crs84 = None
def __enter__(self):
from geowatch.utils import util_gis
import geopandas as gpd
gdf_crs84 = gpd.GeoDataFrame(geometry=self.geoms_crs84)
gdf_utm = util_gis.project_gdf_to_local_utm(gdf_crs84)
self.geoms_utm = gdf_utm.geometry
self.crs_utm = self.geoms_utm.crs
return self
def _coerce_geo_series(self, data, default_crs=None):
import shapely
import geopandas as gpd
import kwimage
if isinstance(data, list):
geoms = gpd.GeoSeries(data, crs=default_crs)
else:
if isinstance(data, shapely.geometry.base.BaseGeometry):
geoms = gpd.GeoSeries([data], crs=default_crs)
elif isinstance(data, kwimage.Polygon):
geoms = gpd.GeoSeries([data.to_shapely()], crs=default_crs)
elif isinstance(data, gpd.GeoDataFrame):
geoms = data.geometry
elif isinstance(data, gpd.GeoSeries):
geoms = data
else:
raise TypeError(type(data))
return geoms
[docs]
def finalize(self, final_utm):
"""
Args:
final_utm (Coercible[GeoSeries]):
something coercable to geometry in UTM coordinates
"""
final_geoms_utm = self._coerce_geo_series(
final_utm, default_crs=self.crs_utm)
self.final_geoms_utm = final_geoms_utm
def __exit__(self, a, b, c):
if self.final_geoms_utm is None:
raise RuntimeError('Need to call finalize')
self.final_geoms_crs84 = self.final_geoms_utm.to_crs(self.crs84)
def _demo_convert_latlon_to_utm():
# Pretend we have these lon/lat (CRS84 corners)
lat_y = 40.060759
lon_x = 116.613095
lat_y_off = 0.0001
lat_x_off = 0.0001
# hard code so north is up
wld_corners = np.array([
[lon_x - lat_x_off, lat_y + lat_y_off],
[lon_x - lat_x_off, lat_y - lat_y_off],
[lon_x + lat_x_off, lat_y - lat_y_off],
[lon_x + lat_x_off, lat_y + lat_y_off],
])
import kwimage
wld_poly = kwimage.Polygon(exterior=wld_corners)
wld_poly_sh = wld_poly.to_shapely()
lon = wld_poly_sh.centroid.x
lat = wld_poly_sh.centroid.y
utm_code = utm_epsg_from_latlon(lat, lon)
import geopandas as gpd
gdf_crs84 = gpd.GeoDataFrame({'geometry': [wld_poly_sh]}, crs='OGC:CRS84')
gdf_utm = gdf_crs84.to_crs(utm_code)
utm_poly_sh = gdf_utm.iloc[0]['geometry']
utm_poly = kwimage.Polygon.from_shapely(utm_poly_sh)
utm_corners = utm_poly.data['exterior'].data
min_x = utm_corners.T[0].min()
max_x = utm_corners.T[0].max()
min_y = utm_corners.T[1].min()
max_y = utm_corners.T[1].max()
# Note: UTM bottom should be the min value, so be careful here
def _torch_meshgrid(*basis_dims):
"""
References:
https://zhaoyu.li/post/how-to-implement-meshgrid-in-pytorch/
"""
basis_lens = list(map(len, basis_dims))
new_dims = []
for i, basis in enumerate(basis_dims):
# Probably a more efficent way to do this, but its right
newshape = [1] * len(basis_dims)
reps = list(basis_lens)
newshape[i] = -1
reps[i] = 1
dd = basis.view(*newshape).repeat(*reps)
new_dims.append(dd)
return new_dims
import torch
image_width = 256
image_height = 256
utm_x_basis = torch.linspace(min_x, max_x, image_width)
utm_y_basis = torch.linspace(max_y, min_y, image_height)
xgrid, ygrid = _torch_meshgrid(utm_x_basis, utm_y_basis)
[docs]
def find_local_meter_epsg_crs(geom_crs84):
"""
Find the "best" meter based CRS for a smallish geographic region.
Currently this only returns UTM zones. Might be better to return an Albers
projection if the geometry spans more than one UTM zone.
Args:
geom_crs84 (shapely.geometry.base.BaseGeometry):
shapely geometry in CRS84 (lon/lat wgs84)
Returns:
int: epsg code
References:
[1] https://gis.stackexchange.com/questions/148181/choosing-projection-crs-for-short-distance-based-analysis/148187
[2] http://projfinder.com/
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import kwimage
>>> geom_crs84 = kwimage.Polygon.random().translate(-0.5).scale((180, 90)).to_shapely()
>>> epsg_zone = find_local_meter_epsg_crs(geom_crs84)
SeeAlso:
GeoDataFrame.estimate_utm_crs
TODO:
- [ ] Albers?
- [ ] Better UTM zone intersection
- [ ] Fix edge cases
"""
import geopandas as gpd
utm_crs = gpd.array.from_shapely([geom_crs84], 'OGC:CRS84').estimate_utm_crs()
epsg_zone = utm_crs.to_epsg()
return epsg_zone
[docs]
def check_latlons(lat, lon):
"""
Quick check to see if latitudes and longitudes are valid.
Longitude (x) is always between -180 and 180 (degrees east)
Latitude (y) is always between -90 and 90 (degrees north)
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import pytest
>>> assert not check_latlons(1000, 1000)
>>> assert check_latlons(0, 0)
"""
lat = np.asarray(lat)
lon = np.asarray(lon)
bad_lat = (lat < -90).any() or (lat > 90).any()
bad_lon = (lon < -180).any() or (lon > 180).any()
in_ranges = not (bad_lon or bad_lat)
return in_ranges
[docs]
def coerce_geojson_datas(arg, format='dataframe', allow_raw=False, workers=0,
mode='thread', verbose=1, desc=None, parse_float=None):
"""
Attempts to resolve an argument into multiple geojson datas.
Multiple threads / processes are used to load the specified information and
the function generates dictionaries of information containing the file path
and the loaded data as they become available.
The argument can be:
1. A path to a geojson file (or a list of them)
2. A glob string specifying multiple geojson files (or a list of them)
3. A path to a json manifest file.
4. If allow_raw is True, then the input can be a raw json string,
dict, or GeoDataFrame.
Args:
arg (str | PathLike | List[str | PathLike]):
an argument that is coerceable to one or more GeoDataFrames.
format (str):
Indicates the returned format of the data. Can be 'dataframe' where
the 'data' key will be a GeoDataFrame, or 'json' where the raw json
data will be returned.
allow_raw (bool):
if True, we will also check if the arguments are raw json /
geopandas data that can be loaded. In general try not to enable
this.
workers (int):
number of io workers
mode (str):
concurrent executor mode. Can be 'serial', 'thread', or 'process'.
desc (str):
custom message for progress bar.
Yields:
List[Dict[str, Any | GeoDataFrame | Dict]]:
A list of dictionaries formated with the keys:
* fpath (str): the file path the data was loaded from (
if applicable)
* data (GeoDataFrame | dict):
the data loaded in the requested format
SeeAlso:
* load_site_or_region_dataframes - the function that does the loading
after the arguments are coerced.
Example:
>>> # xdoctest: +SKIP("failing on CI. unsure why")
>>> from geowatch.utils.util_gis import * # NOQA
>>> from geowatch.demo.metrics_demo import generate_demodata
>>> info1 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R001')
>>> info2 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R002')
>>> info3 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R003')
>>> info4 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R012')
>>> info5 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R022')
>>> #
>>> region_fpaths = sorted(info1['true_region_dpath'].glob('*.geojson'))
>>> site_fpaths = sorted(info1['true_site_dpath'].glob('*.geojson'))
>>> #
>>> import json
>>> manifest_fpath1 = info1['output_dpath'] / 'demo_manifest1.json'
>>> manifest_fpath2 = info1['output_dpath'] / 'demo_manifest2.json'
>>> manifest_data1 = {
>>> 'files': [str(p) for p in region_fpaths[0:2]]
>>> }
>>> manifest_data2 = {
>>> 'files': [str(p) for p in region_fpaths[3:4]]
>>> }
>>> manifest_fpath1.write_text(json.dumps(manifest_data1))
>>> manifest_fpath2.write_text(json.dumps(manifest_data2))
>>> variants = []
>>> # ==========
>>> # Test Cases
>>> # ==========
>>> #
>>> # List of region files
>>> arg = region_fpaths
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 5
>>> #
>>> # Glob for region files
>>> arg = str(info1['true_region_dpath']) + '/*R*2*.geojson'
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 3
>>> #
>>> # Manifest file
>>> arg = manifest_fpath1
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 2
>>> #
>>> # Manifest file glob
>>> arg = str(manifest_fpath1.parent / '*.json')
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 3
>>> #
>>> # Manifest file glob and a region path
>>> arg = [manifest_fpath2, region_fpaths[0]]
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 2
>>> #
>>> # Site glob and a manifest glob
>>> arg = [str(info1['true_site_dpath']) + '/DR_R002_*.geojson',
... str(manifest_fpath1 + '*')]
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 9
>>> #
>>> # Site directory and manifest file.
>>> arg = [str(info1['true_site_dpath']),
... str(manifest_fpath1 + '*')]
>>> result = list(coerce_geojson_datas(arg))
>>> assert len(result) == 31
>>> # Test raw loading and format swapping
>>> from geowatch.utils import util_gis
>>> arg = util_gis.demo_regions_geojson_text()
>>> result1 = list(coerce_geojson_datas(arg, allow_raw=False))
>>> assert len(result1) == 0
>>> result2 = list(coerce_geojson_datas(arg, allow_raw=True))
>>> assert len(result2) == 1
>>> arg = result2
>>> result3 = list(coerce_geojson_datas(arg, format='dataframe', allow_raw=True))
>>> assert result3 == result2
>>> result4 = list(coerce_geojson_datas(arg, format='json', allow_raw=True))
>>> assert isinstance(result4[0]['data'], dict)
>>> result5 = list(coerce_geojson_datas(
>>> result4, format='dataframe', allow_raw=True))
>>> assert isinstance(result4[0]['data'], dict)
>>> #
>>> # Test nothing case
>>> assert len(list(coerce_geojson_datas([], allow_raw=True))) == 0
"""
if format not in {'json', 'dataframe'}:
raise KeyError(format)
def is_iterable_sequence(item, strok=False):
"""
Check if it is a non-string, non-mapping type of iterable.
"""
if ub.iterable(item, strok=strok):
if hasattr(item, 'keys'):
return False
else:
return True
else:
return False
if arg is None:
return
if allow_raw:
# Normally the function assumes we are only inputing things that are
# coercable to paths, and then to geojson. But sometimes we might want
# to pass around that data directly. In this case, grab those items
# first, and then resolve the rest of them.
if is_iterable_sequence(arg):
iterable_arg = arg
else:
iterable_arg = [arg]
raw_items = []
other_items = []
for item in iterable_arg:
try:
was_raw, item = _coerce_raw_geojson(item, format)
except TypeError:
print(f'iterable_arg={iterable_arg}')
print(f'arg={arg}')
raise
if was_raw:
raw_items.append(item)
else:
other_items.append(item)
path_coercable = other_items
else:
path_coercable = arg
# Handle the normal better-defined case of coercing arguments into paths
geojson_fpaths = coerce_geojson_paths(path_coercable)
if allow_raw:
if verbose:
if raw_items or len(geojson_fpaths) == 0:
print(f'Coerced {len(raw_items)} raw geojson item')
if raw_items:
if len(geojson_fpaths) == 0:
# Disable path verbosity if there were raw items, but no
# paths.
verbose = 0
# Now all of resolved accumulator items should be geojson files.
# Submit the data to be loaded.
geojson_fpaths = list(ub.unique(geojson_fpaths))
data_gen = load_geojson_datas(
geojson_fpaths, workers=workers, mode=mode, desc=desc,
format=format, verbose=verbose, yield_after_submit=True,
parse_float=parse_float)
# Start the background workers
next(data_gen)
if allow_raw:
# yield the raw data before the generated data
yield from raw_items
# Finish the main generator
yield from data_gen
[docs]
def coerce_geojson_paths(data, return_manifests=False):
"""
Resolves the argument to a list of geojson paths.
Args:
data (Any): argument to coerce.
The argument can be a full path, a glob string, a path to a
manifest file or any combination of the previous in a list.
If it is a manifest file. It must be a json/yaml file with a
top-level "files" key that points to a list of the actual file
paths.
return_manifests (bool): if True additionally returns paths to
any intermediate manifest files.
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import json
>>> from geowatch.demo.metrics_demo import generate_demodata
>>> # Setup a bunch of geojson files
>>> outdir = ub.Path.appdir("geowatch/tests/gis/coerce_geojson_v1")
>>> # outdir.delete().ensuredir()
>>> info1 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R001', outdir=outdir)
>>> info2 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R002', outdir=outdir)
>>> info3 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R003', outdir=outdir)
>>> info4 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R012', outdir=outdir)
>>> info5 = generate_demodata.generate_demo_metrics_framework_data(roi='DR_R022', outdir=outdir)
>>> region_fpaths = sorted(info1['true_region_dpath'].glob('*.geojson'))
>>> site_fpaths = sorted(info1['true_site_dpath'].glob('*.geojson'))
>>> manifest_fpath1 = info1['output_dpath'] / 'demo_manifest1.json'
>>> manifest_data1 = {'files': [str(p) for p in region_fpaths[0:2]]}
>>> geojson_dpath = info1['true_site_dpath']
>>> manifest_fpath1.write_text(json.dumps(manifest_data1))
>>> # Test manifest case
>>> geojson_fpaths = coerce_geojson_paths(manifest_fpath1)
>>> assert len(geojson_fpaths) == 2
>>> # Test directory case
>>> geojson_fpaths = coerce_geojson_paths(geojson_dpath)
>>> assert len(geojson_fpaths) == 15, 'maybe cache is bad?'
>>> # Test glob case
>>> geojson_fpaths = coerce_geojson_paths(geojson_dpath / '*R001_*')
>>> assert len(geojson_fpaths) == 3
>>> # Test list of files and globstr
>>> data = [geojson_dpath / '*R002_*'] + geojson_fpaths
>>> geojson_fpaths = coerce_geojson_paths(data)
>>> assert len(geojson_fpaths) == 6
>>> # Test manifest case2
>>> info = coerce_geojson_paths(manifest_fpath1, return_manifests=True)
>>> assert len(info['manifest_fpaths']) == 1
>>> assert len(info['geojson_fpaths']) == 2
"""
from kwutil import util_yaml
from kwutil import util_path
paths = util_path.coerce_patterned_paths(data, '.geojson')
geojson_fpaths = []
manifest_fpaths = []
for p in paths:
resolved = None
if isinstance(p, (str, os.PathLike)):
peeked = None
if str(p).lower().endswith('.json'):
# Check to see if this is a manifest file
peeked = json.loads(p.read_text())
elif str(p).lower().endswith(('.yaml', '.yml')):
# Check to see if this is a manifest file
peeked = util_yaml.Yaml.loads(p.read_text())
if peeked is not None:
if isinstance(peeked, dict) and 'files' in peeked:
manifest_fpaths.append(p)
resolved = list(map(ub.Path, peeked['files']))
if resolved is None:
resolved = [p]
if resolved is None:
raise ValueError('Unable to resolve p={p!r}')
geojson_fpaths.extend(resolved)
if return_manifests:
return {
'manifest_fpaths': manifest_fpaths,
'geojson_fpaths': geojson_fpaths,
}
else:
return geojson_fpaths
def _coerce_raw_geojson(item, format):
"""
Helper for the coerce method
"""
import geopandas as gpd
was_raw = False
if isinstance(item, dict):
# Allow the item to be a wrapped dict returned by this func
was_raw = True
if set(item.keys()) == {'fpath', 'data', 'format'}:
item = item['data']
if isinstance(item, str):
# Allow the item to be unparsed
try:
item = json.loads(item)
except json.JSONDecodeError:
... # not json data
else:
was_raw = True
if isinstance(item, (os.PathLike, str)):
... # not raw
elif isinstance(item, dict):
# Allow the item to be parsed json
was_raw = True
assert item.get('type', None) == 'FeatureCollection'
if format == 'dataframe':
item = gpd.GeoDataFrame.from_features(item['features'])
# Hack in CRS-84
crs84 = get_crs84()
# this is much faster and the only reason this is ok is because the
# input is xy-wgs84 so the transform (which is slow) would be a noop
item._crs = crs84
elif isinstance(item, gpd.GeoDataFrame):
# Allow the item to be a GeoDataFrame
was_raw = True
if format == 'json':
item = json.loads(item.to_json())
else:
raise TypeError(type(item))
if was_raw:
item = {
'fpath': None,
'data': item,
'format': format,
}
return was_raw, item
def _load_json_from_path(path, parse_float=None):
with open(path, 'r') as file:
return json.load(file, parse_float=parse_float)
[docs]
def load_geojson_datas(geojson_fpaths, format='dataframe', workers=0,
mode='thread', verbose=1, desc=None,
yield_after_submit=False,
parse_float=None):
"""
Generator that loads sites (and the path they loaded from) in parallel
Args:
geojson_fpaths (Iterable[PathLike]):
geojson paths to load
format (str):
Can be "dataframe" (for pandas) or "json" (for nested dict / list)
workers (int):
number of background loading workers
mode (str):
concurrent executor mode
desc (str): overwrite message for the progress bar
yield_after_submit (bool):
backend argument that will yield None after the data is submitted
to force the data loading to start processing in the background.
TODO:
pass something in to allow for better progress management when a lot of
functions are calling this.
Yields:
Dict:
containing keys, 'fpath' and 'gdf'.
SeeAlso:
* coerce_geojson_datas - the coercable version of this function.
* coerce_geojson_paths - only coerces paths
Example:
>>> from geowatch.utils.util_gis import * # NOQA
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('geowatch', 'tests', 'util_gis', 'load_geojson_data')
>>> dpath.ensuredir()
>>> fpath = dpath / 'data.geojson'
>>> fpath.write_text(demo_regions_geojson_text())
>>> # Test both format loaders work correctly.
>>> gdf = list(load_geojson_datas([fpath], format='dataframe'))[0]['data']
>>> dct = list(load_geojson_datas([fpath], format='json'))[0]['data']
>>> import geopandas as gpd
>>> assert isinstance(gdf, gpd.GeoDataFrame)
>>> assert isinstance(dct, dict)
"""
from geowatch.utils import util_gis
from kwutil import util_progress
# sites = []
if desc is None:
desc = 'load geojson datas'
jobs = ub.JobPool(mode=mode, max_workers=workers)
submit_progkw = {
'desc': 'submit ' + desc,
'verbose': (workers > 0) and verbose
}
# FIXME: kwutil.util_progress is not properly disabling progress when
# verbose=0, fix that and then reenable this.
# try:
# num_jobs = len(geojson_fpaths)
# except Exception:
# num_jobs = None
# if num_jobs is not None and num_jobs < 1000:
# # Don't bother with a progress bar for submit jobs
# # if there are not too many of them.
# submit_progkw['verbose'] = 0
# submit_progkw['enabled'] = False
kwargs = {}
if format == 'dataframe':
loader = util_gis.load_geojson
elif format == 'json':
loader = _load_json_from_path
kwargs['parse_float'] = parse_float
else:
raise KeyError(format)
pmankw = {}
if submit_progkw['verbose'] == 0:
# Hack because rich based progiters always cause a newline
# switching to progiter backend prevents this.
# TODO: fix this issue in util_progress.ProgressManager itself
pmankw['backend'] = 'progiter'
pman = util_progress.ProgressManager(**pmankw)
with pman:
for fpath in pman.progiter(geojson_fpaths, **submit_progkw):
job = jobs.submit(loader, fpath, **kwargs)
job.fpath = fpath
if yield_after_submit:
yield None
result_progkw = {
'verbose': verbose,
}
for job in pman.progiter(jobs.as_completed(), total=len(jobs),
desc=desc, **result_progkw):
data = job.result()
info = {
'fpath': job.fpath,
'data': data,
'format': format,
}
yield info
[docs]
def crs_geojson_to_gdf(geometry, crs_info=None):
"""
TODO: fix the name and scope of this function.
The idea is to convert one of our CRS aware geojson geometries to a
GeoDataFrame for manipulation.
Args:
geometry :
This is a geojson geometry object with that has a crs_info object
in its properties.
crs_info (Dict | None):
if the geojson does not have a crs_info in its properties you
can specify it here.
Returns:
gpd.GeoDataFrame
Example:
>>> # xdoctest: +REQUIRES(env:LARGE_DOWNLOAD)
>>> from geowatch.gis.geotiff import * # NOQA
>>> from geowatch.utils.util_gis import * # NOQA
>>> import kwimage
>>> from geowatch.demo.dummy_demodata import dummy_rpc_geotiff_fpath
>>> gpath_or_ref = gpath = dummy_rpc_geotiff_fpath()
>>> info = geotiff_crs_info(gpath)
>>> # Case 1: Traditional CRS84 Geojson
>>> geometry = info['geos_corners']
>>> gdf = crs_geojson_to_gdf(geometry)
>>> assert gdf.crs.axis_info[0].name == 'Geodetic longitude'
>>> assert gdf.crs.axis_info[1].name == 'Geodetic latitude'
>>> # Case 2: Authority Compliant WGS84 Geojson
>>> wgs84_poly = kwimage.Polygon.coerce(info['wgs84_corners'])
>>> wgs84_geometry = wgs84_poly.to_geojson()
>>> wgs84_geometry['properties'] = {'crs_info': info['wgs84_crs_info']}
>>> crs84_gdf = crs_geojson_to_gdf(wgs84_geometry)
>>> assert crs84_gdf.crs.axis_info[0].name == 'Geodetic longitude'
>>> assert crs84_gdf.crs.axis_info[1].name == 'Geodetic latitude'
>>> # Case 3: UTM
>>> utm_poly = kwimage.Polygon.coerce(info['utm_corners'])
>>> utm_geometry = utm_poly.to_geojson()
>>> utm_geometry['properties'] = {'crs_info': info['utm_crs_info']}
>>> utm_gdf = crs_geojson_to_gdf(utm_geometry)
>>> assert utm_gdf.crs.axis_info[0].name == 'Easting'
>>> assert utm_gdf.crs.axis_info[1].name == 'Northing'
"""
import kwimage
import geopandas as gpd
geos_crs_info = geometry['properties'].get('crs_info', None)
if geos_crs_info is None:
geos_crs_info = crs_info
if geos_crs_info is None:
raise Exception("we need crs_info")
# TODO: better way to get generic Polygon / Multipolygon from shaley in
# kwimage.
kw_geom = kwimage.structs.segmentation._coerce_coco_segmentation(geometry)
# sh_geom = kwimage.MultiPolygon.from_geojson(geometry).to_shapely()
auth = geos_crs_info['auth']
if isinstance(auth, tuple):
auth = list(auth)
if auth == ['EPSG', '4326']:
# TODO:
# - [ ] Handle general axis_mapping issues
if geos_crs_info['axis_mapping'] != 'OAMS_TRADITIONAL_GIS_ORDER':
if geos_crs_info['axis_mapping'] == 'OAMS_AUTHORITY_COMPLIANT':
kw_geom = kw_geom.swap_axes()
else:
raise NotImplementedError(
'geojson should be in traditional order. i.e. crs84'
)
crs = get_crs84()
else:
from pyproj import CRS
crs = CRS.from_user_input(geos_crs_info['auth'])
if geos_crs_info['axis_mapping'] != 'OAMS_AUTHORITY_COMPLIANT':
raise NotImplementedError(
f'Non-CRS84 should be authority compliant. Got: {geos_crs_info}'
)
sh_geom = kw_geom.to_shapely()
gdf = gpd.GeoDataFrame({'geometry': [sh_geom]}, crs=crs)
return gdf
[docs]
def coerce_crs(crs_info):
"""
Get a pyproj CRS from something they should be inferrable from
Example:
>>> from geowatch.gis.geotiff import * # NOQA
>>> from geowatch.utils.util_gis import * # NOQA
>>> coerce_crs('crs84')
>>> coerce_crs(['EPSG', '4326'])
"""
if isinstance(crs_info, str):
if crs_info == 'crs84':
crs = get_crs84()
else:
from pyproj import CRS
crs = CRS.from_user_input(crs_info['auth'])
elif isinstance(crs_info, dict):
auth = crs_info['auth']
if isinstance(auth, tuple):
auth = list(auth)
# if auth == ['EPSG', '4326']:
# if crs_info['axis_mapping'] != 'OAMS_TRADITIONAL_GIS_ORDER':
# raise ValueError(
# 'got real wgs crs. Should be in traditional order. i.e. crs84'
# )
# else:
from pyproj import CRS
crs = CRS.from_user_input(crs_info['auth'])
# if crs_info['axis_mapping'] != 'OAMS_AUTHORITY_COMPLIANT':
# raise NotImplementedError(
# f'Non-CRS84 should be authority compliant. Got: {crs_info}'
# )
else:
from pyproj import CRS
crs = CRS.from_user_input(crs_info)
# TODO: detect wgs84 and error, must assume crs84.
return crs