r"""
Implementation of Maximum Entropy Method
"""
from types import SimpleNamespace
from typing import Union, Optional
import astropy.units as apu
import numpy as np
from astropy.units import Quantity
from numpy.linalg import norm
from numpy.typing import NDArray
from sunpy.map import Map
from xrayvision.imaging import generate_header
from xrayvision.transform import generate_xy
from xrayvision.utils import get_logger
from xrayvision.visibility import Visibilities
__all__ = [
"_get_entropy",
"_get_fourier_matrix",
"_estimate_flux",
"_get_mean_visibilities",
"_proximal_entropy",
"_proximal_operator",
"_optimise_fb",
"mem",
]
logger = get_logger(__name__, "DEBUG")
[docs]
def _get_entropy(image, flux):
r"""
Return the entropy of an image.
The entropy is defined as
.. math::
H(x) = sum_i x_i * log(x_i/(m e))
where :math:`x` is an image and :math:`m` is the total flux divided by the number of pixels of
in the image
Parameters
----------
image :
Input image array
flux :
Total flux divided by the number of pixels of the image
Returns
-------
"""
return np.sum(image * np.log(image / (flux * np.e)))
[docs]
def _get_fourier_matrix(vis, shape=(64, 64) * apu.pix, pixel_size=(4.0312500, 4.0312500) * apu.arcsec):
r"""
Return the complex Fourier matrix used to compute the value of the visibilities.
Generate the complex Fourier matrix :math:`Hv` used to compute the value of the
visibilities. If :math:`\vec{x}` is the vectorised image, then
:math:`v = \mathbf{\mathit{Hv}} \vec{x}` is the vector containing the complex visibilities
Parameters
----------
vis :
Visibly object containing u, v sampling
shape :
Shape of the images
pixel_size
Size of the pixels
Returns
-------
The complex Fourier matrix
"""
m, n = shape.to("pix")
y = generate_xy(m, phase_center=0 * apu.arcsec, pixel_size=pixel_size[1])
x = generate_xy(n, phase_center=0 * apu.arcsec, pixel_size=pixel_size[0])
x, y = np.meshgrid(x, y, indexing="ij")
uv = np.vstack([vis.u, vis.v])
# Check apu are correct for exp need to be dimensionless and then remove apu for speed
if (vis.u * x[0, 0]).unit == apu.dimensionless_unscaled and (vis.v * y[0, 0]).unit == apu.dimensionless_unscaled:
uv = uv.value
x = x.value
y = y.value
Hv = np.exp(
1j * 2 * np.pi * (x[..., np.newaxis] * uv[np.newaxis, 0, :] + y[..., np.newaxis] * uv[np.newaxis, 1, :])
)
return Hv * pixel_size[0].value * pixel_size[1].value
[docs]
def _estimate_flux(vis, shape, pixel, maxiter=1000, tol=1e-3):
r"""
Estimate the total flux in the image by solving an optimisation problem.
This function estimates the total flux of an event by solving the problem
.. math::
\underset{\chi}{\mathrm{argmin}}(f) = \sum \left (\Re V(f) - \Re v))^2
+ ( \Im V(f) - \Im v)^2 ) / \sigma^2 \right )
subject to :math:`f >= 0`,
the algorithm finds a positive image :math:`f` that minimizes the :math:`\chi` square function.
The estimation of the total flux is then obtained by computing the total fux of :math:`f`.
The method implemented :math:`f` the minimization is projected Landweber.
Parameters
----------
vis :
Input visibilities
shape :
Shape of image
pixel :
Size of pixels
maxiter : int
Maximum number of iterations
tol :
Tolerance at which to stop
Returns
-------
Estimated total flux
"""
Hv, Lip, Visib = _prepare_for_optimise(pixel, shape, vis)
# PROJECTED LANDWEBER
# should have same apu as image ct/ keV cm s arcsec**2
x = np.zeros(shape.to_value("pix").astype(int)).flatten()
for i in range(maxiter):
x_old = x[:]
# GRADIENT STEP
grad = 2.0 * np.matmul((np.matmul(Hv, x).value - Visib).T, Hv)
y = x - 1.0 / Lip * grad
# PROJECTION ON THE POSITIVE ORTHANT
x = y.clip(min=0.0)
tmp = x[:]
Hvx = np.matmul(Hv, tmp)
diff_V = Hvx - Visib
chi2 = (diff_V**2.0).sum()
if i % 25 == 0:
logger.info(f"Iter: {i}, Chi2: {chi2}")
if np.sqrt(((x - x_old) ** 2.0).sum()) < tol * np.sqrt((x_old**2.0).sum()):
break
return x.sum() * pixel[0] * pixel[1]
def _prepare_for_optimise(pixel, shape, vis):
r"""
Return matrices and vectors in format for optimisation
For complex values create new matrix concatenating the real and imaginary components and
normalise by the standard error
Parameters
----------
vis :
Input visibilities
shape :
Shape of image
pixel :
Size of pixels
Returns
-------
"""
Hv = _get_fourier_matrix(vis, shape, pixel)
# Division of real and imaginary part of the matrix 'Hv'
ReHv = np.real(Hv)
ImHv = np.imag(Hv)
# 'Hv' is composed by the union of its real and imaginary part
Hv = np.concatenate([ReHv, ImHv], axis=-1)
# Division of real and imaginary part of the visibilities
ReV = np.real(vis.visibilities)
ImV = np.imag(vis.visibilities)
# 'Visib' is composed by the real and imaginary part of the visibilities
Visib = np.concatenate([ReV, ImV], axis=-1)
# Standard deviation of the real and imaginary part of the visibilities
if vis.amplitude_uncertainty is None:
sigma_Re = np.ones_like(ReV)
sigma_Im = np.ones_like(ImV)
else:
sigma_Re = vis.amplitude_uncertainty
sigma_Im = vis.amplitude_uncertainty
# 'sigma': standard deviation of the data contained in 'Visib'
sigma = np.concatenate([sigma_Re, sigma_Im], axis=-1)
# RESCALING of 'Hv' AND 'Visib'(NEEDED FOR COMPUTING THE VALUE OF THE \chi ** 2; FUNCTION)
# The vector 'Visib' and every column of 'Hv' are divided by 'sigma'
Visib = Visib / sigma
ones = np.ones(shape.to_value("pix").astype(int))
sigma1 = sigma * ones[..., np.newaxis]
Hv = Hv / sigma1
# COMPUTATION OF THE LIPSCHITZ CONSTANT; 'Lip' OF THE GRADIENT OF THE \chi ** 2
# FUNCTION (NEEDED TO GUARANTEE THE CONVERGENCE OF THE ALGORITHM)
Hv = Hv.transpose(2, 0, 1).reshape(Hv.shape[2], -1)
HvHvT = np.matmul(Hv, Hv.T)
# TODO magic number
Lip = 2.1 * norm(HvHvT, 2)
return Hv, Lip, Visib
[docs]
def _get_mean_visibilities(vis, shape, pixel):
r"""
Return the mean visibilities sampling the same call in the discretisation of the (u,v) plane.
Parameters
----------
vis :
Input visibilities
shape :
Shape of image
pixel :
Size of pixels
Returns
-------
Mean Visibilities
"""
if vis.amplitude_uncertainty is None:
amplitude_uncertainty = np.ones_like(vis.visibilities)
else:
amplitude_uncertainty = vis.amplitude_uncertainty
imsize2 = shape[0] / 2
pixel_size = 1 / (shape[0] * pixel[0])
iu = vis.u / pixel_size
iv = vis.v / pixel_size
ru = np.around(iu)
rv = np.around(iv)
# index of the u coordinates of the sampling frequencies in the discretisation of the u axis
ru = ru * apu.pix + imsize2.to(apu.pix)
# index of the v coordinates of the sampling frequencies in the discretisation of the v axis
rv = rv * apu.pix + imsize2.to(apu.pix)
# matrix that represents the discretization of the (u,v)-plane
iuarr = np.zeros(shape.to_value("pixel").astype(int))
count = 0
n_vis = vis.u.shape[0]
u = np.zeros(n_vis) * (1 / apu.arcsec)
v = np.zeros(n_vis) * (1 / apu.arcsec)
den = np.zeros(n_vis)
weights = np.ones_like(vis.visibilities**2)
visib = np.zeros_like(vis.visibilities)
for ip in range(n_vis):
# what about 0.5 pix offset
i = ru[ip].to_value("pix").astype(int)
j = rv[ip].to_value("pix").astype(int)
# (i, j) is the position of the spatial frequency in
# the discretization of the (u,v)-plane 'iuarr'
if iuarr[i, j] == 0.0:
u[count] = vis.u[ip]
v[count] = vis.v[ip]
# we save in 'u' and 'v' the u and v coordinates of the first frequency that corresponds
# to the position (i, j) of the discretization of the (u,v)-plane 'iuarr'
visib[count] = vis.visibilities[ip]
weights[count] = amplitude_uncertainty[ip] ** 2.0
den[count] = 1.0
iuarr[i, j] = count
count += 1
else:
# Save the sum of the visibilities that correspond to the same position (i, j)
visib[iuarr[i, j].astype(int)] += vis.visibilities[ip]
# Save the number of the visibilities that correspond to the same position (i, j)
den[iuarr[i, j].astype(int)] += 1.0
# Save the sum of the variances of the amplitudes of the visibilities that
# correspond to the same position (i, j)
weights[iuarr[i, j].astype(int)] += amplitude_uncertainty[ip] ** 2
u = u[:count]
v = v[:count]
visib = visib[:count]
den = den[:count]
# computation of the mean value of the visibilities that correspond to the same
# position in the discretization of the (u,v)-plane
visib = visib / den
# computation of the mean value of the standard deviation of the visibilities that
# correspond to the same position in the discretization of the (u,v)-plane
weights = np.sqrt(weights[:count]) / den
return SimpleNamespace(u=u, v=v, visibilities=visib, amplitude_uncertainty=weights)
[docs]
def _proximal_entropy(y, m, lamba, Lip, tol=10**-10):
r"""
This function computes the value of the proximity operator of the entropy function subject to
positivity constraint, i.e. it solves the problem
argmin_x 1/2*|| y-x ||^2 + \lambda/Lip * H(x)
subject to x >= 0
Actually, this problem can be reduced to finding the zero of the gradient of the objective
function and it is therefore solved by means of a bisection method.
Parameters
----------
y
m
lamba
Lip
tol
Returns
-------
"""
# INITIALIZATION OF THE BISECTION METHOD
# TODO where does this number come from
a = np.full_like(y, 1e-24 * y.unit)
b = np.where(y > m, y, m)
while np.max(b - a) > tol * y.unit:
c = (a + b) / 2
f_c = c - y + lamba / Lip * np.log(c / m)
tmp1 = f_c <= 0
tmp2 = f_c >= 0
a[tmp1] = c[tmp1]
b[tmp2] = c[tmp2]
c = (a + b) / 2
return c
[docs]
def _proximal_operator(z, f, m, lamb, Lip, niter=250):
r"""
Computes the value of the proximity operator of the entropy function subject to
positivity constraint and flux constraint by means of a Dykstra-like proximal algorithm
(see Combettes, Pesquet, "Proximal Splitting Methods in Signal Processing", (2011)).
The problem to solve is:
argmin_x 1/2*|| x - y ||^2 + \lambda/Lip * H(x)
subject to positivity constraint and flux constraint.
Parameters
----------
z
f
m
lamb
Lip
niter
Returns
-------
"""
# INITIALIZATION OF THE DYKSTRA - LIKE SPLITTING
x = z[:]
p = np.zeros_like(x)
q = np.zeros_like(x)
for i in range(niter):
tmp = x + p
# Projection on the hyperplane that represents the flux constraint
y = tmp + (f - tmp.sum()) / tmp.size
p = x + p - y
x = _proximal_entropy(y + q, m, lamb, Lip)
if np.abs(x.sum() - f) <= 0.01 * f:
break
q = y + q - x
return x, i
[docs]
def _optimise_fb(Hv, Visib, Lip, flux, lambd, shape, pixel, maxiter, tol):
r"""
Solve the optimization problem using a forward-backward splitting algorithm
.. math::
\underset{x}{\mathrm{argmin}} \quad \chi^{2}(x) + \lambda H(x)
subject to positivity constraint and flux constraint. Where :math:`x` is the image to
reconstruct, :math:`\lambda` is the regularization parameter and :math:`H(x)` is the entropy of
the image.
The algorithm implemented is a forward-backward splitting algorithm
(see Combettes, Pesquet, "Proximal Splitting Methods in Signal Processing" (2011) and
Beck, Teboulle, "Fast Gradient-Based Algorithms for Constrained Total Variation Image Denoising
and Deblurring Problems" (2009)).
Parameters
----------
Hv :
Fourier matrix used to calculate the visibilities of the photon flux
(actually, Hv = [Re(F); Im(F)] where F is the complex Fourier matrix)
Visib :
Lip :
Lipschitz constant of the gradient of the \chi^2 function
flux :
total flux of the image
lambd :
regularization parameter
shape :
Image size
pixel :
Pixel size
maxiter :
Maximum number of iterations
tol :
Tolerance value used in the stopping rule ( || x - x_old || <= tol || x_old ||)
Returns
-------
MEM Image
"""
# 'f': value of the total flux of the image (taking into account the area of the pixel)
f = flux / (pixel[0] * pixel[1])
# 'm': total flux divided by the number of pixels of the image
m = f / np.prod(shape.to_value("pix"))
# INITIALIZATION
# 'x': constant image with total flux equal to 'f'
x = np.ones(shape.to_value("pix").astype(int)) + 1.0
x = x / x.sum() * f
z = x
t = 1.0
# COMPUTATION OF THE OBJECTIVE FUNCTION 'J'
tmp = x.flatten()[:]
Hvx = np.matmul(Hv, tmp)
f_R = _get_entropy(x, m)
diff_V = Hvx - Visib
f_0 = (diff_V**2).sum()
J = f_0 + lambd * f_R
n_iterations = 0 # number of iterations done in the proximal steps to update the minimizer
for i in range(maxiter):
J_old = np.copy(J)
x_old = np.copy(x)
t_old = np.copy(t)
# GRADIENT STEP
grad = 2 * np.matmul((np.matmul(Hv, z.flatten()) - Visib), Hv).reshape(*shape.to_value("pix").astype(int))
y = z - 1 / Lip * grad
# PROXIMAL STEP
p, pi = _proximal_operator(y, f, m, lambd, Lip)
# COMPUTATION OF THE OBJECTIVE FUNCTION 'Jp' IN 'p'
tmp = p.flatten()
Hvp = np.matmul(Hv, tmp)
f_Rp = _get_entropy(p, m)
diff_Vp = Hvp - Visib
f_0 = (diff_Vp**2).sum()
Jp = f_0 + lambd * f_Rp
# CHECK OF THE MONOTONICITY
# we update 'x' only if 'Jp' is less than or equal to 'J_old'
check = True
if Jp > J_old:
x[:] = x_old
J = J_old
check = False
n_iterations += pi
else:
x[:] = p.real
J = Jp
n_iterations = 0
if n_iterations >= 500:
break # if the number of iterations done to update 'x' is too big, then break
# ACCELERATION
t = (1 + np.sqrt(1.0 + 4.0 * t_old**2.0)) / 2.0
tau = (t_old - 1.0) / t
z = x + tau * (x - x_old) + (t_old / t) * (p - x)
if i % 25 == 0:
logger.info(f"Iter: {i}, Obj function: {J}")
if check and (np.sqrt(((x - x_old) ** 2.0).sum()) < tol * np.sqrt((x_old**2).sum())):
break
return x_old
def resistant_mean(data, sigma_cut):
r"""
Return a resistant mean
Remove outliers using the median and the median absolute deviation. An approximation formula is
used to correct for the truncation caused by removing outliers
Parameters
----------
data : `numpy.ndarray`
Data
sigma_cut : float
Cutoff interms of sigma
Returns
-------
`tuple`
Resistant mean and standard deviation
"""
mad_scale = 0.6745
mad_scale2 = 0.8
mad_lim = 1e-24
sig_coeff = [-0.15405, +0.90723, -0.23584, +0.020142]
median = np.median(data)
abs_deviation = np.abs(data - median)
median_abs_deviation = np.median(abs_deviation) / mad_scale
if median_abs_deviation < mad_lim:
median_abs_deviation = np.mean(abs_deviation) / mad_scale2
cutoff = sigma_cut * median_abs_deviation
good_index = np.where(abs_deviation <= cutoff)
if not good_index[0].size > 0:
raise ValueError("Unable to compute mean")
good_points = data[good_index]
mean = np.mean(good_points)
sigma = np.sqrt((((good_points - mean) ** 2).sum()) / good_points.size)
# Compensate Sigma for truncation (formula by HF):
if sigma_cut <= 4.50:
sigma = sigma / np.polyval(sig_coeff[::-1], sigma_cut)
cutoff = sigma_cut * sigma
good_index = np.where(abs_deviation <= cutoff)
good_points = data[good_index]
mean = np.mean(good_points)
sigma = np.sqrt((((good_points - mean) ** 2).sum()) / good_points.size)
if sigma_cut <= 4.50:
sigma = sigma / np.polyval(sig_coeff[::-1], sigma_cut)
# Now the standard deviation of the mean:
sigma = sigma / np.sqrt(good_points.size - 1)
return mean, sigma
[docs]
@apu.quantity_input
def mem(
vis: Visibilities,
shape: Quantity[apu.pix],
pixel_size: Quantity[apu.arcsec / apu.pix],
*,
percent_lambda: Optional[Quantity[apu.percent]] = 0.02 * apu.percent,
maxiter: int = 1000,
tol: float = 1e-3,
map: bool = True,
total_flux: Optional[Quantity] = None,
) -> Union[Quantity, NDArray[np.float64]]:
r"""
Maximum Entropy Method visibility based image reconstruction
Parameters
----------
vis :
Input Visibilities
shape :
Image size
pixel_size :
Pixel size
percent_lambda
Value used to compute the regularization parameter as a percentage of a maximum value
automatically overestimated by the algorithm. Must be in the range [0.0001,0.2]
maxiter :
Maximum number of iterations of the optimisation loop
tol : float
tolerance value used in the stopping rule ( || x - x_old || <= tol || x_old ||)
map :
Return a sunpy map or bare array
total_flux :
The total flux/counts contained in the image.
If not set, total_flux is estimated using `_estimate_flux`.
Returns
-------
"""
if total_flux is None:
total_flux = _estimate_flux(vis, shape, pixel_size)
mean_vis = _get_mean_visibilities(vis, shape, pixel_size)
Hv, Lip, Visib = _prepare_for_optimise(pixel_size, shape, mean_vis)
# should have same apu as image ct/ keV cm s arcsec**2
x = np.zeros(shape.to_value("pix").astype(int)).flatten()
# COMPUTATION OF THE; OBJECTIVE; FUNCTION; 'chi2'
x = x + total_flux / (shape[0] * shape[1] * pixel_size[0] * pixel_size[1]).value
Hvx = np.matmul(Hv, x)
lambd = 2 * np.abs(np.matmul((Hvx.value - Visib), Hv)).max() * percent_lambda
im = _optimise_fb(Hv, Visib, Lip, total_flux, lambd, shape, pixel_size, maxiter, tol)
# This is needed to match IDL output - prob array vs cartesian indexing
im = im.T
if map:
header = generate_header(vis, shape=shape, pixel_size=pixel_size)
return Map((im, header))
return im