#!/usr/bin/env python3
"""
A simplified Python DVC API
"""
import ubelt as ub
import os
from kwutil import util_path
from kwutil.util_yaml import Yaml
def __test_simple_dvc():
"""
Builds a medium complexity dvc repo, todo:
implement some tests
"""
import ubelt as ub
test_root = ub.Path.appdir('simpledvc', 'tests', 'basic')
dvc_root = test_root / 'repo'
dvc_root.delete()
SimpleDVC.init(dvc_root, no_scm=True)
dvc = SimpleDVC.coerce(dvc_root)
# Build basic data
(dvc_root / 'test-set1').ensuredir()
assets_dpath = (dvc_root / 'test-set1/assets').ensuredir()
for idx in range(1, 21):
fpath = assets_dpath / f'asset_{idx:03d}.data'
fpath.write_text(str(idx) * 100)
manifest_fpath = (dvc_root / 'test-set1/manifest.txt')
manifest_fpath.write_text('pretend-data')
root_fpath = dvc_root / 'root_file'
root_fpath.write_text('----' * 100)
root_dpath = dvc_root / 'root_dir'
# Use networkx to make a random complex directory structure
import networkx as nx
graph = nx.erdos_renyi_graph(30, p=0.2, directed=True)
tree = nx.minimum_spanning_arborescence(graph)
nx.write_network_text(tree)
sources = [n for n in tree.nodes if not tree.pred[n]]
sinks = [n for n in tree.nodes if not tree.succ[n]]
node_paths = []
for t in sinks:
for s in sources:
paths = list(nx.all_simple_edge_paths(tree, s, t))
if paths:
node_path = [u for (u, v) in paths[0]] + [t]
node_paths.append(node_path)
for node_path in node_paths:
rel_fpath = ub.Path(*[f'dir_{n}' for n in node_path[0:-1]]) / ('file_' + str(node_path[-1]) + '.data')
fpath = root_dpath / rel_fpath
fpath.parent.ensuredir()
fpath.write_text(str(node_path))
dvc.add(root_dpath)
dvc.add(root_fpath)
dvc.add(manifest_fpath)
dvc.add(assets_dpath)
# xdev.tree_repr(dvc_root)
[docs]
class SimpleDVC(ub.NiceRepr):
"""
A Simple DVC API
Args:
dvc_root (Path): path to DVC repo directory
remote (str): dvc remote to sync to by default
Ignore:
>>> # xdoctest: +REQUIRES(--dvc-test)
>>> import sys, ubelt
>>> from geowatch.utils.simple_dvc import * # NOQA
>>> dvc_dpath = SimpleDVC.demo_dpath(reset=0)
>>> self = SimpleDVC(dvc_dpath)
>>> a_file_fpath = dvc_dpath / 'a_file.txt'
>>> if not a_file_fpath.exists():
>>> a_file_fpath.write_text('hello')
>>> self.add(a_file_fpath)
"""
def __init__(self, dvc_root=None, remote=None):
self.dvc_root = dvc_root
self.remote = remote
def __nice__(self):
return f'dvc_root={self.dvc_root}'
[docs]
@classmethod
def init(cls, dpath, no_scm=False, force=False, verbose=0):
"""
Initialize a DVC repo in a path
"""
dpath = ub.Path(dpath.ensuredir())
args = ['dvc', 'init']
if verbose:
args += ['--verbose']
if force:
args += ['--force']
if no_scm:
args += ['--no-scm']
ub.cmd(args, cwd=dpath, verbose=3, check=True)
self = cls(dpath)
return self
@ub.memoize_property
def cache_dir(self):
info = ub.cmd('dvc cache dir', cwd=self.dvc_root, check=True)
cache_dpath = ub.Path(info['out'].strip())
return cache_dpath
[docs]
@classmethod
def demo_dpath(cls, reset=False):
dvc_dpath = ub.Path.appdir('simple_dvc/test/test_dvc_repo')
if reset:
dvc_dpath.delete()
if not dvc_dpath.exists():
dvc_dpath.ensuredir()
verbose = 2
# Init empty git repo
ub.cmd('git init --quiet', cwd=dvc_dpath, verbose=verbose)
ub.cmd('git config --local receive.denyCurrentBranch "warn"', cwd=dvc_dpath, verbose=verbose)
# Init empty dvc repo
ub.cmd('dvc init --quiet', cwd=dvc_dpath, verbose=verbose)
ub.cmd('dvc config core.autostage true', cwd=dvc_dpath, verbose=verbose)
ub.cmd('dvc config cache.type "symlink,hardlink,copy"', cwd=dvc_dpath, verbose=verbose)
ub.cmd('dvc config cache.shared group', cwd=dvc_dpath, verbose=verbose)
ub.cmd('dvc config cache.protected true', cwd=dvc_dpath, verbose=verbose)
return dvc_dpath
[docs]
@classmethod
def coerce(cls, dvc_path, **kw):
"""
Given a path inside DVC, finds the root.
"""
dvc_root = cls.find_root(dvc_path)
return cls(dvc_root, **kw)
[docs]
@classmethod
def find_root(cls, path=None):
"""
Given a path, search its ancestors to find the root of a dvc repo.
Returns:
Path | None
"""
if path is None:
raise Exception('no way to find dvc root')
# Need to find it from the path
path = ub.Path(path).resolve()
max_parts = len(path.parts)
curr = path
found = None
for _ in range(max_parts + 1):
cand = curr / '.dvc'
if cand.exists():
found = curr
break
curr = curr.parent
return found
def _ensure_root(self, paths):
if self.dvc_root is None:
self.dvc_root = self.find_root(paths[0])
print('found new self.dvc_root = {!r}'.format(self.dvc_root))
return self.dvc_root
def _ensure_remote(self, remote):
if remote is None:
remote = self.remote
return remote
def _resolve_root_and_relative_paths(self, paths):
# try:
# dvc_root = self._ensure_root(paths)
# rel_paths = [os.fspath(p.relative_to(dvc_root)) for p in paths]
# except Exception as ex:
# print(f'ex={ex}')
# Handle symlinks: https://dvc.org/doc/user-guide/troubleshooting#add-symlink
# not sure if this is safe
dvc_root = self._ensure_root(paths)
if dvc_root is None:
raise Exception('unable to find a DVC root')
dvc_root = dvc_root.resolve()
# Note: this could resolve the symlink to the dvc cache which we dont want
# rel_paths = [os.fspath(p.resolve().relative_to(dvc_root)) for p in paths]
# Fixed version?
parent_resolved = [p.parent.resolve() / p.name for p in paths]
rel_paths = [os.fspath(p.relative_to(dvc_root)) for p in parent_resolved]
return dvc_root, rel_paths
[docs]
def add(self, path, verbose=0):
"""
Args:
path (str | PathLike | Iterable[str | PathLike]):
a single or multiple paths to add
"""
dvc_root, rel_paths = self._dvc_path_op('add', path, verbose)
has_autostage = ub.cmd('dvc config core.autostage', cwd=dvc_root, check=True)['out'].strip() == 'true'
if not has_autostage:
print('warning: Need autostage to complete the git commit')
# raise NotImplementedError('Need autostage to complete the git commit')
[docs]
def pathsremove(self, path, verbose=0):
"""
Args:
path (str | PathLike | Iterable[str | PathLike]):
a single or multiple paths to add
"""
self._dvc_path_op('remove', path, verbose)
def _dvc_path_op(self, op, path, verbose=0):
"""
Args:
path (str | PathLike | Iterable[str | PathLike]):
a single or multiple paths to add
"""
dvc_main = _import_dvc_main()
paths = list(map(ub.Path, _ensure_iterable(path)))
if len(paths) == 0:
print('No paths to add')
return
dvc_root, rel_paths = self._resolve_root_and_relative_paths(paths)
with util_path.ChDir(dvc_root):
dvc_command = [op] + rel_paths
extra_args = self._verbose_extra_args(verbose)
dvc_command = dvc_command + extra_args
ret = dvc_main(dvc_command)
if ret != 0:
raise Exception(f'Failed to {op} files')
return dvc_root, rel_paths
[docs]
def check_ignore(self, path, details=0, verbose=0):
dvc_main = _import_dvc_main()
paths = list(map(ub.Path, _ensure_iterable(path)))
if len(paths) == 0:
print('No paths to add')
return
dvc_root, rel_paths = self._resolve_root_and_relative_paths(paths)
with util_path.ChDir(dvc_root):
dvc_command = ['check-ignore'] + rel_paths
if details:
dvc_command += ['--details']
extra_args = self._verbose_extra_args(verbose)
dvc_command = dvc_command + extra_args
ret = dvc_main(dvc_command)
if ret != 0:
raise Exception('Failed check-ignore')
[docs]
def git_pull(self):
ub.cmd('git pull', verbose=3, check=True, cwd=self.dvc_root)
[docs]
def git_push(self):
ub.cmd('git push', verbose=3, check=True, cwd=self.dvc_root)
[docs]
def git_commit(self, message):
ub.cmd(f'git commit -m "{message}"', verbose=3, check=True, cwd=self.dvc_root)
[docs]
def git_commitpush(self, message='', pull_on_fail=True):
"""
TODO: better name here?
"""
# dangerous?
try:
self.git_commit(message)
except Exception as e:
ex = e
if 'nothing added to commit' not in ex.output:
raise
else:
try:
self.git_push()
except Exception:
if pull_on_fail:
print('Initial push failed, will pull and then try once more')
self.git_pull()
self.git_push()
else:
raise
def _verbose_extra_args(self, verbose):
extra_args = []
if verbose:
verbose = max(min(3, verbose), 1)
extra_args += ['-' + 'v' * verbose]
return extra_args
def _remote_extra_args(self, remote, recursive, jobs, verbose):
extra_args = self._verbose_extra_args(verbose)
if remote:
extra_args += ['-r', remote]
if jobs is not None:
extra_args += ['--jobs', str(jobs)]
if recursive:
extra_args += ['--recursive']
return extra_args
[docs]
def push(self, path, remote=None, recursive=False, jobs=None, verbose=0):
"""
Push the content tracked by .dvc files to remote storage.
Args:
path (Path | List[Path):
one or more file paths that should have an associated .dvc
sidecar file or if recursive is true, a directory containing
multiple tracked files.
remote (str):
the name of the remote registered in the .dvc/config to push to
recursive (bool):
if True, then items in ``path`` can be a directory.
jobs (int): number of parallel workers
"""
dvc_main = _import_dvc_main()
paths = list(map(ub.Path, _ensure_iterable(path)))
if len(paths) == 0:
print('No paths to push')
return
remote = self._ensure_remote(remote)
dvc_root, rel_paths = self._resolve_root_and_relative_paths(paths)
extra_args = self._remote_extra_args(remote, recursive, jobs, verbose)
with util_path.ChDir(dvc_root):
dvc_command = ['push'] + extra_args + [str(p) for p in rel_paths]
dvc_main(dvc_command)
[docs]
def pull(self, path, remote=None, recursive=False, jobs=None, verbose=0):
dvc_main = _import_dvc_main()
paths = list(map(ub.Path, _ensure_iterable(path)))
if len(paths) == 0:
print('No paths to pull')
return
remote = self._ensure_remote(remote)
dvc_root, rel_paths = self._resolve_root_and_relative_paths(paths)
extra_args = self._remote_extra_args(remote, recursive, jobs, verbose)
with util_path.ChDir(dvc_root):
dvc_command = ['pull'] + extra_args + [str(p) for p in rel_paths]
dvc_main(dvc_command)
[docs]
def request(self, path, remote=None):
"""
Requests to ensure that a specific file from DVC exists.
Any files that do not exist, check to see if there is an associated
.dvc sidecar file. If any sidecar files are missing, an error is
thrown. Otherwise we attempt to pull the missing files.
Args:
path (Path | List[Path):
one or more file paths that should have an associated .dvc
sidecar file.
"""
paths = list(map(ub.Path, _ensure_iterable(path)))
missing_data = [path for path in paths if not path.exists()]
if missing_data:
dvc_root, rel_paths = self._resolve_root_and_relative_paths(missing_data)
def _find_sidecar(rel_path):
rel_path = ub.Path(rel_path)
first_cand = dvc_root / rel_path.augment(stem=rel_path.name, ext='.dvc')
if first_cand.exists():
return first_cand
rel_parts = rel_path.parts
for i in reversed(range(len(rel_parts))):
parts = rel_parts[0:i]
cand_dat = dvc_root.joinpath(*parts)
cand_dvc = cand_dat.augment(stem=cand_dat.name, ext='.dvc')
if cand_dvc.exists():
return cand_dvc
raise IOError(f'Could not find sidecar for: {rel_path=} in {dvc_root=}. Wrong path, or need to git pull?')
to_pull = [_find_sidecar(rel_path) for rel_path in rel_paths]
missing_sidecar = [dvc_fpath for dvc_fpath in to_pull if not dvc_fpath.exists()]
if missing_sidecar:
if len(missing_sidecar) < 10:
print(f'missing_sidecar={missing_sidecar}')
raise Exception(f'There were {len(missing_sidecar)} / {len(paths)} missing sidecar files')
if to_pull:
self.pull(to_pull, remote=remote)
[docs]
def unprotect(self, path):
dvc_main = _import_dvc_main()
paths = list(map(ub.Path, _ensure_iterable(path)))
if len(paths) == 0:
print('No paths to unprotect')
return
dvc_root, rel_paths = self._resolve_root_and_relative_paths(paths)
with util_path.ChDir(dvc_root):
dvc_command = ['unprotect'] + rel_paths
dvc_main(dvc_command)
[docs]
def is_tracked(self, path):
path = ub.Path(path)
tracker_fpath = self.find_file_tracker(path)
if tracker_fpath is not None:
return True
else:
tracker_fpath = self.find_dir_tracker(path)
if tracker_fpath is not None:
raise NotImplementedError
[docs]
@classmethod
def find_file_tracker(cls, path):
assert not path.name.endswith('.dvc')
tracker_fpath = path.augment(tail='.dvc')
if tracker_fpath.exists():
return tracker_fpath
[docs]
def find_dir_tracker(cls, path):
# Find if an ancestor parent dpath is tracked
path = ub.Path(path).absolute()
prev = path
dpath = path.parent
while (not (dpath / '.dvc').exists()) and prev != dpath:
tracker_fpath = dpath.augment(tail='.dvc')
if tracker_fpath.exists():
return tracker_fpath
prev = dpath
dpath = dpath.parent
tracker_fpath = dpath.augment(tail='.dvc')
if tracker_fpath.exists():
return tracker_fpath
[docs]
def read_dvc_sidecar(self, sidecar_fpath):
sidecar_fpath = ub.Path(sidecar_fpath)
data = Yaml.loads(sidecar_fpath.read_text())
return data
[docs]
def resolve_cache_paths(self, sidecar_fpath):
"""
Given a .dvc file, enumerate the paths in the cache associated with it.
Args:
sidecar_fpath (PathLike | str): path to the .dvc file
"""
sidecar_fpath = ub.Path(sidecar_fpath)
data = Yaml.loads(sidecar_fpath.read_text())
dvc3_cache_base = (self.cache_dir / 'files/md5')
try_dvc3 = dvc3_cache_base.exists()
# TODO: dvc 3.0 added new hashes! Yay! But we have to support this.
for item in data['outs']:
md5 = item['md5']
if try_dvc3:
cache_fpath = self.cache_dir / 'files' / 'md5' / md5[0:2] / md5[2:]
if not cache_fpath.exists():
cache_fpath = self.cache_dir / md5[0:2] / md5[2:]
else:
cache_fpath = self.cache_dir / md5[0:2] / md5[2:]
if not cache_fpath.exists():
cache_fpath = self.cache_dir / 'files' / 'md5' / md5[0:2] / md5[2:]
if md5.endswith('.dir') and cache_fpath.exists():
dir_data = Yaml.loads(cache_fpath.read_text())
for item in dir_data:
file_md5 = item['md5']
assert not file_md5.endswith('.dir'), 'unhandled'
if try_dvc3:
file_cache_fpath = self.cache_dir / 'files' / 'md5' / file_md5[0:2] / file_md5[2:]
else:
file_cache_fpath = self.cache_dir / file_md5[0:2] / file_md5[2:]
yield file_cache_fpath
yield cache_fpath
[docs]
def find_sidecar_paths(self, dpath):
"""
Args:
dpath (Path | str): directory in dvc repo to search
Yields:
ub.Path: existing dvc sidecar files
"""
# TODO: handle .dvcignore
dpath = ub.Path(dpath)
for r, ds, fs in dpath.walk():
for f in fs:
if f.endswith('.dvc'):
yield r / f
[docs]
def resolve_sidecar(self, path):
"""
Given a path in a DVC repo, resolve it to a sidecar file that it
corresponds to. If the input is a .dvc file return it.
If it is inside a directory that corresponds to a dvc repo, search for
that.
Args:
path (Path | str): directory or file in dvc repo to search
Yields:
ub.Path: existing dvc sidecar files
"""
# TODO: handle .dvcignore
path = ub.Path(path).absolute()
if path.name.endswith('.dvc'):
return path
elif path.augment(tail='.dvc').exists():
return path.augment(tail='.dvc')
else:
return self.find_dir_tracker(path)
def _ensure_iterable(inputs):
return inputs if ub.iterable(inputs) else [inputs]
####
# SimpleDVC CLI Stuff (should move to a new file)
import scriptconfig as scfg # NOQA
[docs]
class SimpleDVC_CLI(scfg.ModalCLI):
"""
A DVC CLI That uses our simplified (and more permissive) interface.
The main advantage is that you can run these commands outside a DVC repo as
long as you point to a valid in-repo path.
"""
[docs]
class Add(scfg.DataConfig):
"""
Add data to the DVC repo.
"""
__command__ = 'add'
paths = scfg.Value([], nargs='+', position=1, help='Input files / directories to add')
[docs]
@classmethod
def main(cls, cmdline=1, **kwargs):
config = cls.cli(cmdline=cmdline, data=kwargs, strict=True)
dvc = SimpleDVC()
dvc.add(config.paths)
[docs]
class Request(scfg.DataConfig):
"""
Pull data if the requested file doesn't exist.
"""
__command__ = 'request'
paths = scfg.Value([], nargs='+', position=1, help='Data to attempt to pull')
remote = scfg.Value(None, short_alias=['r'], help='remote to pull from if needed')
[docs]
@classmethod
def main(cls, cmdline=1, **kwargs):
config = cls.cli(cmdline=cmdline, data=kwargs, strict=True)
dvc = SimpleDVC()
dvc.request(config.paths)
[docs]
class CacheDir(scfg.DataConfig):
"""
Print the cache directory
"""
__command__ = 'cache_dir'
dvc_root = scfg.Value('.', position=1, help='get the cache path for this DVC repo')
[docs]
@classmethod
def main(cls, cmdline=1, **kwargs):
config = cls.cli(cmdline=cmdline, data=kwargs, strict=True)
dvc = SimpleDVC(dvc_root=config.dvc_root)
print(dvc.cache_dir)
def _import_dvc_main():
try:
from dvc import main as dvc_main_mod
dvc_main = dvc_main_mod.main
except (ImportError, ModuleNotFoundError):
from dvc.cli import main as dvc_main
return dvc_main
if __name__ == '__main__':
"""
CommandLine:
python -m geowatch.utils.simple_dvc --help
python -m geowatch.utils.simple_dvc request --help
python -m geowatch.utils.simple_dvc cache_dir
"""
SimpleDVC_CLI.main()