Source code for geowatch.tasks.depth.pl_highres_verify

# FIXME:
# Adds the "modules" subdirectory to the python path.
# See https://gitlab.kitware.com/smart/watch/-/merge_requests/148#note_1050127
# for discussion of how to refactor this in the future.
import geowatch_tpl  # NOQA

import warnings
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms

import pytorch_lightning as pl

from .backbone import get_backbone

from frame_field_learning import data_transforms
from frame_field_learning import local_utils
from frame_field_learning.model_multi import Multi_FrameFieldModel

from scipy import ndimage

dfactor = 25.5

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # PyTorch v0.4.0


#-------------------------------------------
# Modify the batch_norm layers
#-------------------------------------------

[docs] def modify_bn(model, track_running_stats=True, bn_momentum=0.1): for m in model.modules(): for child in m.children(): if isinstance(child, nn.BatchNorm2d): child.momentum = bn_momentum child.track_running_stats = track_running_stats if track_running_stats is False: child.running_mean = None child.running_var = None return model
#------------------------------------------------- # Depth/Label/Shadow/Facade Eestimation Module #-------------------------------------------------
[docs] class MultiTaskModel(pl.LightningModule): def __init__( self, batch_size: int = 1, checkpoint: str = None, config: dict = None, test_img_dir: str = None, test_img_list: str = None, gpus: str = '0', **kwargs, ): super().__init__(**kwargs) self.gpus = gpus self.checkpoint = checkpoint self.batch_size = batch_size self.test_img_dir = test_img_dir self.test_img_list = test_img_list self.config = config self.backbone = get_backbone(self.config["backbone_params"]) train_online_cuda_transform = None eval_online_cuda_transform = None self.net = Multi_FrameFieldModel( self.config, backbone=self.backbone, train_transform=train_online_cuda_transform, eval_transform=eval_online_cuda_transform) self.transform = data_transforms.get_online_cuda_transform( self.config, augmentations=self.config["data_aug_params"]["enable"])
[docs] def forward(self, x, tta=False): return self.net(x, tta)
[docs] def test_step(self, batch, batch_idx): out_arr = [] for i, image in enumerate(batch): if isinstance(image, dict): gid = image['id'] # img_info = image image = image['imgdata'] with torch.no_grad(): image_float = image / 255.0 mean = np.mean(image_float.reshape(-1, image_float.shape[-1]), axis=0) std = np.std(image_float.reshape(-1, image_float.shape[-1]), axis=0) batch2 = { "image": torchvision.transforms.functional.to_tensor(image)[None, ...], "image_mean": torch.from_numpy(mean)[None, ...], "image_std": torch.from_numpy(std)[None, ...], } batch2 = local_utils.batch_to_cuda(batch2) pred2, batch2 = self(batch2, tta=True) output_depth = pred2['depth'][0, 0, :, :].cpu().data.numpy() output_label = pred2['seg'][0, 0, :, :].cpu().data.numpy() weighted_depth = dfactor * output_depth alpha = 0.9 weighted_seg = alpha * output_label + (1.0 - alpha) * np.minimum(0.99, weighted_depth / 70.0) with warnings.catch_warnings(): warnings.simplefilter('ignore') tmp2 = 255 * anisotropic_diffusion(weighted_seg, niter=1, kappa=100, gamma=0.8) weighted_final = ndimage.median_filter(tmp2.astype(np.uint8), size=7) # Image.fromarray(weighted_final.astype(np.uint8)).save('/output/weighted_final.png') out_arr.append((gid, weighted_final)) return out_arr
[docs] @staticmethod def add_model_specific_args(parent_parser): # pragma: no-cover parser = parent_parser.add_argument_group("MultiTaskModel") parser.add_argument('--checkpoint', default=None, type=str, help='checkpoint to use for testing') parser.add_argument('--config', '--config', default=None, type=str, help='Name of the config file, excluding the .json file extension.') parser.add_argument('--test_img_dir', '--test_img_dir', default=None, type=str, help='directory where test images are located') parser.add_argument('--test_img_list', '--test_img_list', default=None, type=str, help='list of test images') parser.add_argument('--gpus', default='0', type=str, help='GPU') return parent_parser
# Vendored in to deal with 3.10 issue
[docs] def anisotropic_diffusion(img, niter=1, kappa=50, gamma=0.1, voxelspacing=None, option=1): r""" Edge-preserving, XD Anisotropic diffusion. Parameters ---------- img : array_like Input image (will be cast to numpy.float). niter : integer Number of iterations. kappa : integer Conduction coefficient, e.g. 20-100. ``kappa`` controls conduction as a function of the gradient. If ``kappa`` is low small intensity gradients are able to block conduction and hence diffusion across steep edges. A large value reduces the influence of intensity gradients on conduction. gamma : float Controls the speed of diffusion. Pick a value :math:`<= .25` for stability. voxelspacing : tuple of floats or array_like The distance between adjacent pixels in all img.ndim directions option : {1, 2, 3} Whether to use the Perona Malik diffusion equation No. 1 or No. 2, or Tukey's biweight function. Equation 1 favours high contrast edges over low contrast ones, while equation 2 favours wide regions over smaller ones. See [1]_ for details. Equation 3 preserves sharper boundaries than previous formulations and improves the automatic stopping of the diffusion. See [2]_ for details. Returns ------- anisotropic_diffusion : ndarray Diffused image. Notes ----- Original MATLAB code by Peter Kovesi, School of Computer Science & Software Engineering, The University of Western Australia, pk @ csse uwa edu au, <http://www.csse.uwa.edu.au> Translated to Python and optimised by Alistair Muldal, Department of Pharmacology, University of Oxford, <alistair.muldal@pharm.ox.ac.uk> Adapted to arbitrary dimensionality and added to the MedPy library Oskar Maier, Institute for Medical Informatics, Universitaet Luebeck, <oskar.maier@googlemail.com> June 2000 original version. - March 2002 corrected diffusion eqn No 2. - July 2012 translated to Python - August 2013 incorporated into MedPy, arbitrary dimensionality - References ---------- .. [1] P. Perona and J. Malik. Scale-space and edge detection using ansotropic diffusion. IEEE Transactions on Pattern Analysis and Machine Intelligence, 12(7):629-639, July 1990. .. [2] M.J. Black, G. Sapiro, D. Marimont, D. Heeger Robust anisotropic diffusion. IEEE Transactions on Image Processing, 7(3):421-432, March 1998. """ # define conduction gradients functions import numpy if option == 1: def condgradient(delta, spacing): return numpy.exp(-(delta / kappa)**2.) / float(spacing) elif option == 2: def condgradient(delta, spacing): return 1. / (1. + (delta / kappa)**2.) / float(spacing) elif option == 3: kappa_s = kappa * (2**0.5) def condgradient(delta, spacing): top = 0.5 * ((1. - (delta / kappa_s)**2.)**2.) / float(spacing) return numpy.where(numpy.abs(delta) <= kappa_s, top, 0) # initialize output array out = numpy.array(img, dtype=numpy.float32, copy=True) # set default voxel spacing if not supplied if voxelspacing is None: voxelspacing = tuple([1.] * img.ndim) # initialize some internal variables deltas = [numpy.zeros_like(out) for _ in range(out.ndim)] for _ in range(niter): # calculate the diffs for i in range(out.ndim): slicer = tuple([slice(None, -1) if j == i else slice(None) for j in range(out.ndim)]) deltas[i][slicer] = numpy.diff(out, axis=i) # update matrices matrices = [ condgradient( delta, spacing) * delta for delta, spacing in zip( deltas, voxelspacing)] # subtract a copy that has been shifted ('Up/North/West' in 3D case) by one # pixel. Don't as questions. just do it. trust me. for i in range(out.ndim): slicer = tuple([slice(1, None) if j == i else slice(None) for j in range(out.ndim)]) matrices[i][slicer] = numpy.diff(matrices[i], axis=i) # update the image out += gamma * (numpy.sum(matrices, axis=0)) return out