#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : compute2d.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 24.08.2021
import os
import time
import textwrap
from concurrent.futures import Executor, ThreadPoolExecutor
import numpy as np
from tqdm import tqdm
from .pixels import Pixels
from . import kc2d, mode, utils
[docs]def dynamic2d(
positions,
mode,
values = None,
radii = None,
pixels = None,
resolution = None,
xlim = None,
ylim = None,
executor = ThreadPoolExecutor,
max_workers = None,
verbose = True,
):
'''Pixelize / rasterize a moving particle's trajectory onto a 2D pixel
grid.
This function is very general, so the description below may seem abstract;
do check out the `examples` for specific applications and consult these
docs if / as needed.
The particle's `positions` are given as a 2D NumPy array, with two columns
representing the X and Y locations; each row corresponds to a single
recorded particle position. Multiple trajectories can be separated by a row
of NaN.
As a particle moves in a straight line between two consecutive positions,
it will be approximated as the convex hull of two circles (centred at the
given `positions`, with radii defined by the input `radii`) - like a
rectangle with two circular ends. Each such circular rectangle defined by
two consecutive particle positions will be rasterized onto a pixel grid.
The values contained by the returned pixel grid depend on the input
`mode`:
1. `kc.RATIO`: each pixel will contain the ratio of its intersected area to
the total area of the circular rectangle defined by a particle moving
between two adjacent positions. This is useful for e.g. velocity
probability distributions.
2. `kc.INTERSECTION`: each pixel will contain the intersected area of the
circular rectangle; if the entire pixel was covered by the particle
movement, it will simply contain the pixel area `p_dx * p_dy`. This is
useful for e.g. residence time distributions.
3. `kc.PARTICLE`: each pixel will contain the area spanned by the circular
rectangle.
4. `kc.ONE`: each intersected pixel will have one added to it.
The areas defined above can be multiplied by a particle-related quantity,
e.g. particle velocity or time interval between two consecutive particle
locations. They are defined by the input `values`; if unset all `values`
are considered 1.
The particles may have a radius; if the input `radii` is given as a single
number, all particles will have this radius. Alternatively, the particle
may change radius if `radii` is given as a NumPy vector containing a radius
associated with each particle location. If unset (`None`), the particle
is considered point-like / infinitesimally small.
The pixel grid can be defined in two ways:
1. Set the input `pixels` to a pre-existing `konigcell.Pixels` to reuse it,
minimising memory allocation.
2. Set the input `resolution` so that a new `konigcell.Pixels` will be
created and returned by this function. If unset (`None`), `xlim` and
`ylim` will be computed automatically to contain all particle positions.
The rasterization can be done in parallel, as defined by the input
`executor`, which can be a ``concurrent.futures.Executor`` subclass; e.g.
`ThreadPoolExecutor` for multi-threaded execution, or `mpi4py.futures`
`MPIPoolExecutor` for distributing the computation across multiple MPI
nodes. It can also be an *instance* of such class, configured beforehand.
The number of parallel workers (threads, processes or ranks) is set by
`max_workers`; if set to 1, execution is sequential (and produces better
error messages). If unset, the `os.cpu_count()` is used.
If `verbose` is True, the computation is timed and (hopefully) helpful
messages are printed during execution.
Parameters
----------
positions : (N, 2) np.ndarray[ndim=2, dtype=float64]
The 2D particle positions. Separate multiple trajectories with a row of
NaN.
mode : kc.RATIO, kc.INTERSECTION, kc.PARTICLE, kc.ONE
The rasterization mode, see above for full details.
values : float, (N-1,) np.ndarray, optional
The particle values to rasterize, will be multiplied with what the mode
returns; if a single `float`, all values are set to it. Multiple values
can be given in a NumPy array for each particle movement, so it needs
1 fewer values than positions (e.g. for particle positions A, B, C,
there are only movements AB then BC). If unset (default), it is
considered 1.
radii : float, (N,) np.ndarray, optional
Each particle's radius; if a single number, all particles are will have
this radius. Multiple radii can be given in a NumPy array for each
particle position. If `None` (default), the particle is considered
point-like / infinitesimally small.
pixels : kc.Pixels, optional
A pre-created pixel grid to use; if unset, a new one will be created -
so `resolution` must be set.
resolution : 2-tuple, optional
If `pixels` is unset, a new pixel grid will be created; this resolution
contains the number of pixels in the X and Y dimensions, e.g.
``(512, 512)``. There must be at least 2 pixels in each dimension.
xlim : 2-tuple, optional
If `pixels` is unset, a new pixel grid will be created; you can
manually set the physical rectangle spanned by this new grid as
`[xmin, xmax]`. If unset, it is automatically computed to contain all
particle positions.
ylim : 2-tuple, optional
Same as for `xlim`.
executor : concurrent.futures.Executor subclass or instance
The parallel executor to use, implementing the `Executor` interface.
For distributed computation with MPI, use `MPIPoolExecutor` from
`mpi4py`. The default is `ThreadPoolExecutor`, which has the lowest
overhead - useful because the main computation is done by C code
releasing the GIL.
max_workers : int, optional
The maximum number of workers (threads, processes or ranks) to use by
the parallel executor; if 1, it is sequential (and produces the
clearest error messages should they happen). If unset, the
`os.cpu_count()` is used.
verbose : bool or str default True
If `True`, time the computation and print the state of the execution.
If `str`, show a message before loading bars.
Examples
--------
Compute the occupancy grid of a 2D particle of radius 0.5 moving between 3
positions:
>>> import numpy as np
>>> positions = np.array([[0, 0], [1, 1], [2, 1]])
>>>
>>> import konigcell as kc
>>> pixels = kc.dynamic2d(
>>> positions,
>>> kc.INTERSECTION,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> xlim = [-2, 5],
>>> ylim = [-2, 5],
>>> )
The ``kc.INTERSECTION`` pixellisation mode adds the area shaded by the
particle's movement onto the grid.
For a residence time distribution, compute the time difference between the
particle's timestamps:
>>> times = np.array([0, 2, 3])
>>> dt = np.diff(times)
>>> pixels = kc.dynamic2d(
>>> positions,
>>> kc.RATIO,
>>> values = dt,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> )
The ``kc.RATIO`` pixellisation mode splits a given value (in this
case the time difference) across a particle's trajectory proportionally to
the shaded area.
If you omit ``xlim`` and ``ylim``, they will be computed automatically to
include all particle positions.
You can reuse the same pixels grid for multiple pixellisations:
>>> kc.dynamic2d(
>>> positions,
>>> kc.INTERSECTION,
>>> radii = 0.5,
>>> pixels = pixels,
>>> )
In this case the ``resolution``, ``xlim`` and ``ylim`` don't need to be set
anymore; they are extracted from the Pixels grid.
You can set the maximum number of workers for the parallel pixellisation;
e.g. for single-threaded execution:
>>> pixels = kc.dynamic2d(
>>> positions,
>>> kc.ONE,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> max_workers = 1,
>>> )
The ``kc.ONE`` pixellisation flag simply adds 1 to each pixel intersected
by the moving particle.
Finally, here is a complete example computing the residence time
distribution of a 2D particle moving randomly in a box of length 5:
>>> positions = np.random.random((10_000, 2)) * 5
>>> times = np.linspace(0, 100, 10_000)
>>> dt = np.diff(times)
>>>
>>> rtd = kc.dynamic2d(
>>> positions,
>>> kc.RATIO,
>>> values = dt,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> )
'''
# Time pixellisation
if verbose and not isinstance(verbose, str):
start = time.time()
# Type-checking inputs
positions = np.asarray(positions, dtype = float, order = "C")
if values is not None:
if hasattr(values, "__iter__"):
values = np.asarray(values, dtype = float, order = "C")
else:
values = float(values) * np.ones(len(positions) - 1, dtype = float)
if radii is not None:
if hasattr(radii, "__iter__"):
radii = np.asarray(radii, dtype = float, order = "C")
else:
radii = float(radii) * np.ones(len(positions), dtype = float)
utils.check_ncols(2, positions = positions)
utils.check_lens(positions = positions, radii = radii)
# Special case for dynamic2d: e.g. for 5 positions, there will be only 4
# cylinders -> only need 4 values
if values is not None and len(values) != len(positions[:-1]):
raise ValueError(textwrap.fill((
"For `dynamic2d`, there should be 1 fewer `values` than "
f"`positions`. Received `len(positions) = {len(positions)}` and "
f"`len(values) = {len(values)}`."
)))
# If pixels is None, create a new Pixels grid
if pixels is None:
if resolution is None:
raise ValueError("`resolution` must be defined if `pixels = None`")
else:
resolution = np.array(resolution, dtype = int)
if resolution.ndim != 1 or len(resolution) != 2 or \
np.any(resolution < 2):
raise ValueError(textwrap.fill((
"`resolution` must have exactly two elements (M, N), both "
f"larger than 2. Received `resolution = {resolution}`."
)))
if xlim is None:
offset = np.nanmax(radii) if radii is not None else 0
xlim = utils.get_cutoffs(offset, positions[:, 0])
if ylim is None:
offset = np.nanmax(radii) if radii is not None else 0
ylim = utils.get_cutoffs(offset, positions[:, 1])
pixels = Pixels.zeros(resolution, xlim, ylim)
# Pixellise according to execution policy
if max_workers is not None and max_workers == 1:
kc2d.dynamic2d(
pixels.pixels,
pixels.xlim,
pixels.ylim,
positions,
mode,
radii = radii,
factors = values,
)
else:
# Split `positions` into `max_workers` chunks for parallel processing
if max_workers is None:
max_workers = os.cpu_count()
# Each chunk must have at least 2 positions
if len(positions) / max_workers < 2:
max_workers = len(positions) // 2
shape = pixels.pixels.shape
pos_chunks = utils.split(positions, max_workers, overlap = 1)
rad_chunks = utils.split(radii, max_workers, overlap = 1)
val_chunks = utils.split(values, max_workers, overlap = 1)
pix_chunks = [np.zeros(shape) for _ in range(max_workers)]
# If `executor` is a class (rather than an instance / object),
# instantiate it
executor_isclass = False
if not isinstance(executor, Executor):
executor_isclass = True
executor = executor(max_workers)
# Pixellise each chunk separately
futures = [
executor.submit(
kc2d.dynamic2d,
pix_chunks[i],
pixels.xlim,
pixels.ylim,
pos_chunks[i],
mode,
radii = rad_chunks[i],
factors = val_chunks[i],
omit_last = True,
) for i in range(max_workers - 1)
]
# Pixellise the last chunk with omit_last = False
futures.append(
executor.submit(
kc2d.dynamic2d,
pix_chunks[-1],
pixels.xlim,
pixels.ylim,
pos_chunks[-1],
mode,
radii = rad_chunks[-1],
factors = val_chunks[-1],
omit_last = False,
)
)
# If verbose, show tqdm loading bar with text
if verbose:
desc = verbose if isinstance(verbose, str) else None
futures = tqdm(futures, desc = desc)
# Add all results onto the pixel grid
pixels.pixels[:, :] += sum((f.result() for f in futures))
# Clean up all copies immediately to free memory
del futures, pos_chunks, rad_chunks, val_chunks, pix_chunks
if executor_isclass:
executor.shutdown()
if verbose and not isinstance(verbose, str):
end = time.time()
print(f"Pixellised in {end - start:3.3f} s")
return pixels
[docs]def static2d(
positions,
mode,
values = None,
radii = None,
pixels = None,
resolution = None,
xlim = None,
ylim = None,
executor = ThreadPoolExecutor,
max_workers = None,
verbose = True,
):
'''Pixelize / rasterize static particles' positions onto a 2D pixel grid.
This is exactly like the `dynamic2d` function, but particles are not
considered to be moving - so they are rasterized a circles.
The input parameters are equivalent to `dynamic2d` - check its
documentation for full details.
Examples
--------
Compute the occupancy grid of a 3 static 2D particles of radius 0.5:
>>> import numpy as np
>>> positions = np.array([[0, 0], [1, 1], [2, 1]])
>>>
>>> import konigcell as kc
>>> pixels = kc.static2d(
>>> positions,
>>> kc.INTERSECTION,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> xlim = [-2, 5],
>>> ylim = [-2, 5],
>>> )
The ``kc.INTERSECTION`` pixellisation mode adds the area shaded by the
particle's movement onto the grid.
If you omit ``xlim`` and ``ylim``, they will be computed automatically to
include all particle positions.
You can reuse the same pixels grid for multiple pixellisations:
>>> kc.static2d(
>>> positions,
>>> kc.INTERSECTION,
>>> radii = 0.5,
>>> pixels = pixels,
>>> )
In this case the ``resolution``, ``xlim`` and ``ylim`` don't need to be set
anymore; they are extracted from the Pixels grid.
You can set the maximum number of workers for the parallel pixellisation;
e.g. for single-threaded execution:
>>> pixels = kc.static2d(
>>> positions,
>>> kc.ONE,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> max_workers = 1,
>>> )
The ``kc.ONE`` pixellisation flag simply adds 1 to each pixel intersected
by the moving particle.
Finally, here is a complete example computing the occupancy grid of 10,000
static 2D particles:
>>> positions = np.random.random((10_000, 2)) * 5
>>>
>>> occupancy = kc.static2d(
>>> positions,
>>> kc.INTERSECTION,
>>> radii = 0.5,
>>> resolution = (500, 500),
>>> )
'''
# Time pixellisation
if verbose and not isinstance(verbose, str):
start = time.time()
# Type-checking inputs
positions = np.asarray(positions, dtype = float, order = "C")
if values is not None:
if hasattr(values, "__iter__"):
values = np.asarray(values, dtype = float, order = "C")
else:
values = float(values) * np.ones(len(positions), dtype = float)
if radii is not None:
if hasattr(radii, "__iter__"):
radii = np.asarray(radii, dtype = float, order = "C")
else:
radii = float(radii) * np.ones(len(positions), dtype = float)
utils.check_ncols(2, positions = positions)
utils.check_lens(positions = positions, values = values, radii = radii)
# If pixels is None, create a new Pixels grid
if pixels is None:
if resolution is None:
raise ValueError("`resolution` must be defined if `pixels = None`")
else:
resolution = np.array(resolution, dtype = int)
if resolution.ndim != 1 or len(resolution) != 2 or \
np.any(resolution < 2):
raise ValueError(textwrap.fill((
"`resolution` must have exactly two elements (M, N), both "
f"larger than 2. Received `resolution = {resolution}`."
)))
if xlim is None:
offset = np.nanmax(radii) if radii is not None else 0
xlim = utils.get_cutoffs(offset, positions[:, 0])
if ylim is None:
offset = np.nanmax(radii) if radii is not None else 0
ylim = utils.get_cutoffs(offset, positions[:, 1])
pixels = Pixels.zeros(resolution, xlim, ylim)
# Pixellise according to execution policy
if max_workers is not None and max_workers == 1:
kc2d.static2d(
pixels.pixels,
pixels.xlim,
pixels.ylim,
positions,
mode,
radii = radii,
factors = values,
)
else:
# Split `positions` into `max_workers` chunks for parallel processing
if max_workers is None:
max_workers = os.cpu_count()
# Each chunk must have at least 2 positions
if len(positions) / max_workers < 2:
max_workers = len(positions) // 2
shape = pixels.pixels.shape
pos_chunks = utils.split(positions, max_workers)
rad_chunks = utils.split(radii, max_workers)
val_chunks = utils.split(values, max_workers)
pix_chunks = [np.zeros(shape) for _ in range(max_workers)]
# If `executor` is a class (rather than an instance / object),
# instantiate it
executor_isclass = False
if not isinstance(executor, Executor):
executor_isclass = True
executor = executor(max_workers)
# Pixellise each chunk separately
futures = [
executor.submit(
kc2d.static2d,
pix_chunks[i],
pixels.xlim,
pixels.ylim,
pos_chunks[i],
mode,
radii = rad_chunks[i],
factors = val_chunks[i],
) for i in range(max_workers)
]
# If verbose, show tqdm loading bar with text
if verbose:
desc = verbose if isinstance(verbose, str) else None
futures = tqdm(futures, desc = desc)
# Add all results onto the pixel grid
pixels.pixels[:, :] += sum((f.result() for f in futures))
# Clean up all copies immediately to free memory
del futures, pos_chunks, rad_chunks, val_chunks, pix_chunks
if executor_isclass:
executor.shutdown()
if verbose and not isinstance(verbose, str):
end = time.time()
print(f"Pixellised in {end - start:3.3f} s")
return pixels
[docs]def dynamic_prob2d(
positions,
values,
radii = None,
pixels = None,
resolution = None,
xlim = None,
ylim = None,
executor = ThreadPoolExecutor,
max_workers = None,
verbose = True,
):
'''Compute the 2D probability distribution of a moving particle's specific
quantity (e.g. velocity).
This function computes the distribution of the input `values` across pixel
cells. For example, computing the velocity distribution of a particle
moving from `positions` A to B to C, we need to rasterize the velocity from
`values[0]` on segment AB, then the velocity from `values[1]` on BC -
therefore for N `positions` we will rasterize N-1 `values`.
For multiple particle trajectories, simply separate them by a row of NaN
in the input `positions`.
All input parameters are equivalent to `dynamic2d` - check its
documentation for full details.
'''
# Time pixellising
if verbose:
start = time.time()
# Compute probability grid, where each pixel contains the values weighted
# by the intersection area; first compute values * weights...
pixels = dynamic2d(
positions,
mode.INTERSECTION,
values = values,
radii = radii,
pixels = pixels,
resolution = resolution,
xlim = xlim,
ylim = ylim,
executor = executor,
max_workers = max_workers,
verbose = "Step 1 / 2 :" if verbose else False,
)
# ... then divide by the sum of weights
igrid = pixels.copy()
igrid.pixels[:, :] = 0.
dynamic2d(
positions,
mode.INTERSECTION,
radii = radii,
pixels = igrid,
executor = executor,
max_workers = max_workers,
verbose = "Step 2 / 2 :" if verbose else False,
)
nonzero = (igrid.pixels != 0.)
if nonzero.any():
pixels.pixels[nonzero] /= igrid.pixels[nonzero]
# Correction for floating-point errors: threshold all pixels with values
# below min(values); they can only exist due to FP errors
minval = np.nanmin(values)
pixels.pixels[pixels.pixels < minval] = minval
if verbose:
end = time.time()
print(("Computed dynamic 2D probability distribution in "
f"{end - start:3.3f} s"))
return pixels
[docs]def static_prob2d(
positions,
values,
radii = None,
pixels = None,
resolution = None,
xlim = None,
ylim = None,
executor = ThreadPoolExecutor,
max_workers = None,
verbose = True,
):
'''Compute the 2D probability distribution of static particles' specific
quantities (e.g. velocity).
This function computes the distribution of the input `values` across pixel
cells for the static circular particles at the input `positions`; it is
the static counterpart of `dynamic_prob2d`.
All input parameters are equivalent to `dynamic2d` - check its
documentation for full details.
'''
# Time pixellising
if verbose:
start = time.time()
# Compute probability grid, where each pixel contains the values weighted
# by the intersection area; first compute values * weights...
pixels = static2d(
positions,
mode.INTERSECTION,
values = values,
radii = radii,
pixels = pixels,
resolution = resolution,
xlim = xlim,
ylim = ylim,
executor = executor,
max_workers = max_workers,
verbose = "Step 1 / 2 :" if verbose else False,
)
# ... then divide by the sum of weights
igrid = pixels.copy()
igrid.pixels[:, :] = 0.
static2d(
positions,
mode.INTERSECTION,
radii = radii,
pixels = igrid,
executor = executor,
max_workers = max_workers,
verbose = "Step 2 / 2 :" if verbose else False,
)
nonzero = (igrid.pixels != 0.)
if nonzero.any():
pixels.pixels[nonzero] /= igrid.pixels[nonzero]
# Correction for floating-point errors: threshold all pixels with values
# below min(values); they can only exist due to FP errors
minval = np.nanmin(values)
pixels.pixels[pixels.pixels < minval] = minval
if verbose:
end = time.time()
print(("Computed static 2D probability distribution in "
f"{end - start:3.3f} s"))
return pixels