Source code for geowatch.cli.smartflow.run_upload_to_rgd

#!/usr/bin/env python
import sys
import subprocess
import json
import traceback
import requests
import ubelt as ub
import scriptconfig as scfg


[docs] class UploadRGDConfig(scfg.DataConfig): """ Run TA-2 BAS fusion as baseline framework component """ input_site_models_s3 = scfg.Value(None, type=str, position=1, help='Path to S3 directory of site models') rgd_aws_region = scfg.Value(None, type=str, help='AWS region where RGD instance is running') rgd_deployment_name = scfg.Value(None, type=str, help=ub.paragraph( ''' Name of RGD deployment (e.g. 'resonantgeodatablue' ''')) aws_profile = scfg.Value(None, type=str, help=ub.paragraph( ''' AWS Profile to use for AWS S3 CLI commands ''')) title = scfg.Value(None, type=str, help='Title of the model run') region_id = scfg.Value(None, type=str, help='Region ID (e.g. "KR_R002")') performer_shortcode = scfg.Value('KIT', type=str, help='Performer shortcode (e.g. "KIT")') rgd_endpoint_override = scfg.Value(None, type=str, help=ub.paragraph( ''' Use this RGD URL instead of looking up via aws tools ''')) jobs = scfg.Value(8, type=int, short_alias=['j'], help='Number of jobs to run in parallel') expiration_time = scfg.Value(None, type=int, short_alias=['x'], help=ub.paragraph( ''' Number of days to keep system run output in RGD '''))
[docs] def main(cmdline=1, **kwargs): config = UploadRGDConfig.cli(cmdline=cmdline, data=kwargs) print('config = {}'.format(ub.urepr(dict(config), nl=1, align=':'))) assert config.rgd_aws_region is not None assert config.input_site_models_s3 is not None assert config.rgd_deployment_name is not None assert config.title is not None assert config.region_id is not None upload_to_rgd(config)
[docs] def get_model_results(model_run_results_url): model_runs_result = requests.get(model_run_results_url, params={ 'limit': '0'}) request_json = model_runs_result.json() request_results = request_json.get('results', ()) return request_results
[docs] def upload_to_rgd(config): input_site_models_s3 = config.input_site_models_s3 rgd_aws_region = config.rgd_aws_region rgd_deployment_name = config.rgd_deployment_name title = config.title region_id = config.region_id aws_profile = config.aws_profile performer_shortcode = config.performer_shortcode jobs = config.jobs rgd_endpoint_override = config.rgd_endpoint_override expiration_time = config.expiration_time from geowatch.utils.util_framework import NodeStateDebugger node_state = NodeStateDebugger() node_state.print_environment() node_state.print_local_invocation(config) # Ensure performer_shortcode is uppercase performer_shortcode = performer_shortcode.upper() if aws_profile is not None: aws_base_command = ['aws', 's3', '--profile', aws_profile, 'cp'] else: aws_base_command = ['aws', 's3', 'cp'] local_site_models_dir = '/tmp/site_models' ub.cmd([*aws_base_command, '--recursive', input_site_models_s3, local_site_models_dir], check=True, verbose=3) if rgd_endpoint_override is None: try: endpoint_result = ub.cmd( ['aws', *(('--profile', aws_profile) if aws_profile is not None else ()), '--region', rgd_aws_region, 'elbv2', 'describe-load-balancers', '--region', rgd_aws_region, '--names', "{}-internal-alb".format(rgd_deployment_name)], check=True, verbose=3) except subprocess.CalledProcessError as e: print(e.stderr, file=sys.stderr) raise e rgd_instance_details = json.loads(endpoint_result.stdout) rgd_endpoint = rgd_instance_details['LoadBalancers'][0]['DNSName'] else: rgd_endpoint = rgd_endpoint_override # Check that our run doesn't already exist model_run_results_url = f"http://{rgd_endpoint}/api/model-runs/" # from retry.api import retry_call from geowatch.utils.util_retry import retry_call from geowatch.utils import util_framework logger = util_framework.PrintLogger() print('\nAttempt to post to RGD model results endpoint') print(f'model_run_results_url = {ub.urepr(model_run_results_url, nl=1)}') request_results = retry_call( get_model_results, fargs=[model_run_results_url], tries=3, exceptions=(Exception,), delay=3, logger=logger) print('Got Results') existing_model_run = None for model_run in request_results: if (model_run['title'] != title or model_run['performer']['short_code'] != performer_shortcode): continue model_region = model_run.get('region') if model_region is None: continue if (isinstance(model_region, dict) and model_run['region'].get('name') == region_id): existing_model_run = model_run break elif (isinstance(model_region, str) and model_region == region_id): # noqa existing_model_run = model_run break if existing_model_run is not None: model_run_id = model_run['id'] else: post_model_url = f"http://{rgd_endpoint}/api/model-runs/" post_model_data = {"performer": performer_shortcode, "title": title, "region": {"name": region_id}, "parameters": {}} if expiration_time is not None: post_model_data['expiration_time'] = expiration_time post_model_result = requests.post( post_model_url, json=post_model_data, headers={"Content-Type": "application/json"}) model_run_id = post_model_result.json()['id'] post_site_url = ( f"http://{rgd_endpoint}/api/model-runs/{model_run_id}/site-model/") local_site_models_dir = ub.Path(local_site_models_dir) jobs = ub.JobPool(mode='process', max_workers=jobs) for site_filepath in local_site_models_dir.glob('*.geojson'): jobs.submit(post_site, post_site_url, site_filepath) for site_post_job in ub.ProgIter(jobs.as_completed(), total=len(jobs), desc='Uploading sites..'): try: result = site_post_job.result() except Exception: print("Exception occurred (printed below)") traceback.print_exception(*sys.exc_info()) continue else: if result.status_code != 201: print(f"Error uploading site, status " f"code: [{result.status_code}]") print(result.text)
[docs] def post_site(post_site_url, site_filepath): with open(site_filepath, 'r') as f: response = requests.post( post_site_url, json=json.load(f), headers={"Content-Type": "application/json"}) return response
if __name__ == "__main__": main()