# Author: Su Ye
# This script is an example for generating block-based stack files from original ARD zip as intermediate inputs to the
# COLD algorithm in a HPC environment. As preparation, you need to download '_BT' and '_SR' for all Landsat
# ARD collection 1.
# This script has 4 steps: 1) warp single-path array to limit the observation inputs for each pixel
# if single_path is set True; 2) unzip all images and unpack bit-based QA bands; 3) partition each 5000*5000 temporal
# images to blocks and eliminate those image blocks if no clear, water or snow pixel were in it (so to save disk space
# and enable independent IO for individual block in later time-series analysis); 4) save each image block to
# python-native binary format (.npy) into its block folders
# For a 42-year Landsat ARD C1 tile (~3000 images), this script averagely produces ~350 G intermediate disk
# files, and takes ~12 mins to finish if 200 EPYC 7452 cores are used.
import os
import sys
import shutil
import tarfile
import logging
import time
import datetime as dt
import xml.etree.ElementTree as ET
import yaml
import click
from pytz import timezone
from datetime import datetime
from os import listdir
from os.path import isfile, join, isdir
from pathlib import Path
from importlib import resources as importlib_resources
import pandas as pd
import numpy as geek
import numpy as np
from glob import glob
from dateutil.parser import parse
from osgeo import gdal_array
from osgeo import gdal
import fiona
# define constant here
QA_CLEAR = 0
QA_WATER = 1
QA_SHADOW = 2
QA_SNOW = 3
QA_CLOUD = 4
QA_FILL = 255
QA_CIRRUS_HLS = 0
QA_WATER_HLS = 5
QA_SHADOW_HLS = 3
QA_SNOW_HLS = 4
QA_CLOUDADJACENT_HLS = 2
QA_CLOUD_HLS = 1
res = 30
s2_stack_bands = ['B02', 'B03', 'B04', 'B8A', 'B11', 'B12', 'Fmask']
l8_stack_bands = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'Fmask']
[docs]
def mask_value(vector, val):
"""
Build a boolean mask around a certain value in the vector.
Args:
vector: 1-d ndarray of values
val: values to mask on
Returns:
1-d boolean ndarray
"""
return vector == val
[docs]
def qabitval_array_HLS(packedint_array):
"""
Institute a hierarchy of qa values that may be flagged in the bitpacked
value.
fill > cloud > shadow > snow > water > clear
Args:
packedint: int value to bit check
Returns:
offset value to use
"""
unpacked = np.full(packedint_array.shape, 0)
QA_CLOUD_unpacked = geek.bitwise_and(packedint_array, 1 << 1)
QA_CLOUD_ADJ = geek.bitwise_and(packedint_array, 1 << 2)
QA_SHADOW_unpacked = geek.bitwise_and(packedint_array, 1 << 3)
QA_SNOW_unpacked = geek.bitwise_and(packedint_array, 1 << 4)
QA_WATER_unpacked = geek.bitwise_and(packedint_array, 1 << 5)
unpacked[QA_WATER_unpacked > 0] = QA_WATER
unpacked[QA_SNOW_unpacked > 0] = QA_SNOW
unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW
unpacked[QA_CLOUD_ADJ > 0] = QA_CLOUD
unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD
unpacked[packedint_array == QA_FILL] = QA_FILL
return unpacked
[docs]
def qabitval_array(packedint_array):
"""
Institute a hierarchy of qa values that may be flagged in the bitpacked
value.
fill > cloud > shadow > snow > water > clear
Args:
packedint: int value to bit check
Returns:
offset value to use
"""
unpacked = np.full(packedint_array.shape, QA_FILL)
QA_CLOUD_unpacked = geek.bitwise_and(packedint_array, 1 << (QA_CLOUD + 1))
QA_SHADOW_unpacked = geek.bitwise_and(packedint_array, 1 << (QA_SHADOW + 1))
QA_SNOW_unpacked = geek.bitwise_and(packedint_array, 1 << (QA_SNOW + 1))
QA_WATER_unpacked = geek.bitwise_and(packedint_array, 1 << (QA_WATER + 1))
QA_CLEAR_unpacked = geek.bitwise_and(packedint_array, 1 << (QA_CLEAR + 1))
unpacked[QA_CLEAR_unpacked > 0] = QA_CLEAR
unpacked[QA_WATER_unpacked > 0] = QA_WATER
unpacked[QA_SNOW_unpacked > 0] = QA_SNOW
unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW
unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD
return unpacked
[docs]
def qabitval_array_c2(packedint_array):
"""
Institute a hierarchy of qa values that may be flagged in the bitpacked
value for c2
fill > cloud > shadow > snow > water > clear
Args:
packedint: int value to bit check
Returns:
offset value to use
"""
unpacked = np.full(packedint_array.shape, QA_FILL)
QA_CLEAR_unpacked = geek.bitwise_and(packedint_array, 1 << 6)
QA_SHADOW_unpacked = geek.bitwise_and(packedint_array, 1 << 4)
QA_CLOUD_unpacked = geek.bitwise_and(packedint_array, 1 << 3)
QA_DILATED_unpacked = geek.bitwise_and(packedint_array, 1 << 1)
QA_SNOW_unpacked = geek.bitwise_and(packedint_array, 1 << 5)
QA_WATER_unpacked = geek.bitwise_and(packedint_array, 1 << 7)
unpacked[QA_SNOW_unpacked > 0] = QA_SNOW
unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW
unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD
unpacked[QA_DILATED_unpacked > 0] = QA_CLOUD
unpacked[QA_CLEAR_unpacked > 0] = QA_CLEAR
unpacked[QA_WATER_unpacked > 0] = QA_WATER
return unpacked
[docs]
def load_data(file_name, gdal_driver='GTiff'):
'''
Converts a GDAL compatable file into a numpy array and associated geodata.
The rray is provided so you can run with your processing - the geodata consists of the geotransform and gdal dataset object
if you're using an ENVI binary as input, this willr equire an associated .hdr file otherwise this will fail.
This needs modifying if you're dealing with multiple bands.
VARIABLES
file_name : file name and path of your file
RETURNS
image array
(geotransform, inDs)
'''
driver_t = gdal.GetDriverByName(gdal_driver) # http://www.gdal.org/formats_list.html
driver_t.Register()
inDs = gdal.Open(file_name, gdal.GA_ReadOnly)
# print(inDs)
if inDs is None:
print('Couldnt open this file {}'.format(file_name))
sys.exit("Try again!")
# Extract some info form the inDs
geotransform = inDs.GetGeoTransform()
# Get the data as a numpy array
band = inDs.GetRasterBand(1)
cols = inDs.RasterXSize
rows = inDs.RasterYSize
image_array = band.ReadAsArray(0, 0, cols, rows)
return image_array, (geotransform, inDs)
[docs]
def single_image_stacking_hls(source_dir, out_dir, logger, config, folder, is_partition=True, clear_threshold=0,
low_date_bound=None, upp_date_bound=None):
"""
unzip single image, convert bit-pack qa to byte value, and save as numpy
:param source_dir: the parent folder to save image 'folder'
:param out_dir: the folder to save result
:param folder: the folder name of image
:param logger: the handler of logger file
:param config
:param is_partition: True, partition each image into blocks; False, save original size of image
:param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed
:param low_date_bound: the lower date of user interested year range
:param upp_date_bound: the upper date of user interested year range
:return:
"""
try:
QA_band = gdal_array.LoadFile(join(join(source_dir, folder), "{}.Fmask.tif".format(folder)))
except ValueError as e:
# logger.error('Cannot open QA band for {}: {}'.format(folder, e))
logger.error('Cannot open QA band for {}: {}'.format(folder, e))
return False
# convertQA = np.vectorize(qabitval)
QA_band_unpacked = qabitval_array_HLS(QA_band).astype(np.short)
if clear_threshold > 0:
clear_ratio = np.sum(np.logical_or(QA_band_unpacked == QA_CLEAR,
QA_band_unpacked == QA_WATER)) \
/ np.sum(QA_band_unpacked != QA_FILL)
else:
clear_ratio = 1
if clear_ratio > clear_threshold:
[collection, sensor, tile_id, imagetime, version1, version2] = folder.rsplit('.')
year = imagetime[0:4]
doy = imagetime[4:7]
file_name = sensor + tile_id + year + doy + collection + version1
if low_date_bound is not None:
if pd.Timestamp.toordinal((dt.datetime(year, 1, 1) + dt.timedelta(doy - 1)).strftime('%Y-%m-%d')) < \
low_date_bound:
return True
if upp_date_bound is not None:
if pd.Timestamp.toordinal((dt.datetime(year, 1, 1) + dt.timedelta(doy - 1)).strftime('%Y-%m-%d')) > \
upp_date_bound:
return True
if sensor == 'L30':
try:
B1 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B02.tif".format(folder)))
B2 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B03.tif".format(folder)))
B3 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B04.tif".format(folder)))
B4 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B05.tif".format(folder)))
B5 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B06.tif".format(folder)))
B6 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B07.tif".format(folder)))
B7 = np.full(B6.shape, 0) # assign zero
except Exception as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return False
elif sensor == 'S30':
try:
B1 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B02.tif".format(folder)))
B2 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B03.tif".format(folder)))
B3 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B04.tif".format(folder)))
B4 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B8A.tif".format(folder)))
B5 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B11.tif".format(folder)))
B6 = gdal_array.LoadFile(join(join(source_dir, folder),
"{}.B12.tif".format(folder)))
B7 = np.full(B6.shape, 0)
except Exception as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return False
if (B1 is None) or (B2 is None) or (B3 is None) or (B4 is None) or (B5 is None) or (B6 is None):
logger.error('Reading Landsat band fails for {}'.format(folder))
return False
if is_partition is True:
b_width = int(config['n_cols'] / config['n_block_x']) # width of a block
b_height = int(config['n_rows'] / config['n_block_y'])
bytesize = 2 # short16 = 2 * byte
# source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-in-python-using-numpy-d1bf0dd7b6f7
B1_blocks = np.lib.stride_tricks.as_strided(B1, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B2_blocks = np.lib.stride_tricks.as_strided(B2, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B3_blocks = np.lib.stride_tricks.as_strided(B3, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B4_blocks = np.lib.stride_tricks.as_strided(B4, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B5_blocks = np.lib.stride_tricks.as_strided(B5, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B6_blocks = np.lib.stride_tricks.as_strided(B6, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B7_blocks = np.lib.stride_tricks.as_strided(B7, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
QA_blocks = np.lib.stride_tricks.as_strided(QA_band_unpacked,
shape=(config['n_block_y'],
config['n_block_x'], b_height,
b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize,
bytesize))
for i in range(config['n_block_y']):
for j in range(config['n_block_x']):
# check if no valid pixels in the chip, then eliminate
qa_unique = np.unique(QA_blocks[i][j])
# skip blocks are all cloud, shadow or filled values
# in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD',
# 'QA_SHADOW', or FILLED value (255)
if QA_CLEAR not in qa_unique and \
QA_WATER not in qa_unique and \
QA_SNOW not in qa_unique:
continue
block_folder = 'block_x{}_y{}'.format(j + 1, i + 1)
np.save(join(join(out_dir, block_folder), file_name),
np.dstack([B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j],
B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j],
QA_blocks[i][j]]).astype(np.int16))
else:
np.save(join(out_dir, file_name),
np.dstack([B1, B2, B3, B4, B5, B6, B7, QA_band_unpacked]).astype(np.int16))
# scene_list.append(folder_name)
else:
# logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
logger.warn('Not enough clear observations for {}'.format(folder))
return True
[docs]
def single_image_stacking_hls14(out_dir, logger, config, folder, is_partition=True, clear_threshold=0,
low_date_bound=None, upp_date_bound=None):
"""
unzip single image, convert bit-pack qa to byte value, and save as numpy
:param source_dir: the parent folder to save image 'folder'
:param out_dir: the folder to save result
:param folder: the folder name of image
:param logger: the handler of logger file
:param config
:param is_partition: True, partition each image into blocks; False, save original size of image
:param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed
:param low_date_bound: the lower bound of user interested date range
:param upp_date_bound: the upper bound of user interested date range
:return:
"""
try:
hdf_ds = gdal.Open(folder, gdal.GA_ReadOnly).GetSubDatasets()
qa_ds = gdal.Open(hdf_ds[-1][0])
QA_band = qa_ds.ReadAsArray()
except ValueError as e:
# logger.error('Cannot open QA band for {}: {}'.format(folder, e))
logger.error('Cannot open QA band for {}: {}'.format(folder, e))
return False
# convertQA = np.vectorize(qabitval)
QA_band_unpacked = qabitval_array_HLS(QA_band).astype(np.short)
if clear_threshold > 0:
clear_ratio = np.sum(np.logical_or(QA_band_unpacked == QA_CLEAR,
QA_band_unpacked == QA_WATER)) \
/ np.sum(QA_band_unpacked != QA_FILL)
else:
clear_ratio = 1
if clear_ratio > clear_threshold:
[collection, sensor, tile_id, imagetime, version1, version2, ext] = folder.split('/')[-1].rsplit('.')
year = imagetime[0:4]
doy = imagetime[4:7]
file_name = sensor + tile_id + year + doy + collection + version1
if low_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) < low_date_bound:
return True
if upp_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) > upp_date_bound:
return True
if sensor == 'L30':
try:
B1 = gdal.Open(hdf_ds[1][0]).ReadAsArray()
B2 = gdal.Open(hdf_ds[2][0]).ReadAsArray()
B3 = gdal.Open(hdf_ds[3][0]).ReadAsArray()
B4 = gdal.Open(hdf_ds[4][0]).ReadAsArray()
B5 = gdal.Open(hdf_ds[5][0]).ReadAsArray()
B6 = gdal.Open(hdf_ds[6][0]).ReadAsArray()
B7 = np.full(B6.shape, 0) # assign zero
except Exception as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return False
elif sensor == 'S30':
try:
B1 = gdal.Open(hdf_ds[1][0]).ReadAsArray()
B2 = gdal.Open(hdf_ds[2][0]).ReadAsArray()
B3 = gdal.Open(hdf_ds[3][0]).ReadAsArray()
B4 = gdal.Open(hdf_ds[8][0]).ReadAsArray()
B5 = gdal.Open(hdf_ds[11][0]).ReadAsArray()
B6 = gdal.Open(hdf_ds[12][0]).ReadAsArray()
B7 = np.full(B6.shape, 0)
except Exception as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return False
if (B1 is None) or (B2 is None) or (B3 is None) or (B4 is None) or (B5 is None) or (B6 is None):
logger.error('Reading Landsat band fails for {}'.format(folder))
return False
if is_partition is True:
b_width = int(config['n_cols'] / config['n_block_x']) # width of a block
b_height = int(config['n_rows'] / config['n_block_y'])
bytesize = 2 # short16 = 2 * byte
# source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-in-python-using-numpy-d1bf0dd7b6f7
B1_blocks = np.lib.stride_tricks.as_strided(B1, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B2_blocks = np.lib.stride_tricks.as_strided(B2, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B3_blocks = np.lib.stride_tricks.as_strided(B3, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B4_blocks = np.lib.stride_tricks.as_strided(B4, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B5_blocks = np.lib.stride_tricks.as_strided(B5, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B6_blocks = np.lib.stride_tricks.as_strided(B6, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B7_blocks = np.lib.stride_tricks.as_strided(B7, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
QA_blocks = np.lib.stride_tricks.as_strided(QA_band_unpacked,
shape=(config['n_block_y'],
config['n_block_x'], b_height,
b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize,
bytesize))
for i in range(config['n_block_y']):
for j in range(config['n_block_x']):
# check if no valid pixels in the chip, then eliminate
qa_unique = np.unique(QA_blocks[i][j])
# skip blocks are all cloud, shadow or filled values
# in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD',
# 'QA_SHADOW', or FILLED value (255)
if QA_CLEAR not in qa_unique and \
QA_WATER not in qa_unique and \
QA_SNOW not in qa_unique:
continue
block_folder = 'block_x{}_y{}'.format(j + 1, i + 1)
np.save(join(join(out_dir, block_folder), file_name),
np.dstack([B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j],
B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j],
QA_blocks[i][j]]).astype(np.int16))
else:
np.save(join(out_dir, file_name),
np.dstack([B1, B2, B3, B4, B5, B6, B7, QA_band_unpacked]).astype(np.int16))
# scene_list.append(folder_name)
else:
# logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
logger.warn('Not enough clear observations for {}'.format(folder))
return True
[docs]
def single_image_stacking(tmp_path, source_dir, out_dir, folder, clear_threshold, path_array, logger, config,
is_partition=True, low_date_bound=None, upp_date_bound=None):
"""
unzip single image, convert bit-pack qa to byte value, and save as numpy
:param tmp_path: tmp folder to save unzip image
:param source_dir: image folder save source zipped files
:param out_dir: the folder to save result
:param folder: the folder name of image
:param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed
:param path_array: path array has the same dimension of inputted image, and the pixel value indicates
the path which the pixel belongs to; if path_array == none, we will use all path
:param logger: the handler of logger file
:param config
:param is_partition: True, partition each image into blocks; False, save original size of image
:param low_date_bound: the lower bound of user interested year range
:param upp_date_bound: the upper bound of user interested year range
:return:
"""
# unzip SR
if os.path.exists(join(tmp_path, folder)):
shutil.rmtree(join(tmp_path, folder), ignore_errors=True)
if os.path.exists(join(tmp_path, folder.replace("SR", "BT"))):
shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True)
try:
with tarfile.open(join(source_dir, folder + '.tar')) as tar_ref:
try:
tar_ref.extractall(join(tmp_path, folder))
except Exception:
# logger.warning('Unzip fails for {}'.format(folder))
logger.error('Unzip fails for {}'.format(folder))
return
except IOError as e:
logger.error('Unzip fails for {}: {}'.format(folder, e))
# return
# unzip BT
try:
with tarfile.open(join(source_dir, folder.replace("SR", "BT") + '.tar')) as tar_ref:
try:
tar_ref.extractall(join(tmp_path, folder.replace("SR", "BT")))
except Exception:
# logger.warning('Unzip fails for {}'.format(folder.replace("SR", "BT")))
logger.error('Unzip fails for {}'.format(folder.replace("SR", "BT")))
return
except IOError as e:
logger.error('Unzip fails for {}: {}'.format(folder.replace("SR", "BT"), e))
return
if not isdir(join(tmp_path, folder.replace("SR", "BT"))):
logger.error('Fail to locate BT folder for {}'.format(folder))
return
try:
QA_band = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}_PIXELQA.tif".format(folder[0:len(folder) - 3])))
except ValueError as e:
# logger.error('Cannot open QA band for {}: {}'.format(folder, e))
logger.error('Cannot open QA band for {}: {}'.format(folder, e))
return
# convertQA = np.vectorize(qabitval)
QA_band_unpacked = qabitval_array(QA_band).astype(np.short)
if clear_threshold > 0:
clear_ratio = np.sum(np.logical_or(QA_band_unpacked == QA_CLEAR,
QA_band_unpacked == QA_WATER)) \
/ np.sum(QA_band_unpacked != QA_FILL)
else:
clear_ratio = 1
if clear_ratio > clear_threshold:
if folder[3] == '5':
sensor = 'LT5'
elif folder[3] == '7':
sensor = 'LE7'
elif folder[3] == '8':
sensor = 'LC8'
elif folder[3] == '4':
sensor = 'LT4'
elif folder[3] == '9':
sensor = 'LC9'
else:
logger.error('Sensor is not correctly formatted for the scene {}'.format(folder))
col = folder[8:11]
row = folder[11:14]
year = folder[15:19]
doy = datetime(int(year), int(folder[19:21]), int(folder[21:23])).strftime('%j')
collection = "C{}".format(folder[35:36])
version = folder[37:40]
file_name = sensor + col + row + year + doy + collection + version
if low_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) < low_date_bound:
return
if upp_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) > upp_date_bound:
return
if sensor == 'LT5' or sensor == 'LE7' or sensor == 'LT4':
try:
B1 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B1.tif".format(folder)))
B2 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B2.tif".format(folder)))
B3 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B3.tif".format(folder)))
B4 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B4.tif".format(folder)))
B5 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B5.tif".format(folder)))
B6 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B7.tif".format(folder)))
B7 = gdal_array.LoadFile(
join(join(tmp_path, "{}_BT".format(folder[0:len(folder) - 3])),
"{}_BTB6.tif".format(folder[0:len(folder) - 3])))
except ValueError as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return
elif sensor == 'LC8' or 'LC9':
try:
B1 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B2.tif".format(folder)))
B2 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B3.tif".format(folder)))
B3 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B4.tif".format(folder)))
B4 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B5.tif".format(folder)))
B5 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B6.tif".format(folder)))
B6 = gdal_array.LoadFile(join(join(tmp_path, folder),
"{}B7.tif".format(folder)))
B7 = gdal_array.LoadFile(
join(join(tmp_path, "{}_BT".format(folder[0:len(folder) - 3])),
"{}_BTB10.tif".format(folder[0:len(folder) - 3])))
except ValueError as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return
if (B1 is None) or (B2 is None) or (B3 is None) or (B4 is None) or (B5 is None) or (B6 is None) or \
(B7 is None):
logger.error('Reading Landsat band fails for {}'.format(folder))
return
# if path_array is not None, we will eliminate those observation that has different path with its assigned path
if path_array is not None: # meaning that single-path processing
if not os.path.exists(join(join(tmp_path, folder), folder.replace("_SR", ".xml"))):
logger.error('Cannot find xml file for {}'.format(join(join(tmp_path, folder), folder.replace("_SR", ".xml"))))
return
tree = ET.parse(join(join(tmp_path, folder), folder.replace("_SR", ".xml")))
# get root element
root = tree.getroot()
elements = root.findall(
'./{https://landsat.usgs.gov/ard/v1}scene_metadata/{https://landsat.usgs.gov/'
'ard/v1}global_metadata/{https://landsat.usgs.gov/ard/v1}wrs')
if len(elements) == 0:
logger.error('Parsing xml fails for {}'.format(folder))
return
pathid = int(elements[0].attrib['path'])
# assign filled value to the pixels has different path id so won't be processed
QA_band_unpacked[path_array != pathid] = QA_FILL
if is_partition is True:
b_width = int(config['n_cols'] / config['n_block_x']) # width of a block
b_height = int(config['n_rows'] / config['n_block_y'])
bytesize = 2 # short16 = 2 * byte
# source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-in-python-using-numpy-d1bf0dd7b6f7
B1_blocks = np.lib.stride_tricks.as_strided(B1, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B2_blocks = np.lib.stride_tricks.as_strided(B2, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B3_blocks = np.lib.stride_tricks.as_strided(B3, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B4_blocks = np.lib.stride_tricks.as_strided(B4, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B5_blocks = np.lib.stride_tricks.as_strided(B5, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B6_blocks = np.lib.stride_tricks.as_strided(B6, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B7_blocks = np.lib.stride_tricks.as_strided(B7, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
QA_blocks = np.lib.stride_tricks.as_strided(QA_band_unpacked,
shape=(config['n_block_y'],
config['n_block_x'], b_height,
b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize,
bytesize))
for i in range(config['n_block_y']):
for j in range(config['n_block_x']):
# check if no valid pixels in the chip, then eliminate
qa_unique = np.unique(QA_blocks[i][j])
# skip blocks are all cloud, shadow or filled values
# in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD',
# 'QA_SHADOW', or FILLED value (255)
if QA_CLEAR not in qa_unique and \
QA_WATER not in qa_unique and \
QA_SNOW not in qa_unique:
continue
block_folder = 'block_x{}_y{}'.format(j + 1, i + 1)
np.save(join(join(out_dir, block_folder), file_name),
np.dstack([B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j],
B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j],
QA_blocks[i][j]]).astype(np.int16))
else:
np.save(join(out_dir, file_name), np.dstack([B1, B2, B3, B4, B5, B6, B7, QA_band_unpacked]).astype(np.int16))
# scene_list.append(folder_name)
else:
# logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
logger.warn('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
# delete unzip folder
shutil.rmtree(join(tmp_path, folder), ignore_errors=True)
shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True)
[docs]
def single_image_stacking_collection2(tmp_path, source_dir, out_dir, folder, clear_threshold, logger, config, bounds,
is_partition=True, low_date_bound=None, upp_date_bound=None):
"""
for collection 2
:param tmp_path: tmp folder to save unzip image
:param source_dir: image folder save source zipped files
:param out_dir: the folder to save result
:param folder: the folder name of image
:param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed
:param logger: the handler of logger file
:param config
:param is_partition: True, partition each image into blocks; False, save original size of image
:param low_date_bound: the lower bound of user interested date range
:param upp_date_bound: the upper bound of user interested date range
:param bounds
:return:
"""
# unzip SR
if os.path.exists(join(tmp_path, folder)):
shutil.rmtree(join(tmp_path, folder), ignore_errors=True)
try:
with tarfile.open(join(source_dir, folder + '.tar')) as tar_ref:
try:
tar_ref.extractall(join(tmp_path, folder))
except Exception:
# logger.warning('Unzip fails for {}'.format(folder))
logger.error('Unzip fails for {}'.format(folder))
return
except IOError as e:
logger.error('Unzip fails for {}: {}'.format(folder, e))
# return
try:
QA_band = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_QA_PIXEL.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=1, srcNodata=1, outputType=gdal.GDT_UInt16).ReadAsArray()
except ValueError as e:
# logger.error('Cannot open QA band for {}: {}'.format(folder, e))
logger.error('Cannot open QA band for {}: {}'.format(folder, e))
return
# convertQA = np.vectorize(qabitval)
QA_band_unpacked = qabitval_array_c2(QA_band).astype(np.short)
if clear_threshold > 0:
clear_ratio = np.sum(np.logical_or(QA_band_unpacked == QA_CLEAR,
QA_band_unpacked == QA_WATER)) \
/ np.sum(QA_band_unpacked != QA_FILL)
else:
clear_ratio = 1
if clear_ratio > clear_threshold:
if folder[3] == '5':
sensor = 'LT5'
elif folder[3] == '7':
sensor = 'LE7'
elif folder[3] == '8':
sensor = 'LC8'
elif folder[3] == '9':
sensor = 'LC9'
elif folder[3] == '4':
sensor = 'LT4'
else:
logger.error('Sensor is not correctly formatted for the scene {}'.format(folder))
path = folder[10:13]
row = folder[13:16]
year = folder[17:21]
doy = datetime(int(year), int(folder[21:23]), int(folder[23:25])).strftime('%j')
collection = "C2"
version = folder[len(folder) - 2:len(folder)]
file_name = sensor + path + row + year + doy + collection + version
if low_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) < low_date_bound:
return True
if upp_date_bound is not None:
if pd.Timestamp.toordinal(parse((dt.datetime(int(year), 1, 1) +
dt.timedelta(int(doy) - 1)).strftime('%Y-%m-%d'))) > upp_date_bound:
return True
if sensor == 'LT5' or sensor == 'LE7' or sensor == 'LT4':
try:
# B1 = gdal_array.LoadFile(join(join(tmp_path, folder),
# "{}_SR_B1.TIF".format(folder)))
B1 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B1.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B2 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B2.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0,
outputType=gdal.GDT_UInt16).ReadAsArray()
B3 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B3.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0,
outputType=gdal.GDT_UInt16).ReadAsArray()
B4 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B4.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0,
outputType=gdal.GDT_UInt16).ReadAsArray()
B5 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B5.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B6 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B7.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B7 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_ST_B6.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
except ValueError as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return
elif sensor == 'LC8' or 'LC9':
try:
B1 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B2.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B2 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B3.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B3 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B4.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B4 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B5.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B5 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B6.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B6 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_SR_B7.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
B7 = gdal.Warp(os.path.join(tmp_path, '_tmp_img'), gdal.Open(join(join(tmp_path, folder),
"{}_ST_B10.TIF".format(folder))),
outputBounds=[bounds[0], bounds[1], bounds[2], bounds[3]], xRes=res, yRes=res,
dstNodata=0, srcNodata=0, outputType=gdal.GDT_UInt16).ReadAsArray()
except ValueError as e:
# logger.error('Cannot open spectral bands for {}: {}'.format(folder, e))
logger.error('Cannot open Landsat bands for {}: {}'.format(folder, e))
return
if (B1 is None) or (B2 is None) or (B3 is None) or (B4 is None) or (B5 is None) or (B6 is None) or \
(B7 is None):
logger.error('Reading Landsat band fails for {}'.format(folder))
return
# source: https://www.usgs.gov/faqs/how-do-i-use-scale-factor-landsat-level-2-science-products?qt-
# news_science_products=0#qt-news_science_products recommended by yongquan
B1 = (10000 * (B1 * 2.75e-05 - 0.2)).astype(np.int16)
B2 = (10000 * (B2 * 2.75e-05 - 0.2)).astype(np.int16)
B3 = (10000 * (B3 * 2.75e-05 - 0.2)).astype(np.int16)
B4 = (10000 * (B4 * 2.75e-05 - 0.2)).astype(np.int16)
B5 = (10000 * (B5 * 2.75e-05 - 0.2)).astype(np.int16)
B7 = (10000 * (B7 * 2.75e-05 - 0.2)).astype(np.int16)
B6 = (10 * (B6 * 0.00341802 + 149)).astype(np.int16)
if is_partition is True:
b_width = int(config['n_cols'] / config['n_block_x']) # width of a block
b_height = int(config['n_rows'] / config['n_block_y'])
bytesize = 2 # short16 = 2 * byte
# source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-
# in-python-using-numpy-d1bf0dd7b6f7
B1_blocks = np.lib.stride_tricks.as_strided(B1, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B2_blocks = np.lib.stride_tricks.as_strided(B2, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B3_blocks = np.lib.stride_tricks.as_strided(B3, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B4_blocks = np.lib.stride_tricks.as_strided(B4, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B5_blocks = np.lib.stride_tricks.as_strided(B5, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B6_blocks = np.lib.stride_tricks.as_strided(B6, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
B7_blocks = np.lib.stride_tricks.as_strided(B7, shape=(config['n_block_y'],
config['n_block_x'], b_height, b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize, bytesize))
QA_blocks = np.lib.stride_tricks.as_strided(QA_band_unpacked,
shape=(config['n_block_y'],
config['n_block_x'], b_height,
b_width),
strides=(config['n_cols'] * b_height * bytesize,
b_width * bytesize,
config['n_cols'] * bytesize,
bytesize))
for i in range(config['n_block_y']):
for j in range(config['n_block_x']):
# check if no valid pixels in the chip, then eliminate
qa_unique = np.unique(QA_blocks[i][j])
# skip blocks are all cloud, shadow or filled values
# in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD',
# 'QA_SHADOW', or FILLED value (255)
if QA_CLEAR not in qa_unique and \
QA_WATER not in qa_unique and \
QA_SNOW not in qa_unique:
continue
block_folder = 'block_x{}_y{}'.format(j + 1, i + 1)
np.save(join(join(out_dir, block_folder), file_name),
np.dstack([B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j],
B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j], QA_blocks[i][j]]).astype(np.int16))
else:
np.save(join(out_dir, file_name), np.dstack([B1, B2, B3, B4, B5, B6, B7, QA_band_unpacked]).astype(np.int16))
# scene_list.append(folder_name)
else:
# logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
logger.warn('Not enough clear observations for {}'.format(folder[0:len(folder) - 3]))
# delete unzip folder
shutil.rmtree(join(tmp_path, folder), ignore_errors=True)
shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True)
[docs]
def checkfinished_step1(tmp_path):
"""
:param tmp_path:
:return:
"""
if not os.path.exists(tmp_path):
return False
return True
[docs]
def checkfinished_step2(out_dir, n_cores):
"""
:param out_dir:
:param n_cores:
:return:
"""
for i in range(n_cores):
if not os.path.exists(join(out_dir, 'rank{}_finished.txt'.format(i + 1))):
return False
return True
[docs]
def checkfinished_step3_partition(out_dir):
"""
:param out_dir:
:return:
"""
if not os.path.exists(join(out_dir, "starting_last_dates.txt")):
return False
else:
return True
[docs]
def checkfinished_step3_nopartition(out_dir):
"""
:param out_dir:
:return:
"""
if not os.path.exists(join(out_dir, "scene_list.txt")):
return False
return True
[docs]
def get_extent(extent_geojson, res, buf=0):
"""
read shapefile of a tile from an S3 bucket, and convert projection to be aligned with sample image.
arg:
'extent_geojson': sharply geojson object
res: planet resolution
return:
(float, float, float, float), (int, int)) tuple
"""
# txmin = min([row[0] for row in extent_geojson['coordinates'][0]]) - res / 2.0
# txmax = max([row[0] for row in extent_geojson['coordinates'][0]]) + res / 2.0
# tymin = min([row[1] for row in extent_geojson['coordinates'][0]]) - res / 2.0
# tymax = max([row[1] for row in extent_geojson['coordinates'][0]]) + res / 2.0
txmin = extent_geojson['bbox'][0] - res * buf
txmax = extent_geojson['bbox'][2] + res * buf
tymin = extent_geojson['bbox'][1] - res * buf
tymax = extent_geojson['bbox'][3] + res * buf
n_row = np.ceil((tymax - tymin) / res)
n_col = np.ceil((txmax - txmin) / res)
txmin_new = (txmin + txmax) / 2 - n_col / 2 * res
txmax_new = (txmin + txmax) / 2 + n_col / 2 * res
tymin_new = (tymin + tymax) / 2 - n_row / 2 * res
tymax_new = (tymin + tymax) / 2 + n_row / 2 * res
return (txmin_new, txmax_new, tymin_new, tymax_new), (n_row, n_col)
[docs]
def get_feature(shp, id):
for feature in shp:
if feature['properties']['id'] == id:
return feature
[docs]
def explode(coords):
"""Explode a GeoJSON geometry's coordinates object and yield coordinate tuples.
As long as the input is conforming, the type of the geometry doesn't matter."""
for e in coords:
if isinstance(e, (float, int)):
yield coords
break
else:
for f in explode(e):
yield f
[docs]
def bbox(f):
x, y = zip(*list(explode(f['geometry']['coordinates'])))
return min(x), min(y), max(x), max(y)
@click.command()
@click.option('--source_dir', type=str, default=None, help='the folder directory of Landsat tar files '
'downloaded from USGS website')
@click.option('--out_dir', type=str, default=None, help='the folder directory for storing stacks')
@click.option('--clear_threshold', type=float, default=0, help='user-defined clear pixel proportion')
@click.option('--single_path', type=bool, default=True, help='indicate if using single_path or sidelap')
@click.option('--rank', type=int, default=1, help='the rank id')
@click.option('--n_cores', type=int, default=1, help='the total cores assigned')
@click.option('--is_partition/--continuous', default=True, help='partition the output to blocks')
@click.option('--yaml_path', type=str, help='yaml file path')
@click.option('--hpc/--dhtc', default=True, help='if it is set for HPC or DHTC environment')
@click.option('--low_date_bound', type=str, default=None, help='the lower bound of the year range of user interest')
@click.option('--upp_date_bound', type=str, default=None, help='the upper bound of the year range of user interest')
@click.option('--collection', type=click.Choice(['ARD', 'C2', 'HLS', 'HLS14']), default='ARD',
help='image source')
@click.option('--shapefile_path', type=str, default=None)
@click.option('--id', type=int, default=0)
def main(source_dir, out_dir, clear_threshold, single_path, rank, n_cores, is_partition, yaml_path, hpc, low_date_bound,
upp_date_bound, collection, shapefile_path, id):
if not os.path.exists(source_dir):
print('Source directory not exists!')
# select only _SR
if collection == 'ARD':
folder_list = [f[0:len(f) - 4] for f in listdir(source_dir) if
(isfile(join(source_dir, f)) and f.endswith('.tar')
and f[len(f) - 6:len(f) - 4] == 'SR')]
elif collection == 'C2':
folder_list = [f[0:len(f) - 4] for f in listdir(source_dir) if
(isfile(join(source_dir, f)) and f.endswith('.tar'))]
elif collection == 'HLS':
folder_list = [f for f in listdir(source_dir) if f.startswith('HLS')]
elif collection == 'HLS14':
folder_list = [y for x in os.walk(source_dir) for y in glob(os.path.join(x[0], '*.hdf'))]
tmp_path = join(out_dir, 'tmp{}'.format(rank))
tz = timezone('US/Eastern')
with open(yaml_path) as yaml_obj:
config = yaml.safe_load(yaml_obj)
logger = None
# create needed folders
if rank == 1:
logging.basicConfig(filename=join(os.getcwd(), 'prepare_ard.log'),
filemode='w', level=logging.INFO) # mode = w enables the log file to be overwritten
logger = logging.getLogger(__name__)
logger.info('AutoPrepareDataARD starts: {}'.format(datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')))
if not os.path.exists(out_dir):
os.mkdir(out_dir)
if is_partition is True:
for i in range(config['n_block_y']):
for j in range(config['n_block_x']):
block_folder = 'block_x{}_y{}'.format(j + 1, i + 1)
if not os.path.exists(join(out_dir, block_folder)):
os.mkdir(join(out_dir, block_folder))
if not os.path.exists(tmp_path):
os.mkdir(tmp_path)
if hpc is True and collection == 'ARD':
# warp a tile-based single path tif
with importlib_resources.path('pycold.imagetool', 'singlepath_landsat_conus.tif') as conus_image_fpath:
# conus_image_fpath = (Path(__file__).parent / 'singlepath_landsat_conus.tif').resolve()
conus_image = gdal.Open(os.fspath(conus_image_fpath))
if os.path.exists(join(tmp_path, folder_list[0])):
shutil.rmtree(join(tmp_path, folder_list[0]), ignore_errors=True)
with tarfile.open(join(source_dir, folder_list[0] + '.tar')) as tar_ref:
try:
tar_ref.extractall(join(tmp_path, folder_list[0]))
except Exception:
logger.error('Unzip fails for {}'.format(folder_list[0]))
ref_image = gdal.Open(join(join(tmp_path, folder_list[0]), "{}B1.tif".format(folder_list[0])))
trans = ref_image.GetGeoTransform()
proj = ref_image.GetProjection()
xmin = trans[0]
ymax = trans[3]
xmax = xmin + trans[1] * ref_image.RasterXSize
ymin = ymax + trans[5] * ref_image.RasterYSize
params = gdal.WarpOptions(dstSRS=proj, outputBounds=[xmin, ymin, xmax, ymax],
width=ref_image.RasterXSize, height=ref_image.RasterYSize)
out_fpath = join(out_dir, 'singlepath_landsat_tile.tif')
dst = gdal.Warp(out_fpath, conus_image, options=params)
# must close the dst
dst = None # NOQA
out_img = None # NOQA
shutil.rmtree(join(tmp_path, folder_list[0]), ignore_errors=True)
if collection == 'ARD':
ordinal_dates = [pd.Timestamp.toordinal(dt.datetime(int(folder[15:19]), int(folder[19:21]),
int(folder[21:23])))
for folder in folder_list]
elif collection == 'C2':
ordinal_dates = [pd.Timestamp.toordinal(dt.datetime(int(folder[17:21]), int(folder[21:23]),
int(folder[23:25])))
for folder in folder_list]
elif collection == 'HLS':
ordinal_dates = [pd.Timestamp.toordinal(dt.datetime(int(folder[15:19]), 1, 1)) + int(folder[19:22]) - 1
for folder in folder_list]
elif collection == 'HLS14':
ordinal_dates = [pd.Timestamp.toordinal(dt.datetime(int(folder.split('/')[-1][15:19]), 1, 1))
+ int(folder.split('/')[-1][19:22]) - 1 for folder in folder_list]
ordinal_dates.sort()
file = open(join(out_dir, "starting_last_dates.txt"), "w+") # need to save out starting and
# lasting date for this tile
file.writelines("{}\n".format(str(np.max([ordinal_dates[0],
pd.Timestamp.toordinal(dt.datetime(parse(low_date_bound)))]))))
file.writelines("{}\n".format(str(np.min([ordinal_dates[-1],
pd.Timestamp.toordinal(dt.datetime(parse(upp_date_bound)))]))))
file.close()
else:
logging.basicConfig(filename=join(os.getcwd(), 'prepare_ard.log'),
filemode='a', level=logging.INFO) # mode = w enables the log file to be overwritten
logger = logging.getLogger(__name__)
# use starting_last_dates.txt to indicate all folder have been created. Very stupid way. Need improve it in future.
while not checkfinished_step1(join(out_dir, "starting_last_dates.txt")):
time.sleep(5)
# read a general path file which can indicate which pixel is assigned to which path
path_array = None
if single_path is True and collection == 'ARD':
# parse tile h and v from folder name
folder_name = os.path.basename(source_dir)
tile_h = int(folder_name[folder_name.find('h') + 1: folder_name.find('h') + 4])
tile_v = int(folder_name[folder_name.find('v') + 1: folder_name.find('v') + 4])
if hpc is True:
path_array = gdal_array.LoadFile(join(out_dir, 'singlepath_landsat_tile.tif'))
else:
try:
path_array = gdal_array.LoadFile(join(Path(os.path.realpath(__file__)).parent,
'singlepath_landsat_tile_crop_compress.tif'),
xoff=tile_h * config['n_cols'],
yoff=tile_v * config['n_rows'],
xsize=config['n_cols'],
ysize=config['n_rows']) # read a partial array
# from a large file
except IOError as e:
logger.error(e)
exit()
if collection == 'C2':
# gpd_tile = gpd.read_file(shapefile_path)
bounds = bbox(get_feature(fiona.open(shapefile_path), id))
for i in range(int(np.ceil(len(folder_list) / n_cores))):
new_rank = rank - 1 + i * n_cores
if new_rank > (len(folder_list) - 1): # means that all folder has been processed
break
folder = folder_list[new_rank]
single_image_stacking_collection2(tmp_path, source_dir, out_dir, folder, clear_threshold, logger,
config, bounds, is_partition=is_partition,
low_date_bound=low_date_bound, upp_date_bound=upp_date_bound)
elif collection == 'ARD':
# assign files to each core
for i in range(int(np.ceil(len(folder_list) / n_cores))):
new_rank = rank - 1 + i * n_cores
if new_rank > (len(folder_list) - 1): # means that all folder has been processed
break
folder = folder_list[new_rank]
single_image_stacking(tmp_path, source_dir, out_dir, folder, clear_threshold, path_array, logger,
config, is_partition=is_partition, low_date_bound=low_date_bound,
upp_date_bound=upp_date_bound)
elif collection == 'HLS':
# assign files to each core
for i in range(int(np.ceil(len(folder_list) / n_cores))):
new_rank = rank - 1 + i * n_cores
if new_rank > (len(folder_list) - 1): # means that all folder has been processed
break
folder = folder_list[new_rank]
single_image_stacking_hls(source_dir, out_dir, logger, config, folder, clear_threshold=clear_threshold,
is_partition=is_partition, low_date_bound=low_date_bound,
upp_date_bound=upp_date_bound)
elif collection == 'HLS14':
# assign files to each core
for i in range(int(np.ceil(len(folder_list) / n_cores))):
new_rank = rank - 1 + i * n_cores
if new_rank > (len(folder_list) - 1): # means that all folder has been processed
break
folder = folder_list[new_rank]
single_image_stacking_hls14(out_dir, logger, config, folder, clear_threshold=clear_threshold,
is_partition=is_partition, low_date_bound=low_date_bound,
upp_date_bound=upp_date_bound)
# create an empty file for signaling the core that has been finished
with open(os.path.join(out_dir, 'rank{}_finished.txt'.format(rank)), 'w'):
pass
# wait for other cores assigned
# while not checkfinished_step2(out_dir, n_cores):
# time.sleep(5)
shutil.rmtree(tmp_path, ignore_errors=True)
# create scene list after stacking is finished and remove folders. Not need it in DHTC
if rank == 1:
# remove tmp folder
logger.info('Stacking procedure finished: {}'.format(datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')))
if __name__ == '__main__':
main()