geowatch.utils.util_kwarray module

Functions that may eventually be moved to kwarray

geowatch.utils.util_kwarray.cartesian_product(*arrays)[source]

Fast numpy version of itertools.product

TODO: Move to kwarray

Referencs:

https://stackoverflow.com/a/11146645/887074

geowatch.utils.util_kwarray.tukey_biweight_loss(r, c=4.685)[source]

Beaton Tukey Biweight

Computes the function :
L(r) = (

(c ** 2) / 6 * (1 - 1 * (r / c) ** 2) ** 3) if abs(r) <= c else (c ** 2)

)

Parameters:
  • r (float | ndarray) – residual parameter

  • c (float) – tuning constant (defaults to 4.685 which is 95% efficient for normal distributions of residuals)

Todo

  • [ ] Move elsewhere or find a package that provides it

  • [ ] Move elsewhere (kwarray?) or find a package that provides it

Returns:

float | ndarray

References

https://en.wikipedia.org/wiki/Robust_statistics https://mathworld.wolfram.com/TukeysBiweight.html https://statisticaloddsandends.wordpress.com/2021/04/23/what-is-the-tukey-loss-function/ https://arxiv.org/pdf/1505.06606.pdf

Example

>>> from geowatch.utils.util_kwarray import *  # NOQA
>>> import ubelt as ub
>>> r = np.linspace(-20, 20, 1000)
>>> data = {'r': r}
>>> grid = ub.named_product({
>>>     'c': [4.685, 2, 6],
>>> })
>>> for kwargs in grid:
>>>     key = ub.urepr(kwargs, compact=1)
>>>     loss = tukey_biweight_loss(r, **kwargs)
>>>     data[key] = loss
>>> import pandas as pd
>>> melted = pd.DataFrame(data).melt(['r'])
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> sns = kwplot.autosns()
>>> kwplot.figure(fnum=1, doclf=True)
>>> ax = sns.lineplot(data=melted, x='r', y='value', hue='variable', style='variable')
>>> #ax.set_ylim(*robust_limits(melted.value))
geowatch.utils.util_kwarray.asymptotic(x, offset=1, gamma=1, degree=0, horizontal=1)[source]

A function with a horizontal asymptote at horizontal

Parameters:
  • x (ndarray) – input parameter

  • offset (float) – shifts function to the left or the right

  • gamma (float) – higher values approach the asymptote more slowly

  • horizontal (float) – location of the horiztonal asymptote

Todo

  • [ ] Move elsewhere (kwarray?) or find a package that provides it

Example

>>> from geowatch.utils.util_kwarray import *  # NOQA
>>> import ubelt as ub
>>> x = np.linspace(0, 27, 1000)
>>> data = {'x': x}
>>> grid = ub.named_product({
>>>     #'gamma': [0.5, 1.0, 2.0, 3.0],
>>>     'gamma': [1.0, 3.0],
>>>     'degree': [0, 1, 2, 3],
>>>     'offset': [0, 2],
>>>     'horizontal': [1],
>>> })
>>> for kwargs in grid:
>>>     key = ub.urepr(kwargs, compact=1)
>>>     data[key] = asymptotic(x, **kwargs)
>>> import pandas as pd
>>> melted = pd.DataFrame(data).melt(['x'])
>>> print(melted)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> sns = kwplot.autosns()
>>> kwplot.figure(fnum=1, doclf=True)
>>> ax = sns.lineplot(data=melted, x='x', y='value', hue='variable', style='variable')
>>> ax.set_ylim(0, 2)
geowatch.utils.util_kwarray.robust_limits(values)[source]

# TODO: Proper Robust estimator for matplotlib ylim and general use

values = np.array([-1000, -4, -3, -2, 0, 2.7, 3.1415, 1, 2, 3, 4, 100000]) robust_limits(values)

geowatch.utils.util_kwarray.unique_rows(arr, ordered=False)[source]

Note: function also added to kwarray and will be available in >0.5.20

Example

>>> import kwarray
>>> from kwarray.util_numpy import *  # NOQA
>>> rng = kwarray.ensure_rng(0)
>>> arr = rng.randint(0, 2, size=(12, 3))
>>> arr_unique = unique_rows(arr)
>>> print('arr_unique = {!r}'.format(arr_unique))
geowatch.utils.util_kwarray.find_robust_normalizers(data, params='auto')[source]

Finds robust normalization statistics for a single observation

Parameters:
  • data (ndarray) – a 1D numpy array where invalid data has already been removed

  • params (str | dict) – normalization params

Returns:

normalization parameters

Return type:

Dict[str, str | float]

Todo

Example

>>> data = np.random.rand(100)
>>> norm_params1 = find_robust_normalizers(data, params='auto')
>>> norm_params2 = find_robust_normalizers(data, params={'low': 0, 'high': 1.0})
>>> norm_params3 = find_robust_normalizers(np.empty(0), params='auto')
>>> print('norm_params1 = {}'.format(ub.urepr(norm_params1, nl=1)))
>>> print('norm_params2 = {}'.format(ub.urepr(norm_params2, nl=1)))
>>> print('norm_params3 = {}'.format(ub.urepr(norm_params3, nl=1)))
geowatch.utils.util_kwarray.apply_normalizer(data, normalizer, mask=None, set_value_at_mask=nan)[source]
geowatch.utils.util_kwarray.normalize(arr, mode='linear', alpha=None, beta=None, out=None, min_val=None, max_val=None)[source]

Rebalance signal values via contrast stretching.

By default linearly stretches array values to minimum and maximum values.

Parameters:
  • arr (ndarray) – array to normalize, usually an image

  • out (ndarray | None) – output array. Note, that we will create an internal floating point copy for integer computations.

  • mode (str) – either linear or sigmoid.

  • alpha (float) – Only used if mode=sigmoid. Division factor (pre-sigmoid). If unspecified computed as: max(abs(old_min - beta), abs(old_max - beta)) / 6.212606. Note this parameter is sensitive to if the input is a float or uint8 image.

  • beta (float) – subtractive factor (pre-sigmoid). This should be the intensity of the most interesting bits of the image, i.e. bring them to the center (0) of the distribution. Defaults to (max - min) / 2. Note this parameter is sensitive to if the input is a float or uint8 image.

  • min_val – override minimum value

  • max_val – override maximum value

References

https://en.wikipedia.org/wiki/Normalization_(image_processing)

Example

>>> raw_f = np.random.rand(8, 8)
>>> norm_f = normalize(raw_f)
>>> raw_f = np.random.rand(8, 8) * 100
>>> norm_f = normalize(raw_f)
>>> assert isclose(norm_f.min(), 0)
>>> assert isclose(norm_f.max(), 1)
>>> raw_u = (np.random.rand(8, 8) * 255).astype(np.uint8)
>>> norm_u = normalize(raw_u)
>>> raw_m = (np.zeros((8, 8)) + 10)
>>> norm_m = normalize(raw_m, min_val=0, max_val=20)
>>> assert isclose(norm_m.min(), 0.5)
>>> assert isclose(norm_m.max(), 0.5)
>>> # Ensure that we're clamping if explicit min or max values
>>> # are provided
>>> raw_m = (np.zeros((8, 8)) + 10)
>>> norm_m = normalize(raw_m, min_val=0, max_val=5)
>>> assert isclose(norm_m.min(), 1.0)
>>> assert isclose(norm_m.max(), 1.0)

Example

>>> # xdoctest: +REQUIRES(module:kwimage)
>>> import kwimage
>>> arr = kwimage.grab_test_image('lowcontrast')
>>> arr = kwimage.ensure_float01(arr)
>>> norms = {}
>>> norms['arr'] = arr.copy()
>>> norms['linear'] = normalize(arr, mode='linear')
>>> norms['sigmoid'] = normalize(arr, mode='sigmoid')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.figure(fnum=1, doclf=True)
>>> pnum_ = kwplot.PlotNums(nSubplots=len(norms))
>>> for key, img in norms.items():
>>>     kwplot.imshow(img, pnum=pnum_(), title=key)
geowatch.utils.util_kwarray.balanced_number_partitioning(items, num_parts)[source]

Greedy approximation to multiway number partitioning

Uses Greedy number partitioning method to minimize the size of the largest partition.

Parameters:
  • items (np.ndarray) – list of numbers (i.e. weights) to split between paritions.

  • num_parts (int) – number of partitions

Returns:

A list for each parition that contains the index of the items assigned to it.

Return type:

List[np.ndarray]

References

https://en.wikipedia.org/wiki/Multiway_number_partitioning https://en.wikipedia.org/wiki/Balanced_number_partitioning

Example

>>> from geowatch.utils.util_kwarray import *  # NOQA
>>> items = np.array([1, 3, 29, 22, 4, 5, 9])
>>> num_parts = 3
>>> bin_assignments = balanced_number_partitioning(items, num_parts)
>>> import kwarray
>>> groups = kwarray.apply_grouping(items, bin_assignments)
>>> bin_weights = [g.sum() for g in groups]
geowatch.utils.util_kwarray.torch_array_equal(data1, data2, equal_nan=False) bool[source]

Example

>>> # xdoctest: +REQUIRES(module:torch)
>>> import torch
>>> data1 = torch.rand(5, 5)
>>> data2 = data1 + 1
>>> result1 = torch_array_equal(data1, data2)
>>> result3 = torch_array_equal(data1, data1)
>>> assert result1 is False
>>> assert result3 is True

Example

>>> # xdoctest: +REQUIRES(module:torch)
>>> import torch
>>> data1 = torch.rand(5, 5)
>>> data1[0] = np.nan
>>> data2 = data1
>>> result1 = torch_array_equal(data1, data2)
>>> result3 = torch_array_equal(data1, data2, equal_nan=True)
>>> assert result1 is False
>>> assert result3 is True
geowatch.utils.util_kwarray.combine_mean_stds(means, stds, nums=None, axis=None, keepdims=False, bessel=True)[source]
Parameters:
  • means (array) – means[i] is the mean of the ith entry to combine

  • stds (array) – stds[i] is the std of the ith entry to combine

  • nums (array | None) – nums[i] is the number of samples in the ith entry to combine. if None, assumes sample sizes are infinite.

  • axis (int | Tuple[int] | None) – axis to combine the statistics over

  • keepdims (bool) – if True return arrays with the same number of dimensions they were given in.

  • bessel (int) – Set to 1 to enables bessel correction to unbias the combined std estimate. Only disable if you have the true population means, or you think you know what you are doing.

References

https://stats.stackexchange.com/questions/55999/is-it-possible-to-find-the-combined-standard-deviation

SeeAlso:

development kwarray has a similar hidden function in util_averages. Might expose later.

Example

>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 100, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 100, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 100, axis=0)
>>> cm1, cs1, _ = combine_mean_stds(means, stds, nums, axis=None)
>>> print('combo_mean = {}'.format(ub.urepr(cm1, nl=1)))
>>> print('combo_std  = {}'.format(ub.urepr(cs1, nl=1)))
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 1, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 1, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 1, axis=0)
>>> cm2, cs2, _ = combine_mean_stds(means, stds, nums, axis=None)
>>> print('combo_mean = {}'.format(ub.urepr(cm2, nl=1)))
>>> print('combo_std  = {}'.format(ub.urepr(cs2, nl=1)))
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 5, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 5, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 5, axis=0)
>>> cm3, cs3, combo_num = combine_mean_stds(means, stds, nums, axis=1)
>>> print('combo_mean = {}'.format(ub.urepr(cm3, nl=1)))
>>> print('combo_std  = {}'.format(ub.urepr(cs3, nl=1)))
>>> assert np.allclose(cm1, cm2) and np.allclose(cm2,  cm3)
>>> assert not np.allclose(cs1, cs2)
>>> assert np.allclose(cs2, cs3)

Example

>>> from geowatch.utils.util_kwarray import *  # NOQA
>>> means = np.random.rand(2, 3, 5, 7)
>>> stds = np.random.rand(2, 3, 5, 7)
>>> nums = (np.random.rand(2, 3, 5, 7) * 10) + 1
>>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=1, keepdims=1)
>>> print('cs = {}'.format(ub.urepr(cs, nl=1)))
>>> assert cm.shape == cs.shape == cn.shape
...
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=(0, 2), keepdims=1)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=(1, 3), keepdims=1)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = combine_mean_stds(means, stds, nums, axis=None)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
cm.shape=(2, 1, 5, 7)
cm.shape=(1, 3, 1, 7)
cm.shape=(2, 1, 5, 1)
cm.shape=()
geowatch.utils.util_kwarray.apply_robust_normalizer(normalizer, imdata, imdata_valid, mask, dtype, copy=True)[source]

data = [self.dataset[idx] for idx in possibly_batched_index]

File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 1004, in __getitem__

return self.getitem(index)

File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 1375, in getitem

imdata_normalized = apply_robust_normalizer(

File “/home/joncrall/code/watch/geowatch/tasks/fusion/datamodules/kwcoco_dataset.py”, line 2513, in apply_robust_normalizer

imdata_valid_normalized = kwarray.normalize(

File “/home/joncrall/code/kwarray/kwarray/util_numpy.py”, line 760, in normalize

old_min = np.nanmin(float_out)

File “<__array_function__ internals>”, line 5, in nanmin File “/home/joncrall/.pyenv/versions/3.10.5/envs/pyenv3.10.5/lib/python3.10/site-packages/numpy/lib/nanfunctions.py”, line 319, in nanmin

res = np.fmin.reduce(a, axis=axis, out=out, **kwargs)

geowatch.utils.util_kwarray.biased_1d_weights(upweight_time, num_frames)[source]

import kwplot plt = kwplot.autoplt()

kwplot.figure() import sys, ubelt sys.path.append(ubelt.expandpath(‘~/code/watch’)) from geowatch.tasks.fusion.datamodules.kwcoco_dataset import * # NOQA

kwplot.figure(fnum=1, doclf=1) num_frames = 5 values = biased_1d_weights(0.5, num_frames) plt.plot(values) values = biased_1d_weights(0.1, num_frames) plt.plot(values) values = biased_1d_weights(0.0, num_frames) plt.plot(values) values = biased_1d_weights(0.9, num_frames) plt.plot(values) values = biased_1d_weights(1.0, num_frames) plt.plot(values)

geowatch.utils.util_kwarray.argsort_threshold(arr, threshold=None, num_top=None, objective='maximize')[source]

Find all indexes over a threshold, but always return at least the num_top, and potentially more.

Parameters:
  • arr (ndarray) – array of scores

  • threshold (float) – return indexes that are better than this threshold.

  • num_top (int) – always return at least this number of “best” indexes.

  • objective (str) – if maximize, filters things above the threshold, otherwise filters below the threshold.

Returns:

top indexes

Return type:

ndarray

Example

>>> from geowatch.utils.util_kwarray import *  # NOQA
>>> arr = np.array([0.3, .2, 0.1, 0.15, 0.11, 0.15, 0.2, 0.6, 0.32])
>>> argsort_threshold(arr, threshold=0.5, num_top=0)
array([7])
>>> argsort_threshold(arr, threshold=0.5, num_top=3)
array([7, 8, 0])
>>> argsort_threshold(arr, threshold=0.0, num_top=3)