"""
TODO:
- [ ] make a nicer DVC registry API and CLI
- [ ] rename
SeeAlso:
../cli/find_dvc.py
python -m geowatch find_dvc list --hardware=ssd --tags=phase2_data
python -m geowatch find_dvc list --hardware=hdd --tags=phase2_data
python -m geowatch find_dvc list --hardware=auto --tags=phase2_data
"""
import ubelt as ub
import warnings
import os
[docs]
class DataRegistry:
"""
Provide a quick way of storing and querying for machine specific paths
Ignore:
from geowatch.utils.util_data import * # NOQA
self = DataRegistry()
self.read()
test_dpath = ub.Path.appdir('geowatch/tests/dvc_registry').ensuredir()
repo1 = (test_dpath / 'repo1').ensuredir()
repo2 = (test_dpath / 'repo2').ensuredir()
repo_hdd2 = (test_dpath / 'repo2-hdd').ensuredir()
repo_ssd2 = (test_dpath / 'repo2-ssd').ensuredir()
repo_ffs2 = (test_dpath / 'repo2-ffs').ensuredir()
self.add('repo1', path=repo1, tags='data_phase1')
self.add('repo2', path=repo2, tags='expt_phase1')
self.add('repo_hdd2', path=repo_hdd2, hardware='hdd', tags='expt_phase1')
self.add('repo_ffs2', path=repo_ffs2, hardware='ffs', tags='expt_phase1', priority=10)
self.add('repo_ssd2', path=repo_ssd2, hardware='ssd', tags='expt_phase1')
print(self.pandas())
print(ub.urepr(self.read()))
self.query(tags='expt_phase1')
self.query(tags='expt_phase1', max_results=1)
self.query()
"""
def __init__(self, registry_fpath=None):
if registry_fpath is None:
old_watch_config_dpath = ub.Path.appdir(type='config', appname='watch')
new_watch_config_dpath = ub.Path.appdir(type='config', appname='geowatch')
if old_watch_config_dpath.exists():
watch_config_dpath = old_watch_config_dpath
import warnings
warnings.warn(f'Using old watch config directory {old_watch_config_dpath}. Please move all contents to the new directory {new_watch_config_dpath}')
else:
watch_config_dpath = new_watch_config_dpath
registry_dpath = (watch_config_dpath / 'registry').ensuredir()
registry_fpath = registry_dpath / 'watch_dvc_registry.shelf'
self.registry_fpath = registry_fpath
self._default_attributes = ub.udict({
'priority': None,
'hardware': None,
'tags': None,
})
# TODO: just use default and filter NoParams
self._expected_attrs = {
'name': ub.NoParam,
'path': ub.NoParam,
} | self._default_attributes
[docs]
def pandas(self, **kwargs):
import pandas as pd
df = pd.DataFrame(self.query(**kwargs))
if len(df):
df['exists'] = df['path'].apply(lambda p: ub.Path(p).exists())
return df
[docs]
def list(self, **kwargs):
from rich import print
print(self.pandas(**kwargs).to_string())
def _open(self):
import shelve
shelf = shelve.open(os.fspath(self.registry_fpath))
return shelf
[docs]
def add(self, name, path, **kwargs):
if name is None:
raise ValueError('Must specify a name')
if path is None:
raise ValueError('Must specify a path')
unknown = ub.udict(kwargs) - self._default_attributes
if unknown:
raise ValueError(f'Unknown kwargs={unknown}')
path = ub.Path(path).absolute()
if 'hardware' in kwargs:
if kwargs['hardware'] == 'auto':
from geowatch.utils import util_hardware
info = util_hardware.disk_info_of_path(path)
if 'hwtype' in info:
kwargs['hardware'] = info['hwtype']
else:
print('unable to automatically determine hardware type')
kwargs.pop('hardware')
row = ub.udict({'name': name, 'path': os.fspath(path)}) | self._default_attributes
row |= (kwargs & row)
shelf = self._open()
try:
shelf[name] = row
finally:
shelf.close()
[docs]
def set(self, name, path=None, **kwargs):
"""
Set an attribute of a row
"""
if name is None:
raise ValueError('Must specify a name')
unknown = ub.udict(kwargs) - self._default_attributes
if unknown:
raise ValueError(f'Unknown kwargs={unknown}')
row = ub.udict({'name': name, 'path': path}) | self._default_attributes
row |= (kwargs & row)
shelf = self._open()
try:
existing = shelf[name]
row |= {existing[k] for k, v in row if v is None}
shelf[name] = row
finally:
shelf.close()
[docs]
def remove(self, name):
"""
Ignore:
name = 'test'
path = 'foo/bar'
hardware = 'fake'
"""
if name is None:
raise ValueError('Must specify a name')
shelf = self._open()
try:
shelf.pop(name)
except Exception as ex:
warnings.warn('Unable to access shelf: {}'.format(ex))
finally:
shelf.close()
[docs]
def read(self):
"""
Ignore:
name = 'test'
path = 'foo/bar'
hardware = 'fake'
"""
# Hard coded fallback candidate DVC paths, we will remove this in the
# future.
hardcoded_paths = [
# {'name': 'hack_data_hdd', 'hardware': 'hdd', 'tags': 'phase2_data', 'path': ub.Path('~/data/dvc-repos/smart_data_dvc').expand()},
# {'name': 'hack_expt_hdd', 'hardware': 'hdd', 'tags': 'phase2_expt', 'path': ub.Path('~/data/dvc-repos/smart_expt_dvc').expand()},
# {'name': 'hack_data_ssd', 'hardware': 'ssd', 'tags': 'phase2_data', 'path': ub.Path('~/data/dvc-repos/smart_data_dvc-ssd').expand()},
# {'name': 'hack_expt_ssd', 'hardware': 'ssd', 'tags': 'phase2_expt', 'path': ub.Path('~/data/dvc-repos/smart_expt_dvc-ssd').expand()},
# {'name': 'hack_data2', 'tags': 'phase2_data', 'path': ub.Path('~/data/smart_data_dvc/').expand()},
# {'name': 'hack_expt2', 'tags': 'phase2_expt', 'path': ub.Path('~/data/smart_expt_dvc/').expand()},
]
# registry_rows = [row for row in hardcoded_paths if row['path'].exists()]
registry_rows = hardcoded_paths.copy()
shelf = self._open()
try:
registry_rows += list(shelf.values())
except Exception as ex:
warnings.warn('Unable to access shelf: {}'.format(ex))
finally:
shelf.close()
registry_rows = sorted(
registry_rows,
key=lambda r:
r['priority']
if r.get('priority', None) is not None else
-float('inf'))[::-1]
return registry_rows
[docs]
def query(self, must_exist=False, **kwargs):
unexepcted = ub.udict(kwargs) - self._expected_attrs
if unexepcted:
raise ValueError(
'Unexpected query keywords: {}. Valid keywords are {}'.format(
ub.urepr(list(unexepcted.keys()), nl=0),
ub.urepr(list(self._expected_attrs.keys()), nl=0),
))
query = ub.udict({k: v for k, v in kwargs.items() if v is not None})
ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC = 1
if ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC:
special_query = {}
if query.get('hardware', None) == 'auto':
special_query['hardware'] = query.pop('hardware')
results = []
candidate_rows = self.read()
for row in candidate_rows:
if query:
relevant = ub.udict(row).subdict(query, default=None)
flag = relevant == query
else:
flag = True
if must_exist:
if not ub.Path(row['path']).exists():
flag = False
if flag:
results.append(row)
if ENABLE_EXPERIMENTAL_SPECIAL_QUERY_LOGIC:
if special_query.get('hardware') == 'auto':
# Make SSDs have higher priority than everything else
hardware_to_results = ub.group_items(results, lambda x: x.get('hardware', None))
hardware_to_max_priority = ub.udict()
for hardware, subs in hardware_to_results.items():
hardware_to_max_priority[hardware] = max([s.get('priority', 0) or 0 for s in subs])
non_ssd_priority = max(1, 1, *(hardware_to_max_priority - {'ssd'}).values())
min_ssd_priority = min(0, 0, *(hardware_to_max_priority & {'ssd'}).values())
for row in hardware_to_results.get('ssd', []):
row['priority'] = (row.get('priority', 0) or 0) - min_ssd_priority + non_ssd_priority * 2
# print('hardware_to_results = {}'.format(ub.urepr(hardware_to_results, nl=2)))
HACK_JONS_REMOTE_PATTERN = 0
if HACK_JONS_REMOTE_PATTERN:
# If we can detect the remote pattern that jon likes (where remote
# machines are mounted via sshfs in the $HOME/remote/$REMOTENAME
# directory and the localmachine $HOME is symlinked to via
# $HOME/remote/$HOSTNAME) then use that version of the paths so its
# easier to work across multiple machines.
import platform
host = platform.node()
for row in results:
path = ub.Path(row['path'])
if path.exists() and f'remote/{host}' not in str(path):
remote_base = ub.Path(f'~/remote/{host}').expand()
remote_alt = path.shrinkuser(home=remote_base)
if remote_alt.exists():
row['path'] = os.fspath(remote_alt)
results = sorted(
results,
key=lambda r:
r['priority']
if r.get('priority', None) is not None else
-float('inf'))[::-1]
return results
[docs]
def find(self, on_error="raise", envvar=None, **kwargs):
name = kwargs.get('name', None)
if envvar is not None:
environ_dvc_dpath = os.environ.get(envvar, "")
else:
environ_dvc_dpath = None
if environ_dvc_dpath and name is None:
results = [ub.Path(environ_dvc_dpath)]
else:
results = [ub.Path(r['path']) for r in self.query(**kwargs)]
if not results:
print('Error in DataRegistry.find. Listing existing data...')
print(self.list())
print('Error in DataRegistry.find. Listing query results...')
print(self.list(**kwargs))
print('... for query kwargs = {}'.format(ub.urepr(kwargs, nl=1)))
raise Exception('No suitable data directory found')
if kwargs.get('must_exist', True):
results = [found for found in results if found.exists()]
if not results:
if on_error == "raise":
print('Error in DataRegistry.find. Listing existing data...')
print(self.list())
print('Error in DataRegistry.find. Listing query results...')
print(self.list(**kwargs))
print('... for query kwargs = {}'.format(ub.urepr(kwargs, nl=1)))
raise Exception('No existing data directory found')
else:
return None
return results[0]
[docs]
def find_dvc_dpath(name=ub.NoParam, on_error="raise", **kwargs):
"""
Return the location of the GeoWATCH DVC Data path if it exists and is in
a "standard" location.
NOTE: other team members can add their "standard" locations if they want.
SeeAlso:
WATCH_DATA_DPATH=$(geowatch_dvc)
python -m geowatch.cli.find_dvc --hardware=hdd
python -m geowatch.cli.find_dvc --hardware=ssd
"""
registry = DataRegistry()
if name is not ub.NoParam:
kwargs['name'] = name
return registry.find(on_error=on_error, **kwargs)
[docs]
def find_smart_dvc_dpath(*args, **kw):
return find_dvc_dpath(*args, **kw)