r"""
Helper for scheduling a set of prediction + evaluation jobs.
This is the main entrypoint for running a bunch of evaluation jobs over a grid
of parameters. We currently expect that pipelines are predefined in
smart_pipeline.py but in the future they will likely be an external resource
file.
TODO:
- [ ] Differentiate between pixel models for different tasks.
- [ ] Allow the output of tracking to feed into activity classification
Example:
# Dummy inputs, just for demonstration
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- ./my_bas_model1.pt
- ./my_bas_model2.pt
bas_pxl.test_dataset:
- ./my_test_dataset/bas_ready_data.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- 'auto'
bas_pxl.input_space_scale:
- '15GSD'
bas_poly_eval.true_site_dpath: null
bas_poly_eval.true_region_dpath: null
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
- 0.2
sc_pxl.test_dataset:
- crop.dst
sc_pxl.window_space_scale:
- auto
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- my_sc_model1.pt
- my_sc_model2.pt
sc_poly_viz.enabled:
- false
" \
--root_dpath=./my_dag_runs \
--devices="0,1" --tmux_workers=2 \
--backend=serial --skip_existing=0 \
--pipeline=joint_bas_sc \
--run=0
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- ./my_bas_model1.pt
- ./my_bas_model2.pt
bas_pxl.test_dataset:
- ./my_test_dataset/bas_ready_data.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- 'auto'
bas_pxl.input_space_scale:
- '15GSD'
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
- 0.2
bas_pxl.enabled: 1
bas_poly_eval.true_site_dpath: true-site
bas_poly_eval.true_region_dpath: true-region
" \
--root_dpath=./my_dag_runs \
--devices="0,1" \
--backend=serial --skip_existing=0 \
--pipeline=bas \
--run=0
# Real inputs, this actually will run something given the DVC repos
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
SC_MODEL=$DVC_EXPT_DPATH/models/fusion/Drop4-SC/packages/Drop4_tune_V30_8GSD_V3/Drop4_tune_V30_8GSD_V3_epoch=2-step=17334.pt.pt
BAS_MODEL=$DVC_EXPT_DPATH/models/fusion/Drop4-BAS/packages/Drop4_TuneV323_BAS_30GSD_BGRNSH_V2/package_epoch0_step41.pt.pt
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- $BAS_MODEL
bas_pxl.test_dataset:
- $DVC_DATA_DPATH/Drop4-BAS/KR_R001.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- "auto"
bas_pxl.input_space_scale:
- "15GSD"
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
sc_pxl.test_dataset:
- crop.dst
sc_pxl.window_space_scale:
- auto
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- $SC_MODEL
sc_poly_viz.enabled:
- false
" \
--root_dpath=./my_dag_runs \
--devices="0,1" --queue_size=2 \
--backend=serial --skip_existing=0 \
--pipeline=joint_bas_sc_nocrop \
--run=0
Example:
# Real data
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
# - $DVC_EXPT_DPATH/models/fusion/Drop4-BAS/packages/Drop4_TuneV323_BAS_30GSD_BGRNSH_V2/package_epoch0_step41.pt.pt
- $DVC_EXPT_DPATH/package_epoch10_step200000.pt
bas_pxl.test_dataset:
- $DVC_DATA_DPATH/Drop4-BAS/KR_R001.kwcoco.json
# - $DVC_DATA_DPATH/Drop4-BAS/KR_R002.kwcoco.json
bas_pxl.window_space_scale:
- auto
# - "15GSD"
# - "30GSD"
# bas_pxl.chip_dims:
# - "256,256"
bas_pxl.time_sampling:
- "auto"
# bas_pxl.input_space_scale:
# - "window"
bas_poly.moving_window_size:
- null
# - 100
# - 200
bas_poly.thresh:
- 0.1
# - 0.13
# - 0.2
sc_pxl.window_space_scale:
- auto
sc_pxl.input_space_scale:
- "window"
sc_pxl.chip_dims:
- "256,256"
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- $DVC_EXPT_DPATH/models/fusion/Drop4-SC/packages/Drop4_tune_V30_8GSD_V3/Drop4_tune_V30_8GSD_V3_epoch=2-step=17334.pt.pt
bas_poly_eval.enabled: 1
bas_pxl_eval.enabled: 1
bas_poly_viz.enabled: 1
sc_poly_eval.enabled: 1
sc_pxl_eval.enabled: 1
sc_poly_viz.enabled: 1
" \
--root_dpath=$DVC_EXPT_DPATH/_testpipe \
--enable_links=1 \
--devices="0,1" --queue_size=2 \
--backend=serial \
--pipeline=bas \
--cache=1 --rprint=1 --run=1
--pipeline=joint_bas_sc
xdev tree --dirblocklist "_*" my_expt_dir/_testpipe/ --max_files=1
"""
import ubelt as ub
import scriptconfig as scfg
from cmd_queue.cli_boilerplate import CMDQueueConfig
[docs]
class ScheduleEvaluationConfig(CMDQueueConfig):
"""
Driver for GeoWATCH mlops evaluation scheduling
Builds commands and optionally executes them via slurm, tmux, or serial
(i.e. one at a time). This is a [link=https://gitlab.kitware.com/computer-vision/cmd_queue]cmd_queue[/link] CLI.
"""
__command__ = 'schedule'
__alias__ = ['mlops_schedule']
params = scfg.Value(None, type=str, help='a yaml/json grid/matrix of prediction params')
devices = scfg.Value(None, help=(
'if using tmux or serial, indicate which gpus are available for use '
'as a comma separated list: e.g. 0,1'))
skip_existing = scfg.Value(False, help=(
'if True dont submit commands where the expected '
'products already exist'))
pred_workers = scfg.Value(4, help='number of prediction workers in each process')
root_dpath = scfg.Value('auto', help=(
'Where do dump all results. If "auto", uses <expt_dvc_dpath>/dag_runs'))
pipeline = scfg.Value('joint_bas_sc', help=ub.paragraph(
'''
The name of the pipeline to run. Can also specify this in the params.
There are special SMART-specific names that are available, and initial
experimental support for custom pipelines, which needs documentation.
'''))
enable_links = scfg.Value(True, isflag=True, help='if true enable symlink jobs')
cache = scfg.Value(True, isflag=True, help=(
'if true, each a test is appened to each job to skip itself if its output exists'))
draw_heatmaps = scfg.Value(1, isflag=True, help='if true draw heatmaps on pixel eval')
draw_curves = scfg.Value(1, isflag=True, help='if true draw curves on pixel eval')
max_configs = scfg.Value(None, help='if specified only run at most this many of the grid search configs')
queue_size = scfg.Value(None, help='if auto, defaults to number of GPUs')
print_varied = scfg.Value('auto', isflag=True, help='print the varied parameters')
def __post_init__(self):
super().__post_init__()
if self.queue_name is None:
self.queue_name = 'schedule-eval'
if self.queue_size is not None:
raise Exception('The queue_size argument to schedule evaluation has been removed. Use the tmux_workers argument instead')
# self.tmux_workers = self.queue_size
from cmd_queue.util.util_yaml import Yaml
self.slurm_options = Yaml.coerce(self.slurm_options) or {}
devices = self.devices
if devices == 'auto':
GPUS = _auto_gpus()
else:
GPUS = None if devices is None else ensure_iterable(devices)
self.devices = GPUS
[docs]
def main(cmdline=True, **kwargs):
config = ScheduleEvaluationConfig.cli(cmdline=cmdline, data=kwargs, strict=True)
import rich
rich.print('ScheduleEvaluationConfig config = {}'.format(ub.urepr(config, nl=1, sv=1)))
schedule_evaluation(config)
[docs]
def schedule_evaluation(config):
r"""
First ensure that models have been copied to the DVC repo in the
appropriate path. (as noted by model_dpath)
"""
import json
import pandas as pd
import rich
from kwutil import slugify_ext
from kwutil import util_progress
from kwutil.util_yaml import Yaml
from geowatch.mlops import smart_pipeline
from geowatch.utils.result_analysis import varied_values
from geowatch.utils.util_param_grid import expand_param_grid
# Dont put in post-init because it is called by the CLI!
if config['root_dpath'] in {None, 'auto'}:
import geowatch
expt_dvc_dpath = geowatch.find_dvc_dpath(tags='phase2_expt', hardware='auto')
config['root_dpath'] = expt_dvc_dpath / 'dag_runs'
root_dpath = ub.Path(config['root_dpath'])
pipeline = config.pipeline
if config['params'] is not None:
param_arg = Yaml.coerce(config['params']) or {}
pipeline = param_arg.pop('pipeline', config.pipeline)
# Associate BAS datasets and HighRes datasets
# Convinience to make it less tedious to specify datasets.
# Hard codes the DVC pattern to associate lowres and hires data.
# This is not a robust mechanism.
smart_highres_bundle = param_arg.pop('smart_highres_bundle', None)
if smart_highres_bundle is not None:
smart_highres_bundle = ub.Path(smart_highres_bundle)
assert smart_highres_bundle.exists()
if 'submatrices' in param_arg:
raise Exception('cant hack submatrices with submatrices on')
submatrices = []
from geowatch import heuristics
for bas_fpath in param_arg['matrix']['bas_pxl.test_dataset']:
region_id = heuristics.extract_region_id(ub.Path(bas_fpath).name)
region_dpath = (smart_highres_bundle / region_id)
hires_coco_candidates = [
region_dpath / f'imgonly-{region_id}.kwcoco.zip',
region_dpath / f'imgonly-{region_id}-rawbands.kwcoco.zip',
]
hires_coco_fpath = None
for cand_fpath in hires_coco_candidates:
if cand_fpath.exists():
hires_coco_fpath = cand_fpath
break
if hires_coco_fpath is None:
raise Exception(f'Expected hires path, but no candidates exist: {hires_coco_candidates}')
submatrices.append({
'bas_pxl.test_dataset': bas_fpath,
'sv_crop.crop_src_fpath': hires_coco_fpath,
'sc_crop.crop_src_fpath': hires_coco_fpath,
})
param_arg['submatrices'] = submatrices
# Load the requested pipeline
EXPERIMENTAL_CUSTOM_PIPELINES = True
try:
dag = smart_pipeline.make_smart_pipeline(pipeline)
except Exception:
if EXPERIMENTAL_CUSTOM_PIPELINES:
# New experimental pipelines
dag = _experimental_resolve_pipeline(pipeline)
else:
raise
...
dag.print_graphs(smart_colors=1)
dag.inspect_configurables()
if config.run:
mlops_meta = (root_dpath / '_mlops_schedule').ensuredir()
(root_dpath / '_cmd_queue_schedule').ensuredir()
# Write some metadata to help aggregate set its defaults automatically
most_recent_fpath = mlops_meta / 'most_recent_run.json'
data = {
'pipeline': str(pipeline),
}
most_recent_fpath.write_text(json.dumps(data, indent=' '))
queue = config.create_queue(gpus=config.devices)
# Expand paramater search grid
if config['params'] is not None:
# print('param_arg = {}'.format(ub.urepr(param_arg, nl=1)))
all_param_grid = list(expand_param_grid(
param_arg,
max_configs=config['max_configs'],
))
else:
all_param_grid = []
if len(all_param_grid) == 0:
print('WARNING: PARAM GRID IS EMPTY')
# Configure a DAG for each row.
pman = util_progress.ProgressManager()
configured_stats = []
with pman:
for row_config in pman.progiter(all_param_grid, desc='configure dags', verbose=3):
dag.configure(
config=row_config,
root_dpath=root_dpath,
cache=config['cache'])
summary = dag.submit_jobs(
queue=queue,
skip_existing=config['skip_existing'],
enable_links=config['enable_links'])
configured_stats.append(summary)
print(f'len(queue)={len(queue)}')
print_thresh = 30
if config['print_varied'] == 'auto':
if len(queue) < print_thresh:
config['print_varied'] = 1
else:
print(f'More than {print_thresh} jobs, skip print_varied. '
'If you want to see them explicitly specify print_varied=1')
config['print_varied'] = 0
if config['print_varied']:
# Print config info
longparams = pd.DataFrame(all_param_grid)
varied = varied_values(longparams, min_variations=2, dropna=False)
relevant = longparams[longparams.columns.intersection(varied)]
def pandas_preformat(item):
if isinstance(item, str):
return slugify_ext.smart_truncate(item, max_length=16, trunc_loc=0)
else:
return item
displayable = relevant.applymap(pandas_preformat)
rich.print(displayable.to_string())
for job in queue.jobs:
# TODO: should be able to set this as a queue param.
job.log = False
if config.run:
ub.ensuredir(dag.root_dpath)
print_kwargs = {
'with_status': 0,
'style': "colors",
'with_locks': 0,
'exclude_tags': ['boilerplate'],
}
rich.print(f'\n\ndag.root_dpath: [link={dag.root_dpath}]{dag.root_dpath}[/link]')
config.run_queue(queue, print_kwargs=print_kwargs)
if not config.run:
driver_fpath = queue.write()
print('Wrote script: to run execute:\n{}'.format(driver_fpath))
return dag, queue
[docs]
def ensure_iterable(inputs):
return inputs if ub.iterable(inputs) else [inputs]
def _auto_gpus():
from geowatch.utils.util_nvidia import nvidia_smi
# TODO: liberate the needed code from netharn
# Use all unused devices
GPUS = []
gpu_info = nvidia_smi()
for gpu_idx, gpu_info in gpu_info.items():
if len(gpu_info['procs']) == 0:
GPUS.append(gpu_idx)
return GPUS
def _experimental_resolve_pipeline(pipeline):
"""
Users need to be able to build and specify their own pipelines here
(similar to how kwiver pipelines work). This is initial support.
Ignore:
pipeline = 'user_module.pipelines.custom_pipeline_func()'
pipeline = 'geowatch.mlops.smart_pipeline.make_smart_pipeline("bas")'
pipeline = 'shitspotter.pipelines.heatmap_evaluation_pipeline()'
_experimental_resolve_pipeline(pipeline)
"""
# Case: given in the format `{module_name}.{attribute_expression}`
# Note the attribute_expression allows arbitrary code execution
print('Resolving user-specified pipeline')
if '.' in pipeline:
# Find which part is the module and which is the member
parts = pipeline.split('.')
found = None
for idx in reversed(range(1, len(parts) + 1)):
candidate = '.'.join(parts[:idx])
try:
modpath = _coerce_modpath(candidate)
except ValueError:
...
else:
lhs = candidate
rhs = '.'.join(parts[idx:])
module = ub.import_module_from_path(modpath)
found = (module, modpath, lhs, rhs)
print(f'found = {ub.urepr(found, nl=1)}')
break
if found is None:
raise ValueError('unable to resolve pipeline')
else:
# This initial specification allows arbitrary code execution.
# We should define a hardened variant which does not require this.
(module, modpath, lhs, rhs) = found
limited_namespace = {'pipeline_module': module}
statement = f'pipeline_module.{rhs}'
dag = eval(statement, limited_namespace)
return dag
else:
raise ValueError(pipeline)
def _coerce_modpath(modpath_or_name):
import types
import os
if isinstance(modpath_or_name, types.ModuleType):
raise TypeError('Expected a static module but got a dynamic one')
modpath = ub.modname_to_modpath(modpath_or_name)
if modpath is None:
if os.path.exists(modpath_or_name):
modpath = modpath_or_name
else:
raise ValueError('Cannot find module={}'.format(modpath_or_name))
return modpath
__notes__ = """
If this is going to be a real mlops framework, then we need to abstract the
pipeline. The user needs to define what the steps are, but then they need to
explicitly connect them. We can't make the assumptions we are currently using.
Ignore:
# We can use our CLIs as definitions of the pipeline as long as the config
# object has enough metadata. With scriptconfig+jsonargparse we should be
# able to do this.
import geowatch.cli.run_metrics_framework
import geowatch.cli.run_tracker
import geowatch.tasks.fusion.predict
geowatch.cli.run_tracker.__config__.__default__
list(geowatch.tasks.fusion.predict.make_predict_config().__dict__.keys())
from geowatch.tasks.tracking.from_heatmap import TimeAggregatedBAS
from geowatch.tasks.tracking.from_heatmap import TimeAggregatedSC
# from geowatch.tasks.tracking.from_heatmap import TimeAggregatedHybrid
import jsonargparse
parser = jsonargparse.ArgumentParser()
parser.add_class_arguments(TimeAggregatedBAS, nested_key='bas_poly')
parser.add_class_arguments(TimeAggregatedSC, nested_key='sc_poly')
parser.print_help()
parser.parse_known_args([])
import jsonargparse
parser = jsonargparse.ArgumentParser()
parser.add_argument('--foo')
args = parser.parse_args(args=[])
parser.save(args)
"""
__config__ = ScheduleEvaluationConfig
__config__.main = main
if __name__ == '__main__':
main()