#!/usr/bin/env python3
# PYTHON_ARGCOMPLETE_OK
"""
TODO:
move to queue_cli
CommandLine:
xdoctest -m geowatch.cli.queue_cli.prepare_splits __doc__
Example:
>>> from geowatch.cli.queue_cli.prepare_splits import * # NOQA
>>> dpath = ub.Path.appdir('geowatch', 'tests', 'prep_splits').ensuredir()
>>> (dpath / 'KR_R001.kwcoco.zip').touch()
>>> (dpath / 'KR_R002.kwcoco.zip').touch()
>>> (dpath / 'BR_R002.kwcoco.zip').touch()
>>> config = {
>>> 'base_fpath': dpath / '*.kwcoco.zip',
>>> 'virtualenv_cmd': 'conda activate geowatch',
>>> 'constructive_mode': True,
>>> 'run': 0,
>>> 'cache': False,
>>> 'backend': 'serial',
>>> 'splits': 'split6',
>>> 'verbose': 0,
>>> }
>>> queue = prep_splits(cmdline=False, **config)
>>> config['backend'] = 'slurm'
>>> queue = prep_splits(cmdline=False, **config)
>>> queue.print_commands()
>>> config['backend'] = 'tmux'
>>> queue = prep_splits(cmdline=False, **config)
>>> queue.print_commands()
>>> config['backend'] = 'serial'
>>> queue = prep_splits(cmdline=False, **config)
>>> queue.print_commands()
CommandLine:
DVC_DATA_DPATH=$(geowatch_dvc --tags=phase2_data --hardware="hdd")
python -m geowatch.cli.queue_cli.prepare_splits \
--src_kwcocos="$DVC_DATA_DPATH"/Drop7-MedianNoWinter10GSD-NoMask/*/imganns-*.kwcoco.zip \
--dst_dpath "$DVC_DATA_DPATH"/Drop7-MedianNoWinter10GSD-NoMask \
--suffix=rawbands \
--backend=tmux --tmux_workers=6 \
--splits=split6 \
--run=0
"""
import scriptconfig as scfg
import ubelt as ub
[docs]
class PrepareSplitsConfig(scfg.DataConfig):
"""
This generates the bash commands necessary to split a base kwcoco file into
the standard train / validation splits.
Ignore:
base_fpath = 'imganns*.kwcoco.*'
"""
src_kwcocos = scfg.Value(None, alias=['base_fpath'], position=1, help=ub.paragraph(
'''
input kwcoco files to be joined into splits
'''), nargs='+')
dst_dpath = scfg.Value(None, help=ub.paragraph(
'''
location to write the new kwcoco files. If unspecfied uses
the folder of the first input kwcoco file
'''))
suffix = scfg.Value('', help=ub.paragraph(
'''
destination suffix for the output split filenames
'''))
virtualenv_cmd = scfg.Value(None, type=str, help=ub.paragraph(
'''
Command to start the appropriate virtual environment if your
bashrc does not start it by default.
'''))
run = scfg.Value(True, help='if True execute the pipeline')
cache = scfg.Value(0, help='if True skip completed results')
backend = scfg.Value('tmux', help=ub.paragraph(
'''
can be serial, tmux, or slurm. Using tmux is recommended.
'''))
with_textual = scfg.Value('auto', help='setting for cmd-queue monitoring')
other_session_handler = scfg.Value('ask', help=ub.paragraph(
'''
for tmux backend only. How to handle conflicting sessions.
Can be ask, kill, or ignore, or auto
'''))
constructive_mode = scfg.Value(True, help='if True use the new constructive mode')
verbose = scfg.Value(1, help='')
workers = scfg.Value(2, alias=['tmux_workers'], help='')
splits = scfg.Value('*', help='restrict to only a specific split')
add_detail_suffix = scfg.Value(True, help=ub.paragraph(
'''
if True, add a suffix info to the output paths to disambiguate splits
with different inputs
'''))
imerit_vali_regions = {'CN_C000', 'KW_C001', 'SA_C001', 'CO_C001', 'VN_C002'}
# TODO: should be some sort of external file we read / define
VALI_REGIONS_SPLITS = {
'split1': {
'KR_R001',
'KR_R002',
} | imerit_vali_regions,
'split2': {
'BR_R002',
'NZ_R001',
} | imerit_vali_regions,
'split3': {
'AE_R001',
'US_R004',
} | imerit_vali_regions,
'split4': {
'BR_R001',
'LT_R001',
'US_R004',
} | imerit_vali_regions,
'split5': {
'BR_R001',
'US_R001',
'CH_R001',
} | imerit_vali_regions,
'split6': {
'KR_R002', # can we do better on KR2 by training on KR1?
} | imerit_vali_regions,
}
IGNORE_REGIONS = {
# 'CN_C001',
}
def _submit_constructive_split_jobs(base_fpath, dst_dpath, suffix, queue, config, depends=[]):
"""
new method for splits to construct them from previouly partitioned files
"""
from kwutil import util_path
from kwutil import util_pattern
split_pat = util_pattern.MultiPattern.coerce(config.splits)
import shlex
partitioned_fpaths = sorted(util_path.coerce_patterned_paths(base_fpath))
print('partitioned_fpaths = {}'.format(ub.urepr(partitioned_fpaths, nl=1)))
if dst_dpath is None:
dst_dpath = ub.Path(partitioned_fpaths[0]).parent # Hack
full_fpath = dst_dpath / 'data.kwcoco.zip'
if config.add_detail_suffix:
path_to_hash = {}
for p in ub.ProgIter(partitioned_fpaths, desc='compute hashes'):
path_to_hash[p] = ub.hash_file(p)
for split, vali_regions in VALI_REGIONS_SPLITS.items():
if not split_pat.match(split):
continue
train_split_fpath = dst_dpath / f'data_train_{split}.kwcoco.zip'
vali_split_fpath = dst_dpath / f'data_vali_{split}.kwcoco.zip'
if suffix:
train_split_fpath = dst_dpath / f'data_train_{suffix}_{split}.kwcoco.zip'
vali_split_fpath = dst_dpath / f'data_vali_{suffix}_{split}.kwcoco.zip'
else:
train_split_fpath = dst_dpath / f'data_train_{split}.kwcoco.zip'
vali_split_fpath = dst_dpath / f'data_vali_{split}.kwcoco.zip'
train_parts = []
vali_parts = []
for fpath in partitioned_fpaths:
if any(v in fpath.name for v in IGNORE_REGIONS):
...
elif any(v in fpath.name for v in vali_regions):
vali_parts.append(fpath)
else:
train_parts.append(fpath)
if config.add_detail_suffix:
train_hashid = ub.hash_data(sorted([path_to_hash[p] for p in train_parts]))[0:8]
vali_hashid = ub.hash_data(sorted([path_to_hash[p] for p in vali_parts]))[0:8]
train_detail_suffix = f'_n{len(train_parts):03d}_{train_hashid}'
vali_detail_suffix = f'_n{len(vali_parts):03d}_{vali_hashid}'
train_hashed_fpath = train_split_fpath.augment(stemsuffix=train_detail_suffix, multidot=1)
vali_hashed_fpath = vali_split_fpath.augment(stemsuffix=vali_detail_suffix, multidot=1)
else:
train_hashed_fpath = train_split_fpath
vali_hashed_fpath = vali_split_fpath
train_parts_str = ' '.join([shlex.quote(str(p)) for p in train_parts])
vali_parts_str = ' '.join([shlex.quote(str(p)) for p in vali_parts])
if len(vali_parts):
command = ub.codeblock(
fr'''
python -m kwcoco union \
--remember_parent=True \
--src {vali_parts_str} \
--dst {vali_hashed_fpath}
''')
vali_job = queue.submit(command, begin=1, depends=depends, log=False)
if config.add_detail_suffix:
# Symlink to original locations
vali_job = queue.submit(f'ln -sf {vali_hashed_fpath} {vali_split_fpath}', begin=1, depends=vali_job, log=False)
if len(train_parts):
command = ub.codeblock(
fr'''
python -m kwcoco union \
--remember_parent=True \
--src {train_parts_str} \
--dst {train_hashed_fpath}
''')
train_job = queue.submit(command, depends=depends, log=False)
if config.add_detail_suffix:
# Symlink to original locations
train_job = queue.submit(f'ln -sf {train_hashed_fpath} {train_split_fpath}', begin=1, depends=train_job, log=False)
if 0:
all_parts_str = ' '.join([shlex.quote(str(p)) for p in partitioned_fpaths])
command = ub.codeblock(
fr'''
python -m kwcoco union \
--remember_parent=True \
--src {all_parts_str} \
--dst {full_fpath}
''')
queue.submit(command, depends=depends, log=False)
def _submit_split_jobs(base_fpath, queue, depends=[]):
"""
Populates a Serial, Slurm, or Tmux Queue with jobs
"""
base_fpath = ub.Path(base_fpath)
splits = {
# 'nowv': base_fpath.augment(stemsuffix='_nowv', multidot=True),
'train_split1': base_fpath.augment(stemsuffix='_train_split1', multidot=True),
'train_split2': base_fpath.augment(stemsuffix='_train_split2', multidot=True),
'train_split3': base_fpath.augment(stemsuffix='_train_split3', multidot=True),
'train_split4': base_fpath.augment(stemsuffix='_train_split4', multidot=True),
'train_split5': base_fpath.augment(stemsuffix='_train_split5', multidot=True),
'train_split6': base_fpath.augment(stemsuffix='_train_split6', multidot=True),
# 'nowv_train': base_fpath.augment(stemsuffix='_nowv_train', multidot=True),
# 'wv_train': base_fpath.augment(stemsuffix='_wv_train', multidot=True),
# 's2_wv_train': base_fpath.augment(stemsuffix='_s2_wv_train', multidot=True),
'vali_split1': base_fpath.augment(stemsuffix='_vali_split1', multidot=True),
'vali_split2': base_fpath.augment(stemsuffix='_vali_split2', multidot=True),
'vali_split3': base_fpath.augment(stemsuffix='_vali_split3', multidot=True),
'vali_split4': base_fpath.augment(stemsuffix='_vali_split4', multidot=True),
'vali_split5': base_fpath.augment(stemsuffix='_vali_split5', multidot=True),
'vali_split6': base_fpath.augment(stemsuffix='_vali_split6', multidot=True),
# 'nowv_vali': base_fpath.augment(stemsuffix='_nowv_vali', multidot=True),
# 'wv_vali': base_fpath.augment(stemsuffix='_wv_vali', multidot=True),
# 's2_wv_vali': base_fpath.augment(stemsuffix='_s2_wv_vali', multidot=True),
}
# train_region_selector = '(' + ' or '.join(['(.name == "{}")'.format(n) for n in (ignore_regions | vali_regions)]) + ') | not'
# vali_region_selector = ' or '.join(['(.name == "{}")'.format(n) for n in (vali_regions)])
for split, vali_regions in VALI_REGIONS_SPLITS.items():
train_key = f'train_{split}'
vali_key = f'vali_{split}'
train_region_selector = '(' + ' or '.join(['(.name | startswith("{}"))'.format(n) for n in (IGNORE_REGIONS | vali_regions)]) + ') | not'
vali_region_selector = ' or '.join(['(.name | startswith("{}"))'.format(n) for n in (vali_regions)])
split_jobs = {}
# Perform train/validation splits with and without worldview
command = ub.codeblock(
fr'''
python -m kwcoco subset \
--src {base_fpath} \
--dst {splits[train_key]} \
--select_videos '{train_region_selector}'
''')
split_jobs['train'] = queue.submit(command, begin=1, depends=depends, log=False)
# Perform vali/validation splits with and without worldview
command = ub.codeblock(
fr'''
python -m kwcoco subset \
--src {base_fpath} \
--dst {splits[vali_key]} \
--select_videos '{vali_region_selector}'
''')
split_jobs['vali'] = queue.submit(command, depends=depends, log=False)
return queue
[docs]
def prep_splits(cmdline=False, **kwargs):
"""
The idea is that we should have a lightweight scheduler. I think something
fairly minimal can be implemented with tmux, but it would be nice to have a
more robust slurm extension.
TODO:
- [ ] Option to just dump the serial bash script that does everything.
"""
config = PrepareSplitsConfig.cli(data=kwargs, cmdline=cmdline, strict=True)
print('config = {}'.format(ub.urepr(dict(config), nl=1)))
if config['base_fpath'] == 'auto':
# Auto hack.
raise NotImplementedError
import geowatch
dvc_dpath = geowatch.find_dvc_dpath()
# base_fpath = dvc_dpath / 'Drop2-Aligned-TA1-2022-01/data.kwcoco.json'
base_fpath = dvc_dpath / 'Aligned-Drop3-TA1-2022-03-10/data.kwcoco.json'
else:
base_fpath = config['base_fpath']
import cmd_queue
queue = cmd_queue.Queue.create(
backend=config['backend'],
name='geowatch-splits', size=config['workers'],
)
if config['virtualenv_cmd']:
queue.add_header_command(config['virtualenv_cmd'])
if config.dst_dpath is not None:
dst_dpath = ub.Path(config.dst_dpath)
else:
dst_dpath = None
suffix = config.suffix
if not config['constructive_mode']:
raise NotImplementedError('non-constructive mode is no longer supported')
print('WARNING: non-constructive mode has not been maintained')
_submit_split_jobs(base_fpath, queue)
_submit_constructive_split_jobs(base_fpath, dst_dpath, suffix, queue, config)
if config['verbose']:
queue.rprint()
if config['run']:
queue.run(block=True, with_textual=config['with_textual'])
return queue
main = prep_splits
if __name__ == '__main__':
"""
CommandLine:
DVC_DATA_DPATH=$(geowatch_dvc --tags=phase2_data)
BASE_FPATH=$DVC_DATA_DPATH/Aligned-Drop4-2022-08-08-TA1-S2-L8-ACC/data.kwcoco.json
python -m geowatch.cli.queue_cli.prepare_splits \
--base_fpath=$BASE_FPATH \
--backend=serial --run=0
"""
main(cmdline=True)