"""
This is step 3/4 in predict.py
SeeAlso:
predict.py
prepare_kwcoco.py
tile_processing_kwcoco.py
export_cold_result_kwcoco.py *
assemble_cold_result_kwcoco.py
This script is for exporting COLD algorithm results (change vector, coefficients, RMSEs)
to geotiff raster with kwcoco dataset.
See original code: ~/code/pycold/src/python/pycold/imagetool/export_change_map.py
"""
import os
import numpy as np
import pandas as pd
import datetime as datetime_mod
import json
import scriptconfig as scfg
import logging
import itertools
import ubelt as ub
import pytz
from datetime import datetime as datetime_cls
import gc
import kwcoco
logger = logging.getLogger(__name__)
try:
from line_profiler import profile
except ImportError:
from ubelt import identity as profile
[docs]
class ExportColdKwcocoConfig(scfg.DataConfig):
"""
The docstring will be the description in the CLI help
"""
rank = scfg.Value(None, help='rank id')
n_cores = scfg.Value(None, help='total cores assigned')
stack_path = scfg.Value(None, help='folder directory of stacked data')
reccg_path = scfg.Value(
None, help='folder directory of cold processing result')
meta_fpath = scfg.Value(
None, help='file path of metadata json created by prepare_kwcoco script')
combined_coco_fpath = scfg.Value(None, help=ub.paragraph(
'''
a path to a file to combined kwcoco file
'''))
year_lowbound = scfg.Value(
None, help='min year for saving geotiff, e.g., 2017')
year_highbound = scfg.Value(
None, help='max year for saving geotiff, e.g., 2022')
coefs = scfg.Value(
None,
type=str,
help="list of COLD coefficients for saving geotiff, e.g., a0,c1,a1,b1,a2,b2,a3,b3,cv,rmse")
coefs_bands = scfg.Value(
None,
type=str,
help='indicate the ba_nds for output coefs_bands, e.g., 0,1,2,3,4,5')
timestamp = scfg.Value(
False,
help='True: exporting cold result by timestamp, False: exporting cold result by year, Default is False')
combine = scfg.Value(False, help='for temporal combined mode, Default is True')
exclude_first = scfg.Value(True, help='exclude first date of image from each sensor, Default is True')
sensors = scfg.Value('L8', type=str, help='sensor type, default is "L8"')
[docs]
@profile
def export_cold_main(cmdline=1, **kwargs):
"""
Args:
cmdline (int, optional): _description_. Defaults to 1.
Ignore:
python -m geowatch.tasks.cold.export_cold_result_kwcoco --help
TEST_COLD=1 xdoctest -m geowatch.tasks.cold.export_cold_result_kwcoco export_cold_main
Example:
>>> # xdoctest: +REQUIRES(env:TEST_COLD)
>>> from geowatch.tasks.cold.export_cold_result_kwcoco import export_cold_main
>>> from geowatch.tasks.cold.export_cold_result_kwcoco import *
>>> kwargs= dict(
>>> rank = 0,
>>> n_cores = 1,
>>> stack_path = "/gpfs/scratchfs1/zhz18039/jws18003/new-repos/smart_data_dvc2/Drop6-MeanYear10GSD-V2/_pycold_combine2/stacked/KR_R001/",
>>> reccg_path = "/gpfs/scratchfs1/zhz18039/jws18003/new-repos/smart_data_dvc2/Drop6-MeanYear10GSD-V2/_pycold_combine2/reccg/KR_R001/",
>>> combined_coco_fpath = "/gpfs/scratchfs1/zhz18039/jws18003/new-repos/smart_data_dvc2/Drop6-MeanYear10GSD-V2/imgonly-KR_R001.kwcoco.zip",
>>> coefs = 'cv,rmse,a0,a1,b1,c1',
>>> year_lowbound = None,
>>> year_highbound = None,
>>> coefs_bands = '0,1,2,3,4,5',
>>> timestamp = False,
>>> combine = True,
>>> )
>>> cmdline=0
>>> export_cold_main(cmdline, **kwargs)
"""
pman = kwargs.pop('pman', None)
config_in = ExportColdKwcocoConfig.cli(cmdline=cmdline, data=kwargs, strict=True)
rank = config_in['rank']
n_cores = config_in['n_cores']
stack_path = ub.Path(config_in['stack_path'])
reccg_path = ub.Path(config_in['reccg_path'])
year_lowbound = config_in['year_lowbound']
year_highbound = config_in['year_highbound']
coefs = config_in['coefs']
coefs_bands = config_in['coefs_bands']
combine = config_in['combine']
timestamp = config_in['timestamp']
exclude_first = config_in['exclude_first']
sensors = config_in['sensors']
if combine:
combined_coco_fpath = ub.Path(config_in['combined_coco_fpath'])
else:
combined_coco_fpath = None
# assert (combine and not timestamp) or (not combine and timestamp), "combine and timestamp must be either True and False, or False and True."
# TODO: MPI mode
# if config_in['rank'] == 'MPI':
# ## MPI mode
# raise NotImplementedError('todo')
# MPI = 'TODO'
# comm = MPI.COMM_WORLD
# rank = comm.Get_rank()
# n_cores = comm.Get_size()
# else:
# rank = config_in['rank']
# n_cores = config_in['n_cores']
# define variables
config = read_json_metadata(stack_path)
n_cols = config['padded_n_cols']
n_rows = config['padded_n_rows']
n_block_x = config['n_block_x']
n_block_y = config['n_block_y']
block_width = int(n_cols / n_block_x) # width of a block
block_height = int(n_rows / n_block_y) # height of a block
n_blocks = n_block_x * n_block_y # total number of blocks
cold_param = json.loads((reccg_path / 'log.json').read_text())
method = cold_param['algorithm']
# coef_names = ['cv', 'rmse', 'a0', 'a1', 'b1', 'a2', 'b2', 'a3', 'b3', 'c1']
# band_names = [0, 1, 2, 3, 4, 5]
if coefs is not None:
try:
coefs = list(coefs.split(","))
except Exception:
print(f'coefs={coefs}')
print(
"Illegal coefs inputs: example, --coefs='a0, c1, a1, b1, a2, b2, a3, b3, cv, rmse'")
raise
try:
coefs_bands = list(coefs_bands.split(","))
coefs_bands = [int(coefs_band) for coefs_band in coefs_bands]
except Exception:
print(f'coefs_bands={coefs_bands}')
print(
"Illegal coefs_bands inputs: example, --coefs_bands='0, 1, 2, 3, 4, 5, 6'")
raise
dt = np.dtype([('t_start', np.int32),
('t_end', np.int32),
('t_break', np.int32),
('pos', np.int32),
('num_obs', np.int32),
('category', np.short),
('change_prob', np.short),
# note that the slope coefficient was scaled up by 10000
('coefs', np.float32, (7, 8)),
('rmse', np.float32, 7),
('magnitude', np.float32, 7)])
# if coefs is not None:
# assert all(elem in coef_names for elem in coefs)
# assert all(elem in band_names for elem in coefs_bands)
out_path = reccg_path / 'cold_feature'
tmp_path = reccg_path / 'cold_feature' / 'tmp'
tz = pytz.timezone('US/Eastern')
if rank == 0:
out_path.ensuredir()
tmp_path.ensuredir()
# MPI mode
# trans = comm.bcast(trans, root=0)
# proj = comm.bcast(proj, root=0)
# cols = comm.bcast(cols, root=0)
# rows = comm.bcast(rows, root=0)
# config = comm.bcast(config, root=0)
# Get ordinal list from sample block_folder
block_folder = stack_path / 'block_x1_y1'
meta_files = [m for m in os.listdir(block_folder) if m.endswith('.json')]
# sort image files by ordinal dates
img_dates = []
img_names = []
img_dates_L8 = []
img_names_L8 = []
img_dates_S2 = []
img_names_S2 = []
# read metadata and
for meta in meta_files:
meta_config = json.loads((block_folder / meta).read_text())
ordinal_date = meta_config['ordinal_date']
img_name = meta_config['image_name'] + '.npy'
img_dates.append(ordinal_date)
img_names.append(img_name)
for meta in meta_files:
meta_config = json.loads((block_folder / meta).read_text())
if '_L8_' in meta_config['image_name']:
ordinal_date_L8 = meta_config['ordinal_date']
img_name_L8 = meta_config['image_name'] + '.npy'
img_dates_L8.append(ordinal_date_L8)
img_names_L8.append(img_name_L8)
elif '_S2_' in meta_config['image_name']:
ordinal_date_S2 = meta_config['ordinal_date']
img_name_S2 = meta_config['image_name'] + '.npy'
img_dates_S2.append(ordinal_date_S2)
img_names_S2.append(img_name_S2)
if year_lowbound is None:
year_low_ordinal = min(img_dates)
year_lowbound = pd.Timestamp.fromordinal(year_low_ordinal).year
else:
year_low_ordinal = pd.Timestamp.toordinal(
datetime_mod.datetime(int(year_lowbound), 1, 1))
if year_highbound is None:
year_high_ordinal = max(img_dates)
year_highbound = pd.Timestamp.fromordinal(year_high_ordinal).year
else:
year_high_ordinal = pd.Timestamp.toordinal(
datetime_mod.datetime(int(year_highbound + 1), 1, 1))
img_dates, img_names = zip(*filter(lambda x: x[0] >= year_low_ordinal,
zip(img_dates, img_names)))
img_dates, img_names = zip(*filter(lambda x: x[0] < year_high_ordinal,
zip(img_dates, img_names)))
img_dates = sorted(img_dates)
img_names = sorted(img_names)
if 'L8' in sensors:
img_dates_L8, img_names_L8 = zip(*filter(lambda x: x[0] >= year_low_ordinal,
zip(img_dates_L8, img_names_L8)))
img_dates_L8, img_names_L8 = zip(*filter(lambda x: x[0] < year_high_ordinal,
zip(img_dates_L8, img_names_L8)))
img_dates_L8 = sorted(img_dates_L8)
img_names_L8 = sorted(img_names_L8)
if 'S2' in sensors:
img_dates_S2, img_names_S2 = zip(*filter(lambda x: x[0] >= year_low_ordinal,
zip(img_dates_S2, img_names_S2)))
img_dates_S2, img_names_S2 = zip(*filter(lambda x: x[0] < year_high_ordinal,
zip(img_dates_S2, img_names_S2)))
img_dates_S2 = sorted(img_dates_S2)
img_names_S2 = sorted(img_names_S2)
if timestamp:
ordinal_day_list = img_dates
if not timestamp:
first_ordinal_dates_L8 = []
first_img_names_L8 = []
first_ordinal_dates_S2 = []
first_img_names_S2 = []
last_year_L8 = None
last_year_S2 = None
if 'L8' in sensors:
for ordinal_day_L8, img_name_L8 in zip(img_dates_L8, img_names_L8):
year_L8 = pd.Timestamp.fromordinal(ordinal_day_L8).year
if year_L8 != last_year_L8:
# print("L8", img_name_L8)
first_ordinal_dates_L8.append(ordinal_day_L8)
first_img_names_L8.append(img_name_L8)
last_year_L8 = year_L8
if 'S2' in sensors:
for ordinal_day_S2, img_name_S2 in zip(img_dates_S2, img_names_S2):
year_S2 = pd.Timestamp.fromordinal(ordinal_day_S2).year
if year_S2 != last_year_S2:
# print("S2", img_name_S2)
first_ordinal_dates_S2.append(ordinal_day_S2)
first_img_names_S2.append(img_name_S2)
last_year_S2 = year_S2
if exclude_first:
if 'L8' in sensors and 'S2' in sensors:
ordinal_day_list = first_ordinal_dates_L8[1:] + first_ordinal_dates_S2[1:]
elif 'L8' in sensors and 'S2' not in sensors:
ordinal_day_list = first_ordinal_dates_L8[1:]
elif 'S2' in sensors and 'L8' not in sensors:
ordinal_day_list = first_ordinal_dates_S2[1:]
else:
if 'L8' in sensors and 'S2' in sensors:
ordinal_day_list = first_ordinal_dates_L8 + first_ordinal_dates_S2
elif 'L8' in sensors and 'S2' not in sensors:
ordinal_day_list = first_ordinal_dates_L8
elif 'S2' in sensors and 'L8' not in sensors:
ordinal_day_list = first_ordinal_dates_S2
ordinal_day_list.sort()
if combine:
combined_coco_dset = kwcoco.CocoDataset(combined_coco_fpath)
# filter by sensors
all_images = combined_coco_dset.images(list(ub.flatten(combined_coco_dset.videos().images)))
flags = [s in sensors for s in all_images.lookup('sensor_coarse')]
all_images = all_images.compress(flags)
image_id_iter = iter(all_images)
# Get ordinal date of combined coco image
ordinal_dates = []
ordinal_dates_july_first = []
for image_id in image_id_iter:
combined_coco_image: kwcoco.CocoImage = combined_coco_dset.coco_image(image_id)
ts = combined_coco_image.img['timestamp']
timestamp_local = datetime_cls.fromtimestamp(ts, tz=tz)
timestamp_utc = timestamp_local.astimezone(pytz.utc)
july_first = datetime_mod.date(timestamp_utc.year, 7, 1).toordinal()
ordinal = timestamp_utc.toordinal()
ordinal_dates.append(ordinal)
ordinal_dates_july_first.append(july_first)
ordinal_day_list = ordinal_dates
ranks_percore = int(np.ceil(n_blocks / n_cores))
i_iter = range(ranks_percore)
if pman is not None:
i_iter = pman.progiter(i_iter, total=ranks_percore,
desc=f'Export COLD \\[rank {rank}]', transient=True)
for i in i_iter:
iblock = n_cores * i + rank
if iblock >= n_blocks:
break
current_block_y = int(np.floor(iblock / n_block_x)) + 1
current_block_x = iblock % n_block_x + 1
if method == 'OBCOLD':
filename = f'record_change_x{current_block_x}_y{current_block_y}_obcold.npy'
elif method == 'COLD':
filename = f'record_change_x{current_block_x}_y{current_block_y}_cold.npy'
elif method == 'HybridCOLD':
filename = f'record_change_x{current_block_x}_y{current_block_y}_hybridcold.npy'
block_folder = stack_path / \
f'block_x{current_block_x}_y{current_block_y}'
reccg_fpath = reccg_path / filename
now = datetime_cls.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print(f'processing the rec_cg file {reccg_fpath} ({now})')
# if not reccg_fpath.exists():
# print(f'the rec_cg file {reccg_fpath} is missing')
if coefs is not None:
results_block_coefs = np.full(
(block_height, block_width, len(coefs) * len(coefs_bands),
len(ordinal_day_list)), -9999, dtype=np.float32)
status_fpath = reccg_fpath.with_suffix('.status.json')
if status_fpath.exists():
with open(status_fpath, 'r') as f:
status_data = json.load(f)
if status_data.get('status') == 'failed':
print(f'the rec_cg file {reccg_fpath} has failed status')
results_block_coefs
else:
cold_block = np.array(np.load(reccg_fpath), dtype=dt)
cold_block_split = np.split(cold_block, np.argwhere(np.diff(cold_block['pos']) != 0)[:, 0] + 1)
nan_val = -9999
feature_outputs = coefs
feature_set = set(feature_outputs)
if not feature_set.issubset({'a0', 'c1', 'a1', 'b1', 'a2', 'b2', 'a3', 'b3', 'cv', 'rmse'}):
raise Exception('the outputted feature must be in [a0, c1, a1, b1, a2, b2, a3, b3, cv, rmse]')
for element in cold_block_split:
# Degugging mode
# if element[0]["pos"] == 4:
# print(element)
# the relative column number in the block
i_col = int((element[0]["pos"] - 1) % n_cols) - \
(current_block_x - 1) * block_width
i_row = int((element[0]["pos"] - 1) / n_cols) - \
(current_block_y - 1) * block_height
for band_idx, band in enumerate(coefs_bands):
feature_row = extract_features(element, band, ordinal_day_list, nan_val, timestamp,
feature_outputs, feature_set)
# Degugging mode
# if current_block_x == 1 and current_block_y == 1:
# if i_col == 3 and i_row == 0:
# print(element[0]["pos"])
# print((current_block_x, current_block_y), (i_col, i_row), (band_idx, band))
# print('feature_row', feature_row)
for index, coef in enumerate(coefs):
# Degugging mode
# if i_col == 3 and i_row == 0 and element[0]["pos"] == 4:
# print(feature_row[index])
results_block_coefs[i_row][i_col][index + band_idx * len(coefs)][:] = \
feature_row[index]
# save the temp dataset out
for day in range(len(ordinal_day_list)):
if coefs is not None:
outfile = tmp_path / \
f'tmp_coefmap_block{iblock + 1}_{ordinal_day_list[day]}.npy'
np.save(outfile, results_block_coefs[:, :, :, day])
# MPI mode (wait for all processes)
# comm.Barrier()
gc.collect()
[docs]
class NoMatchingColdCurve(Exception):
...
if __name__ == '__main__':
export_cold_main()