#!/usr/bin/env python3
r"""
Helper for scheduling a set of prediction + evaluation jobs.
This is the main entrypoint for running a bunch of evaluation jobs over a grid
of parameters. We currently expect that pipelines are predefined in
smart_pipeline.py but in the future they will likely be an external resource
file.
TODO:
- [ ] Differentiate between pixel models for different tasks.
- [ ] Allow the output of tracking to feed into activity classification
- [ ] Rename to "schedule". The pipeline does not have to be an evaluation.
Example:
# Dummy inputs, just for demonstration
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- ./my_bas_model1.pt
- ./my_bas_model2.pt
bas_pxl.test_dataset:
- ./my_test_dataset/bas_ready_data.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- 'auto'
bas_pxl.input_space_scale:
- '15GSD'
bas_poly_eval.true_site_dpath: null
bas_poly_eval.true_region_dpath: null
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
- 0.2
sc_pxl.test_dataset:
- crop.dst
sc_pxl.window_space_scale:
- auto
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- my_sc_model1.pt
- my_sc_model2.pt
sc_poly_viz.enabled:
- false
" \
--root_dpath=./my_dag_runs \
--devices="0,1" --tmux_workers=2 \
--backend=serial --skip_existing=0 \
--pipeline=joint_bas_sc \
--run=0
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- ./my_bas_model1.pt
- ./my_bas_model2.pt
bas_pxl.test_dataset:
- ./my_test_dataset/bas_ready_data.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- 'auto'
bas_pxl.input_space_scale:
- '15GSD'
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
- 0.2
bas_pxl.enabled: 1
bas_poly_eval.true_site_dpath: true-site
bas_poly_eval.true_region_dpath: true-region
" \
--root_dpath=./my_dag_runs \
--devices="0,1" \
--backend=serial --skip_existing=0 \
--pipeline=bas \
--run=0
# Real inputs, this actually will run something given the DVC repos
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
SC_MODEL=$DVC_EXPT_DPATH/models/fusion/Drop4-SC/packages/Drop4_tune_V30_8GSD_V3/Drop4_tune_V30_8GSD_V3_epoch=2-step=17334.pt.pt
BAS_MODEL=$DVC_EXPT_DPATH/models/fusion/Drop4-BAS/packages/Drop4_TuneV323_BAS_30GSD_BGRNSH_V2/package_epoch0_step41.pt.pt
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
- $BAS_MODEL
bas_pxl.test_dataset:
- $DVC_DATA_DPATH/Drop4-BAS/KR_R001.kwcoco.json
bas_pxl.window_space_scale: 15GSD
bas_pxl.time_sampling:
- "auto"
bas_pxl.input_space_scale:
- "15GSD"
bas_poly.moving_window_size:
bas_poly.thresh:
- 0.1
sc_pxl.test_dataset:
- crop.dst
sc_pxl.window_space_scale:
- auto
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- $SC_MODEL
sc_poly_viz.enabled:
- false
" \
--root_dpath=./my_dag_runs \
--devices="0,1" --queue_size=2 \
--backend=serial --skip_existing=0 \
--pipeline=joint_bas_sc_nocrop \
--run=0
Example:
# Real data
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
DVC_EXPT_DPATH=$(geowatch_dvc --tags='phase2_expt' --hardware=auto)
python -m geowatch.mlops.schedule_evaluation \
--params="
matrix:
bas_pxl.package_fpath:
# - $DVC_EXPT_DPATH/models/fusion/Drop4-BAS/packages/Drop4_TuneV323_BAS_30GSD_BGRNSH_V2/package_epoch0_step41.pt.pt
- $DVC_EXPT_DPATH/package_epoch10_step200000.pt
bas_pxl.test_dataset:
- $DVC_DATA_DPATH/Drop4-BAS/KR_R001.kwcoco.json
# - $DVC_DATA_DPATH/Drop4-BAS/KR_R002.kwcoco.json
bas_pxl.window_space_scale:
- auto
# - "15GSD"
# - "30GSD"
# bas_pxl.chip_dims:
# - "256,256"
bas_pxl.time_sampling:
- "auto"
# bas_pxl.input_space_scale:
# - "window"
bas_poly.moving_window_size:
- null
# - 100
# - 200
bas_poly.thresh:
- 0.1
# - 0.13
# - 0.2
sc_pxl.window_space_scale:
- auto
sc_pxl.input_space_scale:
- "window"
sc_pxl.chip_dims:
- "256,256"
sc_poly.thresh:
- 0.1
sc_poly.use_viterbi:
- 0
sc_pxl.package_fpath:
- $DVC_EXPT_DPATH/models/fusion/Drop4-SC/packages/Drop4_tune_V30_8GSD_V3/Drop4_tune_V30_8GSD_V3_epoch=2-step=17334.pt.pt
bas_poly_eval.enabled: 1
bas_pxl_eval.enabled: 1
bas_poly_viz.enabled: 1
sc_poly_eval.enabled: 1
sc_pxl_eval.enabled: 1
sc_poly_viz.enabled: 1
" \
--root_dpath=$DVC_EXPT_DPATH/_testpipe \
--enable_links=1 \
--devices="0,1" --queue_size=2 \
--backend=serial \
--pipeline=bas \
--cache=1 --rprint=1 --run=1
--pipeline=joint_bas_sc
xdev tree --dirblocklist "_*" my_expt_dir/_testpipe/ --max_files=1
"""
import ubelt as ub
import scriptconfig as scfg
from cmd_queue.cli_boilerplate import CMDQueueConfig
[docs]
class ScheduleEvaluationConfig(CMDQueueConfig):
"""
Driver for GeoWATCH mlops evaluation scheduling
Builds commands and optionally executes them via slurm, tmux, or serial
(i.e. one at a time). This is a [link=https://gitlab.kitware.com/computer-vision/cmd_queue]cmd_queue[/link] CLI.
"""
__command__ = 'schedule'
__alias__ = ['mlops_schedule']
params = scfg.Value(None, type=str, help='a yaml/json grid/matrix of prediction params')
devices = scfg.Value(None, help=(
'if using tmux or serial, indicate which gpus are available for use '
'as a comma separated list: e.g. 0,1'))
skip_existing = scfg.Value(False, help=(
'if True dont submit commands where the expected '
'products already exist'))
pred_workers = scfg.Value(4, help='number of prediction workers in each process')
root_dpath = scfg.Value('auto', help=(
'Where do dump all results. If "auto", uses <expt_dvc_dpath>/dag_runs'))
pipeline = scfg.Value('joint_bas_sc', help=ub.paragraph(
'''
The name of the pipeline to run. Can also specify this in the params.
This can be a name of an internally registered pipeline, or it can
point to a function that defines a pipeline in a Python file. E.g.
``user_module.pipelines.custom_pipeline_func()`` or
``$HOME/my_code/my_pipeline.py::make_my_pipeline("arg")``.
'''))
enable_links = scfg.Value(True, isflag=True, help='if true enable symlink jobs')
cache = scfg.Value(True, isflag=True, help=(
'if true, each a test is appened to each job to skip itself if its output exists'))
draw_heatmaps = scfg.Value(1, isflag=True, help='if true draw heatmaps on pixel eval')
draw_curves = scfg.Value(1, isflag=True, help='if true draw curves on pixel eval')
max_configs = scfg.Value(None, help='if specified only run at most this many of the grid search configs')
queue_size = scfg.Value(None, help='if auto, defaults to number of GPUs')
print_varied = scfg.Value('auto', isflag=True, help='print the varied parameters')
def __post_init__(self):
super().__post_init__()
if self.queue_name is None:
self.queue_name = 'schedule-eval'
if self.queue_size is not None:
raise Exception('The queue_size argument to schedule evaluation has been removed. Use the tmux_workers argument instead')
# self.tmux_workers = self.queue_size
from cmd_queue.util.util_yaml import Yaml
self.slurm_options = Yaml.coerce(self.slurm_options) or {}
devices = self.devices
if devices == 'auto':
GPUS = _auto_gpus()
else:
GPUS = None if devices is None else ensure_iterable(devices)
self.devices = GPUS
[docs]
def main(cmdline=True, **kwargs):
config = ScheduleEvaluationConfig.cli(cmdline=cmdline, data=kwargs, strict=True)
import rich
rich.print('ScheduleEvaluationConfig config = {}'.format(ub.urepr(config, nl=1, sv=1)))
schedule_evaluation(config)
[docs]
def schedule_evaluation(config):
r"""
First ensure that models have been copied to the DVC repo in the
appropriate path. (as noted by model_dpath)
"""
import json
import pandas as pd
import rich
from kwutil import slugify_ext
from kwutil import util_progress
from kwutil.util_yaml import Yaml
from geowatch.mlops import pipeline_nodes
from geowatch.utils.result_analysis import varied_values
from geowatch.utils.util_param_grid import expand_param_grid
# Dont put in post-init because it is called by the CLI!
if config['root_dpath'] in {None, 'auto'}:
import geowatch
expt_dvc_dpath = geowatch.find_dvc_dpath(tags='phase2_expt', hardware='auto')
config['root_dpath'] = expt_dvc_dpath / 'dag_runs'
root_dpath = ub.Path(config['root_dpath'])
pipeline = config.pipeline
if config['params'] is not None:
param_arg = Yaml.coerce(config['params']) or {}
pipeline = param_arg.pop('pipeline', config.pipeline)
# Associate BAS datasets and HighRes datasets
# Convinience to make it less tedious to specify datasets.
# Hard codes the DVC pattern to associate lowres and hires data.
# This is not a robust mechanism.
smart_highres_bundle = param_arg.pop('smart_highres_bundle', None)
if smart_highres_bundle is not None:
smart_highres_bundle = ub.Path(smart_highres_bundle)
assert smart_highres_bundle.exists()
if 'submatrices' in param_arg:
raise Exception('cant hack submatrices with submatrices on')
submatrices = []
from geowatch import heuristics
for bas_fpath in param_arg['matrix']['bas_pxl.test_dataset']:
region_id = heuristics.extract_region_id(ub.Path(bas_fpath).name)
region_dpath = (smart_highres_bundle / region_id)
hires_coco_candidates = [
region_dpath / f'imgonly-{region_id}.kwcoco.zip',
region_dpath / f'imgonly-{region_id}-rawbands.kwcoco.zip',
]
hires_coco_fpath = None
for cand_fpath in hires_coco_candidates:
if cand_fpath.exists():
hires_coco_fpath = cand_fpath
break
if hires_coco_fpath is None:
raise Exception(f'Expected hires path, but no candidates exist: {hires_coco_candidates}')
submatrices.append({
'bas_pxl.test_dataset': bas_fpath,
'sv_crop.crop_src_fpath': hires_coco_fpath,
'sc_crop.crop_src_fpath': hires_coco_fpath,
})
param_arg['submatrices'] = submatrices
# Load the requested pipeline
dag = pipeline_nodes.coerce_pipeline(pipeline)
dag.print_graphs()
dag.inspect_configurables()
if config.run:
mlops_meta = (root_dpath / '_mlops_schedule').ensuredir()
(root_dpath / '_cmd_queue_schedule').ensuredir()
# Write some metadata to help aggregate set its defaults automatically
most_recent_fpath = mlops_meta / 'most_recent_run.json'
data = {
'pipeline': str(pipeline),
}
most_recent_fpath.write_text(json.dumps(data, indent=' '))
queue = config.create_queue(gpus=config.devices)
# Expand paramater search grid
if config['params'] is not None:
# print('param_arg = {}'.format(ub.urepr(param_arg, nl=1)))
all_param_grid = list(expand_param_grid(
param_arg,
max_configs=config['max_configs'],
))
else:
all_param_grid = []
if len(all_param_grid) == 0:
print('WARNING: PARAM GRID IS EMPTY')
# Configure a DAG for each row.
pman = util_progress.ProgressManager()
configured_stats = []
with pman:
for row_config in pman.progiter(all_param_grid, desc='configure dags', verbose=3):
dag.configure(
config=row_config,
root_dpath=root_dpath,
cache=config['cache'])
summary = dag.submit_jobs(
queue=queue,
skip_existing=config['skip_existing'],
enable_links=config['enable_links'])
configured_stats.append(summary)
print(f'len(queue)={len(queue)}')
print_thresh = 30
if config['print_varied'] == 'auto':
if len(queue) < print_thresh:
config['print_varied'] = 1
else:
print(f'More than {print_thresh} jobs, skip print_varied. '
'If you want to see them explicitly specify print_varied=1')
config['print_varied'] = 0
if config['print_varied']:
# Print config info
longparams = pd.DataFrame(all_param_grid)
varied = varied_values(longparams, min_variations=2, dropna=False)
relevant = longparams[longparams.columns.intersection(varied)]
def pandas_preformat(item):
if isinstance(item, str):
return slugify_ext.smart_truncate(item, max_length=16, trunc_loc=0)
else:
return item
displayable = relevant.applymap(pandas_preformat)
rich.print(displayable.to_string())
for job in queue.jobs:
# TODO: should be able to set this as a queue param.
job.log = False
if config.run:
ub.ensuredir(dag.root_dpath)
print_kwargs = {
'with_status': 0,
'style': "colors",
'with_locks': 0,
'exclude_tags': ['boilerplate'],
}
rich.print(f'\n\ndag.root_dpath: [link={dag.root_dpath}]{dag.root_dpath}[/link]')
config.run_queue(queue, print_kwargs=print_kwargs)
if not config.run:
driver_fpath = queue.write()
print('Wrote script: to run execute:\n{}'.format(driver_fpath))
return dag, queue
[docs]
def ensure_iterable(inputs):
return inputs if ub.iterable(inputs) else [inputs]
def _auto_gpus():
from geowatch.utils.util_nvidia import nvidia_smi
# TODO: liberate the needed code from netharn
# Use all unused devices
GPUS = []
gpu_info = nvidia_smi()
for gpu_idx, gpu_info in gpu_info.items():
if len(gpu_info['procs']) == 0:
GPUS.append(gpu_idx)
return GPUS
__cli__ = ScheduleEvaluationConfig
__cli__.main = main
if __name__ == '__main__':
main()