Source code for geowatch.gis.digital_globe

r"""
Helpers for reading data downloaded from digital globe


Notes:
    The data in the Core3D dataset is public and can be rehosted.

    https://spacenet.ai/core3d/

    AWS_PROFILE=iarpa aws s3 ls s3://spacenet-dataset/Hosted-Datasets/CORE3D-Public-Data/Satellite-Images/
    AWS_PROFILE=iarpa aws s3 ls s3://spacenet-dataset/Hosted-Datasets/CORE3D-Public-Data/Tiled-Examples-for-Urban-3D-Challenge-Comparisons/02_Master/challenge-inputs/

    AWS_PROFILE=iarpa aws s3 cp \
            s3://spacenet-dataset/Hosted-Datasets/CORE3D-Public-Data/Tiled-Examples-for-Urban-3D-Challenge-Comparisons/02_Master/challenge-inputs/RIC_Tile_000_DSM.tif RIC_Tile_000_DSM.tif
    AWS_PROFILE=iarpa aws s3 cp \
            s3://spacenet-dataset/Hosted-Datasets/CORE3D-Public-Data/Tiled-Examples-for-Urban-3D-Challenge-Comparisons/02_Master/challenge-inputs/RIC_Tile_000_DTM.tif RIC_Tile_000_DTM.tif
    AWS_PROFILE=iarpa aws s3 cp \
            s3://spacenet-dataset/Hosted-Datasets/CORE3D-Public-Data/Tiled-Examples-for-Urban-3D-Challenge-Comparisons/02_Master/challenge-inputs/RIC_Tile_000_RGB.tif RIC_Tile_000_RGB.tif

Requirements:
    pip install xmltodict
    pip install pyshp
    pip install cogeotiff
"""
from os.path import exists
from os.path import join
import ubelt as ub
import xmltodict
from os.path import dirname, abspath


[docs] class DigitalGlobeBundle(ub.NiceRepr): """ Data structure to organize information in digital globe bundles TODO: need public digital globe demodata for a doctest Maybe we can grab them from here? https://www.maxar.com/product-samples https://ard.maxar.com/samples#v5/ https://spacenet.ai/core3d/ Requirements: pip isntall pyshp Ignore: # This has a different format than our stuff... bleh.. sample_zip_fpath = ub.grabdata('https://maxar-marketing.s3.amazonaws.com/product-samples/Rome_Colosseum_2022-03-22_WV03_HD.zip', hash_prefix='2a99cea2b37bed9b5867fa21a1bd') from kwcoco.util import util_archive archive = util_archive.Archive(sample_zip_fpath) dpath = (ub.Path(sample_zip_fpath).parent / 'MaxarSample').ensuredir() metadata_fpath = list(dpath.glob('*.MAN'))[0] archive.extractall(dpath) delivery_metadata_fpath = dpath / '050012575010_01/050012575010_01_README.XML' self = DigitalGlobeBundle(delivery_metadata_fpath) """ def __init__(self, delivery_metadata_fpath, pointer=None, autobuild=True): self.data = { 'delivery_metadata_fpath': delivery_metadata_fpath, 'product_metas': None, 'pointer': pointer, } if autobuild: self.parse_delivery_metadata() def __nice__(self): return self.data['delivery_metadata_fpath']
[docs] def parse_delivery_metadata(self): import shapefile import kwimage delivery_metadata_fpath = self.data['delivery_metadata_fpath'] dpath = dirname(delivery_metadata_fpath) with open(delivery_metadata_fpath, 'r') as file: delivery_metadata = xmltodict.parse(file.read()) self.data['other'] = ub.dict_diff(delivery_metadata, {'DeliveryMetadata'}) self.data['non_product'] = ub.dict_diff(delivery_metadata, {'DeliveryMetadata'}) product_list = delivery_metadata['DeliveryMetadata']['product'] pointer = self.data['pointer'] product_metas = [] for product in product_list: product_meta = product.copy() prod_files = product_meta.pop('productFile') # Find the files associated with the order AOI aoi_fpaths = { 'shp': None, 'dbf': None, 'shx': None, 'prj': None, } misc_exts = { '_LAYOUT.JPG', 'NEXTVIEW.TXT', '_README.TXT', '_README.XML', '-BROWSE.JPG', } prod_types = [] for v in prod_files: product_type = None for ext in misc_exts: if v['filename'].lower().endswith(ext.lower()): product_type = 'misc' if product_type is None: for ext in aoi_fpaths.keys(): fname = 'ORDER_SHAPE.' + ext if v['filename'].lower().endswith(fname.lower()): fpath = join(dpath, v['relativeDirectory'], v['filename']) assert aoi_fpaths[ext] is None aoi_fpaths[ext] = fpath product_type = 'aoi' if product_type is None: if v['relativeDirectory'].lower().endswith('GIS_FILES'.lower()): product_type = 'misc-gis' if product_type is None: for ext in kwimage.im_io.IMAGE_EXTENSIONS: if v['filename'].lower().endswith(ext.lower()): product_type = 'image' if product_type is None: product_type = 'other' prod_types.append(product_type) type_to_prods = ub.group_items(prod_files, prod_types) type_to_prods['image'] type_to_prods['misc'] type_to_prods['gis'] type_to_prods['aoi'] type_to_prods['other'] # prod_type_hist = ub.map_vals(len, type_to_prods) # print('prod_type_hist = {}'.format(ub.urepr(prod_type_hist, nl=1))) aoi_files = {key: open(val, 'rb') for key, val in aoi_fpaths.items()} try: shp_wkt = ensure_unicode(aoi_files['prj'].read()) shp_reader = shapefile.Reader( shp=aoi_files['shp'], dbf=aoi_files['dbf'], shx=aoi_files['shx'] ) aoi_geojson = shp_reader.shape().__geo_interface__ product_meta['aoi_geojson'] = aoi_geojson product_meta['shp_wkt'] = shp_wkt finally: for val in aoi_files.values(): val.close() shp_reader.close() if pointer is not None: for v in type_to_prods['image']: prod_fname = v['filename'] flag = pointer.endswith(prod_fname) v['is_pointer'] = flag product_meta['images'] = type_to_prods['image'] product_metas.append(product_meta) self.data['product_metas'] = product_metas
[docs] @classmethod def from_pointer(cls, pointer, **kw): """ Args: pointer (str): a path to any file inside a digital globe bundle. We will search for the DeliveryMetadata.xml data. Ignore: pointer = '/home/joncrall/data/dvc-repos/smart_watch_dvc/drop0/KR-Pyeongchang-WV/_assets/20170907_a_KRP_011777481_10_0/011777481010_01_003/011777481010_01/011777481010_01_P001_MUL/17SEP07021826-M1BS-011777481010_01_P001.TIF' pointer = '/home/joncrall/data/dvc-repos/smart_watch_dvc/drop0/KR-Pyeongchang-WV/_assets/20170907_a_KRP_011777481_10_0/011777481010_01_003/011777481010_01/011777481010_01_P001_PAN/17SEP07021826-P1BS-011777481010_01_P001.TIF' cls = DigitalGlobeBundle self = DigitalGlobeBundle.from_pointer(pointer) for meta in self.data['product_metas']: meta['sensorVehicle'] dict_list = self.data['product_metas'] print(varried['sensorVehicle']) """ dpath = abspath(pointer) delivery_fpath = search_path_ancestors( path=dpath, fname='DeliveryMetadata.xml') if delivery_fpath is None: raise Exception('cannot find DG DeliveryMetadata.xml') self = cls(delivery_fpath, pointer=pointer, **kw) return self
[docs] @classmethod def coerce(cls, key, **kw): try: self = cls.pointer(key, **kw) except Exception: self = None return self
[docs] def search_path_ancestors(path, fname, stop_fname=None, max_steps=1000): """ Search path and all of its containing folders for a file name ``fname``. Args: path (str): directory to start the search fname (str): path to search for stop_fname (str): stop if we find a file with this name. """ import itertools as it dpath = path found = None for idx in it.count(): fpath = join(dpath, fname) if exists(fpath): found = fpath break if stop_fname is not None: stop_fpath = join(dpath, stop_fname) if exists(stop_fpath): raise Exception('found stop fname, cannot find {}'.format(fname)) dpath_next = dirname(dpath) if idx > max_steps: raise Exception('too many steps, cannot find {}'.format(fname)) if dpath_next == dpath: raise Exception('reached the root, cannot find {}'.format(fname)) dpath = dpath_next return found
[docs] def ensure_unicode(text): r""" Casts bytes into utf8 (mostly for python2 compatibility). Args: text (str | bytes): text to ensure is decoded as unicode Returns: str References: .. [SO_12561063] http://stackoverflow.com/questions/12561063/extract-data-from-file Example: >>> import codecs # NOQA >>> assert ensure_unicode('my ünicôdé strįng') == 'my ünicôdé strįng' >>> assert ensure_unicode('text1') == 'text1' >>> assert ensure_unicode('text1'.encode('utf8')) == 'text1' >>> assert ensure_unicode('text1'.encode('utf8')) == 'text1' >>> assert (codecs.BOM_UTF8 + 'text»¿'.encode('utf8')).decode('utf8') """ if isinstance(text, str): return text elif isinstance(text, bytes): return text.decode('utf8') else: # nocover raise ValueError('unknown input type {!r}'.format(text))