"""
Functions that may be moved to kwutil
"""
import itertools as it
# Backwards compatible variant used in geowatch util
[docs]
def distributed_subitems(items, max_num=None):
"""
Return a subset of items maximally distributed over the input index space.
I.e. the chosen indexes maximize the space between them.
Args:
items (List | Dict): an ordered indexable object
Returns:
List | Dict: a subset of the input with a length at most ``max_num``.
Note:
Prefer using the generator variant :func:`farthest_first` instead.
TODO:
- [X] Find a better name. ChatGPT suggests using "spread", which I
like. Maybe spreadsort, spreadshuffle, spredtraverse?
spreadtake, takespread?
- [ ] Figure out where this lives, probably kwutil.
- [X] maybe we should force this to be a generator? Or make a generator variant?
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> items = list(range(100))
>>> max_num = 5
>>> sub_items = distributed_subitems(items, max_num)
>>> print(sub_items)
[0, 25, 50, 75, 99]
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> items = {chr(i): i for i in range(ord('a'), ord('a') + 26)}
>>> max_num = 5
>>> sub_items = distributed_subitems(items, max_num)
>>> print(sub_items)
{'a': 97, 'g': 103, 'n': 110, 't': 116, 'z': 122}
"""
sub_index_gen = _farthest_first_indices(0, len(items))
sub_indices = sorted(it.islice(sub_index_gen, max_num))
if isinstance(items, dict):
item_keys = list(items.keys())
sub_keys = [item_keys[idx] for idx in sub_indices]
sub_items = {key: items[key] for key in sub_keys}
else:
sub_items = [items[idx] for idx in sub_indices]
return sub_items
[docs]
def farthest_first(items, max_num=None, first=None):
"""
Return a subset of items maximally distributed over the input index space.
I.e. the chosen indexes maximize the space between them.
Args:
items (List | Dict): an ordered indexable object
first (int | None): if specified, start with this index.
Returns:
List | Dict: a subset of the input with a length at most ``max_num``.
Note:
A farthest first sequence is not unique in general. The particular
order this function returns may change in the future.
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> items = list(range(100))
>>> max_num = 5
>>> sub_items = list(farthest_first(items, max_num))
>>> print(list(sub_items))
[99, 0, 50, 25, 75]
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> items = {chr(i): i for i in range(ord('a'), ord('a') + 26)}
>>> max_num = 5
>>> sub_items = list(farthest_first(items, max_num))
>>> print(dict(sub_items))
{'z': 122, 'a': 97, 'n': 110, 'g': 103, 't': 116}
Example:
>>> # Choose a custom starting location
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> import ubelt as ub
>>> items = list(range(10))
>>> results = []
>>> start, stop = 0, 10
>>> for first in range(start, stop):
>>> results += [list(farthest_first(items, first=first))]
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = [
[0, 9, 4, 2, 6, 1, 3, 5, 7, 8],
[1, 9, 5, 3, 7, 0, 2, 4, 6, 8],
[2, 9, 5, 0, 7, 1, 3, 4, 6, 8],
[3, 9, 0, 6, 1, 2, 4, 5, 7, 8],
[4, 9, 0, 2, 6, 1, 3, 5, 7, 8],
[5, 0, 9, 2, 7, 1, 3, 4, 6, 8],
[6, 0, 3, 9, 1, 2, 4, 5, 7, 8],
[7, 0, 3, 5, 9, 1, 2, 4, 6, 8],
[8, 0, 4, 2, 6, 1, 3, 5, 7, 9],
[9, 0, 5, 2, 7, 1, 6, 3, 8, 4],
]
"""
sub_index_gen = _farthest_first_indices(0, len(items), first=first)
if max_num is not None:
sub_index_gen = it.islice(sub_index_gen, max_num)
if isinstance(items, dict):
item_keys = list(items.keys())
for idx in sub_index_gen:
key = item_keys[idx]
value = items[key]
yield (key, value)
else:
for idx in sub_index_gen:
yield items[idx]
def _farthest_first_indices(start, stop, first=None):
"""
Given a ordered list of items, incrementally yield indexes such that each
new index maximizes the distance to the closest previously chosen index.
Args:
start (int): The inclusive starting index (typically 0)
stop (int): The exclusive maximum index (typically ``len(items)``)
first (int | None): The index to start from.
If unspecified the largest index is first.
Yields:
int: the next chosen index in the series
Notes:
* This is an instance of farthest-point traversal in 1D.
References:
.. [CSSE_167943] https://cs.stackexchange.com/questions/167943/is-this-knapsack-variant-named-studied-online-algorithm-for-farthest-from-pr
.. [WikiFarthestFirst] https://en.wikipedia.org/wiki/Farthest-first_traversal
CommandLine:
xdoctest -m geowatch.utils.util_kwutil _farthest_first_indices
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> total = 10
>>> start, stop = 0, 10
>>> gen = _farthest_first_indices(start, stop)
>>> result = list(gen)
>>> assert set(result) == set(range(start, stop))
>>> print(result)
[9, 0, 5, 2, 7, 1, 6, 3, 8, 4]
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> from geowatch.utils.util_kwutil import _farthest_first_indices
>>> from geowatch.utils.util_kwutil import _check_farthest_first_index_sequence
>>> import ubelt as ub
>>> start, stop = 0, 10
>>> gen = _farthest_first_indices(start, stop)
>>> results = []
>>> for first in range(start, stop):
>>> result = list(_farthest_first_indices(start, stop, first=first))
>>> assert _check_farthest_first_index_sequence(result)
>>> results += [result]
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = [
[0, 9, 4, 2, 6, 1, 3, 5, 7, 8],
[1, 9, 5, 3, 7, 0, 2, 4, 6, 8],
[2, 9, 5, 0, 7, 1, 3, 4, 6, 8],
[3, 9, 0, 6, 1, 2, 4, 5, 7, 8],
[4, 9, 0, 2, 6, 1, 3, 5, 7, 8],
[5, 0, 9, 2, 7, 1, 3, 4, 6, 8],
[6, 0, 3, 9, 1, 2, 4, 5, 7, 8],
[7, 0, 3, 5, 9, 1, 2, 4, 6, 8],
[8, 0, 4, 2, 6, 1, 3, 5, 7, 9],
[9, 0, 5, 2, 7, 1, 6, 3, 8, 4],
]
>>> result = list(gen)
>>> assert set(result) == set(range(start, stop))
>>> print(result)
print(list(_farthest_first_indices(0, 32)))
start = 0
for stop in range(100):
for first in range(start, stop):
result = list(_farthest_first_indices(start, stop, first=first))
assert _check_farthest_first_index_sequence(result)
"""
import ubelt as ub
if first is None or first == stop - 1:
# Standard case
if start < stop:
yield stop - 1
yield from _farthest_from_previous_helper(start, stop - 1)
else:
yield from _farthest_first_indices_generalized(start, stop, first)
# Broken
if 0:
# Case where first is specified
yield first
# In this case, the left and right are not always balanced, so we generate
# from whatever one is further until they balance. We have to include
# the "first" element in both left and right generators to ensure
# are generating elments far away from it, so we have to filter them out.
left_gen = _farthest_first_indices(start, first + 1)
right_gen = _farthest_first_indices(first, stop)
DEBUG = 1
if DEBUG:
from itertools import tee
right_gen, right_gen1 = tee(right_gen)
left_gen, left_gen1 = tee(left_gen)
print(f'start = {ub.urepr(start, nl=1)}')
print(f'stop={stop}')
print(f'first = {ub.urepr(first, nl=1)}')
print('setup left: ' + str(list(left_gen1)))
print('setup right: ' + str(list(right_gen1)))
# the first item in the left gen will be "first", which we already yeilded
# pop it off and ignore it.
next(left_gen)
# As long as first isn't the same as start or stop, then next will not raise StopIteration here.
next_left = next(left_gen, None)
next_right = next(right_gen, None)
"""
Observation:
the "best" distance halfs every time we take an item past the first.
def best_dist(sequence):
for idx, item in enumerate(sequence):
if idx == 0:
yield len(sequence)
else:
yield min([abs(p - item) for p in sequence[:idx]])
sequence = list(_farthest_first_indices(0, 32))
distance = list(best_dist(sequence))
print(ub.urepr(ub.dzip(sequence, distance), align=':'))
print(ub.urepr(ub.dict_hist(distance)))
If the previous distance id d:
then the next distance can only be on of:
d
d - 1
(d // 2)
(d // 2) + 1
sequence: [31, 0, 16, 8, 24, 4, 20, 12, 28, 2, 18, 10, 26, 6, 22, 14, 30, 1, 17, 9, 25, 5, 21, 13, 29, 3, 19, 11, 27, 7, 23, 15]
bestdist: ∞ 31, 15, 8, 7, 4, 4, 4, 3, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
The 0th selection is basically free and doesn't have a value.
the 1st will always have a distance of length of the sequence.
the 2nd selection will roughly half the distance
the 3rd will roughly half the distance again
the 4th will be within 1 of the halfed distance
the 5th will roughly half the distance again and there will be 4 of this magnitude
the 9th will be the next halving and have 8 items at the magnitude
The 17th item will be the next halving
The 33th will be the next half
so the pattern is the i-th item's distance is within at most this
number of halfings:
from numpy import log2, ceil
def num_halfings(i):
if i == 0:
return 0
else:
return ceil(log2(i))
def max_distance_at_step(i, total):
if i == 0:
return total
return total / (2 ** ceil(log2(i)))
for total in range(2048):
bound2 = np.array([max_distance_at_step(i, total) for i in range(total)], dtype=int)
halvings = np.array([num_halfings(i) for i in range(total)], dtype=int)
bound = np.floor(total / 2 ** halvings).astype(int)
sequence = np.array(list(_farthest_first_indices(0, total)))
distance = np.array(list(best_dist(sequence)))
print(sequence)
print(distance)
print(bound)
np.array(distance <= bound)
tightness = distance - bound
assert np.all(np.abs(tightness) <= 1)
so if we have a first element,
a left sequence an an optimal right sequence, we know the
size of the left and right sequence.
without loss of generality assume the right sequence is larger.
the right size has size R
and the left side has size L
in the case R > L
in either sequence, the distance will roughly half with the sequence:
0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, ... # https://oeis.org/A029837
say
start=0
stop=81
first=27
result = list(_farthest_first_indices(start, stop, first=first))
so
R = 53
L = 27
The max distance in each case will be...
[int(max_distance_at_step(i, R)) for i in range(1, 4)]
[int(max_distance_at_step(i, L)) for i in range(1, 4)]
"""
if next_left is None:
if next_right is not None:
# FIXME; something wrong here in the case where first = stop
yield next_right
# Ignore the next item in next right, it "first"
next(right_gen, None)
yield from right_gen
return
elif next_right is None:
yield next_left
yield from left_gen
return
left_dist = abs(next_left - first)
right_dist = abs(next_right - first)
left_size = first - start
right_size = stop - first - 1
print(f'left_size={left_size}')
print(f'right_size={right_size}')
# The second item in the right gen will be "first", so we pop it off
# to ignore it.
next(right_gen, None)
DEBUG = 1
if DEBUG:
from itertools import tee
right_gen, right_gen1 = tee(right_gen)
left_gen, left_gen1 = tee(left_gen)
print(f'start left: {next_left}, ' + str(list(left_gen1)))
print(f'start right: {next_right}, ' + str(list(right_gen1)))
#### FFF this doesn't work..
while True:
if left_dist == right_dist:
# At this point the distance to first heuristic breaks in the
# case where start=0, stop=10, first=2. I think it might be the
# case that when the heuristic breaks we can just interleave
# the remaining iterables, but I'm not sure if this holds.
total = (stop - start)
if DEBUG:
print('At a point where left and right dist are the same')
right_gen, right_gen1 = tee(right_gen)
left_gen, left_gen1 = tee(left_gen)
print(f'curr left: {next_left}, ' + str(list(left_gen1)))
print(f'curr right: {next_right}, ' + str(list(right_gen1)))
print(f'total = {ub.urepr(total, nl=1)}')
# if total % 2 == 0:
pairgen = it.zip_longest(left_gen, right_gen)
flatgen = it.chain([next_left], [next_right], *pairgen)
filtgen = filter(lambda x: x is not None, flatgen)
yield from filtgen
# else:
# pairgen = it.zip_longest(right_gen, left_gen)
# flatgen = it.chain([next_right], [next_left], *pairgen)
# filtgen = filter(lambda x: x is not None, flatgen)
# yield from filtgen
return
if left_dist > right_dist:
if DEBUG:
print(f'chose left {next_left=} {next_right=} {left_dist=} {right_dist=}')
yield next_left
next_left = next(left_gen, None)
if next_left is None:
# No more data on the left side, finish the right side
yield next_right
yield from right_gen
break
left_dist = abs(next_left - first)
else:
if DEBUG:
print(f'chose right {next_left=} {next_right=} {left_dist=} {right_dist=}')
yield next_right
next_right = next(right_gen, None)
if next_right is None:
# No more data on the right side, finish the left side
yield next_left
yield from left_gen
break
right_dist = abs(next_right - first)
def _check_farthest_first_index_sequence(sequence):
"""
Checks that the sequence of indices satisfies the farthest-first objective.
For each item in the sequence, it should be the further from any seen value
than any unseen value.
Example:
>>> # There are multiple ways a sequence can be valie
>>> assert _check_farthest_first_index_sequence([3, 0, 2, 1])
>>> assert _check_farthest_first_index_sequence([0, 3, 2, 1])
>>> assert not _check_farthest_first_index_sequence([0, 2, 3, 1])
"""
for idx, index in enumerate(sequence):
seen = sequence[:idx]
unseen = sequence[idx + 1:]
# Look at the distances from the chosen item to all seen items
this_dist_to_seens = {prev: abs(index - prev) for prev in seen}
# Look at the distances from the unchosen item to all seen items
other_dist_to_seens = {
other: {prev: abs(other - prev) for prev in seen}
for other in unseen
}
import ubelt as ub
if seen and unseen:
# The minimum distance from this to the seen items should be greater than
this_min_dist_to_seen_value = min(this_dist_to_seens.values())
# The minimum distance from any unseen item to the seen items
other_min_dist_to_seen_value = max([min(o.values()) for o in other_dist_to_seens.values()])
next_best = ub.argmax({k: min(o.values()) for k, o in other_dist_to_seens.items()})
if this_min_dist_to_seen_value < other_min_dist_to_seen_value:
print('----')
print(f'index={index}')
print(f'seen = {ub.urepr(seen, nl=0)}')
print(f'unseen = {ub.urepr(unseen, nl=0)}')
print(f'this_dist_to_seens = {ub.urepr(this_dist_to_seens, nl=1)}')
print(f'other_dist_to_seens = {ub.urepr(other_dist_to_seens, nl=1)}')
print(f'this_min_dist_to_seen_value={this_min_dist_to_seen_value}')
print(f'other_min_dist_to_seen_value={other_min_dist_to_seen_value}')
print(f'We chose the index {index}, but we could have chosen {next_best}')
print(f'sequence = {ub.urepr(sequence, nl=0)}')
return False
return True
def _farthest_from_previous_helper(start, stop):
if start < stop:
low_mid: int = (start + stop) // 2
high_mid: int = (start + stop + 1) // 2
left_gen = _farthest_from_previous_helper(start, low_mid)
right_gen = _farthest_from_previous_helper(high_mid, stop)
pairgen = it.zip_longest(left_gen, right_gen)
flatgen = it.chain.from_iterable(pairgen)
filtgen = filter(lambda x: x is not None, flatgen)
yield from filtgen
if low_mid < high_mid:
yield low_mid
def _farthest_first_indices_generalized(start, stop, first=None, distance_fn=None):
"""
Yield indices in a farthest-first traversal order for a 1D uniform grid [start, stop),
with an optional user-specified starting index.
This implementation uses a heap to maintain for each candidate the minimum distance
to any selected index, so that we can always pick the candidate that maximizes that distance.
Args:
start (int): inclusive start index.
stop (int): exclusive stop index.
first (int, optional): the starting index (must be in [start, stop)). Defaults to stop - 1.
distance_fn (callable, optional): A function that computes distance between two indices.
Defaults to absolute difference (L1 norm).
Yields:
int: the next index in the farthest-first order.
References:
https://chatgpt.com/c/67a37e86-f89c-8013-8ef8-400c209de0e0
TODO:
- [ ] Allow the user to specify a custom distance metric. The logic
should still work because we only care about the maximum distance
of unselected elements to their closest selected index.
Example:
>>> from geowatch.utils.util_kwutil import * # NOQA
>>> import ubelt as ub
>>> total = 10
>>> start, stop = 0, 10
>>> results = []
>>> for first in range(start, stop):
>>> results += [list(_farthest_first_indices_generalized(start, stop, first=first))]
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = [
[0, 9, 4, 2, 6, 1, 3, 5, 7, 8],
[1, 9, 5, 3, 7, 0, 2, 4, 6, 8],
[2, 9, 5, 0, 7, 1, 3, 4, 6, 8],
[3, 9, 0, 6, 1, 2, 4, 5, 7, 8],
[4, 9, 0, 2, 6, 1, 3, 5, 7, 8],
[5, 0, 9, 2, 7, 1, 3, 4, 6, 8],
[6, 0, 3, 9, 1, 2, 4, 5, 7, 8],
[7, 0, 3, 5, 9, 1, 2, 4, 6, 8],
[8, 0, 4, 2, 6, 1, 3, 5, 7, 9],
[9, 0, 4, 2, 6, 1, 3, 5, 7, 8],
]
>>> # This distance function just reverses the metric, so we find the
>>> # closest item instead of the furthest. It is a bit contrived, but
>>> # demos how a specific distance function can modify the results.
>>> results = []
>>> def distance_fn(i, j):
>>> return -abs(i - j)
>>> for first in range(start, stop):
>>> results += [list(_farthest_first_indices_generalized(start, stop, first=first, distance_fn=distance_fn))]
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[1, 0, 2, 3, 4, 5, 6, 7, 8, 9],
[2, 1, 0, 3, 4, 5, 6, 7, 8, 9],
[3, 2, 1, 0, 4, 5, 6, 7, 8, 9],
[4, 3, 2, 1, 0, 5, 6, 7, 8, 9],
[5, 4, 3, 2, 1, 0, 6, 7, 8, 9],
[6, 5, 4, 3, 2, 1, 0, 7, 8, 9],
[7, 6, 5, 4, 3, 2, 1, 0, 8, 9],
[8, 7, 6, 5, 4, 3, 2, 1, 0, 9],
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0],
]
"""
# Note: using sortedcontainers might have a more clear implementation this
# is probably still O(n**2), but I think we must be able to do better. not
# going to think about it too hard, this code path is not very likely to be
# needed, as the simple farther first implementation is what is mainly
# needed.
import heapq
if first is None:
first = stop - 1
if not (start <= first < stop):
raise ValueError("first must be in the range [start, stop)")
if distance_fn is None:
distance_fn = lambda a, b: abs(a - b) # NOQA
# Set of selected indices (kept sorted for convenience).
selected = [first]
yield first
# For each candidate (not yet selected), store its best known distance to a selected index.
best_dist = {
i: distance_fn(i, first)
for i in range(start, stop)
if i != first
}
# Maintain a heap keyed by negative distance (since heapq is a min-heap).
heap = []
for i, d in best_dist.items():
heapq.heappush(heap, (-d, i))
# Greedily select the candidate with the maximum current distance.
while heap:
neg_d, candidate = heapq.heappop(heap)
# If candidate was already selected, skip it.
if candidate not in best_dist:
continue
# If the min distance in the heap is wrong, then the item is outdated,
# and we need to skip it.
current_d = best_dist[candidate]
if -neg_d != current_d:
# The stored distance is outdated, pop it and skip, the correct
# value will exist in the queue later on.
continue
# Candidate is the farthest from any selected point.
yield candidate
selected.append(candidate)
# Remove candidate from best_dist so it isn’t chosen again.
best_dist.pop(candidate)
# Update best distances for all remaining candidates.
for other, old_d in best_dist.items():
new_d = distance_fn(other, candidate)
if new_d < old_d:
best_dist[other] = new_d
heapq.heappush(heap, (-new_d, other))