"""
Defines the :class:`ProcessContext` object, which is what mlops expects jobs to
be wrapped in.
TODO:
- [ ] Make "most" telemetry opt-in
"""
import platform
import socket
import sys
import os
import ubelt as ub
import uuid
from kwutil import util_environ
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY = util_environ.envflag('PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY', default=False)
PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY = util_environ.envflag('PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY', default=False)
[docs]
class ProcessContext:
"""
Context manager to track the context under which a result was computed.
This tracks things like start / end time. The command line that can
reproduce the process (assuming an appropriate environment. The
configuration the process was run with. The machine details the process was
run on. The power usage / carbon emissions the process used, and other
information.
Args:
args (str | List[str]):
This should be the sys.argv or the command line string that can be
used to rerun the process
config (Dict):
This should be a configuration dictionary (likely based on
sys.argv)
name (str): the name of this process
type (str): The type of this process
(usually keep the default of process)
request_all_telemetry (bool):
if False, telemetry is disabled. This is forced to False if
PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY is in the environment.
request_most_telemetry (bool):
if False, telemetry is disabled. This is forced to False if
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY is in the environment.
Note:
This module provides telemetry, which records user-identifiable
information. While useful, it does raise ethical concerns about user
privacy, and the people running this code have a right to know about it
and opt out. In the future we will change our policy to opt-in, but for
system stability, we are not changing defaults.
Note:
There are two levels of telemetry.
Enviornment telemetry. These are things like the machine the code was
run on. Use PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY=0 to opt-out.
The start / stop / sys.argv / config objects are necessary for mlops to
do anything. But these can leak information by containing system paths.
Emissions is also in this category. Use
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY to opt out.
CommandLine:
xdoctest -m geowatch.utils.process_context ProcessContext
Example:
>>> from geowatch.utils.process_context import *
>>> import torch
>>> import rich
>>> device = torch.device(0) if torch.cuda.is_available() else torch.device('cpu')
>>> # Adding things like disk info an tracking emission usage
>>> self = ProcessContext(track_emissions='offline')
>>> obj1 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> #
>>> # Telemetry can be mostly disabled
>>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False)
>>> obj2 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> # Telemetry can be completely disabled
>>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False)
>>> obj3 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3)))
>>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3)))
>>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
Example:
>>> from geowatch.utils.process_context import *
>>> # flush can measure intermediate progress
>>> self = ProcessContext(track_emissions='offline')
>>> self.add_disk_info('.')
>>> obj1 = self.start().flush()
>>> obj1_orig = obj1.copy()
>>> obj2 = self.stop()
"""
def __init__(self, name=None, type='process', args=None, config=None,
extra=None, track_emissions=False, request_all_telemetry=True,
request_most_telemetry=True):
if args is None:
args = sys.argv
else:
import warnings
warnings.warn(ub.paragraph(
'''
It is better to leave args unspecified so sys.argv is captured.
Be sure to specify ``config`` as the resolved config.
In the future we may add an extra param for unresolved configs.
'''))
self.properties = {
"name": name,
"args": args,
"config": config,
"machine": None,
"start_timestamp": None,
"stop_timestamp": None,
"duration": None,
"uuid": str(uuid.uuid4()),
"extra": extra,
}
self.obj = {
"type": type,
"properties": self.properties
}
self.track_emissions = track_emissions
self.emissions_tracker = None
self._emission_backend = 'auto'
self._started = False
if PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY:
request_all_telemetry = 0
else:
self.enable_all_telemetry = request_all_telemetry
if PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY:
request_most_telemetry = 0
else:
self.enable_most_telemetry = request_most_telemetry
if not self.enable_all_telemetry:
self.enable_most_telemetry = 0
self.properties.pop('config')
self.properties.pop('args')
[docs]
def write_invocation(self, invocation_fpath):
"""
Write a helper file that contains a locally reproducable invocation of
this process.
"""
import shlex
command = ' '.join(list(map(shlex.quote, self.properties['args'])))
invocation_fpath = ub.Path(invocation_fpath)
invocation_fpath.write_text(ub.codeblock(
f'''
#!/bin/bash
{command}
'''))
def _timestamp(self):
timestamp = ub.timestamp()
return timestamp
def _hostinfo(self):
if not self.enable_most_telemetry:
return {}
return {
"host": socket.gethostname(),
"user": ub.Path.home().name,
'cwd': os.fspath(ub.Path.cwd()),
"userhome": os.fspath(ub.Path.home()),
}
def _osinfo(self):
if not self.enable_most_telemetry:
return {}
(
uname_system,
_,
uname_release,
uname_version,
_,
uname_processor,
) = platform.uname()
return {
"os_name": uname_system,
"os_release": uname_release,
"os_version": uname_version,
"arch": uname_processor,
}
def _pyinfo(self):
if not self.enable_most_telemetry:
return {}
return {
"py_impl": platform.python_implementation(),
"py_version": sys.version.replace("\n", ""),
}
def _meminfo(self):
if not self.enable_most_telemetry:
return {}
import psutil
# TODO: could collect memory info at start and stop and intermediate
# stages. Here we just want info that is static wrt to the machine.
# For now, just get the total available.
svmem_info = psutil.virtual_memory()
return {
"mem_total": svmem_info.total,
}
def _cpuinfo(self):
if not self.enable_most_telemetry:
return {}
import cpuinfo
_cpu_info = cpuinfo.get_cpu_info()
cpu_info = {
"cpu_brand": _cpu_info["brand_raw"],
"cpu_count": _cpu_info["count"],
}
return cpu_info
def _gpuinfo(self):
import torch
num_devices = torch.cuda.device_count()
device_infos = []
for device_num in num_devices:
device = torch.device(device_num)
info = self._device_info(device)
device_infos.append(info)
return device_infos
def _machine(self):
if not self.enable_most_telemetry:
return {'telemetry_disabled': True}
return ub.dict_union(
self._hostinfo(),
self._meminfo(),
self._cpuinfo(),
self._osinfo(),
self._pyinfo(),
)
[docs]
def start(self):
self._started = True
if not self.enable_all_telemetry:
return self
self.properties.update({
"machine": self._machine(),
"start_timestamp": self._timestamp(),
"stop_timestamp": None,
})
if self.track_emissions:
self._start_emissions_tracker()
return self
[docs]
def flush(self):
if not self._started:
raise Exception("Must start before you flush")
if self.enable_all_telemetry:
self.properties["stop_timestamp"] = self._timestamp()
start_time = ub.timeparse(self.properties["start_timestamp"])
stop_time = ub.timeparse(self.properties["stop_timestamp"])
self.properties["duration"] = str(stop_time - start_time)
if self.emissions_tracker is not None:
try:
self._flush_emissions_tracker()
except Exception as ex:
print(f'warning: issue with emissions ex={ex}')
return self.obj
[docs]
def stop(self):
if not self._started:
raise Exception("Must start before you stop")
if self.enable_all_telemetry:
self.properties["stop_timestamp"] = self._timestamp()
start_time = ub.timeparse(self.properties["start_timestamp"])
stop_time = ub.timeparse(self.properties["stop_timestamp"])
self.properties["duration"] = str(stop_time - start_time)
if self.emissions_tracker is not None:
try:
self._stop_emissions_tracker()
except Exception as ex:
print(f'warning: issue with emissions ex={ex}')
return self.obj
def __enter__(self):
return self.start()
def __exit__(self, a, b, c):
self.stop()
def _start_emissions_tracker(self):
if not self.enable_all_telemetry:
return
emissions_tracker = None
if isinstance(self.track_emissions, str):
backend = self.track_emissions
elif self.track_emissions:
backend = 'auto'
if backend == 'auto':
backend = 'online'
if backend == 'online':
try:
from codecarbon import EmissionsTracker
"""
# emissions_tracker = EmissionsTracker(log_level='info')
"""
emissions_tracker = EmissionsTracker(log_level='error')
emissions_tracker.start()
except Exception as ex:
print('ex = {}'.format(ub.urepr(ex, nl=1)))
print('Online emissions tracker is not available. Trying offline')
if self._emission_backend == 'auto':
backend = 'offline'
if backend == 'offline':
try:
# TODO: allow configuration
from codecarbon import OfflineEmissionsTracker
emissions_tracker = OfflineEmissionsTracker(
country_iso_code='USA',
# region='Virginia',
# cloud_provider='aws',
# cloud_region='us-east-1',
# country_2letter_iso_code='us'
)
emissions_tracker.start()
except Exception as ex:
print('Non-Critical Warning: Unable to track carbon emissions ex = {!r}'.format(ex))
self.emissions_tracker = emissions_tracker
def _flush_emissions_tracker(self):
if self.emissions_tracker is None:
self.properties['emissions'] = None
return
self.emissions_tracker._measure_power_and_energy()
summary = emissions_data = self.emissions_tracker._prepare_emissions_data()
self.emissions_tracker._persist_data(emissions_data)
co2_kg = summary.emissions
total_kWH = summary.energy_consumed
# summary.cloud_provider
# summary.cloud_region
# summary.duration
# summary.emissions_rate
# summary.cpu_power
# summary.gpu_power
# summary.ram_power
# summary.cpu_energy
# summary.gpu_energy
# summary.ram_energy
emissions = {
'co2_kg': co2_kg,
'total_kWH': total_kWH,
'run_id': str(self.emissions_tracker.run_id),
}
try:
import pint
except Exception as ex:
print('Error stopping emissions tracker: ex = {!r}'.format(ex))
else:
reg = pint.UnitRegistry()
if co2_kg is None:
co2_kg = float('nan')
co2_ton = (co2_kg * reg.kg).to(reg.metric_ton)
dollar_per_ton = 15 / reg.metric_ton # cotap rate
emissions['co2_ton'] = co2_ton.m
emissions['est_dollar_to_offset'] = (co2_ton * dollar_per_ton).m
self.properties['emissions'] = emissions
def _stop_emissions_tracker(self):
if self.emissions_tracker is None:
self.properties['emissions'] = None
return
self.emissions_tracker.stop()
summary = self.emissions_tracker.final_emissions_data
co2_kg = summary.emissions
total_kWH = summary.energy_consumed
# summary.cloud_provider
# summary.cloud_region
# summary.duration
# summary.emissions_rate
# summary.cpu_power
# summary.gpu_power
# summary.ram_power
# summary.cpu_energy
# summary.gpu_energy
# summary.ram_energy
emissions = {
'co2_kg': co2_kg,
'total_kWH': total_kWH,
'run_id': str(self.emissions_tracker.run_id),
}
try:
import pint
except Exception as ex:
print('Error stopping emissions tracker: ex = {!r}'.format(ex))
else:
reg = pint.UnitRegistry()
if co2_kg is None:
co2_kg = float('nan')
co2_ton = (co2_kg * reg.kg).to(reg.metric_ton)
dollar_per_ton = 15 / reg.metric_ton # cotap rate
emissions['co2_ton'] = co2_ton.m
emissions['est_dollar_to_offset'] = (co2_ton * dollar_per_ton).m
self.properties['emissions'] = emissions
def _device_info(self, device):
import torch
try:
device_info = {
'device_index': device.index,
'device_type': device.type,
}
try:
device_props = torch.cuda.get_device_properties(device)
capabilities = (device_props.multi_processor_count, device_props.minor)
device_info.update({
'device_name': device_props.name,
'total_vram': device_props.total_memory,
'reserved_vram': torch.cuda.memory_reserved(device),
'allocated_vram': torch.cuda.memory_allocated(device),
'device_capabilities': capabilities,
'device_multi_processor_count': device_props.multi_processor_count,
})
except Exception:
pass
except Exception as ex:
print('Error adding device info: ex = {!r}'.format(ex))
device_info = str(ex)
return device_info
[docs]
def add_device_info(self, device):
"""
Add information about a torch device that was used in this process.
Does nothing if telemetry is disabled.
Args:
device (torch.device): torch device to add info about
"""
if not self.enable_most_telemetry:
return
self.properties['device_info'] = self._device_info(device)
[docs]
def add_disk_info(self, path):
"""
Add information about a storage disk that was used in this process
Does nothing if telemetry is disabled.
"""
if not self.enable_most_telemetry:
return
try:
from geowatch.utils import util_hardware
# Get information about disk used in this process
disk_info = util_hardware.disk_info_of_path(path)
except Exception as ex:
print('ex = {!r}'.format(ex))
print('ex = {!r}'.format(ex))
disk_info = str(ex)
self.properties['disk_info'] = disk_info
# def _test_offline():
# """
# xdoctest -m geowatch.utils.process_context ProcessContext
# """
# from codecarbon import OfflineEmissionsTracker
# emissions_tracker = OfflineEmissionsTracker(
# country_iso_code='USA',
# # region='Virginia',
# region='virginia',
# cloud_provider='aws',
# cloud_region='us-east-1',
# log_level='info',
# # country_2letter_iso_code='us'
# )
# emissions_tracker.start()
# emissions_tracker.stop()
# from codecarbon import EmissionsTracker
# emissions_tracker = EmissionsTracker(log_level='debug')
# emissions_tracker.start()
# emissions_tracker.stop()
# from codecarbon.external.geography import CloudMetadata, GeoMetadata
# geo = GeoMetadata(
# country_iso_code="USA", country_name="United States", region="Illinois"
# )
# self = ProcessContext(track_emissions=True)
# self.start()
# self.stop()
# _ = self.emissions_tracker._data_source.get_country_emissions_data('usa')
[docs]
def jsonify_config(config):
"""
Converts an object to a jsonifiable config as best as possible
"""
from kwcoco.util import util_json
if hasattr(config, 'asdict'):
config = config.asdict()
jsonified_config = util_json.ensure_json_serializable(config)
walker = ub.IndexableWalker(jsonified_config)
for problem in util_json.find_json_unserializable(jsonified_config):
bad_data = problem['data']
walker[problem['loc']] = str(bad_data)
return jsonified_config
[docs]
class Reconstruction:
# TODO
...
[docs]
def main():
"""
Simple CLI to get hardware measurements that process context would provide.
"""
# Adding things like disk info an tracking emission usage
self = ProcessContext(track_emissions=False)
obj = self.start().stop()
self.add_disk_info('.')
import torch
if torch.cuda.is_available():
device = torch.device(0)
self.add_device_info(device)
self.stop()
print('obj = {}'.format(ub.urepr(obj, nl=3)))
# def _codecarbon_mwe():
# from codecarbon import OfflineEmissionsTracker
# self = OfflineEmissionsTracker(
# country_iso_code='USA',
# # cloud_provider='gcp',
# # region='us-east-1',
# # country_2letter_iso_code='us'
# )
# self.start()
# self.flush()
# emissions_data = self._prepare_emissions_data()
# cloud = self._get_cloud_metadata()
# df = self._data_source.get_cloud_emissions_data()
if __name__ == '__main__':
"""
CommandLine:
python ~/code/watch/geowatch/utils/process_context.py
"""
main()