geowatch.utils.util_framework module¶
- geowatch.utils.util_framework.ingress_item(feature, outdir, aws_base_command, dryrun, relative=False, virtual=False)[source]¶
Originally from the baseline_framework_ingress code; could probably be cleaned up.
FIXME: Something is this is not concurrent-safe
- geowatch.utils.util_framework.download_mtd_msil1c(product_id, metadata_href, outdir, aws_base_command, dryrun)[source]¶
- class geowatch.utils.util_framework.CacheItemOutputS3Wrapper(item_map, outbucket, aws_profile=None)[source]¶
Bases:
object
- class geowatch.utils.util_framework.IngressProcessEgressWrapper(item_map, outbucket, aws_base_command, dryrun=False, stac_item_selector=<function _default_item_selector>, asset_selector=<function _default_asset_selector>, skip_egress=False)[source]¶
Bases:
object
- geowatch.utils.util_framework.download_region(input_region_path, output_region_path, aws_profile=None, strip_nonregions=False, ensure_comments=False)[source]¶
- geowatch.utils.util_framework.determine_region_id(region_fpath)[source]¶
- Parameters:
region_fpath (str | PathLike) – the path to a region model geojson file
- Returns:
the region id if we can find one
- Return type:
str | None
- class geowatch.utils.util_framework.AWS_S3_Command(command, *args, **options)[source]¶
Bases:
object
Helper to build and execute AWS S3 bash commands
Note
probably should use fsspec instead of this in most cases.
References
https://docs.aws.amazon.com/cli/latest/reference/s3/
Example
>>> from geowatch.utils.util_framework import * # NOQA >>> self = AWS_S3_Command('ls', 's3://foo/bar') >>> self.update(profile='myprofile') >>> print(self.finalize()) ['aws', 's3', '--profile', 'myprofile', 'ls', 's3://foo/bar'] >>> self = AWS_S3_Command('cp', 's3://foo/bar', '/foo/bar', quiet=True, no_progress=True, color='auto') >>> print(self.finalize()) ['aws', 's3', '--quiet', '--no-progress', '--color', 'auto', 'cp', 's3://foo/bar', '/foo/bar']
Example
>>> # Reuse the same command object with different positional args >>> aws_cmd = AWS_S3_Command('cp') >>> aws_cmd.update( >>> profile='myprof', >>> only_show_errors=True >>> ) >>> aws_cmd.args = ['s3://data1', '/local/data1'] >>> print(aws_cmd.finalize()) ['aws', 's3', '--only-show-errors', '--profile', 'myprof', 'cp', 's3://data1', '/local/data1'] >>> # Set the `args` attribute to get a new command while keeping >>> # existing options. >>> aws_cmd.update(recursive=True) >>> aws_cmd.args = ['s3://data2', '/local/data2'] >>> print(aws_cmd.finalize()) ['aws', 's3', '--only-show-errors', '--recursive', '--profile', 'myprof', 'cp', 's3://data2', '/local/data2']
Example
>>> # There is no need to specify the entire command. If you want >>> # to simply build a command prefix, then that works too. >>> aws_cmd = AWS_S3_Command('cp', profile='myprof', aws_storage_class='foobar') >>> print(aws_cmd.finalize()) ['aws', 's3', '--profile', 'myprof', '--aws-storage-class', 'foobar', 'cp']
- Parameters:
command (str) – can be: cp, ls, mv, rm, sync
*args – positional arguments
**options – key value options (e.g. profile)
- cmd_known_flags = {'cp': ['dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'recursive', 'debug', 'no-verify-ssl', 'no-paginate'], 'ls': ['recursive', 'human-readable', 'summarize', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request'], 'sync': ['dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'size-only', 'exact-timestamps', 'delete', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request']}¶
- cmd_known_keyvals = {'cp': ['include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'expected-size', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'no-sign-request', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout'], 'ls': ['endpoint-url', 'page-size', 'request-payer', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout'], 'sync': ['include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout']}¶
- update(arg=None, /, **options)[source]¶
Update key / value options.
This function is aware of what options need to be flags versus key/values
So quiet=True will result in –quiet, quiet=False will have include no option. Likewise profile=foo will result in –profile foo and profile=None will include no option.
- geowatch.utils.util_framework.ta2_collate_output(aws_base_command, local_region_dir, local_sites_dir, destination_s3_bucket, performer_suffix='KIT')[source]¶
I think this is for putting the final system regions / sites into the place that T&E wants them.
- geowatch.utils.util_framework.fixup_and_validate_site_and_region_models(region_dpath, site_dpath)[source]¶
Read, fix, and validate all site and region models.
- geowatch.utils.util_framework.check_region_and_site_models_agree(region_models, site_models)[source]¶
- class geowatch.utils.util_framework.NodeStateDebugger[source]¶
Bases:
object
Prints information about the current node that is helpful for debugging.
Use in the smartflow CLI nodes.
Maintains some internal state to keep things organized.
Example
>>> from geowatch.utils.util_framework import * # NOQA >>> import ubelt as ub >>> watch_appdir_dpath = ub.Path.appdir('geowatch') >>> self = NodeStateDebugger() >>> self.print_environment() >>> self.print_current_state(watch_appdir_dpath) >>> config = {'foo': 'bar'} >>> self.print_local_invocation(config)