geowatch.utils.util_kwarray module¶
Functions that may eventually be moved to kwarray
- geowatch.utils.util_kwarray.cartesian_product(*arrays)[source]¶
Fast numpy version of itertools.product
TODO: Move to kwarray
- geowatch.utils.util_kwarray.tukey_biweight_loss(r, c=4.685)[source]¶
Beaton Tukey Biweight
- Computes the function :
- L(r) = (
(c ** 2) / 6 * (1 - 1 * (r / c) ** 2) ** 3) if abs(r) <= c else (c ** 2)
)
- Parameters:
r (float | ndarray) – residual parameter
c (float) – tuning constant (defaults to 4.685 which is 95% efficient for normal distributions of residuals)
Todo
[ ] Move elsewhere or find a package that provides it
[ ] Move elsewhere (kwarray?) or find a package that provides it
- Returns:
float | ndarray
References
https://en.wikipedia.org/wiki/Robust_statistics https://mathworld.wolfram.com/TukeysBiweight.html https://statisticaloddsandends.wordpress.com/2021/04/23/what-is-the-tukey-loss-function/ https://arxiv.org/pdf/1505.06606.pdf
Example
>>> from geowatch.utils.util_kwarray import * # NOQA >>> import ubelt as ub >>> r = np.linspace(-20, 20, 1000) >>> data = {'r': r} >>> grid = ub.named_product({ >>> 'c': [4.685, 2, 6], >>> }) >>> for kwargs in grid: >>> key = ub.urepr(kwargs, compact=1) >>> loss = tukey_biweight_loss(r, **kwargs) >>> data[key] = loss >>> import pandas as pd >>> melted = pd.DataFrame(data).melt(['r']) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> sns = kwplot.autosns() >>> kwplot.figure(fnum=1, doclf=True) >>> ax = sns.lineplot(data=melted, x='r', y='value', hue='variable', style='variable') >>> #ax.set_ylim(*robust_limits(melted.value))
- geowatch.utils.util_kwarray.asymptotic(x, offset=1, gamma=1, degree=0, horizontal=1)[source]¶
A function with a horizontal asymptote at
horizontal
- Parameters:
x (ndarray) – input parameter
offset (float) – shifts function to the left or the right
gamma (float) – higher values approach the asymptote more slowly
horizontal (float) – location of the horiztonal asymptote
Todo
[ ] Move elsewhere (kwarray?) or find a package that provides it
Example
>>> from geowatch.utils.util_kwarray import * # NOQA >>> import ubelt as ub >>> x = np.linspace(0, 27, 1000) >>> data = {'x': x} >>> grid = ub.named_product({ >>> #'gamma': [0.5, 1.0, 2.0, 3.0], >>> 'gamma': [1.0, 3.0], >>> 'degree': [0, 1, 2, 3], >>> 'offset': [0, 2], >>> 'horizontal': [1], >>> }) >>> for kwargs in grid: >>> key = ub.urepr(kwargs, compact=1) >>> data[key] = asymptotic(x, **kwargs) >>> import pandas as pd >>> melted = pd.DataFrame(data).melt(['x']) >>> print(melted) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> sns = kwplot.autosns() >>> kwplot.figure(fnum=1, doclf=True) >>> ax = sns.lineplot(data=melted, x='x', y='value', hue='variable', style='variable') >>> ax.set_ylim(0, 2)
- geowatch.utils.util_kwarray.robust_limits(values)[source]¶
# TODO: Proper Robust estimator for matplotlib ylim and general use
values = np.array([-1000, -4, -3, -2, 0, 2.7, 3.1415, 1, 2, 3, 4, 100000]) robust_limits(values)
- geowatch.utils.util_kwarray.unique_rows(arr, ordered=False)[source]¶
Note: function also added to kwarray and will be available in >0.5.20
Example
>>> import kwarray >>> from kwarray.util_numpy import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> arr = rng.randint(0, 2, size=(12, 3)) >>> arr_unique = unique_rows(arr) >>> print('arr_unique = {!r}'.format(arr_unique))
- geowatch.utils.util_kwarray.find_robust_normalizers(data, params='auto')[source]¶
Finds robust normalization statistics for a single observation
- Parameters:
data (ndarray) – a 1D numpy array where invalid data has already been removed
params (str | dict) – normalization params
- Returns:
normalization parameters
- Return type:
Todo
[ ] No Magic Numbers! Use first principles to deterimine defaults.
[ ] Probably a lot of literature on the subject.
[ ] Is this a kwarray function in general?
[ ] https://www.tandfonline.com/doi/full/10.1080/02664763.2019.1671961
Example
>>> data = np.random.rand(100) >>> norm_params1 = find_robust_normalizers(data, params='auto') >>> norm_params2 = find_robust_normalizers(data, params={'low': 0, 'high': 1.0}) >>> norm_params3 = find_robust_normalizers(np.empty(0), params='auto') >>> print('norm_params1 = {}'.format(ub.urepr(norm_params1, nl=1))) >>> print('norm_params2 = {}'.format(ub.urepr(norm_params2, nl=1))) >>> print('norm_params3 = {}'.format(ub.urepr(norm_params3, nl=1)))
- geowatch.utils.util_kwarray.apply_normalizer(data, normalizer, mask=None, set_value_at_mask=nan)[source]¶
- geowatch.utils.util_kwarray.normalize(arr, mode='linear', alpha=None, beta=None, out=None, min_val=None, max_val=None)[source]¶
Rebalance signal values via contrast stretching.
By default linearly stretches array values to minimum and maximum values.
- Parameters:
arr (ndarray) – array to normalize, usually an image
out (ndarray | None) – output array. Note, that we will create an internal floating point copy for integer computations.
mode (str) – either linear or sigmoid.
alpha (float) – Only used if mode=sigmoid. Division factor (pre-sigmoid). If unspecified computed as:
max(abs(old_min - beta), abs(old_max - beta)) / 6.212606
. Note this parameter is sensitive to if the input is a float or uint8 image.beta (float) – subtractive factor (pre-sigmoid). This should be the intensity of the most interesting bits of the image, i.e. bring them to the center (0) of the distribution. Defaults to
(max - min) / 2
. Note this parameter is sensitive to if the input is a float or uint8 image.min_val – override minimum value
max_val – override maximum value
References
https://en.wikipedia.org/wiki/Normalization_(image_processing)
Example
>>> raw_f = np.random.rand(8, 8) >>> norm_f = normalize(raw_f) >>> raw_f = np.random.rand(8, 8) * 100 >>> norm_f = normalize(raw_f) >>> assert isclose(norm_f.min(), 0) >>> assert isclose(norm_f.max(), 1) >>> raw_u = (np.random.rand(8, 8) * 255).astype(np.uint8) >>> norm_u = normalize(raw_u) >>> raw_m = (np.zeros((8, 8)) + 10) >>> norm_m = normalize(raw_m, min_val=0, max_val=20) >>> assert isclose(norm_m.min(), 0.5) >>> assert isclose(norm_m.max(), 0.5) >>> # Ensure that we're clamping if explicit min or max values >>> # are provided >>> raw_m = (np.zeros((8, 8)) + 10) >>> norm_m = normalize(raw_m, min_val=0, max_val=5) >>> assert isclose(norm_m.min(), 1.0) >>> assert isclose(norm_m.max(), 1.0)
Example
>>> # xdoctest: +REQUIRES(module:kwimage) >>> import kwimage >>> arr = kwimage.grab_test_image('lowcontrast') >>> arr = kwimage.ensure_float01(arr) >>> norms = {} >>> norms['arr'] = arr.copy() >>> norms['linear'] = normalize(arr, mode='linear') >>> norms['sigmoid'] = normalize(arr, mode='sigmoid') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1, doclf=True) >>> pnum_ = kwplot.PlotNums(nSubplots=len(norms)) >>> for key, img in norms.items(): >>> kwplot.imshow(img, pnum=pnum_(), title=key)
- geowatch.utils.util_kwarray.balanced_number_partitioning(items, num_parts)[source]¶
Greedy approximation to multiway number partitioning
Uses Greedy number partitioning method to minimize the size of the largest partition.
- Parameters:
items (np.ndarray) – list of numbers (i.e. weights) to split between paritions.
num_parts (int) – number of partitions
- Returns:
A list for each parition that contains the index of the items assigned to it.
- Return type:
List[np.ndarray]
References
https://en.wikipedia.org/wiki/Multiway_number_partitioning https://en.wikipedia.org/wiki/Balanced_number_partitioning
Example
>>> from geowatch.utils.util_kwarray import * # NOQA >>> items = np.array([1, 3, 29, 22, 4, 5, 9]) >>> num_parts = 3 >>> bin_assignments = balanced_number_partitioning(items, num_parts) >>> import kwarray >>> groups = kwarray.apply_grouping(items, bin_assignments) >>> bin_weights = [g.sum() for g in groups]
- geowatch.utils.util_kwarray.torch_array_equal(data1, data2, equal_nan=False) bool [source]¶
Example
>>> # xdoctest: +REQUIRES(module:torch) >>> import torch >>> data1 = torch.rand(5, 5) >>> data2 = data1 + 1 >>> result1 = torch_array_equal(data1, data2) >>> result3 = torch_array_equal(data1, data1) >>> assert result1 is False >>> assert result3 is True
Example
>>> # xdoctest: +REQUIRES(module:torch) >>> import torch >>> data1 = torch.rand(5, 5) >>> data1[0] = np.nan >>> data2 = data1 >>> result1 = torch_array_equal(data1, data2) >>> result3 = torch_array_equal(data1, data2, equal_nan=True) >>> assert result1 is False >>> assert result3 is True
- geowatch.utils.util_kwarray.combine_mean_stds(means, stds, nums=None, axis=None, keepdims=False, bessel=True)[source]¶
- Parameters:
means (array) – means[i] is the mean of the ith entry to combine
stds (array) – stds[i] is the std of the ith entry to combine
nums (array | None) – nums[i] is the number of samples in the ith entry to combine. if None, assumes sample sizes are infinite.
axis (int | Tuple[int] | None) – axis to combine the statistics over
keepdims (bool) – if True return arrays with the same number of dimensions they were given in.
bessel (int) – Set to 1 to enables bessel correction to unbias the combined std estimate. Only disable if you have the true population means, or you think you know what you are doing.
References
- SeeAlso:
development kwarray has a similar hidden function in util_averages. Might expose later.
Example
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 100, axis=0) >>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 100, axis=0) >>> nums = np.stack([np.array([10, 100, 10])] * 100, axis=0) >>> cm1, cs1, _ = combine_mean_stds(means, stds, nums, axis=None) >>> print('combo_mean = {}'.format(ub.urepr(cm1, nl=1))) >>> print('combo_std = {}'.format(ub.urepr(cs1, nl=1))) >>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 1, axis=0) >>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 1, axis=0) >>> nums = np.stack([np.array([10, 100, 10])] * 1, axis=0) >>> cm2, cs2, _ = combine_mean_stds(means, stds, nums, axis=None) >>> print('combo_mean = {}'.format(ub.urepr(cm2, nl=1))) >>> print('combo_std = {}'.format(ub.urepr(cs2, nl=1))) >>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 5, axis=0) >>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 5, axis=0) >>> nums = np.stack([np.array([10, 100, 10])] * 5, axis=0) >>> cm3, cs3, combo_num = combine_mean_stds(means, stds, nums, axis=1) >>> print('combo_mean = {}'.format(ub.urepr(cm3, nl=1))) >>> print('combo_std = {}'.format(ub.urepr(cs3, nl=1))) >>> assert np.allclose(cm1, cm2) and np.allclose(cm2, cm3) >>> assert not np.allclose(cs1, cs2) >>> assert np.allclose(cs2, cs3)
Example
>>> from geowatch.utils.util_kwarray import * # NOQA >>> means = np.random.rand(2, 3, 5, 7) >>> stds = np.random.rand(2, 3, 5, 7) >>> nums = (np.random.rand(2, 3, 5, 7) * 10) + 1 >>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=1, keepdims=1) >>> print('cs = {}'.format(ub.urepr(cs, nl=1))) >>> assert cm.shape == cs.shape == cn.shape ... >>> print(f'cm.shape={cm.shape}') >>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=(0, 2), keepdims=1) >>> assert cm.shape == cs.shape == cn.shape >>> print(f'cm.shape={cm.shape}') >>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=(1, 3), keepdims=1) >>> assert cm.shape == cs.shape == cn.shape >>> print(f'cm.shape={cm.shape}') >>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=None) >>> assert cm.shape == cs.shape == cn.shape >>> print(f'cm.shape={cm.shape}') cm.shape=(2, 1, 5, 7) cm.shape=(1, 3, 1, 7) cm.shape=(2, 1, 5, 1) cm.shape=()
- geowatch.utils.util_kwarray.apply_robust_normalizer(normalizer, imdata, imdata_valid, mask, dtype, copy=True)[source]¶
data = [self.dataset[idx] for idx in possibly_batched_index]
- File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 1004, in __getitem__
return self.getitem(index)
- File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 1375, in getitem
imdata_normalized = apply_robust_normalizer(
- File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 2513, in apply_robust_normalizer
imdata_valid_normalized = kwarray.normalize(
- File “/home/joncrall/code/kwarray/kwarray/util_numpy.py”, line 760, in normalize
old_min = np.nanmin(float_out)
File “<__array_function__ internals>”, line 5, in nanmin File “/home/joncrall/.pyenv/versions/3.10.5/envs/pyenv3.10.5/lib/python3.10/site-packages/numpy/lib/nanfunctions.py”, line 319, in nanmin
res = np.fmin.reduce(a, axis=axis, out=out, **kwargs)
- geowatch.utils.util_kwarray.biased_1d_weights(upweight_time, num_frames)[source]¶
import kwplot plt = kwplot.autoplt()
kwplot.figure() import sys, ubelt sys.path.append(ubelt.expandpath(‘~/code/watch’)) from geowatch.tasks.fusion.datamodules.kwcoco_dataset import * # NOQA
kwplot.figure(fnum=1, doclf=1) num_frames = 5 values = biased_1d_weights(0.5, num_frames) plt.plot(values) values = biased_1d_weights(0.1, num_frames) plt.plot(values) values = biased_1d_weights(0.0, num_frames) plt.plot(values) values = biased_1d_weights(0.9, num_frames) plt.plot(values) values = biased_1d_weights(1.0, num_frames) plt.plot(values)
- geowatch.utils.util_kwarray.argsort_threshold(arr, threshold=None, num_top=None, objective='maximize')[source]¶
Find all indexes over a threshold, but always return at least the num_top, and potentially more.
- Parameters:
arr (ndarray) – array of scores
threshold (float) – return indexes that are better than this threshold.
num_top (int) – always return at least this number of “best” indexes.
objective (str) – if maximize, filters things above the threshold, otherwise filters below the threshold.
- Returns:
top indexes
- Return type:
ndarray
Example
>>> from geowatch.utils.util_kwarray import * # NOQA >>> arr = np.array([0.3, .2, 0.1, 0.15, 0.11, 0.15, 0.2, 0.6, 0.32]) >>> argsort_threshold(arr, threshold=0.5, num_top=0) array([7]) >>> argsort_threshold(arr, threshold=0.5, num_top=3) array([7, 8, 0]) >>> argsort_threshold(arr, threshold=0.0, num_top=3)