geowatch.utils.util_framework module

geowatch.utils.util_framework.egress_item(stac_item, outbucket, aws_base_command)[source]
geowatch.utils.util_framework.ingress_item(feature, outdir, aws_base_command, dryrun, relative=False, virtual=False)[source]

Originally from the baseline_framework_ingress code; could probably be cleaned up.

FIXME: Something is this is not concurrent-safe

geowatch.utils.util_framework.download_mtd_msil1c(product_id, metadata_href, outdir, aws_base_command, dryrun)[source]
geowatch.utils.util_framework.download_file(href, outpath, aws_base_command, dryrun)[source]
geowatch.utils.util_framework.download_http_file(url, outpath)[source]
class geowatch.utils.util_framework.CacheItemOutputS3Wrapper(item_map, outbucket, aws_profile=None)[source]

Bases: object

class geowatch.utils.util_framework.IngressProcessEgressWrapper(item_map, outbucket, aws_base_command, dryrun=False, stac_item_selector=<function _default_item_selector>, asset_selector=<function _default_asset_selector>, skip_egress=False)[source]

Bases: object

geowatch.utils.util_framework.download_region(input_region_path, output_region_path, aws_profile=None, strip_nonregions=False, ensure_comments=False)[source]
geowatch.utils.util_framework.determine_region_id(region_fpath)[source]
Parameters:

region_fpath (str | PathLike) – the path to a region model geojson file

Returns:

the region id if we can find one

Return type:

str | None

class geowatch.utils.util_framework.AWS_S3_Command(command, *args, **options)[source]

Bases: object

Helper to build and execute AWS S3 bash commands

Note

probably should use fsspec instead of this in most cases.

References

https://docs.aws.amazon.com/cli/latest/reference/s3/

Example

>>> from geowatch.utils.util_framework import *  # NOQA
>>> self = AWS_S3_Command('ls', 's3://foo/bar')
>>> self.update(profile='myprofile')
>>> print(self.finalize())
['aws', 's3', '--profile', 'myprofile', 'ls', 's3://foo/bar']
>>> self = AWS_S3_Command('cp', 's3://foo/bar', '/foo/bar', quiet=True, no_progress=True, color='auto')
>>> print(self.finalize())
['aws', 's3', '--quiet', '--no-progress', '--color', 'auto', 'cp', 's3://foo/bar', '/foo/bar']

Example

>>> # Reuse the same command object with different positional args
>>> aws_cmd = AWS_S3_Command('cp')
>>> aws_cmd.update(
>>>     profile='myprof',
>>>     only_show_errors=True
>>> )
>>> aws_cmd.args = ['s3://data1', '/local/data1']
>>> print(aws_cmd.finalize())
['aws', 's3', '--only-show-errors', '--profile', 'myprof', 'cp', 's3://data1', '/local/data1']
>>> # Set the `args` attribute to get a new command while keeping
>>> # existing options.
>>> aws_cmd.update(recursive=True)
>>> aws_cmd.args = ['s3://data2', '/local/data2']
>>> print(aws_cmd.finalize())
['aws', 's3', '--only-show-errors', '--recursive', '--profile', 'myprof', 'cp', 's3://data2', '/local/data2']

Example

>>> # There is no need to specify the entire command. If you want
>>> # to simply build a command prefix, then that works too.
>>> aws_cmd = AWS_S3_Command('cp', profile='myprof', aws_storage_class='foobar')
>>> print(aws_cmd.finalize())
['aws', 's3', '--profile', 'myprof', '--aws-storage-class', 'foobar', 'cp']
Parameters:
  • command (str) – can be: cp, ls, mv, rm, sync

  • *args – positional arguments

  • **options – key value options (e.g. profile)

cmd_known_flags = {'cp': ['dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'recursive', 'debug', 'no-verify-ssl', 'no-paginate'], 'ls': ['recursive', 'human-readable', 'summarize', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request'], 'sync': ['dryrun', 'quiet', 'follow-symlinks', 'no-follow-symlinks', 'no-guess-mime-type', 'only-show-errors', 'no-progress', 'ignore-glacier-warnings', 'force-glacier-transfer', 'size-only', 'exact-timestamps', 'delete', 'debug', 'no-verify-ssl', 'no-paginate', 'no-sign-request']}
cmd_known_keyvals = {'cp': ['include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'expected-size', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'no-sign-request', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout'], 'ls': ['endpoint-url', 'page-size', 'request-payer', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout'], 'sync': ['include', 'exclude', 'acl', 'sse', 'sse-c', 'sse-c-key', 'sse-kms-key-id', 'sse-c-copy-source', 'sse-c-copy-source-key', 'storage-class', 'grants', 'website-redirect', 'content-type', 'cache-control', 'content-disposition', 'content-encoding', 'content-language', 'expires', 'source-region', 'page-size', 'request-payer', 'metadata', 'metadata-directive', 'endpoint-url', 'output', 'query', 'profile', 'region', 'version', 'color', 'ca-bundle', 'cli-read-timeout', 'cli-connect-timeout']}
update(arg=None, /, **options)[source]

Update key / value options.

This function is aware of what options need to be flags versus key/values

So quiet=True will result in –quiet, quiet=False will have include no option. Likewise profile=foo will result in –profile foo and profile=None will include no option.

finalize()[source]
Returns:

commands suitable for passing to POpen

Return type:

List[str]

run(check=True, shell=False, capture=False, verbose=3)[source]

Execute the S3 command

Returns:

ubelt cmd info dict

Return type:

Dict

geowatch.utils.util_framework.ta2_collate_output(aws_base_command, local_region_dir, local_sites_dir, destination_s3_bucket, performer_suffix='KIT')[source]

I think this is for putting the final system regions / sites into the place that T&E wants them.

geowatch.utils.util_framework.fixup_and_validate_site_and_region_models(region_dpath, site_dpath)[source]

Read, fix, and validate all site and region models.

geowatch.utils.util_framework.check_region_and_site_models_agree(region_models, site_models)[source]
class geowatch.utils.util_framework.NodeStateDebugger[source]

Bases: object

Prints information about the current node that is helpful for debugging.

Use in the smartflow CLI nodes.

Maintains some internal state to keep things organized.

Example

>>> from geowatch.utils.util_framework import *  # NOQA
>>> import ubelt as ub
>>> watch_appdir_dpath = ub.Path.appdir('geowatch')
>>> self = NodeStateDebugger()
>>> self.print_environment()
>>> self.print_current_state(watch_appdir_dpath)
>>> config = {'foo': 'bar'}
>>> self.print_local_invocation(config)
print_environment()[source]

Print info about what version of the code we are running on

print_local_invocation(config=None)[source]

Attempt to build a string that will allow the user to start stepping through a local run of this smartflow step in IPython.

Parameters:

config (scriptconfig.DataConfig) – the config used to invoke the script

print_current_state(dpath)[source]
print_directory_contents(dpath)[source]
class geowatch.utils.util_framework.PrintLogger[source]

Bases: object

Ducktype a logger

info(msg, *args, **kwargs)[source]
debug(msg, *args, **kwargs)[source]
error(msg, *args, **kwargs)[source]
warning(msg, *args, **kwargs)[source]
critical(msg, *args, **kwargs)[source]