"""
See: ~/code/watch/geowatch/utils/process_context.py
"""
[docs]
def get_cpu_mem_info():
cpu_info = get_cpu_info()
mem_info = get_mem_info()
system_info = {
'cpu_info': cpu_info,
'mem_info': mem_info,
}
return system_info
[docs]
def get_cpu_info():
import cpuinfo
cpu_info = cpuinfo.get_cpu_info()
return cpu_info
[docs]
def get_mem_info(with_units=False):
"""
Memory info is returned in bytes.
TODO:
- [ ] Should we use pint to give these numbers units?
References:
https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory
Example:
>>> from geowatch.utils import util_hardware
>>> import ubelt as ub
>>> mem_info = util_hardware.get_mem_info(with_units=1)
>>> print(f'mem_info = {ub.urepr(mem_info, nl=1)}')
total = mem_info['used']
tmp = (
# mem_info['cached'] +
# mem_info['buffers'] +
# mem_info['inactive'] +
mem_info['slab'])
green = total - tmp
print(f'green={green}')
percent = ((green / mem_info['total']) * 100).m
print(f'percent={percent}')
"""
import psutil
svmem_info = psutil.virtual_memory()
mem_info = dict(zip(svmem_info._fields, svmem_info))
if 0:
# Measure memory used by this process and its children
import psutil
import os
this_process = psutil.Process(os.getpid())
this_total_pcnt = 0
this_total_pcnt += this_process.memory_percent()
for child in this_process.children():
this_total_pcnt += child.memory_percent()
if 0:
# Measure memory explicitly used by userland processes
total_vms = 0
total_pcnt = 0
for proc in psutil.process_iter():
total_pcnt += proc.memory_percent()
total_vms += proc.memory_info().vms
if with_units:
from geowatch.utils import util_units
ureg = util_units.unit_registry()
bytes_keys = [
'total', 'available', 'used', 'free', 'active', 'inactive',
'buffers', 'cached', 'shared', 'slab', 'wired',
]
for key in bytes_keys:
if key in mem_info:
mem_info[key] = (mem_info[key] * ureg.bytes).to('gigabytes')
return mem_info
[docs]
def disk_info_of_path(path):
"""
Get disk info wrt where a file lives
WIP - needs more work
CommandLine:
xdoctest -m geowatch.utils.util_hardware disk_info_of_path
Returns:
dict - dictionary of information
Example:
>>> from geowatch.utils.util_hardware import * # NOQA
>>> path = '.'
>>> x = disk_info_of_path(path)
>>> import ubelt as ub
>>> print(ub.urepr(x))
TODO:
- [ ] Handle btrfs
- [ ] Handle whatever AWS uses
- [ ] Use udisksctl or udevadm
Ignore:
lsblk /dev/nvme1n1
lsblk -afs /dev/mapper/vgubuntu-root
lsblk -nasd /dev/mapper/vgubuntu-root
df $HOME --output=source,fstype
df $HOME/data/dvc-repos/smart_watch_dvc --output=source,fstype
df $HOME/data/dvc-repos/smart_watch_dvc-hdd --output=source,fstype
df . --output=source,fstype,itotal,iused,iavail,ipcent,size,used,avail,pcent,file,target
References:
https://askubuntu.com/questions/609708/how-to-find-hard-drive-brand-name-or-model
https://stackoverflow.com/questions/38615464/how-to-get-device-name-on-which-a-file-is-located-from-its-path-in-c
"""
import ubelt as ub
import os
path = ub.Path(path)
path = path.resolve()
# Returns the lvm name: e.g.
# Filesystem Type
# data zfs
# /dev/sde1 ext4
# /dev/md0 ext4
# /dev/mapper/vgubuntu-root ext4
# /dev/nvme1n1 f2fs
info = ub.cmd(f'df {path} --output=source,fstype', check=True)
parts = info['out'].split('\n')[1].rsplit(' ', 1)
source, filesystem = [p.strip() for p in parts]
hwinfo = {
'path': os.fspath(path),
'source': source,
'filesystem': filesystem,
}
if filesystem == 'zfs':
# Use ZFS to get more information
# info = ub.cmd(f'zpool list {source}', verbose=3)
# info = ub.cmd(f'zpool iostat {source}', verbose=3)
# info = ub.cmd(f'zpool list -H {source}', verbose=3)
# info = ub.cmd(f'zpool iostat -H {source}', verbose=3)
try:
zfs_status = _zfs_status(source)
hwinfo['hwtype'] = zfs_status['coarse_type']
except Exception as ex:
print('error in zfs stuff: ex = {}'.format(ub.urepr(ex, nl=1)))
elif filesystem == 'overlay':
# This is the case on AWS. lsblk isnt able to provide us with more info
# I'm not sure how to determine more info.
# References:
# https://docs.kernel.org/filesystems/overlayfs.html
...
else:
try:
if _device_is_hdd(source):
hwinfo['hwtype'] = 'hdd'
else:
hwinfo['hwtype'] = 'ssd'
except Exception as ex:
print('warning: unable to infer disk info: ex = {}'.format(ub.urepr(ex, nl=1)))
try:
import json
info = ub.cmd(f'lsblk -as {source} --json')
lsblk_info = json.loads(info['out'])
walker = ub.IndexableWalker(lsblk_info)
names = []
for path, item in walker:
if isinstance(item, dict):
if 'name' in item:
names.append(item['name'])
hwinfo['names'] = names
except Exception:
pass
# print(ub.Path('/proc/partitions').read_text())
return hwinfo
def _device_is_hdd(path):
import ubelt as ub
import json
info = ub.cmd(f'lsblk -as {path} --json -o name,rota')
info.check_returncode()
lsblk_info = json.loads(info['out'])
walker = ub.IndexableWalker(lsblk_info)
rotas = []
for path, item in walker:
if isinstance(item, dict):
if 'rota' in item:
rotas.append(item['rota'])
return any(rotas)
def _zfs_status(pool, verbose=0):
"""
Semi-parsable zfs status output. This is a proof-of-concept and needs some
work to handle the nested pool structure.
"""
import ubelt as ub
import re
info = ub.cmd(f'zpool status {pool} -P', verbose=verbose)
info.check_returncode()
splitter = re.compile(r'\s+')
config = ub.ddict(list)
context = None
state = None
header = None
# stack = [] todo
for line in info.stdout.split('\n'):
indentation = line[:len(line) - len(line.lstrip())]
if not line.strip():
continue
if state is None:
if line.strip() == 'config:':
state = 'CONFIG'
elif state == 'CONFIG':
parts = splitter.split(line.strip())
if parts[0] == 'NAME':
state = 'NAME'
header = parts
elif state == 'NAME':
if len(indentation) == 1:
parts = splitter.split(line.strip())
row = ub.dzip(header, parts)
name = parts[0]
row['children'] = []
context = config[name] = row
elif len(indentation) > 1:
parts = splitter.split(line.strip())
row = ub.dzip(header, parts)
context['children'].append(row)
else:
state = None
# hack to get the data we currently need
dev_paths = []
for row in config[pool]['children']:
if row['NAME'].startswith('/dev'):
dev_paths.append(row['NAME'])
is_part_hdd = []
for path in dev_paths:
flag = _device_is_hdd(path)
is_part_hdd.append(flag)
if all(is_part_hdd):
coarse_type = 'hdd'
elif not any(is_part_hdd):
coarse_type = 'ssd'
else:
coarse_type = 'mixed'
output = {
'coarse_type': coarse_type,
'poc_config': config,
}
# print('records = {}'.format(ub.urepr(config, nl=True)))
return output