import logging
import numpy as np
from jwst.ami import analyticnrm2, mask_definition_ami, utils
from jwst.ami import leastsqnrm as leastsqnrm
log = logging.getLogger(__name__)
m = 1.0
mm = 1.0e-3 * m
um = 1.0e-6 * m
mas = 1.0e-3 / (60 * 60 * 180 / np.pi) # in radians
__all__ = ["LgModel", "goodness_of_fit", "run_data_correlate"]
[docs]
class LgModel:
"""
A class for conveniently dealing with an NRM object.
This class:
* Defines mask geometry and detector-scale parameters.
* Simulates PSF (broadband or monochromatic).
* Builds a fringe model - either by user definition, or automated to data.
* Fits model to data by least squares.
Masks: ``jwst_ami`` (formerly ``jwst_g7s6c``)
Parameters
----------
nrm_model : `~stdatamodels.jwst.datamodels.NRMModel` or \
`~jwst.ami.mask_definition_ami.NRMDefinition`
Datamodel containing mask geometry information
pixscale : float
Initial estimate of pixel scale in radians
bandpass : ndarray[float]
Array of the form: ``[(weight1, wavl1), (weight2, wavl2), ...]``
mask : str
Keyword for built-in values
holeshape : str
Shape of apertures; default: "hex"
over : int
Oversampling factor
phi : float 1D array
Distance of fringe from hole center in units of waves
chooseholes : list of strings, default None
E.g., ``['B2', 'B4', 'B5', 'B6']`` for a four-hole mask.
If None, use the real seven-hole mask.
affine2d : `~jwst.ami.utils.Affine2d`
Affine2d object
Notes
-----
Algorithm documented in Greenbaum, A. Z., Pueyo, L. P., Sivaramakrishnan,
A., and Lacour, S., Astrophysical Journal vol. 798, Jan 2015.
First written by Alexandra Greenbaum in 2014.
"""
def __init__(
self,
nrm_model,
pixscale,
bandpass,
mask="jwst_ami",
holeshape="hex",
over=1,
phi=None,
chooseholes=None,
affine2d=None,
):
self.bandpass = bandpass
self.holeshape = holeshape
self.pixel = pixscale # det pix in rad (square)
self.over = over
self.mask = mask_definition_ami.NRMDefinition(
nrm_model, maskname=mask, chooseholes=chooseholes
)
self.ctrs = self.mask.ctrs
self.d = self.mask.hdia
self.D = self.mask.active_D
self.N = len(self.ctrs)
self.fmt = "%10.4e"
# get closest in time OPD from STPSF?
if (phi is None) or (phi == "perfect"): # meters of OPD at central wavelength
self.phi = np.zeros(self.N) # backwards compatibility
else:
self.phi = phi
self.chooseholes = chooseholes
# affine2d property not to be changed in LgModel - create a new
# instance instead
# Save affine deformation of pupil object or create a no-deformation
# object.
# We apply this when sampling the PSF, not to the pupil geometry.
if affine2d is None:
self.affine2d = utils.Affine2d(
mx=1.0, my=1.0, sx=0.0, sy=0.0, xo=0.0, yo=0.0, name="Ideal"
)
else:
self.affine2d = affine2d
[docs]
def simulate(self, fov, psf_offset=(0, 0)):
"""
Simulate a detector-scale PSF.
Use parameters input from the call and
already stored in the object, and generate a simulation FITS header
storing all of the parameters used to generate that PSF. If the input
bandpass is one number, it will calculate a monochromatic PSF.
Parameters
----------
fov : int
Number of detector pixels on a side
psf_offset : tuple of int
Offset from center of array in units of detector pixels.
Returns
-------
float 2D array
Simulated PSF of shape ``(fov, fov)`` that is also
stored as ``self.psf``
"""
# First set up conditions for choosing various parameters
self.psf_over = np.zeros((self.over * fov, self.over * fov))
nspec = 0
# accumulate polychromatic oversampled psf in the object
for w, l in self.bandpass: # w: wavelength's weight, l: lambda (wavelength)
self.psf_over += w * analyticnrm2.psf(
self.pixel, # det pixel, rad
fov, # in detpix number
self.over,
self.ctrs,
self.d,
l,
self.phi,
psf_offset, # det pixels
self.affine2d,
shape=self.holeshape,
)
# offset signs fixed to agree w/DS9, +x shifts ctr R, +y shifts up
nspec += 1
# store the detector pixel scale psf in the object
self.psf = utils.rebin(self.psf_over, (self.over, self.over))
return self.psf
[docs]
def make_model(self, fov, psf_offset=(0, 0)):
"""
Generate the fringe model.
Use the attributes of the object with a bandpass that is either a single
wavelength or a list of tuples of the form::
[(weight1, wavl1), (weight2, wavl2), ...]
The model is
a collection of fringe intensities, where ``nholes = 7`` means the model
has a ``@D`` slice for each of 21 cosines, 21 sines, a DC-like, and a flux
slice for a total of 44 2D slices.
Parameters
----------
fov : int
Number of detector pixels on a side
psf_offset : tuple of int
Offset from center of array in units of detector pixels.
Returns
-------
ndarray[float]
Generated fringe model in the shape of ``(fov, fov, N * (N - 1) + 2)``
that is also stored as ``self.model``
"""
self.fov = fov
# The model shape is (fov) x (fov) x (# solution coefficients)
# the coefficient refers to the terms in the analytic equation
# There are N(N-1) independent pistons, double-counted by cosine
# and sine, one constant term and a DC offset.
self.model = np.zeros((self.fov, self.fov, self.N * (self.N - 1) + 2))
self.model_beam = np.zeros((self.over * self.fov, self.over * self.fov))
self.fringes = np.zeros(
(self.N * (self.N - 1) + 1, self.over * self.fov, self.over * self.fov)
)
for w, l in self.bandpass: # w: weight, l: lambda (wavelength)
# model_array returns the envelope and fringe model as a list of
# oversampled fov x fov slices
pb, ff = analyticnrm2.model_array(
self.ctrs,
l,
self.over,
self.pixel,
self.fov,
self.d,
shape=self.holeshape,
psf_offset=psf_offset,
affine2d=self.affine2d,
)
log.debug(f"Passed to model_array: psf_offset: {psf_offset}")
log.debug(f"Primary beam in the model created: {pb}")
self.model_beam += pb
self.fringes += ff
# this routine multiplies the envelope by each fringe "image"
model_over = leastsqnrm.multiplyenv(pb, ff)
model_binned = np.zeros((self.fov, self.fov, model_over.shape[2]))
# loop over slices "sl" in the model
for sl in range(model_over.shape[2]):
model_binned[:, :, sl] = utils.rebin(model_over[:, :, sl], (self.over, self.over))
self.model += w * model_binned
return self.model
[docs]
def fit_image(
self,
image,
model_in,
dqm=None,
weighted=False,
):
"""
Run a least-squares fit on an input image.
Find the appropriate wavelength scale and rotation.
If a model is not specified then this
method will find the appropriate wavelength scale, rotation (and
hopefully centering as well -- This is not written into the object yet,
but should be soon). Without specifying a model, this method can take a
reference image (a cropped deNaNed version of the data) to run
correlations. It is recommended that the symmetric part of the data be
used to avoid piston confusion in scaling.
Parameters
----------
image : 2D float array
Input image
model_in : 2D float array
Model image
dqm : 2D array
Bad pixel mask of same dimensions as image
weighted : bool
Use weighted operations in the least squares routine
"""
# TODO: change name of self.singvals or self.linfit_results to be the same, for consistency.
# This would be easier if matrix_operations and weighted_operations both did their fitting
# with scipy
self.weighted = weighted
self.fittingmodel = model_in
if dqm is None:
dqm = np.zeros(image.shape, dtype="bool")
if not weighted:
self.soln, self.residual, self.cond, self.linfit_result = leastsqnrm.matrix_operations(
image, self.fittingmodel, dqm=dqm
)
else:
self.soln, self.residual, self.cond, self.singvals = leastsqnrm.weighted_operations(
image, self.fittingmodel, dqm=dqm
)
self.rawDC = self.soln[-1]
self.flux = self.soln[0]
self.soln = self.soln / self.soln[0]
# fringephase now in radians
self.fringeamp, self.fringephase = leastsqnrm.tan2visibilities(self.soln)
self.fringepistons = utils.fringes2pistons(self.fringephase, len(self.ctrs))
self.redundant_cps = leastsqnrm.redundant_cps(self.fringephase, n=self.N)
# RC 8/24
self.t3_amplitudes = leastsqnrm.t3_amplitudes(self.fringeamp, n=self.N)
self.redundant_cas = leastsqnrm.closure_amplitudes(self.fringeamp, n=self.N)
self.q4_phases = leastsqnrm.q4_phases(self.fringephase, n=self.N) # RC 8/24
[docs]
def create_modelpsf(self):
"""
Make an image from the object's model and fit solutions.
The result is stored in the ``self.modelpsf`` attribute.
"""
try:
self.modelpsf = np.zeros((self.fov, self.fov))
except AttributeError:
self.modelpsf = np.zeros((self.fov_sim, self.fov_sim))
for ind, coeff in enumerate(self.soln):
self.modelpsf += self.flux * coeff * self.fittingmodel[:, :, ind]
[docs]
def goodness_of_fit(data, bestfit, disk_r=8):
"""
Calculate goodness of fit between the data and the fit.
Parameters
----------
data : 2D float array
Input image
bestfit : 2D float array
Fit to input image
disk_r : int
Radius of disk
Returns
-------
float
Goodness of fit
"""
mask = (
np.ones(data.shape)
+ utils.makedisk(data.shape[0], 2)
- utils.makedisk(data.shape[0], disk_r)
)
difference = np.ma.masked_invalid(mask * (bestfit - data))
masked_data = np.ma.masked_invalid(mask * data)
return abs(difference).sum() / abs(masked_data).sum()
[docs]
def run_data_correlate(data, model):
"""
Calculate correlation between data and model.
Parameters
----------
data : 2D float array
Reference image
model : 2D float array
Simulated PSF
Returns
-------
2D float array
Correlation between data and model
"""
sci = data
log.debug("shape sci: %s", np.shape(sci))
return utils.rcrosscorrelate(sci, model)