import logging
import warnings
import numpy as np
import numpy.linalg as linalg
from scipy.special import comb
log = logging.getLogger(__name__)
__all__ = [
"replacenan",
"weighted_operations",
"matrix_operations",
"multiplyenv",
"tan2visibilities",
"populate_antisymmphasearray",
"populate_symmamparray",
"t3_amplitudes",
"redundant_cps",
"closure_amplitudes",
"q4_phases",
"LinearFit",
]
[docs]
def replacenan(array):
"""
Replace singularities in analytical hexagon Fourier transform.
Replace NaN values with the analytically derived limits.
Parameters
----------
array : 2D float array
Input array
Returns
-------
array : 2D float array
Input array with NaNs replaced with analytically derived limits
"""
nanpos = np.where(np.isnan(array))
array[nanpos] = np.pi / 4
return array
[docs]
def weighted_operations(img, model, dqm=None):
"""
Perform least squares matrix operations to solve A x = b weighting by Poisson variance.
A is the model, b is the data (image), and x is the coefficient vector we are solving for.
Here we are weighting data by Poisson variance::
x = inv(At.W.A).(At.W.b)
where W is a diagonal matrix of weights w_i,
weighting each data point i by the inverse of its variance::
w_i = 1 / sigma_i ^ 2
For photon noise, the data, i.e., the image values b_i have variance
proportional to b_i with an, e.g., ADU to electrons conversion factor.
If this factor is the same for all pixels, we do not need to include
it here.
Parameters
----------
img : 2D float array
Input data
model : 2D float array
Analytic model
dqm : 2D bool array
Bad pixel mask, same shape as image.
Returns
-------
x : 1D float array
Coefficient vector
res : 2D float array
Residual; difference between model and fit
Notes
-----
Use matrix_operations() for equal weighting of data.
"""
# Remove not-to-be-fit data from the flattened "img" data vector
flatimg = img.reshape(np.shape(img)[0] * np.shape(img)[1])
if dqm is not None:
flatdqm = dqm.reshape(np.shape(img)[0] * np.shape(img)[1])
nanlist = np.where(flatdqm) # where DO_NOT_USE up.
else:
nanlist = (np.array((), dtype=np.int32),) # shouldn't occur w/MAST JWST data
# see original linearfit https://github.com/agreenbaum/ImPlaneIA:
# agreenbaum committed on May 21, 2017 1 parent 3e0fb8b
# commit bf02eb52c5813cb5d77036174a7caba703f9d366
#
flatimg = np.delete(flatimg, nanlist) # DATA values
# photon noise variance - proportional to ADU
# (for roughly uniform adu2electron factor)
variance = np.abs(flatimg)
# this resets the weights of pixels with negative or unity values to zero
# we ignore data with unity or lower values - weight it not-at-all..
weights = np.where(flatimg <= 1.0, 0.0, 1.0 / np.sqrt(variance)) # anand 2022 Jan
log.debug(f"{len(nanlist[0]):d} bad pixels skipped in weighted fringefitter")
# A - but delete all pixels flagged by dq array
flatmodel_nan = model.reshape(np.shape(model)[0] * np.shape(model)[1], np.shape(model)[2])
flatmodel = np.zeros((len(flatimg), np.shape(model)[2]))
for fringe in range(np.shape(model)[2]):
flatmodel[:, fringe] = np.delete(flatmodel_nan[:, fringe], nanlist)
# A.w
aw = flatmodel * weights[:, np.newaxis]
bw = flatimg * weights
# resids are pixel value residuals, flattened to 1d vector
x, _rss, _rank, singvals = np.linalg.lstsq(aw, bw, rcond=None)
# actual residuals in image:
res = flatimg - np.dot(flatmodel, x)
# put bad pixels back
naninsert = nanlist[0] - np.arange(len(nanlist[0]))
# calculate residuals with fixed but unused bad pixels as nans
res = np.insert(res, naninsert, np.nan)
res = res.reshape(img.shape[0], img.shape[1])
cond = None
return x, res, cond, singvals # no condition number yet...
[docs]
def matrix_operations(img, model, linfit=False, dqm=None):
"""
Use least squares matrix operations to solve A x = b.
A is the model, b is the data (img), and x is the coefficient vector we are solving for.
In 2-D, data x = inv(At.A).(At.b).
TODO: replace linearfit with scipy fitting.
Parameters
----------
img : 2D float array
Input data
model : 2D float array
Analytic model
linfit : bool
Whether to perform linear fit
dqm : 2D bool array
Bad pixel mask slice, same shape as image.
Returns
-------
x : 1D float array
Solution to fit
res : 2D float array
Residuals in fit
cond : float
Condition number of the inverse of the product of model and its
transpose
"""
flatimg = img.reshape(np.shape(img)[0] * np.shape(img)[1])
log.info("fringefitting.leastsqnrm.matrix_operations(): ")
log.info(f"\timg {img.shape:}")
log.info(
f"\tL x W = {img.shape[0]:d} x {img.shape[1]:d} = {img.shape[0] * img.shape[1]:d}",
)
log.info(f"\tflatimg {flatimg.shape:}")
log.info("")
log.info("\ttype(dqm) %s", type(dqm))
if dqm is not None:
flatdqm = dqm.reshape(np.shape(img)[0] * np.shape(img)[1])
nanlist = np.where(flatdqm) # where DO_NOT_USE up.
else:
nanlist = (np.array((), dtype=np.int32),) # shouldn't occur w/MAST JWST data
log.info(f"\ttype(nanlist) {type(nanlist):}, len={len(nanlist):}")
log.info(f"\tnumber of nanlist pixels: {len(nanlist[0]):d} items")
log.info(f"\t{len(nanlist[0]):d} DO_NOT_USE pixels found in data slice")
flatimg = np.delete(flatimg, nanlist)
log.info(f"\tflatimg {flatimg.shape:} after deleting {len(nanlist[0]):d}")
# A
flatmodel_nan = model.reshape(np.shape(model)[0] * np.shape(model)[1], np.shape(model)[2])
flatmodel = np.zeros((len(flatimg), np.shape(model)[2]))
log.info(f"\tflatmodel_nan {flatmodel_nan.shape:}")
log.info(f"\tflatmodel {flatmodel.shape:}")
log.info(f"\tdifference {flatmodel_nan.shape[0] - flatmodel.shape[0]:}")
log.info("flat model dimensions %s", np.shape(flatmodel))
log.info("flat image dimensions %s", np.shape(flatimg))
for fringe in range(np.shape(model)[2]):
flatmodel[:, fringe] = np.delete(flatmodel_nan[:, fringe], nanlist)
# At (A transpose)
flatmodeltransp = flatmodel.transpose()
# At.A (makes square matrix)
modelproduct = np.dot(flatmodeltransp, flatmodel)
# At.b
data_vector = np.dot(flatmodeltransp, flatimg)
# inv(At.A)
inverse = linalg.inv(modelproduct)
cond = np.linalg.cond(inverse)
x = np.dot(inverse, data_vector)
res = flatimg - np.dot(flatmodel, x)
# put bad pixels back
naninsert = nanlist[0] - np.arange(len(nanlist[0]))
# calculate residuals with fixed but unused bad pixels as nans
res = np.insert(res, naninsert, np.nan)
res = res.reshape(img.shape[0], img.shape[1])
log.info("data flux %s", flatimg.sum())
log.info("flat model dimensions %s", np.shape(flatmodel))
log.info("model transpose dimensions %s", np.shape(flatmodeltransp))
log.info("flat image dimensions %s", np.shape(flatimg))
log.info("transpose * image data dimensions %s", np.shape(data_vector))
log.info("flat img * transpose dimensions %s", np.shape(inverse))
if linfit:
# photon noise
noise = np.sqrt(np.abs(flatimg))
# this sets the weights of pixels fulfilling condition to zero
weights = np.where(np.abs(flatimg) <= 1.0, 0.0, 1.0 / (noise**2))
# initialize object with uniform weights
result = LinearFit(flatimg, np.diag(weights), flatmodeltransp)
# do the fit
result.fit()
# delete inverse_covariance_matrix to reduce size of pickled file
result.inverse_covariance_matrix = []
linfit_result = result
log.info("Returned linearfit result")
else:
linfit_result = None
log.info("linearfit not attempted, no covariances saved.")
return x, res, cond, linfit_result
[docs]
def multiplyenv(env, fringeterms):
"""
Multiply the envelope by each fringe 'image'.
Parameters
----------
env : 2D float array
Envelope
fringeterms : list of 3 2D float arrays
Model
Returns
-------
full : 3D float array
Envelope multiplied by each fringe 'image'
"""
# The envelope has size (fov, fov). This multiplies the envelope by each
# of the 43 slices in the fringe model
full = np.ones(
(
np.shape(fringeterms)[1],
np.shape(fringeterms)[2],
np.shape(fringeterms)[0] + 1,
)
)
for i, fringeterm in enumerate(fringeterms):
full[:, :, i] = env * fringeterm
log.debug("Total number of fringe terms: %s", len(fringeterms) - 1)
return full
[docs]
def tan2visibilities(coeffs):
"""
From the solution to the fit, calculate the fringe amplitude and phase.
Parameters
----------
coeffs : 1D float array
The coefficients providing the fit solution.
Returns
-------
amp, delta : 1D float array, 1D float array
Fringe amplitude & phase
Notes
-----
Technically the fit measures phase AND amplitude, so to retrieve the
phase we need to consider both sin and cos terms. Consider one fringe:
A { cos(kx)cos(dphi) + sin(kx)sin(dphi) } =
A(a cos(kx) + b sin(kx)), where a = cos(dphi) and b = sin(dphi)
and A is the fringe amplitude, therefore coupling a and b.
In practice we measure A*a and A*b from the coefficients, so:
Ab/Aa = b/a = tan(dphi)
call a' = A*a and b' = A*b (we actually measure a', b')
(A*sin(dphi))^2 + (A*cos(dphi)^2) = A^2 = a'^2 + b'^2
"""
delta = np.zeros(int((len(coeffs) - 1) / 2))
amp = np.zeros(int((len(coeffs) - 1) / 2))
for q in range(int((len(coeffs) - 1) / 2)):
delta[q] = np.arctan2(coeffs[2 * q + 2], coeffs[2 * q + 1])
amp[q] = np.sqrt(coeffs[2 * q + 2] ** 2 + coeffs[2 * q + 1] ** 2)
log.debug(f"tan2visibilities: shape coeffs:{np.shape(coeffs)} shape delta:{np.shape(delta)}")
# returns fringe amplitude & phase
return amp, delta
[docs]
def populate_antisymmphasearray(deltaps, n=7):
"""
Populate the antisymmetric fringe phase array:.
This array takes the form::
fringephasearray[0, q + 1 :] = coeffs[0:6]
fringephasearray[1, q + 2 :] = coeffs[6:11]
fringephasearray[2, q + 3 :] = coeffs[11:15]
fringephasearray[3, q + 4 :] = coeffs[15:18]
fringephasearray[4, q + 5 :] = coeffs[18:20]
fringephasearray[5, q + 6 :] = coeffs[20:]
Parameters
----------
deltaps : 1D float array
Pistons between each pair of holes
n : int, optional
Number of holes (default=7)
Returns
-------
arr : 2D float array
Fringe phases between each pair of holes
"""
# Initialize fringe phase array
arr = np.zeros((n, n))
step = 0
n = n - 1
for h in range(n):
arr[h, h + 1 :] = deltaps[step : step + n]
step += n
n -= 1
arr -= arr.T
return arr
[docs]
def populate_symmamparray(amps, n=7):
"""
Populate the symmetric fringe amplitude array.
Parameters
----------
amps : 1D float array
Fringe visibility between each pair of holes
n : int, optional
Number of holes (default=7)
Returns
-------
arr: 2D float array
Fringe amplitude array
"""
arr = np.zeros((n, n))
step = 0
n = n - 1
for h in range(n):
arr[h, h + 1 :] = amps[step : step + n]
step += n
n -= 1
arr += arr.T
return arr
[docs]
def t3_amplitudes(amps, n=7):
"""
Populate the triple-product amplitude array (NOT closure amplitudes).
Parameters
----------
amps : 1D float array
Fringe visibility between each pair of holes
n : int, optional
Number of holes (default=7)
Returns
-------
cpamps : 1D float array
Triple product amplitude array
"""
arr = populate_symmamparray(amps, n=n)
cpamps = np.zeros(int(comb(n, 3)))
nn = 0
for kk in range(n - 2):
for ii in range(n - kk - 2):
for jj in range(n - kk - ii - 2):
cpamps[nn + jj] = (
arr[kk, ii + kk + 1]
* arr[ii + kk + 1, jj + ii + kk + 2]
* arr[jj + ii + kk + 2, kk]
)
nn += jj + 1
return cpamps
[docs]
def redundant_cps(deltaps, n=7):
"""
Calculate closure phases for each set of 3 holes.
Parameters
----------
deltaps : 1D float array
Pistons between each pair of holes
n : int, optional
Number of holes (default=7)
Returns
-------
cps : 1D float array
Closure phases
"""
arr = populate_antisymmphasearray(deltaps, n=n) # fringe phase array
cps = np.zeros(int(comb(n, 3)))
nn = 0
for kk in range(n - 2):
for ii in range(n - kk - 2):
for jj in range(n - kk - ii - 2):
cps[nn + jj] = (
arr[kk, ii + kk + 1]
+ arr[ii + kk + 1, jj + ii + kk + 2]
+ arr[jj + ii + kk + 2, kk]
)
nn += jj + 1
return cps
[docs]
def closure_amplitudes(amps, n=7):
"""
Calculate closure amplitudes.
Parameters
----------
amps : 1D float array
Fringe amplitudes
n : int, optional
Number of holes (default=7)
Returns
-------
CAs : 1D float array
Closure amplitudes
"""
arr = populate_symmamparray(amps, n=n) # fringe amp array
nn = 0
cas = np.zeros(int(comb(n, 4)))
for ii in range(n - 3):
for jj in range(n - ii - 3):
for kk in range(n - jj - ii - 3):
for ll in range(n - jj - ii - kk - 3):
cas[nn + ll] = (
arr[ii, jj + ii + 1]
* arr[ll + ii + jj + kk + 3, kk + jj + ii + 2]
/ (arr[ii, kk + ii + jj + 2] * arr[jj + ii + 1, ll + ii + jj + kk + 3])
)
nn = nn + ll + 1
return cas
[docs]
def q4_phases(deltaps, n=7):
"""
Calculate phases for each set of 4 holes.
Parameters
----------
deltaps : 1D float array
Pistons between each pair of holes
n : int, optional
Number of holes (default=7)
Returns
-------
quad_phases : 1D float array
Quad phases
"""
arr = populate_antisymmphasearray(deltaps, n=n) # fringe phase array
nn = 0
quad_phases = np.zeros(int(comb(n, 4)))
for ii in range(n - 3):
for jj in range(n - ii - 3):
for kk in range(n - jj - ii - 3):
for ll in range(n - jj - ii - kk - 3):
quad_phases[nn + ll] = (
arr[ii, jj + ii + 1]
+ arr[ll + ii + jj + kk + 3, kk + jj + ii + 2]
- arr[ii, kk + ii + jj + 2]
- arr[jj + ii + 1, ll + ii + jj + kk + 3]
)
nn = nn + ll + 1
return quad_phases
[docs]
class LinearFit:
"""
Perform a general least-squares fit of a linear model using numpy matrix inversion.
Uncertainties in the dependent variables (but not in the independent variables)
can be taken into account.
All inputs have to be numpy matrices.
Math is based on Press'
* Numerical Recipes p661 : Section 15.2 Fitting Data to a Straight Line
* Numerical Recipes p671 : Section 15.4 General Linear Least Squares
Code is based on an early yorick implementation by Damien Segransan (University of Geneva)
Python implementation and tools by Johannes Sahlmann 2009-2017
(University of Geneva, European Space Agency, STScI/AURA)
Parameters
----------
dependent_variable : ndarray (1xN)
Dependent_variables of the linear equation system (N equations, M unknown coefficients)
inverse_covariance_matrix : ndarray (NxN)
Inverse covariance matrix corresponding to the dependent_variable.
i.e. data weights proportional to 1/sigma**2 where sigma=uncertainty
independent_variable : ndarray (MxN)
The independent_variables that are multiplied by the unknown coefficients
Attributes
----------
p : ndarray
Coefficients of the solution
p_formal_uncertainty : ndarray
Formal uncertainty of the coefficients
p_formal_covariance_matrix : ndarray
Formal covariance matrix of the coefficients (not rescaled)
p_normalised_uncertainty : ndarray
Normalised uncertainty (chi2 = 1) of the coefficients
p_normalised_covariance_matrix : ndarray
Normalised covariance matrix of the coefficients (rescaled to yield chi2=1)
p_correlation_matrix : ndarray
Coefficient correlation matrix
fit : ndarray
Values of the best-fit model
residuals : ndarray
Observed - Calculated (O-C) residuals
chi2 : float
Chi-square value of the best fit
"""
def __init__(self, dependent_variable, inverse_covariance_matrix, independent_variable):
self.y_i = dependent_variable
self.X_ij = independent_variable
self.inverse_covariance_matrix = inverse_covariance_matrix
[docs]
def fit(self):
"""Perform the linear fit."""
# number of measurements/constraints, i.e. number of equations
self.n_measurement = self.X_ij.shape[1]
# number of coefficients, i.e. free parameters
self.n_param = self.X_ij.shape[0]
# number of degrees of freedom
self.n_freedom = self.n_measurement - self.n_param
a_ij = (self.X_ij) @ self.inverse_covariance_matrix
alpha_kj = a_ij @ (self.X_ij.T)
beta_k = a_ij @ self.y_i.T
a_j = np.linalg.solve(alpha_kj, beta_k)
yfit_i = (self.X_ij.T * a_j).T
chi2 = ((yfit_i - self.y_i) @ self.inverse_covariance_matrix) @ (yfit_i - self.y_i).T
c_jk = np.linalg.inv(alpha_kj)
var_aj = np.diag(c_jk)
# the variance on the fitted coefficients are the diagonal terms
# of the coefficient covariance matrix
# if the error bars are not well estimated, it is useful to
# renormalize the variance by the measured chi2
# divided by the expected chi2.
# That means the normalised variances take into account the residual dispersion in the data
normalized_variance_aj = var_aj.T * chi2 / self.n_freedom
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning, message="invalid value encountered in sqrt"
)
stdev_aj = np.sqrt(normalized_variance_aj)
# coefficients of the solution
self.p = np.array(a_j).flatten()
# normalised uncertainty (chi2 = 1) of the coefficients
self.p_normalised_uncertainty = np.array(stdev_aj).flatten()
# values of the best-fit model
self.fit_values = np.array(yfit_i).flatten()
# Observed - Calculated (O-C) residuals
self.residuals = np.array(self.y_i - yfit_i).flatten()
# chi-square value of the best fit
self.chi2 = np.array(chi2)[0][0]
# formal uncertainty of the coefficients
self.p_formal_uncertainty = np.array(np.sqrt(var_aj)).flatten()
# formal covariance matrix of the coefficients (not rescaled)
self.p_formal_covariance_matrix = c_jk
# normalised covariance matrix of the coefficients (rescaled to yield chi2=1)
self.p_normalised_covariance_matrix = c_jk * self.chi2 / self.n_freedom
# compute correlation Matrix
tmp_v = 1.0 / self.p_formal_uncertainty
tmp = np.vstack((tmp_v, np.tile(np.zeros(len(tmp_v)), (len(tmp_v) - 1, 1))))
m = tmp.T
err_matrix = m.dot(m.T)
correlation_matrix = np.multiply(err_matrix, self.p_formal_covariance_matrix.T)
self.p_correlation_matrix = correlation_matrix