import math
import numpy as np
import numba
import numba.cuda
import scipy.constants as const
import scipy.sparse
import scipy.ndimage
from libertem.corrections import coordinates
from libertem.common.numba import rmatmul
# Calculation of the relativistic electron wavelength in meters
[docs]
def wavelength(U):
'''
Calculate the electron wavelength
Parameters
----------
U : float
Acceleration voltage in kV
Returns
-------
wavelength : float
Wavelength in m
Examples
--------
>>> wavelength(300)
1.9687489006848795e-12
'''
e = const.elementary_charge # Elementary charge !!! 1.602176634×10−19
h = const.Planck # Planck constant !!! 6.62607004 × 10-34
c = const.speed_of_light # Speed of light
m_0 = const.electron_mass # Electron rest mass
T = e*np.float64(U)*1000
lambda_e = h*c/(math.sqrt(T**2+2*T*m_0*(c**2)))
return lambda_e
@numba.njit
def offset(s1, s2):
o1, ss1 = s1
o2, ss2 = s2
return o2 - o1
@numba.njit
def shift_by(sl, shift):
origin, shape = sl
return (
origin + shift,
shape
)
@numba.njit
def shift_to(s1, origin):
o1, ss1 = s1
return (
origin,
ss1
)
@numba.njit
def intersection(s1, s2):
o1, ss1 = s1
o2, ss2 = s2
# Adapted from libertem.common.slice
new_origin = np.maximum(o1, o2)
new_shape = np.minimum(
(o1 + ss1) - new_origin,
(o2 + ss2) - new_origin,
)
new_shape = np.maximum(0, new_shape)
return (new_origin, new_shape)
[docs]
@numba.njit
def get_shifted(arr_shape: tuple, tile_origin: tuple, tile_shape: tuple, shift: tuple):
'''
Calculate the slices to cut out a shifted part of a 2D source
array and place it into a target array, including tiling support.
This works with negative and positive integer shifts.
'''
# TODO this could be adapted for full sig, nav, n-D etc support
# and included as a method in Slice?
full_slice = (np.array((0, 0)), arr_shape)
tileslice = (tile_origin, tile_shape)
shifted = shift_by(tileslice, shift)
isect = intersection(full_slice, shifted)
if np.prod(isect[1]) == 0:
return (
np.array([(0, 0), (0, 0)]),
np.array([0, 0])
)
# We measure by how much we have clipped the zero point
# This is zero if we didn't shift into the negative region beyond the original array
clip = offset(shifted, isect)
# Now we move the intersection to (0, 0) plus the amount we clipped
# so that the overlap region is moved by the correct amount, in total
targetslice = shift_by(shift_to(isect, np.array((0, 0))), clip)
start = targetslice[0]
length = targetslice[1]
target_tup = np.stack((start, start+length), axis=1)
offsets = isect[0] - targetslice[0]
return (target_tup, offsets)
def to_slices(target_tup, offsets):
target_slice = tuple(slice(s[0], s[1]) for s in target_tup)
source_slice = tuple(slice(s[0] + o, s[1] + o) for (s, o) in zip(target_tup, offsets))
return (target_slice, source_slice)
def bounding_box(array):
# Based on https://stackoverflow.com/questions/31400769/bounding-box-of-numpy-array
# But return values that work as start:stop slices
rows = np.any(array, axis=1)
cols = np.any(array, axis=0)
if np.any(rows):
y_min, y_max = np.where(rows)[0][[0, -1]]
x_min, x_max = np.where(cols)[0][[0, -1]]
return np.array(((y_min, y_max+1), (x_min, x_max+1)))
else:
return np.array([(0, 0), (0, 0)])
[docs]
def diffraction_to_detector(
lamb, diffraction_shape, pixel_size_real, pixel_size_detector,
cy, cx, flip_y=False, scan_rotation=0.):
'''
Generate a function that transforms pixel coordinates from diffraction of
real space to detector coordinates.
When performing a forward calculation where a wave front is passed through an object
function and projected in the far field, the projection is a Fourier transform.
Each pixel in the fourier-transformed slice of real space corresponds to a diffraction
angle in radian. This angle is a function of real space dimensions and wavelength.
For ptychography, this forward-projected beam is put in relation with detector data.
The projection and detector, however, are usually not calibrated in such a way that each
pixel corresponds exactly to one diffracted pixel/beam from the object and illumination
function. This function applies the correct scale, rotation and handedness to match
the forward-projected beam with the detector data, given the necessary input parameters.
cy, cx, flip_y and scan_rotation are chosen to correspond to the parameters in
:meth:libertem.api.Context.create_com_analysis`.
This function is designed to create a :code:`affine_transformation` parameter
for :meth:`image_transformation_matrix`.
Parameters
----------
lamb : float
Wavelength in m
diffraction_shape : Tuple[int, int]
Shape of the diffracted area
pixel_size_real : float or tuple or ndarray
Pixel size in m for the diffracted shape. Can be a Tuple[float, float] or array with
two values for y and x
pixel_size_detector : float or tuple or ndarray
Pixel size in radian of the detector. Can be a Tuple[float, float] or array with two values
for y and x. For free propagation into the far-field, this is the detector pixel size in
m divided by the camera length for small angles.
cy : float
Y position of the central beam on the detector in pixel.
cx : float
X position of the central beam on the detector in pixel.
flip_y : bool
Flip the y axis of the detector coordinates
scan_rotation : float
Scan rotation in degrees
Returns
-------
transform : callable(coords : numpy.ndarray) -> numpy.ndarray
A function that accepts pixel coordinates in diffracted space as an
array of shape (n, 2) with (y, x). Upper left
corner is (0, 0). It returns pixel coordinates on the detector as floats of shape (n, 2).
'''
# Make sure broadcasting works as expected
diffraction_shape = np.array(diffraction_shape)
pixel_size_real = np.array(pixel_size_real)
pixel_size_detector = np.array(pixel_size_detector)
# Size of one pixel in radian of the diffracted shape.
# Twice the diffraction_shape and half the pixel_size_real, i.e. the same physical
# area at a finer pixel resolution, can capture higher diffraction orders and
# therefore the FFT extends twice as far.
# That means the pixel_size_diffracted should stay the same.
# Longer wavelength means higher diffraction angles to get the same relative
# path difference.
pixel_size_diffracted = 1/pixel_size_real/diffraction_shape*lamb
transformation = coordinates.identity()
if flip_y:
transformation = coordinates.flip_y() @ transformation
transformation = coordinates.rotate_deg(scan_rotation) @ transformation
transformation *= pixel_size_diffracted / pixel_size_detector
def transform(coords: np.ndarray):
# Shift the coordinates relative to the center of the
# diffraction pattern
relative_to_center = (coords - diffraction_shape / 2)
return (relative_to_center @ transformation) + (cy, cx)
return transform
[docs]
def fftshift_coords(reconstruct_shape):
'''
Generate a function that performs an FFT shift of coordinates.
On the detector, the central beam is near the center, while for native FFT
the center is at the (0, 0) position of the result array. Instead of
fft-shifting the result of a projection calculation to match the detector directly,
one can instead calculate a transformation that picks from an unshifted FFT result
in such a way that it matches a diffraction pattern in its native layout. This allows
to combine the FFT shift with additional transformation steps. See also
:meth:`image_transformation_matrix`.
Parameters
----------
reconstruct_shape : Tuple[int, int]
Taget shape
Returns
-------
Callable(coords: numpy.ndarray)
Function that accepts target coordinates with shape (n, 2), kind int
and calculates the source coordinates with shape (n, 2), kind int
so that taking from the source coordinates and placing into the target
coordinates performs an FFT shift.
'''
reconstruct_shape = np.array(reconstruct_shape)
def fftshift(coords):
coords = np.array(coords)
return (coords + (reconstruct_shape + 1) // 2) % reconstruct_shape
return fftshift
[docs]
def ifftshift_coords(reconstruct_shape):
'''
Generate a function that performs an inverse FFT shift of coordinates.
On the detector, the central beam is near the center, while for native FFT
the center is at the (0, 0) position of the result array. Instead of
fft-shifting the result of a projection calculation to match the detector,
one can instead calculate a transformation that picks from the detector data
in such a way that an inverse FFT shift is performed. This allows to combine the
inverse FFT shift with additional transformations. See also
:meth:`image_transformation_matrix`.
Parameters
----------
reconstruct_shape : Tuple[int, int]
Taget shape
Returns
-------
Callable(coords: numpy.ndarray)
Function that accepts target coordinates with shape (n, 2), kind int
and calculates the source coordinates with shape (n, 2), kind int
so that taking from the source coordinates and placing into the target
coordinates performs an inverse FFT shift.
'''
reconstruct_shape = np.array(reconstruct_shape)
def ifftshift(coords):
coords = np.array(coords)
return (coords + reconstruct_shape // 2) % reconstruct_shape
return ifftshift
@numba.njit(fastmath=True)
def _binning_elements(
multi_target, multi_y_steps, multi_x_steps, multi_upleft,
multi_y_vectors, multi_x_vectors):
n_entries = int(np.sum(multi_y_steps*multi_x_steps))
source_array = np.empty((n_entries, 2), dtype=np.float32)
target_array = np.empty((n_entries, 2), dtype=np.int32)
index = 0
for i in range(len(multi_target)):
for y in range(multi_y_steps[i]):
for x in range(multi_x_steps[i]):
source_coord = (
multi_upleft[i]
+ y / multi_y_steps[i] * multi_y_vectors[i]
+ x / multi_x_steps[i] * multi_x_vectors[i]
)
source_array[index] = source_coord
target_array[index] = multi_target[i]
index += 1
return source_array, target_array
@numba.njit(fastmath=True)
def _weights(targets, target_shape):
counts = np.zeros(target_shape, dtype=np.int32)
result = np.empty(len(targets), dtype=np.float32)
for t in targets:
counts[t[0], t[1]] += 1
for i, t in enumerate(targets):
result[i] = 1/counts[t[0], t[1]]
return result
[docs]
def apply_matrix(sources, matrix, target_shape):
'''
Apply a transformation matrix generated by :meth:`image_transformation_matrix` to
a stack of images.
Parameters
----------
sources : array-like
Array of shape (n, sy, sx) where (sy, sx) is the :code:`source_shape` parameter
of :meth:`image_transformation_matrix`.
matrix : array-like
Matrix generated by :meth:`image_transformation_matrix` or equivalent
target_shape : Tuple[int, int]
:code:`source_shape` parameter of :meth:`image_transformation_matrix`.
The result will be reshaped to :code:`(n, ) + target_shape`
'''
flat_sources = sources.reshape((-1, np.prod(sources.shape[-2:], dtype=int)))
if isinstance(matrix, (scipy.sparse.csc_matrix, scipy.sparse.csr_matrix)):
flat_result = rmatmul(flat_sources, matrix)
else:
flat_result = flat_sources @ matrix
return flat_result.reshape(sources.shape[:-2] + tuple(target_shape))
[docs]
def shifted_probes(probe, bins):
'''
Calculated subpixel-shifted versions of the probe
Parameters
----------
probe : numpy.ndarray
bins : int or Tuple[int, int]
Number of antialiasing steps in y and x axis. Can be int as well
Returns
-------
probes : numpy.ndarray
4D, shape bins + probe.shape or (bins, bins) + probe.shape if bins is an int
'''
if isinstance(bins, int):
bins = (bins, bins)
assert isinstance(bins, (list, tuple))
assert len(bins) == 2
probes = np.zeros(bins + probe.shape, dtype=probe.dtype)
for y in range(bins[0]):
for x in range(bins[1]):
dy = y / bins[0]
dx = x / bins[1]
real = scipy.ndimage.shift(
probe.real,
shift=(dy, dx),
)
probes[y, x] = real
if np.iscomplexobj(probe):
imag = scipy.ndimage.shift(
probe.imag,
shift=(dy, dx),
)
probes[y, x] += 1j*imag
return probes
[docs]
@numba.njit(fastmath=True)
def rolled_object_probe_product_cpu(obj, probe, shifts, result_out, ifftshift=False):
'''
Multiply object and shifted illumination
This function combines several steps that are relevant for ptychographic reconstruction:
* Multiply an object function with a shifted illumination
* Roll the object function indices around the edges
* Optionally, perform an ifftshift to prepare the data for subsequent FFT
These steps are combined in a single loop since each requires significant memory
transfer if they are performed step-by-step. For performance reasons it doesn't perform a
free subpixel shift, but picks the best option from a set of pre-calculated shifted versions.
See :meth:`shifted_probes` for a function to calculate the shifted versions.
Parameters
----------
obj : numpy.ndarray
2D array with the object
probe : numpy.ndarray
4D array with subpixel shifts of the probe, last two dimensions same size or
smaller than obj.
shifts : numpy.ndarray
Array with shift vectors, shape (n, 2), kind float
result_out : numpy.ndarray
Array where the result is placed. Shape (n, ) + probe.shape
ifftshift : bool
place the product ifft-shifted into :code:`result_out`
Returns
-------
subpixel_indices : np.ndarray
The first two indices for :code:`probe`
'''
obj_y, obj_x = obj.shape
assert len(shifts) == result_out.shape[0]
assert probe.shape[2:] == result_out.shape[1:]
assert len(probe.shape) == 4
y_subpixels, x_subpixels = probe.shape[:2]
int_shifts = shifts.astype(np.int32)
subpixel_indices = (
shifts * np.array((y_subpixels, x_subpixels))
).astype(np.int32) % np.array((y_subpixels, x_subpixels))
for i in range(len(result_out)):
for y in range(probe.shape[-2]):
for x in range(probe.shape[-1]):
source_y = (y + int_shifts[i, 0]) % obj_y
source_x = (x + int_shifts[i, 1]) % obj_x
y_subpixel = subpixel_indices[i, 0]
x_subpixel = subpixel_indices[i, 1]
if ifftshift: # From source to target
target_y = (y + (probe.shape[-2] + 1) // 2) % probe.shape[-2]
target_x = (x + (probe.shape[-1] + 1) // 2) % probe.shape[-1]
else:
target_y, target_x = y, x
update = obj[source_y, source_x] * probe[y_subpixel, x_subpixel, y, x]
result_out[i, target_y, target_x] = update
return subpixel_indices
[docs]
@numba.njit(fastmath=True)
def rolled_object_aggregation_cpu(obj_out, updates, shifts, fftshift=False):
'''
Aggregate shifted updates to an object function
This function accumulates updates that are shifted relative to the object function
using addition and rolls the indices within the object if necesssary.
Optionally, it can fftshift the updates while integrating. Doing this in one loop allows to
reduce the number of calls for each shift and reduces overall memory transfer.
Parameters
----------
obj_out : numpy.ndarray
2D array with the object, modified in-place by this function
updates : numpy.ndarray
Array with updates, shape (n, ...)
shifts : numpy.ndarray
Array with shift vectors, shape (n, 2), kind int
fftshift : bool
Read the updates fft-shifted from :code:`updates`
'''
obj_y, obj_x = obj_out.shape
assert len(shifts) == updates.shape[0]
for i in range(updates.shape[0]):
for y in range(updates.shape[1]):
for x in range(updates.shape[2]):
target_y = (y + shifts[i, 0]) % obj_y
target_x = (x + shifts[i, 1]) % obj_x
if fftshift: # From target to source
source_y = (y + (updates.shape[1] + 1) // 2) % updates.shape[1]
source_x = (x + (updates.shape[2] + 1) // 2) % updates.shape[2]
else:
source_y, source_x = y, x
obj_out[target_y, target_x] += updates[i, source_y, source_x]
@numba.cuda.jit
def _rolled_object_probe_product_cuda(obj, probe, shifts, result_out, ifftshift):
obj_y, obj_x = obj.shape
y_subpixels, x_subpixels = probe.shape[:2]
i, y, x = numba.cuda.grid(3)
source_y = (y + int(shifts[i, 0])) % obj_y
source_x = (x + int(shifts[i, 1])) % obj_x
y_subpixel = int(shifts[i, 0] * y_subpixels) % y_subpixels
x_subpixel = int(shifts[i, 1] * x_subpixels) % x_subpixels
if i < result_out.shape[0] and y < result_out.shape[1] and x < result_out.shape[2]:
if ifftshift:
target_y = (y + (probe.shape[-2] + 1) // 2) % probe.shape[-2]
target_x = (x + (probe.shape[-1] + 1) // 2) % probe.shape[-1]
else:
target_y, target_x = y, x
update = obj[source_y, source_x] * probe[y_subpixel, x_subpixel, y, x]
result_out[i, target_y, target_x] = update
[docs]
def rolled_object_probe_product_cuda(obj, probe, shifts, result_out, ifftshift=False):
'''
Numba CUDA version of :meth:`rolled_object_probe_product_cpu`
'''
import cupy
count = result_out.shape[0]
threadsperblock = 32
blockspergrid = (count + (threadsperblock - 1)) // threadsperblock
assert len(shifts) == result_out.shape[0]
assert probe.shape[2:] == result_out.shape[1:]
assert len(probe.shape) == 4
y_subpixels, x_subpixels = probe.shape[:2]
subpixel_indices = (
shifts * cupy.array((y_subpixels, x_subpixels))
).astype(np.int32) % cupy.array((y_subpixels, x_subpixels))
_rolled_object_probe_product_cuda[
(blockspergrid, result_out.shape[1], result_out.shape[2]), (32, 1, 1)
](obj, probe, shifts, result_out, ifftshift)
return subpixel_indices
@numba.cuda.jit(device=True)
def add_complex_complex(a, coords, b):
numba.cuda.atomic.add(
a.imag, coords, b.imag
)
numba.cuda.atomic.add(
a.real, coords, b.real
)
@numba.cuda.jit(device=True)
def add_real_real(a, coords, b):
numba.cuda.atomic.add(
a, coords, b
)
@numba.cuda.jit(device=True)
def add_complex_real(a, coords, b):
numba.cuda.atomic.add(
a.real, coords, b
)
def _make_rolled_object_aggregation_cuda(add):
@numba.cuda.jit
def _rolled_object_aggregation_cuda(obj_out, updates, shifts, fftshift):
obj_y, obj_x = obj_out.shape
i, y, x = numba.cuda.grid(3)
if i < updates.shape[0] and y < updates.shape[1] and x < updates.shape[2]:
target_y = (y + shifts[i, 0]) % obj_y
target_x = (x + shifts[i, 1]) % obj_x
if fftshift: # From target to source
source_y = (y + (updates.shape[1] + 1) // 2) % updates.shape[1]
source_x = (x + (updates.shape[2] + 1) // 2) % updates.shape[2]
else:
source_y, source_x = y, x
add(obj_out, (target_y, target_x), updates[i, source_y, source_x])
return _rolled_object_aggregation_cuda
_roac_complex_complex = _make_rolled_object_aggregation_cuda(add_complex_complex)
_roac_complex_real = _make_rolled_object_aggregation_cuda(add_complex_real)
_roac_real_real = _make_rolled_object_aggregation_cuda(add_real_real)
[docs]
def rolled_object_aggregation_cuda(obj_out, updates, shifts, fftshift=False):
'''
Numba CUDA version of :meth:`rolled_object_aggregation_cpu`
'''
count = updates.shape[0]
threadsperblock = 32
blockspergrid = (count + (threadsperblock - 1)) // threadsperblock
if obj_out.dtype.kind == 'c':
if updates.dtype.kind == 'c':
f = _roac_complex_complex
else:
f = _roac_complex_real
else:
f = _roac_real_real
f[
(blockspergrid, updates.shape[1], updates.shape[2]), (32, 1, 1)
](obj_out, updates, shifts, fftshift)