r"""
Main prediction script for cold
SeeAlso:
../../cli/queue_cli/prepare_teamfeats.py
predict.py *
prepare_kwcoco.py
tile_processing_kwcoco.py
export_cold_result_kwcoco.py
assemble_cold_result_kwcoco.py
CommandLine:
###################################################################################
### FULL REGION TEST: COLD FEATURES WITH HIGH TEMPORAL RESOLUTION (HTR) + L8/S2 ###
###################################################################################
DATA_DVC_DPATH=$(geowatch_dvc --tags=phase3_data --hardware="auto")
EXPT_DVC_DPATH=$(geowatch_dvc --tags=phase3_expt --hardware="auto")
python -m geowatch.tasks.cold.predict \
--coco_fpath="$DATA_DVC_DPATH/Aligned-Drop8-ARA/KR_R001/imgonly-KR_R001.kwcoco.zip" \
--out_dpath="$DATA_DVC_DPATH/Aligned-Drop8-ARA/_pycold_Drop8" \
--mod_coco_fpath="$DATA_DVC_DPATH/Aligned-Drop8-ARA/KR_R001/imgonly_KR_R001_cold-biyearly.kwcoco.zip" \
--sensors='L8,S2' \
--coefs=cv,rmse,a0,a1,b1,c1 \
--prob=0.99 \
--conse=8 \
--coefs_bands=0,1,2,3,4,5 \
--combine=False \
--resolution='10GSD' \
--cold_time_span='6months' \
--workermode='process' \
--workers=8
######################################################################
### FULL REGION TEST: TRANSFER COLD FEATURE FROM RAW TO COMBINED INPUT
######################################################################
DATA_DVC_DPATH=$(geowatch_dvc --tags=phase3_data --hardware="auto")
EXPT_DVC_DPATH=$(geowatch_dvc --tags=phase3_expt --hardware="auto")
python -m geowatch.tasks.cold.transfer_features \
--coco_fpath="$DATA_DVC_DPATH/Aligned-Drop8-ARA/KR_R001/imgonly_KR_R001_cold-biyearly.kwcoco.zip" \
--combine_fpath="$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imgonly-KR_R001.kwcoco.zip" \
--new_coco_fpath="$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imganns-KR_R001-cold-biyearly.kwcoco.zip"
kwcoco stats "$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imganns-KR_R001-cold-biyearly.kwcoco.zip"
geowatch stats "$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imganns-KR_R001-cold-biyearly.kwcoco.zip"
kwcoco validate "$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imganns-KR_R001-cold-biyearly.kwcoco.zip"
DATA_DVC_DPATH=$(geowatch_dvc --tags=phase3_data --hardware="auto")
geowatch visualize \
"$DATA_DVC_DPATH/Drop8-Median10GSD-V1/imganns-KR_R001-cold-biyearly.kwcoco.zip" \
--channels="L8:(red|green|blue,red_COLD_a1|green_COLD_a1|blue_COLD_a1,red_COLD_cv|green_COLD_cv|blue_COLD_cv,red_COLD_rmse|green_COLD_rmse|blue_COLD_rmse)" \
--exclude_sensors=WV,PD,S2 \
--smart=True
########################
### MULTIPLE REGION TEST
########################
DVC_DATA_DPATH=$(geowatch_dvc --tags='phase2_data' --hardware=auto)
"$BUNDLE_DPATH"/imganns-*BR_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*KR_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*NZ_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*US_[RC]*.kwcoco.zip \
echo "$DVC_DATA_DPATH"
BUNDLE_DPATH=$DVC_DATA_DPATH/Drop6
python -m geowatch.cli.queue_cli.prepare_teamfeats \
--base_fpath \
"$BUNDLE_DPATH"/imganns-*AE_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*BH_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*CH_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*LT_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*NZ_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*PE_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*QA_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*SA_[RC]*.kwcoco.zip \
"$BUNDLE_DPATH"/imganns-*US_C*.kwcoco.zip \
--with_cold=1 \
--with_landcover=0 \
--with_materials=0 \
--with_invariants=0 \
--with_depth=0 \
--skip_existing=1 \
--cold_workers=8 \
--cold_workermode=thread \
--tmux_workers=2 \
--backend=tmux --run=0
"""
import scriptconfig as scfg
import ubelt as ub
# import json
import logging
import os
# import shutil
logger = logging.getLogger(__name__)
try:
from line_profiler import profile
except ImportError:
profile = ub.identity
[docs]
class ColdPredictConfig(scfg.DataConfig):
"""
The docstring will be the description in the CLI help
"""
coco_fpath = scfg.Value(None, position=1, help=ub.paragraph(
'''
a path to a file to input kwcoco file (to predict on)
'''))
mod_coco_fpath = scfg.Value(None, help=ub.paragraph(
'''
The modified output kwcoco file, which is a copy of the input
kwcoco file enriched with COLD features.
'''
), alias=['output_kwcoco'])
combined_coco_fpath = scfg.Value(None, help=ub.paragraph(
'''
a path to a file to combined input kwcoco file (to merge with)
'''))
out_dpath = scfg.Value(None, help=ub.paragraph(
'''
output directory for the output. If unspecified uses the input kwcoco bundle
'''))
write_kwcoco = scfg.Value(True, help='writing kwcoco file based on COLD feature, Default is True')
sensors = scfg.Value('L8', type=str, help='sensor type, default is "L8"')
adj_cloud = scfg.Value(False, help='How to treat QA band, default is False: ignoring adj. cloud class')
method = scfg.Value('COLD', choices=['COLD', 'HybridCOLD', 'OBCOLD'], help='type of cold algorithms')
prob = scfg.Value(0.99, help='change probability of chi-distribution, e.g., 0.99')
conse = scfg.Value(6, help='consecutive observation to confirm change, e.g., 6')
cm_interval = scfg.Value(60, help='CM output inverval, e.g., 60')
year_lowbound = scfg.Value(None, help='min year for saving geotiff, e.g., 2017')
year_highbound = scfg.Value(None, help='max year for saving geotiff, e.g., 2022')
coefs = scfg.Value(None, type=str, help="list of COLD coefficients for saving geotiff, e.g., a0,c1,a1,b1,a2,b2,a3,b3,cv,rmse")
coefs_bands = scfg.Value(None, type=str, help='indicate the ba_nds for output coefs_bands, e.g., 0,1,2,3,4,5')
timestamp = scfg.Value(False, help='True: exporting cold result by timestamp, False: exporting cold result by year, Default is False')
combine = scfg.Value(False, help='for temporal combined mode, Default is False')
cold_time_span = scfg.Value('1year', type=str, help='temporal period for extracting cold features, default is "1year", another option is "6months"')
track_emissions = scfg.Value(True, help='if True use codecarbon for emission tracking')
resolution = scfg.Value('30GSD', help='if specified then data is processed at this resolution')
exclude_first = scfg.Value(True, help='exclude first date of image from each sensor, Default is True')
workers = scfg.Value(16, help='total number of workers')
workermode = scfg.Value('process', help='Can be process, serial, or thread')
# region_id = scfg.Value(None, help='region id for the input kwcoco file')
[docs]
@profile
def cold_predict_main(cmdline=1, **kwargs):
"""
Args:
cmdline (int, optional): _description_. Defaults to 1.
Ignore:
python -m geowatch.tasks.cold.predict --help
TEST_COLD=1 xdoctest -m geowatch.tasks.cold.predict cold_predict_main
Example:
>>> # xdoctest: +REQUIRES(env:TEST_COLD)
>>> from geowatch.tasks.cold.predict import cold_predict_main
>>> from geowatch.tasks.cold.predict import *
>>> kwargs= dict(
>>> coco_fpath = ub.Path('/gpfs/scratchfs1/zhz18039/jws18003/new-repos/smart_data_dvc2/Drop6/imgonly-KR_R001.kwcoco.json'),
>>> out_dpath = ub.Path.appdir('/gpfs/scratchfs1/zhz18039/jws18003/new-repos/smart_data_dvc2/Drop6/_pycold_combine_V2'),
>>> write_kwcoco = False,
>>> sensors = 'L8',
>>> adj_cloud = False,
>>> method = 'COLD',
>>> prob = 0.99,
>>> conse = 6,
>>> cm_interval = 60,
>>> year_lowbound = None,
>>> year_highbound = None,
>>> coefs = 'cv,rmse,a0,a1,b1,c1',
>>> coefs_bands = '0,1,2,3,4,5',
>>> timestamp = False,
>>> combine = False,
>>> resolution = '10GSD',
>>> workermode = 'process',
>>> )
>>> cmdline=0
>>> cold_predict_main(cmdline, **kwargs)
"""
from geowatch.tasks.cold import prepare_kwcoco
from geowatch.tasks.cold import tile_processing_kwcoco
from geowatch.tasks.cold import export_cold_result_kwcoco
from geowatch.tasks.cold import assemble_cold_result_kwcoco
config = ColdPredictConfig.cli(cmdline=cmdline, data=kwargs)
import rich
rich.print('config = {}'.format(ub.urepr(config, nl=1)))
from geowatch.utils import process_context
from kwutil import util_parallel
from kwutil import util_progress
from kwutil import util_json
resolved_config = config.to_dict()
resolved_config = util_json.ensure_json_serializable(resolved_config)
proc_context = process_context.ProcessContext(
name='geowatch.tasks.cold.predict',
type='process',
config=resolved_config,
track_emissions=config['track_emissions'],
)
coco_fpath = ub.Path(config['coco_fpath'])
if config['out_dpath'] is None:
config['out_dpath'] = coco_fpath.parent
out_dpath = ub.Path(config['out_dpath']).ensuredir()
write_kwcoco = config['write_kwcoco']
sensors = config['sensors']
adj_cloud = config['adj_cloud']
method = config['method']
workers = util_parallel.coerce_num_workers(config['workers'])
use_subprogress = workers == 0 or config['workermode'] != 'process'
# FIXME: not robust, is there a better way to get region_id?
region_id = config['coco_fpath'].split('/')[-2]
proc_context.start()
proc_context.add_disk_info(out_dpath)
pman = util_progress.ProgressManager(backend='rich')
with pman:
main_prog = pman.progiter(total=4, desc='Predict PyCOLD:')
# ============
# 1 / 4 Prepare Step
# ============
main_prog.set_postfix('Step 1: Prepare')
prepare_kwcoco.prepare_kwcoco_main(
cmdline=0, coco_fpath=coco_fpath, out_dpath=out_dpath, sensors=sensors,
adj_cloud=adj_cloud, method=method, workers=workers,
resolution=config.resolution)
main_prog.step()
# =========
# 2 / 4 Tile Step
# =========
main_prog.set_postfix('Step 2: Process')
logger.info('Starting COLD tile-processing...')
tile_kwargs = tile_processing_kwcoco.TileProcessingKwcocoConfig().to_dict()
tile_kwargs['stack_path'] = out_dpath / 'stacked' / region_id
tile_kwargs['reccg_path'] = out_dpath / 'reccg' / region_id
tile_log_fpath = out_dpath / 'reccg' / region_id / 'log.json'
tile_kwargs['method'] = method
tile_kwargs['prob'] = config['prob']
tile_kwargs['conse'] = config['conse']
tile_kwargs['cm_interval'] = config['cm_interval']
if use_subprogress:
tile_kwargs['pman'] = pman
if os.path.exists(tile_log_fpath):
logger.info('Skipping step 2 because COLD processing already finished...')
else:
jobs = ub.JobPool(mode=config['workermode'], max_workers=workers)
with jobs:
for i in pman.progiter(range(workers + 1), desc='submit process jobs', transient=True):
tile_kwargs['rank'] = i
tile_kwargs['n_cores'] = max(workers, 1)
jobs.submit(tile_processing_kwcoco.tile_process_main, cmdline=0, **tile_kwargs)
tile_iter = pman.progiter(jobs.as_completed(), desc='Collect process jobs', total=len(jobs))
for job in tile_iter:
job.result()
main_prog.step()
# ===========
# 3 / 4 Export Step
# ===========
main_prog.set_postfix('Step 3: Export')
logger.info('Writting tmp file of COLD output...')
export_kwargs = export_cold_result_kwcoco.ExportColdKwcocoConfig().to_dict()
export_kwargs['stack_path'] = out_dpath / 'stacked' / region_id
export_kwargs['reccg_path'] = out_dpath / 'reccg' / region_id
export_kwargs['combined_coco_fpath'] = config['combined_coco_fpath']
export_kwargs['year_lowbound'] = config['year_lowbound']
export_kwargs['year_highbound'] = config['year_highbound']
export_kwargs['coefs'] = config['coefs']
export_kwargs['combine'] = config['combine']
export_kwargs['coefs_bands'] = config['coefs_bands']
export_kwargs['timestamp'] = config['timestamp']
export_kwargs['exclude_first'] = config['exclude_first']
export_kwargs['sensors'] = sensors
export_kwargs['cold_time_span'] = config['cold_time_span']
if use_subprogress:
export_kwargs['pman'] = pman
jobs = ub.JobPool(mode=config['workermode'], max_workers=workers)
with jobs:
for i in pman.progiter(range(workers + 1), desc='submit export jobs', transient=True):
export_kwargs['rank'] = i
export_kwargs['n_cores'] = max(workers, 1)
jobs.submit(export_cold_result_kwcoco.export_cold_main, cmdline=0, **export_kwargs)
tmp_iter = pman.progiter(jobs.as_completed(), desc='Collect export jobs', total=len(jobs))
for job in tmp_iter:
job.result()
main_prog.step()
# =============
# 4 / 4 Assemble Step
# =============
main_prog.set_postfix('Step 4: Assemble')
logger.info('Writting geotiff of COLD output...')
assemble_kwargs = assemble_cold_result_kwcoco.AssembleColdKwcocoConfig().to_dict()
assemble_kwargs['stack_path'] = out_dpath / 'stacked' / region_id
assemble_kwargs['reccg_path'] = out_dpath / 'reccg' / region_id
assemble_kwargs['coco_fpath'] = coco_fpath
assemble_kwargs['combined_coco_fpath'] = config['combined_coco_fpath']
assemble_kwargs['mod_coco_fpath'] = config['mod_coco_fpath']
assemble_kwargs['write_kwcoco'] = write_kwcoco
assemble_kwargs['year_lowbound'] = config['year_lowbound']
assemble_kwargs['year_highbound'] = config['year_highbound']
assemble_kwargs['coefs'] = config['coefs']
assemble_kwargs['coefs_bands'] = config['coefs_bands']
assemble_kwargs['timestamp'] = config['timestamp']
assemble_kwargs['combine'] = config['combine']
assemble_kwargs['exclude_first'] = config['exclude_first']
assemble_kwargs['resolution'] = config.resolution
assemble_kwargs['sensors'] = sensors
assemble_kwargs['cold_time_span'] = config['cold_time_span']
if True:
assemble_kwargs['pman'] = pman
assemble_cold_result_kwcoco.assemble_main(
cmdline=0, proc_context=proc_context, **assemble_kwargs)
main_prog.step()
# To keep meta data, this script won't clean up stack_path
# remove stacked image
# main_prog.set_postfix('Cleanup')
# shutil.rmtree(tile_kwargs['stack_path'])
# main_prog.step()
# @profile
# def read_json_metadata(folder_path):
# stacked_path = folder_path / 'stacked'
# for root, dirs, files in os.walk(stacked_path):
# for file in files:
# if file.endswith(".json"):
# json_path = os.path.join(root, file)
# with open(json_path, "r") as f:
# metadata = json.load(f)
# return metadata
# return None
if __name__ == '__main__':
cold_predict_main()