"""
Logic for loading raw results from the MLops DAG root dir.
Used by ./aggregate.py
"""
import ubelt as ub
from kwutil import util_pattern
from kwutil import util_parallel
from geowatch.utils import util_dotdict
import parse
import json
#from geowatch.mlops import smart_pipeline
from geowatch.mlops import smart_result_parser
[docs]
def build_tables(root_dpath, dag, io_workers, eval_nodes,
cache_resolved_results):
import pandas as pd
from kwutil import util_progress
io_workers = util_parallel.coerce_num_workers(io_workers)
print(f'io_workers={io_workers}')
# Hard coded nodes of interest to gather. Should abstract later.
# The user-defined pipelines should be responsible for providing the
# methods needed to parse their outputs.
node_eval_infos = [
{'name': 'bas_pxl_eval', 'out_key': 'eval_pxl_fpath'},
{'name': 'sc_poly_eval', 'out_key': 'eval_fpath'},
{'name': 'bas_poly_eval', 'out_key': 'eval_fpath'},
{'name': 'sv_poly_eval', 'out_key': 'eval_fpath'},
]
lut = ub.udict({info['name']: info for info in node_eval_infos})
DEVFLAG = 1
if DEVFLAG:
for node_name in eval_nodes:
node = dag.nodes[node_name]
if node_name not in lut:
if len(node.out_paths) == 1:
primary_out_key = list(node.out_paths)[0]
elif getattr(node, 'primary_out_key', None) is not None:
primary_out_key = node.primary_out_key
else:
raise Exception(ub.paragraph(
'''
evaluation nodes must have a single item in out_paths
or define a primary_out_key
'''))
node_eval_infos.append({
'name': node.name,
'out_key': primary_out_key,
})
lut = ub.udict({info['name']: info for info in node_eval_infos})
if eval_nodes is None:
node_eval_infos_chosen = node_eval_infos
else:
try:
node_eval_infos_chosen = list(lut.take(eval_nodes))
except Exception as ex:
from kwutil.util_exception import add_exception_note
raise add_exception_note(ex, ub.paragraph(
f'''
Unknown evaluation node. Evaluation nodes need to be
connected to a function that can parse their results.
Requested evaluation nodes were: {eval_nodes}.
But available nodes are {list(lut.keys())}.
'''))
from concurrent.futures import as_completed
pman = util_progress.ProgressManager(backend='rich')
# pman = util_progress.ProgressManager(backend='progiter')
with pman:
eval_type_to_results = {}
eval_node_prog = pman.progiter(node_eval_infos_chosen, desc='Loading node results')
for node_eval_info in eval_node_prog:
node_name = node_eval_info['name']
out_key = node_eval_info['out_key']
if node_name not in dag.nodes:
continue
node = dag.nodes[node_name]
out_node = node.outputs[out_key]
fpaths = out_node_matching_fpaths(out_node)
# Pattern match
# node.template_out_paths[out_node.name]
cols = {
'index': [],
'metrics': [],
'requested_params': [],
'resolved_params': [],
'specified_params': [],
'other': [],
'fpath': [],
# 'json_info': [],
}
executor = ub.Executor(mode='process', max_workers=io_workers)
jobs = []
submit_prog = pman.progiter(
fpaths, desc=f' * submit load jobs: {node_name}',
transient=True)
for fpath in submit_prog:
job = executor.submit(load_result_worker, fpath, node_name,
node=node, dag=dag,
use_cache=cache_resolved_results)
jobs.append(job)
num_ignored = 0
job_iter = as_completed(jobs)
del jobs
collect_prog = pman.progiter(
job_iter, total=len(fpaths),
desc=f' * loading node results: {node_name}')
for job in collect_prog:
result = job.result()
if result['requested_params'] or True:
assert set(result.keys()) == set(cols.keys())
for k, v in result.items():
cols[k].append(v)
else:
num_ignored += 1
results = {
'fpath': pd.DataFrame(cols['fpath'], columns=['fpath']),
'index': pd.DataFrame(cols['index']),
'metrics': pd.DataFrame(cols['metrics']),
'requested_params': pd.DataFrame(cols['requested_params'], dtype=object), # prevents nones from being read as nan
'specified_params': pd.DataFrame(cols['specified_params']),
'resolved_params': pd.DataFrame(cols['resolved_params'], dtype=object),
'other': pd.DataFrame(cols['other']),
}
# print(results['resolved_params']['resolved_params.sc_poly.smoothing'])
eval_type_to_results[node_name] = results
return eval_type_to_results
[docs]
def load_result_worker(fpath, node_name, node=None, dag=None, use_cache=True):
"""
Main driver for loading results
Ignore:
fpath = ub.Path('/home/joncrall/remote/toothbrush/data/dvc-repos/smart_expt_dvc/_testpipe/eval/flat/bas_poly_eval/bas_poly_eval_id_1ad531cc/poly_eval.json')
node_name = 'bas_poly_eval'
"""
import json
import safer
import rich
from kwutil import util_json
from kwutil.util_exception import add_exception_note
fpath = ub.Path(fpath)
resolved_json_fpath = fpath.parent / 'resolved_result_row_v012.json'
if use_cache and resolved_json_fpath.exists():
# Load the cached row data
try:
result = json.loads(resolved_json_fpath.read_text())
except Exception as ex:
raise add_exception_note(ex, f'Failed to read {resolved_json_fpath!r}')
else:
node_dpath = fpath.parent
node_dpath = ub.Path(node_dpath)
# Read the requested config
job_config_fpath = node_dpath / 'job_config.json'
if job_config_fpath.exists():
try:
job_config_text = job_config_fpath.read_text()
_requested_params = json.loads(job_config_text)
except Exception as ex:
raise add_exception_note(ex, f'Failed to parse json job config {job_config_fpath}')
else:
_requested_params = {}
requested_params = util_dotdict.DotDict(_requested_params).add_prefix('params')
specified_params = {'specified.' + k: 1 for k in requested_params}
# Read the resolved config
# (Uses the DAG to trace the result lineage)
try:
flat = load_result_resolved(node_dpath, node=node, dag=dag)
HACK_FOR_REGION_ID = True
if HACK_FOR_REGION_ID:
# Munge data to get the region ids we expect
candidate_keys = list(flat.query_keys('region_ids'))
region_ids = None
for k in candidate_keys:
region_ids = flat[k]
if region_ids is None:
print(ub.paragraph(
'''
Warning: no region ids available, some assumptions may
be violated.
'''))
region_ids = 'unknown'
else:
import re
region_pat = re.compile(r'[A-Z][A-Za-z]*_[A-Z]\d\d\d')
region_ids = ','.join(list(region_pat.findall(region_ids)))
resolved_params_keys = list(flat.query_keys('resolved_params'))
metrics_keys = list(flat.query_keys('metrics'))
resolved_params = flat & resolved_params_keys
metrics = flat & metrics_keys
other = flat - (resolved_params_keys + metrics_keys)
index = {
'node': node_name,
'region_id': region_ids,
}
result = {
'fpath': fpath,
'index': index,
'metrics': metrics,
'requested_params': requested_params,
'resolved_params': resolved_params,
'specified_params': specified_params,
'other': other,
}
# Cache this resolved row data
result = util_json.ensure_json_serializable(result)
except Exception as ex:
rich.print(f'[red]Failed to load results for: {node_name}')
rich.print(f'node_dpath={str(node_dpath)!r}')
rich.print('ex = {}'.format(ub.urepr(ex, nl=1)))
raise
with safer.open(resolved_json_fpath, 'w') as file:
json.dump(result, file, indent=4)
HACK_FOR_BAD_SV_REQUEST = 1
if HACK_FOR_BAD_SV_REQUEST:
if node_name == 'sv_poly_eval':
import pandas as pd
if pd.isnull(result['requested_params'].get('params.sv_crop.crop_src_fpath', None)):
result['requested_params'] = None
return result
[docs]
def new_process_context_parser(proc_item):
"""
Load parameters out of data saved by a ProcessContext object
"""
tracker_name_pat = util_pattern.MultiPattern.coerce({
'geowatch.cli.run_tracker',
'geowatch.cli.run_tracker',
})
heatmap_name_pat = util_pattern.MultiPattern.coerce({
'geowatch.tasks.fusion.predict',
})
pxl_eval_pat = util_pattern.MultiPattern.coerce({
'geowatch.tasks.fusion.evaluate',
})
proc_item = smart_result_parser._handle_process_item(proc_item)
props = proc_item['properties']
# Node-specific hacks
params = props['config']
if tracker_name_pat.match(props['name']):
params.update(**json.loads(params.pop('track_kwargs', '{}')))
elif heatmap_name_pat.match(props['name']):
params.pop('datamodule_defaults', None)
elif pxl_eval_pat.match(props['name']):
from geowatch.tasks.fusion import evaluate
# We can resolve the params to a dictionary in this instance
if isinstance(params, list) or 'true_dataset' not in params:
args = props['args']
params = evaluate.SegmentationEvalConfig().load(cmdline=args).to_dict()
resources = smart_result_parser.parse_resource_item(proc_item, add_prefix=False)
output = {
# TODO: better name for this
'context': {
'task': props['name'],
'uuid': props.get('uuid', None),
'start_timestamp': props.get('start_timestamp', None),
'stop_timestamp': props.get('stop_timestamp', props.get('end_timestamp', None)),
},
'resolved_params': params,
'resources': resources,
'machine': props.get('machine', {}),
}
return output
[docs]
def load_result_resolved(node_dpath, node=None, dag=None):
"""
Recurse through the DAG filesytem structure and load resolved
configurations from each step.
Args:
node_dpath (str | PathLike):
the path to the evaluation node directory. The specific type of
evaluation node must have a known (currently hard-coded) condition
in this function that knows how to parse it.
node (None | ProcessNode):
new experimental way to allow users to specify how results should
be loaded. The node should have a "load_result" function that
accepts node_dpath as a single argument and then returns a flat
resolved dotdict of hyperparams, metrics, and context.
dag (None | Pipeline):
Used to lookup loading functions for predecessor nodes.
Returns:
Dict - flat_resolved - a flat dot-dictionary with resolved params
TODO:
Some mechanism to let the user specify how to parse an evaluation node
of a given type.
Ignore:
>>> # To diagnose issues, construct a path to an evaluation node to get the
>>> # relevant project-specific entrypoint data.
>>> # TODO: need a demo pipeline that we can test for robustness here.
>>> from geowatch.mlops.aggregate_loader import * # NOQA
>>> from geowatch.mlops.aggregate_loader import load_result_resolved
>>> import geowatch
>>> import rich
>>> expt_dpath = geowatch.find_dvc_dpath(tags='phase3_expt')
>>> # choose the location mlops schedule dumped results to
>>> mlops_dpath = expt_dpath / '_preeval20_bas_grid'
>>> # Search and pick a poly eval node, the specific path
>>> # will depend on the pipeline structure, which may be revised
>>> # in the future. At the start of SMART phase3, we keep all
>>> # eval nodes grouped in eval/flat, so enumerate those
>>> node_type_dpaths = list(mlops_dpath.glob('eval/flat/*'))
>>> node_type_dpaths += list(mlops_dpath.glob('pred/flat/*'))
>>> # For each eval node_type, choose a node in it.
>>> for node_type_dpath in node_type_dpaths:
>>> for node_dpath in node_type_dpath.ls():
>>> if len(node_dpath.ls()) > 2:
>>> print(f'Found node_dpath={node_dpath}')
>>> break
>>> print(f'node_dpath={node_dpath}')
>>> flat_resolved = load_result_resolved(node_dpath)
>>> rich.print(f'flat_resolved = {ub.urepr(flat_resolved, nl=1)}')
Ignore:
## OR If you know the node_type of node you want
from geowatch.mlops.aggregate_loader import * # NOQA
import geowatch
import rich
expt_dpath = geowatch.find_dvc_dpath(tags='phase3_expt')
mlops_dpath = expt_dpath / '_preeval20_bas_grid'
node_group, node_type = 'eval', 'bas_poly_eval'
node_group, node_type = 'pred', 'sc_pxl'
node_group, node_type = 'pred', 'bas_pxl'
node_group, node_type = 'eval', 'bas_pxl_eval'
for node_dpath in mlops_dpath.glob(f'{node_group}/flat/{node_type}/*'):
if len(node_dpath.ls()) > 2:
print(f'Found node_dpath={node_dpath}')
break
flat_resolved = load_result_resolved(node_dpath)
rich.print(f'flat_resolved = {ub.urepr(flat_resolved, nl=1)}')
Ignore:
from geowatch.mlops.aggregate_loader import * # NOQA
node_dpath = ub.Path('/home/joncrall/remote/toothbrush/data/dvc-repos/smart_expt_dvc/_testpipe/eval/flat/bas_poly_eval/bas_poly_eval_id_1ad531cc')
node_dpath = ub.Path('/home/joncrall/remote/toothbrush/data/dvc-repos/smart_expt_dvc/_testpipe/eval/flat/bas_pxl_eval/bas_pxl_eval_id_6028edfe/')
node_dpath = ub.Path('/home/joncrall/remote/toothbrush/data/dvc-repos/smart_expt_dvc/_timekernel_test_drop4/eval/flat/bas_pxl_eval/bas_pxl_eval_id_5d38c6b3')
got = load_result_resolved(node_dpath)
"""
# from geowatch.utils.util_dotdict import explore_nested_dict
node_dpath = ub.Path(node_dpath)
node_type_dpath = node_dpath.parent
node_type = node_type_dpath.name
if dag is not None:
if node is None:
try:
node = dag.nodes[node_type]
except KeyError:
print(f'node_dpath = {ub.urepr(node_dpath, nl=1)}')
print(f'node_type = {ub.urepr(node_type, nl=1)}')
print(f'dag.nodes = {ub.urepr(dag.nodes, nl=1)}')
raise
if node is not None and hasattr(node, 'load_result'):
flat_resolved = node.load_result(node_dpath)
if flat_resolved is None:
raise AssertionError('node.load_result should have returned a dict')
elif node_type in {'sc_pxl', 'bas_pxl'}:
pat = util_pattern.Pattern.coerce(node_dpath / 'pred.kwcoco.*')
found = list(pat.paths())
if len(found) == 0:
raise FileNotFoundError(f'Unable to find expected kwcoco file in {node_type} node_dpath: {node_dpath}')
fpath = found[0]
bas_pxl_info = smart_result_parser.parse_json_header(fpath)
proc_item = smart_result_parser.find_pred_pxl_item(bas_pxl_info)
nest_resolved = new_process_context_parser(proc_item)
flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
flat_resolved = flat_resolved.insert_prefix(node_type, index=1)
# Record the train-time parameters
fit_node_type = node_type + '_fit'
extra = proc_item['properties']['extra']
fit_config = extra['fit_config']
if 'data' not in fit_config:
raise Exception(ub.paragraph(
f'''
A kwcoco has an old fit-config that did not contain all
train-time params. To fix this run for a single file run:
``python -m geowatch.cli.experimental.fixup_predict_kwcoco_metadata {fpath}``
''') +
'\n\n' +
ub.paragraph(
'''
For more details see:
``python -m geowatch.cli.experimental.fixup_predict_kwcoco_metadata --help``
'''))
fit_nested = {
'context': {'task': 'geowatch.tasks.fusion.fit'},
'resolved_params': fit_config,
'resources': {},
'machine': {},
}
flat_fit_resolved = util_dotdict.DotDict.from_nested(fit_nested)
flat_fit_resolved = flat_fit_resolved.insert_prefix(fit_node_type, index=1)
flat_resolved |= flat_fit_resolved
elif node_type in {'bas_poly', 'sc_poly'}:
pat = util_pattern.Pattern.coerce(node_dpath / 'poly.kwcoco.*')
fpath = list(pat.paths())[0]
try:
bas_poly_info = smart_result_parser.parse_json_header(fpath)
except Exception:
# There are some cases where the kwcoco file was clobbered,
# but we can work around by using a manifest file.
pat = util_pattern.Pattern.coerce(node_dpath / '*_manifest.json')
fpath = list(pat.paths())[0]
bas_poly_info = smart_result_parser.parse_json_header(fpath)
raise
proc_item = smart_result_parser.find_track_item(bas_poly_info)
nest_resolved = new_process_context_parser(proc_item)
flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
flat_resolved = flat_resolved.insert_prefix(node_type, index=1)
elif node_type in {'bas_poly_eval', 'sc_poly_eval', 'sv_poly_eval'}:
fpath = node_dpath / 'poly_eval.json'
iarpa_result = smart_result_parser.load_iarpa_evaluation(fpath)
proc_item = smart_result_parser.find_metrics_framework_item(
iarpa_result['iarpa_json']['info'])
nest_resolved = new_process_context_parser(proc_item)
nest_resolved['metrics'] = iarpa_result['metrics']
flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
flat_resolved['context.region_ids'] = iarpa_result['iarpa_json']['region_ids']
flat_resolved = flat_resolved.insert_prefix(node_type, 1)
elif node_type in {'bas_pxl_eval', 'sc_pxl_eval'}:
fpath = node_dpath / 'pxl_eval.json'
info = smart_result_parser.load_pxl_eval(fpath, with_param_types=False)
metrics = info['metrics']
proc_item = smart_result_parser.find_pxl_eval_item(
info['json_info']['meta']['info'])
nest_resolved = new_process_context_parser(proc_item)
# Hack for region ids
nest_resolved['context']['region_ids'] = ub.Path(nest_resolved['resolved_params']['true_dataset']).name.split('.')[0]
nest_resolved['metrics'] = metrics
flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
flat_resolved = flat_resolved.insert_prefix(node_type, 1)
elif node_type in {'valicrop', 'sv_crop'}:
# TODO: parse resolved params
# nest_resolved = {}
# flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
# flat_resolved = flat_resolved.insert_prefix(node_type, index=1)
node_process_name = 'coco_align'
fpath = node_dpath / 'sv_crop.kwcoco.zip'
if not fpath.exists():
fpath = node_dpath / 'valicrop.kwcoco.zip'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
elif node_type in {'sc_crop'}:
# TODO: parse resolved params
node_process_name = 'coco_align'
fpath = node_dpath / 'sitecrop.kwcoco.zip'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
elif node_type in {'cluster_sites'}:
# TODO: write out resolved params in cluster sites
flat_resolved = {}
elif node_type in {'sv_dino_boxes'}:
node_process_name = 'box.predict'
fpath = node_dpath / 'pred_boxes.kwcoco.zip'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
elif node_type in {'sv_dino_filter'}:
node_process_name = 'geowatch.tasks.dino_detector.building_validator'
fpath = node_dpath / 'out_site_manifest.json'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
# TODO: it would be nice if just declaring a node gave us this information.
elif node_type in {'sv_depth_score'}:
from geowatch.mlops import smart_pipeline
fpath = node_dpath / smart_pipeline.SV_DepthPredict.out_paths['out_kwcoco']
node_process_name = 'geowatch.tasks.depth_pcd.score_tracks'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
elif node_type in {'sv_depth_filter'}:
from geowatch.mlops import smart_pipeline
fpath = node_dpath / smart_pipeline.SV_DepthFilter.out_paths['output_site_manifest_fpath']
node_process_name = 'geowatch.tasks.depth_pcd.filter_tracks'
flat_resolved = _generalized_process_flat_resolved(fpath, node_process_name, node_type)
else:
raise NotImplementedError(ub.paragraph(
f'''
Attempted to load a result for {node_type} in {node_dpath}.
But was unable to determine how to do so.
In your pipeline class you a method ``def load_result(self,
node_dpath):`` which returns a flat dot-dictionary of params and
results from the node.
'''))
# Determine if this node has any predecessor computations and load results
# from those as well to have a flat and complete picture of the process
# lineage for this node.
predecessor_dpath = node_dpath / '.pred'
for predecessor_node_type_dpath in predecessor_dpath.glob('*'):
# predecessor_node_type = predecessor_node_type_dpath.name
for predecessor_node_dpath in predecessor_node_type_dpath.glob('*'):
if predecessor_node_dpath.exists():
predecessor_flat_resolved = load_result_resolved(predecessor_node_dpath, dag=dag)
flat_resolved |= predecessor_flat_resolved
return flat_resolved
def _generalized_process_flat_resolved(fpath, node_process_name, node_type):
"""
Parses info out of json files where we conform to a pattern that the top
level json object has an "info" section as list of ProcessContext info
objects. We grab the context of a metric-less node there. We do need to
know the filepath of this json file and what the name used in process
context was.
"""
info = smart_result_parser.parse_json_header(fpath)
proc_item = smart_result_parser.find_info_items(info, {'process'}, {node_process_name})
items = list(proc_item)
assert len(items) == 1
proc_item = items[0]
nest_resolved = new_process_context_parser(proc_item)
flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved)
flat_resolved = flat_resolved.insert_prefix(node_type, index=1)
return flat_resolved
[docs]
def out_node_matching_fpaths(out_node):
out_template = out_node.template_value
parser = parse.Parser(str(out_template))
patterns = {n: '*' for n in parser.named_fields}
pat = out_template.format(**patterns)
mpat = util_pattern.Pattern.coerce(pat)
fpaths = list(mpat.paths())
return fpaths
if 1:
import numpy as np
if np.bool_ is not bool:
# Hack for a ubelt issue
@ub.hash_data.register(np.bool_)
def _hashnp_bool(data):
from ubelt.util_hash import _int_to_bytes
# warnings.warn('Hashing ints is slow, numpy is preferred')
hashable = _int_to_bytes(bool(data))
# hashable = data.to_bytes(8, byteorder='big')
prefix = b'INT'
return prefix, hashable