Source code for konigcell.compute3d

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : compute3d.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 24.08.2021


import os
import time
import textwrap
from concurrent.futures import Executor, ThreadPoolExecutor

import numpy    as      np
from tqdm       import  tqdm

from .voxels    import  Voxels
from .          import  kc3d, mode, utils




[docs]def dynamic3d( positions, mode, values = None, radii = None, voxels = None, resolution = None, xlim = None, ylim = None, zlim = None, executor = ThreadPoolExecutor, max_workers = None, verbose = True, ): '''Voxelize / rasterize a moving particle's trajectory onto a 3D voxel grid. This function is very general, so the description below may seem abstract; do check out the `examples` for specific applications and consult these docs if / as needed. The particle's `positions` are given as a 2D NumPy array, with three columns representing the X, Y and Z locations; each row corresponds to a single recorded particle position. Multiple trajectories can be separated by a row of NaN. As a particle moves in a straight line between two consecutive positions, it will be approximated as the convex hull of two spheres (centred at the given `positions`, with radii defined by the input `radii`) - like a cylinder with two spherical ends. Each such spherical cylinder defined by two consecutive particle positions will be rasterized onto a voxel grid. The values contained by the returned voxel grid depend on the input `mode`: 1. `kc.RATIO`: each voxel will contain the ratio of its intersected volume to the total volume of the spherical cylinder defined by a particle moving between two adjacent positions. This is useful for e.g. velocity probability distributions. 2. `kc.INTERSECTION`: each voxel will contain the intersected volume of the spherical cylinder; if the entire voxel was covered by the particle movement, it will simply contain the voxel volume `v_dx * v_dy * v_dz`. This is useful for e.g. residence time distributions. 3. `kc.PARTICLE`: each voxel will contain the volume spanned by the spherical cylinder. 4. `kc.ONE`: each intersected voxel will have one added to it. The volumes defined above can be multiplied by a particle-related quantity, e.g. particle velocity or time interval between two consecutive particle locations. They are defined by the input `values`; if unset all `values` are considered 1. The particles may have a radius; if the input `radii` is given as a single number, all particles will have this radius. Alternatively, the particle may change radius if `radii` is given as a NumPy vector containing a radius associated with each particle location. If unset (`None`), the particle is considered point-like / infinitesimally small. The voxel grid can be defined in two ways: 1. Set the input `voxels` to a pre-existing `konigcell.Voxels` to reuse it, minimising memory allocation. 2. Set the input `resolution` so that a new `konigcell.Voxels` will be created and returned by this function. If unset (`None`), `xlim`, `ylim` `zlim` will be computed automatically to contain all particle positions. The rasterization can be done in parallel, as defined by the input `executor`, which can be a ``concurrent.futures.Executor`` subclass; e.g. `ThreadPoolExecutor` for multi-threaded execution, or `mpi4py.futures` `MPIPoolExecutor` for distributing the computation across multiple MPI nodes. It can also be an *instance* of such class, configured beforehand. The number of parallel workers (threads, processes or ranks) is set by `max_workers`; if set to 1, execution is sequential (and produces better error messages). If unset, the `os.cpu_count()` is used. If `verbose` is True, the computation is timed and (hopefully) helpful messages are printed during execution. Parameters ---------- positions : (N, 3) np.ndarray[ndim=2, dtype=float64] The 3D particle positions. Separate multiple trajectories with a row of NaN. mode : kc.RATIO, kc.INTERSECTION, kc.PARTICLE, kc.ONE The rasterization mode, see above for full details. values : float, (N-1,) np.ndarray, optional The particle values to rasterize, will be multiplied with what the mode returns; if a single `float`, all values are set to it. Multiple values can be given in a NumPy array for each particle movement, so it needs 1 fewer values than positions (e.g. for particle positions A, B, C, there are only movements AB then BC). If unset (default), it is considered 1. radii : float, (N,) np.ndarray, optional Each particle's radius; if a single number, all particles are will have this radius. Multiple radii can be given in a NumPy array for each particle position. If `None` (default), the particle is considered point-like / infinitesimally small. voxels : konigcell.Voxels, optional A pre-created voxel grid to use; if unset, a new one will be created - so `resolution` must be set. resolution : 3-tuple, optional If `voxels` is unset, a new voxel grid will be created; this resolution contains the number of voxels in the X, Y and Z dimensions, e.g. ``(50, 50, 50)``. There must be at least 2 voxels in each dimension. xlim : 2-tuple, optional If `voxels` is unset, a new voxel grid will be created; you can manually set the physical rectangle spanned by this new grid as `[xmin, xmax]`. If unset, it is automatically computed to contain all particle positions. ylim : 2-tuple, optional Same as for `xlim`. zlim : 2-tuple, optional Same as for `xlim`. executor : concurrent.futures.Executor subclass or instance The parallel executor to use, implementing the `Executor` interface. For distributed computation with MPI, use `MPIPoolExecutor` from `mpi4py`. The default is `ThreadPoolExecutor`, which has the lowest overhead - useful because the main computation is done by C code releasing the GIL. max_workers : int, optional The maximum number of workers (threads, processes or ranks) to use by the parallel executor; if 1, it is sequential (and produces the clearest error messages should they happen). If unset, the `os.cpu_count()` is used. verbose : bool or str default True If `True`, time the computation and print the state of the execution. If `str`, show a message before loading bars. Examples -------- TODO ''' # Time voxellisation if verbose and not isinstance(verbose, str): start = time.time() # Type-checking inputs positions = np.asarray(positions, dtype = float, order = "C") if values is not None: if hasattr(values, "__iter__"): values = np.asarray(values, dtype = float, order = "C") else: values = float(values) * np.ones(len(positions) - 1, dtype = float) if radii is not None: if hasattr(radii, "__iter__"): radii = np.asarray(radii, dtype = float, order = "C") else: radii = float(radii) * np.ones(len(positions), dtype = float) utils.check_ncols(3, positions = positions) utils.check_lens(positions = positions, radii = radii) # Special case for dynamic3d: e.g. for 5 positions, there will be only 4 # cylinders -> only need 4 values if values is not None and len(values) != len(positions[:-1]): raise ValueError(textwrap.fill(( "For `dynamic3d`, there should be 1 fewer `values` than " f"`positions`. Received `len(positions) = {len(positions)}` and " f"`len(values) = {len(values)}`." ))) # If voxels is None, create a new Voxels grid if voxels is None: if resolution is None: raise ValueError("`resolution` must be defined if `voxels = None`") else: resolution = np.array(resolution, dtype = int) if resolution.ndim != 1 or len(resolution) != 3 or \ np.any(resolution < 2): raise ValueError(textwrap.fill(( "`resolution` must have exactly three elements (M, N, P), " f"all larger than 3. Received `resolution = {resolution}`." ))) offset = radii.max() if radii is not None else 0 if xlim is None: xlim = utils.get_cutoffs(offset, positions[:, 0]) if ylim is None: ylim = utils.get_cutoffs(offset, positions[:, 1]) if zlim is None: zlim = utils.get_cutoffs(offset, positions[:, 2]) voxels = Voxels.zeros(resolution, xlim, ylim, zlim) # Voxellise according to execution policy if executor == "seq" or (max_workers is not None and max_workers == 1): kc3d.dynamic3d( voxels.voxels, voxels.xlim, voxels.ylim, voxels.zlim, positions, mode, radii = radii, factors = values, ) else: # Split `positions` into `max_workers` chunks for parallel processing if max_workers is None: max_workers = os.cpu_count() # Each chunk must have at least 2 positions if len(positions) / max_workers < 2: max_workers = len(positions) // 2 shape = voxels.voxels.shape pos_chunks = utils.split(positions, max_workers, overlap = 1) rad_chunks = utils.split(radii, max_workers, overlap = 1) val_chunks = utils.split(values, max_workers, overlap = 1) vox_chunks = [np.zeros(shape) for _ in range(max_workers)] # If `executor` is a class (rather than an instance / object), # instantiate it executor_isclass = False if not isinstance(executor, Executor): executor_isclass = True executor = executor(max_workers) # Voxellise each chunk separately futures = [ executor.submit( kc3d.dynamic3d, vox_chunks[i], voxels.xlim, voxels.ylim, voxels.zlim, pos_chunks[i], mode, radii = rad_chunks[i], factors = val_chunks[i], omit_last = True, ) for i in range(max_workers - 1) ] # Voxellise the last chunk with omit_last = False futures.append( executor.submit( kc3d.dynamic3d, vox_chunks[-1], voxels.xlim, voxels.ylim, voxels.zlim, pos_chunks[-1], mode, radii = rad_chunks[-1], factors = val_chunks[-1], omit_last = False, ) ) # If verbose, show tqdm loading bar with text if verbose: desc = verbose if isinstance(verbose, str) else None futures = tqdm(futures, desc = desc) # Add all results onto the voxel grid voxels.voxels[:, :, :] += sum((f.result() for f in futures)) # Clean up all copies immediately to free memory del futures, pos_chunks, rad_chunks, val_chunks, vox_chunks if executor_isclass: executor.shutdown() if verbose and not isinstance(verbose, str): end = time.time() print(f"Voxellised in {end - start:3.3f} s") return voxels
[docs]def static3d( positions, mode, values = None, radii = None, voxels = None, resolution = None, xlim = None, ylim = None, zlim = None, executor = ThreadPoolExecutor, max_workers = None, verbose = True, ): '''Voxelize / rasterize static particles' positions onto a 3D voxel grid. This is exactly like the `dynamic3d` function, but particles are not considered to be moving - so they are rasterized a spheres. The input parameters are equivalent to `dynamic2d` - check its documentation for full details. Examples -------- TODO ''' # Time voxellisation if verbose and not isinstance(verbose, str): start = time.time() # Type-checking inputs positions = np.asarray(positions, dtype = float, order = "C") if values is not None: if hasattr(values, "__iter__"): values = np.asarray(values, dtype = float, order = "C") else: values = float(values) * np.ones(len(positions), dtype = float) if radii is not None: if hasattr(radii, "__iter__"): radii = np.asarray(radii, dtype = float, order = "C") else: radii = float(radii) * np.ones(len(positions), dtype = float) utils.check_ncols(3, positions = positions) utils.check_lens(positions = positions, values = values, radii = radii) # If voxels is None, create a new Voxels grid if voxels is None: if resolution is None: raise ValueError("`resolution` must be defined if `voxels = None`") else: resolution = np.array(resolution, dtype = int) if resolution.ndim != 1 or len(resolution) != 3 or \ np.any(resolution < 2): raise ValueError(textwrap.fill(( "`resolution` must have exactly three elements (M, N, P), " f"all larger than 2. Received `resolution = {resolution}`." ))) offset = radii.max() if radii is not None else 0 if xlim is None: xlim = utils.get_cutoffs(offset, positions[:, 0]) if ylim is None: ylim = utils.get_cutoffs(offset, positions[:, 1]) if zlim is None: zlim = utils.get_cutoffs(offset, positions[:, 2]) voxels = Voxels.zeros(resolution, xlim, ylim, zlim) # Voxellise according to execution policy if executor == "seq" or (max_workers is not None and max_workers == 1): kc3d.static3d( voxels.voxels, voxels.xlim, voxels.ylim, voxels.zlim, positions, mode, radii = radii, factors = values, ) else: # Split `positions` into `max_workers` chunks for parallel processing if max_workers is None: max_workers = os.cpu_count() # Each chunk must have at least 2 positions if len(positions) / max_workers < 2: max_workers = len(positions) // 2 shape = voxels.voxels.shape pos_chunks = utils.split(positions, max_workers) rad_chunks = utils.split(radii, max_workers) val_chunks = utils.split(values, max_workers) vox_chunks = [np.zeros(shape) for _ in range(max_workers)] # If `executor` is a class (rather than an instance / object), # instantiate it executor_isclass = False if not isinstance(executor, Executor): executor_isclass = True executor = executor(max_workers) # Voxellise each chunk separately futures = [ executor.submit( kc3d.static3d, vox_chunks[i], voxels.xlim, voxels.ylim, voxels.zlim, pos_chunks[i], mode, radii = rad_chunks[i], factors = val_chunks[i], ) for i in range(max_workers) ] # If verbose, show tqdm loading bar with text if verbose: desc = verbose if isinstance(verbose, str) else None futures = tqdm(futures, desc = desc) # Add all results onto the voxel grid voxels.voxels[:, :, :] += sum((f.result() for f in futures)) # Clean up all copies immediately to free memory del futures, pos_chunks, rad_chunks, val_chunks, vox_chunks if executor_isclass: executor.shutdown() if verbose and not isinstance(verbose, str): end = time.time() print(f"Voxellised in {end - start:3.3f} s") return voxels
[docs]def dynamic_prob3d( positions, values, radii = None, voxels = None, resolution = None, xlim = None, ylim = None, zlim = None, executor = ThreadPoolExecutor, max_workers = None, verbose = True, ): '''Compute the 3D probability distribution of a moving particle's specific quantity (e.g. velocity). This function computes the distribution of the input `values` across voxel cells. For example, computing the velocity distribution of a particle moving from `positions` A to B to C, we need to rasterize the velocity from `values[0]` on segment AB, then the velocity from `values[1]` on BC - therefore for N `positions` we will rasterize N-1 `values`. For multiple particle trajectories, simply separate them by a row of NaN in the input `positions`. All input parameters are equivalent to `dynamic3d` - check its documentation for full details. Examples -------- TODO ''' # Time voxellising if verbose: start = time.time() # Compute probability grid, where each voxel contains the values weighted # by the intersection volume; first compute values * weights... voxels = dynamic3d( positions, mode.INTERSECTION, values = values, radii = radii, voxels = voxels, resolution = resolution, xlim = xlim, ylim = ylim, zlim = zlim, executor = executor, max_workers = max_workers, verbose = "Step 1 / 2 :" if verbose else False, ) # ... then divide by the sum of weights igrid = voxels.copy() igrid.voxels[:, :, :] = 0. dynamic3d( positions, mode.INTERSECTION, radii = radii, voxels = igrid, executor = executor, max_workers = max_workers, verbose = "Step 2 / 2 :" if verbose else False, ) nonzero = (igrid.voxels != 0.) if nonzero.any(): voxels.voxels[nonzero] /= igrid.voxels[nonzero] # Correction for floating-point errors: threshold all voxels with values # below min(values); they can only exist due to FP errors minval = np.nanmin(values) voxels.voxels[voxels.voxels < minval] = minval if verbose: end = time.time() print(("Computed dynamic 3D probability distribution in " f"{end - start:3.3f} s")) return voxels
[docs]def static_prob3d( positions, values, radii = None, voxels = None, resolution = None, xlim = None, ylim = None, zlim = None, executor = ThreadPoolExecutor, max_workers = None, verbose = True, ): '''Compute the 3D probability distribution of static particles' specific quantities (e.g. velocity). This function computes the distribution of the input `values` across voxel cells for the static spherical particles at the input `positions`; it is the static counterpart of `dynamic_prob3d`. All input parameters are equivalent to `dynamic3d` - check its documentation for full details. Examples -------- TODO ''' # Time voxellising if verbose: start = time.time() # Compute probability grid, where each voxel contains the values weighted # by the intersection volume; first compute values * weights... voxels = static3d( positions, mode.INTERSECTION, values = values, radii = radii, voxels = voxels, resolution = resolution, xlim = xlim, ylim = ylim, zlim = zlim, executor = executor, max_workers = max_workers, verbose = "Step 1 / 2 :" if verbose else False, ) # ... then divide by the sum of weights igrid = voxels.copy() igrid.voxels[:, :, :] = 0. static3d( positions, mode.INTERSECTION, radii = radii, voxels = igrid, executor = executor, max_workers = max_workers, verbose = "Step 2 / 2 :" if verbose else False, ) nonzero = (igrid.voxels != 0.) if nonzero.any(): voxels.voxels[nonzero] /= igrid.voxels[nonzero] # Correction for floating-point errors: threshold all voxels with values # below min(values); they can only exist due to FP errors minval = np.nanmin(values) voxels.voxels[voxels.voxels < minval] = minval if verbose: end = time.time() print(("Computed static 3D probability distribution in " f"{end - start:3.3f} s")) return voxels