Source code for jwst.ami.oifits

import logging
import warnings

import numpy as np
from astropy.stats import sigma_clipped_stats
from astropy.time.core import Time
from scipy.special import comb
from stdatamodels.jwst import datamodels

from jwst.ami import leastsqnrm

log = logging.getLogger(__name__)

__all__ = ["RawOifits", "CalibOifits"]


[docs] class RawOifits: """ Store AMI data in the format required to write out to OIFITS files. Builds the structure needed to write out oifits files according to the schema. Populates the structure with the observables from the fringe fitter. For data arrays, the observables are populated slice-by-slice to enable fitting with FringeFitter and storage as OiFits to take place in the same loop. Angular quantities, initially in radians from fringe fitting, are converted to degrees for saving. Produces averaged and multi-integration versions, with sigma-clipped stats over integrations. Notes ----- Based on ObservablesFromText from ImPlaneIA, e.g. https://github.com/anand0xff/ImPlaneIA/blob/master/nrm_analysis/misctools/implane2oifits.py#L32 """ def __init__(self, instrument_data, method="mean"): """ Initialize the RawOifits object. Parameters ---------- instrument_data : jwst.ami.instrument_data.NIRISS object Information on the mask geometry (namely # holes), instrument, wavelength obs mode. method : str Method to average observables: mean or median. Default mean. """ self.n_holes = 7 self.instrument_data = instrument_data self.nslices = self.instrument_data.nslices # n ints self.n_baselines = int(comb(self.n_holes, 2)) # 21 self.n_closure_phases = int(comb(self.n_holes, 3)) # 35 self.n_closure_amplitudes = int(comb(self.n_holes, 4)) # also 35 self.method = method allowed_methods = ["mean", "median", "multi"] if self.method not in allowed_methods: msg = ( f"method for saving OIFITS file must be one of {allowed_methods}. " "Defaulting to use mean!" ) log.warning(msg) self.method = "mean" self.ctrs_eqt = self.instrument_data.ctrs_eqt self.ctrs_inst = self.instrument_data.ctrs_inst self.bholes, self.bls = self._makebaselines() self.tholes, self.tuv = self._maketriples_all() self.qholes, self.quads = self._makequads_all()
[docs] def initialize_obsarrays(self): """Initialize arrays of observables to empty arrays.""" # empty arrays of observables, (nslices,nobservables) shape. self.fringe_phases = np.zeros((self.nslices, self.n_baselines)) self.fringe_amplitudes = np.zeros((self.nslices, self.n_baselines)) self.closure_phases = np.zeros((self.nslices, self.n_closure_phases)) self.t3_amplitudes = np.zeros((self.nslices, self.n_closure_phases)) self.q4_phases = np.zeros((self.nslices, self.n_closure_amplitudes)) self.closure_amplitudes = np.zeros((self.nslices, self.n_closure_amplitudes)) self.pistons = np.zeros((self.nslices, self.n_holes)) self.solns = np.zeros((self.nslices, 44)) self.fringe_amplitudes_squared = np.zeros((self.nslices, self.n_baselines))
[docs] def populate_obsarray(self, i, nrmslc): """ Populate arrays of observables with fringe fitter results. Parameters ---------- i : int Index of the integration nrmslc : object Object containing the results of the fringe fitting for this integration """ # populate with each integration's observables self.fringe_phases[i, :] = nrmslc.fringephase # FPs in radians self.fringe_amplitudes[i, :] = nrmslc.fringeamp self.closure_phases[i, :] = nrmslc.redundant_cps # CPs in radians self.t3_amplitudes[i, :] = nrmslc.t3_amplitudes self.q4_phases[i, :] = nrmslc.q4_phases # quad phases in radians self.closure_amplitudes[i, :] = nrmslc.redundant_cas self.pistons[i, :] = nrmslc.fringepistons # segment pistons in radians self.solns[i, :] = nrmslc.soln self.fringe_amplitudes_squared[i, :] = nrmslc.fringeamp**2 # squared visibilities
[docs] def rotate_matrix(self, cov_mat, theta): """ Rotate a covariance matrix by an angle. Parameters ---------- cov_mat : array The matrix to be rotated. 2x2 theta : float Angle by which to rotate the matrix (radians) Returns ------- cv_rotated : array The rotated matrix """ c, s = np.cos(theta), np.sin(theta) r_mat = [[c, -s], [s, c]] # coordinate rotation from real/imaginary to absolute value/phase (modulus/argument) return np.linalg.multi_dot([np.transpose(r_mat), cov_mat, r_mat])
[docs] def average_observables(self, averfunc): """ Average all the observables. Calculate covariance matrices between fringe amplitudes/fringe phases, and between triple product amps/closure phases, and closure amplitudes/quad phases. Convert r, theta (modulus, phase) to x,y. Calculate cov(x,y). Rotate resulting 2x2 matrix back to r, theta. Take sqrt of relevant covariance element to be error. This must be done with phases in radians. Parameters ---------- averfunc : function Function for averaging, either np.mean (default) or np.median Returns ------- avg_sqv : array Averaged squared visibilites err_sqv : array Standard error of the mean of averaged squared visibilities avg_fa : array Averaged fringe (visibility) amplitudes err_fa : array Standard error of the mean of averaged fringe (visibility) amplitudes avg_fp : array Averaged fringe phases (rad) err_fp : array Standard error of the mean of averaged fringe phases (rad) avg_cp : array Averaged closure phases (rad) err_cp : array Standard error of the mean of averaged closure phases (rad) avg_t3amp : array Averaged triple amplitudes err_t3amp : array Standard error of the mean of averaged triple amplitudes avg_ca : array Averaged closure amplitudes err_ca : array Standard error of the mean of averaged closure amplitudes avg_q4phi : array Averaged quad phases err_q4phi : array Standard error of the mean of averaged quad phases avg_pist : array Averaged segment pistons err_pist : array Standard error of the mean of averaged segment pistons """ covmats_fringes, covmats_triples, covmats_quads = self.observable_covariances(averfunc) if self.method == "mean": avg_fa, _, std_fa = sigma_clipped_stats(self.fringe_amplitudes, axis=0) avg_fp, _, std_fp = sigma_clipped_stats(self.fringe_phases, axis=0) avg_sqv, _, std_sqv = sigma_clipped_stats(self.fringe_amplitudes**2, axis=0) avg_pist, _, err_pist = sigma_clipped_stats(self.pistons, axis=0) else: # median. std_fa is just for comparing to covariance _, avg_fa, std_fa = sigma_clipped_stats(self.fringe_amplitudes, axis=0) # 21 _, avg_fp, std_fp = sigma_clipped_stats(self.fringe_phases, axis=0) # 21 _, avg_sqv, std_sqv = sigma_clipped_stats(self.fringe_amplitudes**2, axis=0) _, avg_pist, err_pist = sigma_clipped_stats(self.pistons, axis=0) err_pist = err_pist / np.sqrt(self.nslices) # standard error of the mean err_fa, err_fp = self.err_from_covmat(covmats_fringes) # calculate squared visibility (fringe) amplitude uncertainties correctly err_sqv = (2 * avg_fa * err_fa) / np.sqrt(self.nslices) # calculate triple and quad quantities from **averaged** fringe amps and phases avg_t3amp = leastsqnrm.t3_amplitudes(avg_fa, n=self.n_holes) avg_cp = leastsqnrm.redundant_cps(avg_fp, n=self.n_holes) err_t3amp, err_cp = self.err_from_covmat(covmats_triples) avg_ca = leastsqnrm.closure_amplitudes(avg_fa, n=self.n_holes) avg_q4phi = leastsqnrm.q4_phases(avg_fp, n=self.n_holes) err_ca, err_q4phi = self.err_from_covmat(covmats_quads) return ( avg_sqv, err_sqv, avg_fa, err_fa, avg_fp, err_fp, avg_cp, err_cp, avg_t3amp, err_t3amp, avg_ca, err_ca, avg_q4phi, err_q4phi, avg_pist, err_pist, )
[docs] def err_from_covmat(self, covmatlist): """ Derive observable errors from their covariance matrices. Return sqrt of [0,0] and [1,1] elements of each of a list of covariance matrices, divided by sqrt(N_ints), for use as observable errors (standard error of the mean). If using median, error calculation is questionable because this is NOT the standard error of the median. Parameters ---------- covmatlist : array Array of covariance matrices for each baseline/triple/quad shape e.g. (21,2,2) or (35,2,2) Returns ------- err_00 : array Standard errors of the mean of the first observable. shape e.g. (21) err_11 : array Standard errors of the mean of the second observable. shape e.g. (21) """ err_00 = np.sqrt(np.array([covmat[0, 0] for covmat in covmatlist])) / np.sqrt(self.nslices) err_11 = np.sqrt(np.array([covmat[1, 1] for covmat in covmatlist])) / np.sqrt(self.nslices) return err_00, err_11
[docs] def observable_covariances(self, averfunc): """ Calculate covariance matrices from each pair of observables. For each baseline/triple/quad, calculate covariance between each fringe amplitude/phase quantity. Parameters ---------- averfunc : function Function for averaging, either np.mean (default) or np.median Returns ------- cov_mat_fringes : array Array of 21 covariance matrices for fringes (amplitudes, phases) cov_mat_triples : array Array of 35 covariance matrices for triples (t3 amplitudes, closure phases) cov_mat_quads : array Array of 35 covariance matrices for quads (closure amplitudes, quad phases) """ # loop over 21 baselines cov_mat_fringes = [] # These operate on all slices at once, not on already-averaged integrations for bl in np.arange(self.n_baselines): fringeamps = self.fringe_amplitudes[:, bl] fringephases = self.fringe_phases[:, bl] covmat = self.cov_r_theta(fringeamps, fringephases, averfunc) cov_mat_fringes.append(covmat) # loop over 35 triples cov_mat_triples = [] for triple in np.arange(self.n_closure_phases): tripamp = self.t3_amplitudes[:, triple] triphase = self.closure_phases[:, triple] covmat = self.cov_r_theta(tripamp, triphase, averfunc) cov_mat_triples.append(covmat) # loop over 35 quads cov_mat_quads = [] for quad in np.arange(self.n_closure_amplitudes): quadamp = self.closure_amplitudes[:, quad] quadphase = self.q4_phases[:, quad] covmat = self.cov_r_theta(quadamp, quadphase, averfunc) cov_mat_quads.append(covmat) # lists of cov mats have shape e.g. (21, 2, 2) or (35, 2, 2) return ( np.array(cov_mat_fringes), np.array(cov_mat_triples), np.array(cov_mat_quads), )
[docs] def cov_r_theta(self, rr, theta, averfunc): """ Calculate covariance in polar coordinates. Calculate covariance in x, y coordinates, then rotate covariance matrix by **average** phase (over integrations) to get matrix in (r,theta). Parameters ---------- rr : array Complex number modulus theta : array Complex number phase averfunc : function Function for averaging, either np.mean (default) or np.median Returns ------- cov_mat_r_theta : array (2,2) Covariance matrix in r, theta coordinates """ xx = rr * np.cos(theta) yy = rr * np.sin(theta) # np.cov returns NaN if there are too few input values - ignore the warnings. with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Degrees of freedom <= 0", RuntimeWarning) warnings.filterwarnings("ignore", "divide by zero", RuntimeWarning) warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) cov_mat_xy = np.cov(xx, yy) return self.rotate_matrix(cov_mat_xy, averfunc(theta))
[docs] def make_oifits(self): """ Populate AmiOIModel. Perform final manipulations of observable arrays, calculate uncertainties, and populate AmiOIModel. Returns ------- m : AmiOIModel Fully populated datamodel """ instrument_data = self.instrument_data observation_date = Time( f"{instrument_data.year}-{instrument_data.month}-{instrument_data.day}", format="fits", ) # central wavelength, equiv. width from effstims used for fringe fitting wl = instrument_data.lam_c e_wl = instrument_data.lam_c * instrument_data.lam_w flag_vis = [False] * self.n_baselines flag_t3 = [False] * self.n_closure_phases flag_q4 = [False] * self.n_closure_amplitudes # Average observables (or don't), and get uncertainties # Unwrap phases shift2pi = np.zeros(self.closure_phases.shape) shift2pi[self.closure_phases >= 6] = 2 * np.pi shift2pi[self.closure_phases <= -6] = -2 * np.pi self.closure_phases -= shift2pi # Now we are setting up the observables to be written out to OIFITS # set these as attributes (some may exist and be overwritten) if self.method == "multi": self.vis2 = self.fringe_amplitudes_squared.T self.e_vis2 = np.zeros(self.vis2.shape) self.visamp = self.fringe_amplitudes.T self.e_visamp = np.zeros(self.visamp.shape) self.visphi = self.fringe_phases.T self.e_visphi = np.zeros(self.visphi.shape) self.closure_phases = self.closure_phases.T self.e_cp = np.zeros(self.closure_phases.shape) self.t3amp = self.t3_amplitudes.T self.e_t3amp = np.zeros(self.t3amp.shape) self.q4phi = self.q4_phases.T self.e_q4phi = np.zeros(self.q4phi.shape) self.camp = self.closure_amplitudes.T self.e_camp = np.zeros(self.camp.shape) self.pist = self.pistons.T self.e_pist = np.zeros(self.pist.shape) elif self.method == "mean": ( self.vis2, self.e_vis2, self.visamp, self.e_visamp, self.visphi, self.e_visphi, self.closure_phases, self.e_cp, self.t3amp, self.e_t3amp, self.camp, self.e_camp, self.q4phi, self.e_q4phi, self.pist, self.e_pist, ) = self.average_observables(np.mean) else: # take the median ( self.vis2, self.e_vis2, self.visamp, self.e_visamp, self.visphi, self.e_visphi, self.closure_phases, self.e_cp, self.t3amp, self.e_t3amp, self.camp, self.e_camp, self.q4phi, self.e_q4phi, self.pist, self.e_pist, ) = self.average_observables(np.median) # Convert angular quantities from radians to degrees self.visphi = np.rad2deg(self.visphi) self.e_visphi = np.rad2deg(self.e_visphi) self.closure_phases = np.rad2deg(self.closure_phases) self.e_cp = np.rad2deg(self.e_cp) self.q4phi = np.rad2deg(self.q4phi) self.e_q4phi = np.rad2deg(self.e_q4phi) self.pist = np.rad2deg(self.pist) self.e_pist = np.rad2deg(self.e_pist) # prepare arrays for OI_ARRAY ext self.staxy = instrument_data.ctrs_inst tel_name = [f"A{x:d}" % x for x in np.arange(self.n_holes) + 1] sta_name = tel_name diameter = [0] * self.n_holes staxyz = [] for x in self.staxy: a = list(x) line = [a[0], a[1], 0] staxyz.append(line) sta_index = np.arange(self.n_holes) + 1 pscale = instrument_data.pscale_mas / 1000.0 # arcsec # Size of the image to extract NRM data isz = self.instrument_data.isz fov = [pscale * isz] * self.n_holes fovtype = ["RADIUS"] * self.n_holes oim = datamodels.AmiOIModel() self.init_oimodel_arrays(oim) # primary header keywords oim.meta.telescope = instrument_data.telname oim.meta.origin = "STScI" oim.meta.instrument.name = instrument_data.instrument oim.meta.program.pi_name = instrument_data.pi_name oim.meta.target.proposer_name = instrument_data.proposer_name oim.meta.observation.date = observation_date.fits oim.meta.oifits.array_name = instrument_data.arrname oim.meta.oifits.instrument_mode = instrument_data.pupil oim.meta.guidestar.fgs_roll_ref = instrument_data.roll_ref oim.meta.guidestar.fgs_v3yangle = instrument_data.v3iyang oim.meta.guidestar.fgs_vparity = instrument_data.vparity # oi_array extension data oim.array["TEL_NAME"] = tel_name oim.array["STA_NAME"] = sta_name oim.array["STA_INDEX"] = sta_index oim.array["DIAMETER"] = diameter oim.array["STAXYZ"] = staxyz oim.array["FOV"] = fov oim.array["FOVTYPE"] = fovtype oim.array["CTRS_EQT"] = instrument_data.ctrs_eqt oim.array["PISTONS"] = self.pist oim.array["PIST_ERR"] = self.e_pist # oi_target extension data oim.target["TARGET_ID"] = [1] oim.target["TARGET"] = instrument_data.objname oim.target["RAEP0"] = instrument_data.ra oim.target["DECEP0"] = instrument_data.dec oim.target["EQUINOX"] = [2000] oim.target["RA_ERR"] = instrument_data.ra_uncertainty oim.target["DEC_ERR"] = instrument_data.dec_uncertainty oim.target["SYSVEL"] = [0] oim.target["VELTYP"] = ["UNKNOWN"] oim.target["VELDEF"] = ["OPTICAL"] oim.target["PMRA"] = instrument_data.pmra oim.target["PMDEC"] = instrument_data.pmdec oim.target["PMRA_ERR"] = [0] oim.target["PMDEC_ERR"] = [0] oim.target["PARALLAX"] = [0] oim.target["PARA_ERR"] = [0] oim.target["SPECTYP"] = ["UNKNOWN"] # oi_vis extension data oim.vis["TARGET_ID"] = 1 oim.vis["TIME"] = 0 oim.vis["MJD"] = observation_date.mjd oim.vis["INT_TIME"] = instrument_data.itime oim.vis["VISAMP"] = self.visamp oim.vis["VISAMPERR"] = self.e_visamp oim.vis["VISPHI"] = self.visphi oim.vis["VISPHIERR"] = self.e_visphi # for all u-v coords: index 0 and 1 reversed to get the good coverage (same fft) oim.vis["UCOORD"] = self.bls[:, 1] oim.vis["VCOORD"] = self.bls[:, 0] oim.vis["STA_INDEX"] = self._format_staindex(self.bholes) oim.vis["FLAG"] = flag_vis # oi_vis2 extension data oim.vis2["TARGET_ID"] = 1 oim.vis2["TIME"] = 0 oim.vis2["MJD"] = observation_date.mjd oim.vis2["INT_TIME"] = instrument_data.itime oim.vis2["VIS2DATA"] = self.vis2 oim.vis2["VIS2ERR"] = self.e_vis2 oim.vis2["UCOORD"] = self.bls[:, 1] oim.vis2["VCOORD"] = self.bls[:, 0] oim.vis2["STA_INDEX"] = self._format_staindex(self.bholes) oim.vis2["FLAG"] = flag_vis # oi_t3 extension data oim.t3["TARGET_ID"] = 1 oim.t3["TIME"] = 0 oim.t3["MJD"] = observation_date.mjd oim.t3["INT_TIME"] = instrument_data.itime oim.t3["T3AMP"] = self.t3amp oim.t3["T3AMPERR"] = self.e_t3amp oim.t3["T3PHI"] = self.closure_phases oim.t3["T3PHIERR"] = self.e_cp oim.t3["U1COORD"] = self.tuv[:, 0, 1] oim.t3["V1COORD"] = self.tuv[:, 0, 0] oim.t3["U2COORD"] = self.tuv[:, 1, 1] oim.t3["V2COORD"] = self.tuv[:, 1, 0] oim.t3["STA_INDEX"] = self._format_staindex(self.tholes) oim.t3["FLAG"] = flag_t3 # oi_q4 extension data oim.q4["TARGET_ID"] = 1 oim.q4["TIME"] = 0 oim.q4["MJD"] = observation_date.mjd oim.q4["INT_TIME"] = instrument_data.itime oim.q4["Q4AMP"] = self.camp oim.q4["Q4AMPERR"] = self.e_camp oim.q4["Q4PHI"] = self.q4phi oim.q4["Q4PHIERR"] = self.e_q4phi oim.q4["U1COORD"] = self.quads[:, 0, 1] oim.q4["V1COORD"] = self.quads[:, 0, 0] oim.q4["U2COORD"] = self.quads[:, 1, 1] oim.q4["V2COORD"] = self.quads[:, 1, 0] oim.q4["U3COORD"] = self.quads[:, 2, 1] oim.q4["V3COORD"] = self.quads[:, 2, 0] oim.q4["STA_INDEX"] = self._format_staindex(self.qholes) oim.q4["FLAG"] = flag_q4 # oi_wavelength extension data oim.wavelength["EFF_WAVE"] = wl oim.wavelength["EFF_BAND"] = e_wl return oim
[docs] def init_oimodel_arrays(self, oimodel): """ Set dtypes and initialize shapes for AmiOiModel arrays. Supports averaged or multi-integration versions of oimodel. Parameters ---------- oimodel : AmiOIModel object Empty model """ if self.method == "multi": # update dimensions of arrays for multi-integration oifits target_dtype = oimodel.target.dtype wavelength_dtype = np.dtype([("EFF_WAVE", "<f4"), ("EFF_BAND", "<f4")]) array_dtype = np.dtype( [ ("TEL_NAME", "S16"), ("STA_NAME", "S16"), ("STA_INDEX", "<i2"), ("DIAMETER", "<f4"), ("STAXYZ", "<f8", (3,)), ("FOV", "<f8"), ("FOVTYPE", "S6"), ("CTRS_EQT", "<f8", (2,)), ("PISTONS", "<f8", (self.nslices,)), ("PIST_ERR", "<f8", (self.nslices,)), ] ) vis_dtype = np.dtype( [ ("TARGET_ID", "<i2"), ("TIME", "<f8"), ("MJD", "<f8"), ("INT_TIME", "<f8"), ("VISAMP", "<f8", (self.nslices,)), ("VISAMPERR", "<f8", (self.nslices,)), ("VISPHI", "<f8", (self.nslices,)), ("VISPHIERR", "<f8", (self.nslices,)), ("UCOORD", "<f8"), ("VCOORD", "<f8"), ("STA_INDEX", "<i2", (2,)), ("FLAG", "i1"), ] ) vis2_dtype = np.dtype( [ ("TARGET_ID", "<i2"), ("TIME", "<f8"), ("MJD", "<f8"), ("INT_TIME", "<f8"), ("VIS2DATA", "<f8", (self.nslices,)), ("VIS2ERR", "<f8", (self.nslices,)), ("UCOORD", "<f8"), ("VCOORD", "<f8"), ("STA_INDEX", "<i2", (2,)), ("FLAG", "i1"), ] ) t3_dtype = np.dtype( [ ("TARGET_ID", "<i2"), ("TIME", "<f8"), ("MJD", "<f8"), ("INT_TIME", "<f8"), ("T3AMP", "<f8", (self.nslices,)), ("T3AMPERR", "<f8", (self.nslices,)), ("T3PHI", "<f8", (self.nslices,)), ("T3PHIERR", "<f8", (self.nslices,)), ("U1COORD", "<f8"), ("V1COORD", "<f8"), ("U2COORD", "<f8"), ("V2COORD", "<f8"), ("STA_INDEX", "<i2", (3,)), ("FLAG", "i1"), ] ) q4_dtype = np.dtype( [ ("TARGET_ID", "<i2"), ("TIME", "<f8"), ("MJD", "<f8"), ("INT_TIME", "<f8"), ("Q4AMP", "<f8", (self.nslices,)), ("Q4AMPERR", "<f8", (self.nslices,)), ("Q4PHI", "<f8", (self.nslices,)), ("Q4PHIERR", "<f8", (self.nslices,)), ("U1COORD", "<f8"), ("V1COORD", "<f8"), ("U2COORD", "<f8"), ("V2COORD", "<f8"), ("U3COORD", "<f8"), ("V3COORD", "<f8"), ("STA_INDEX", "<i2", (4,)), ("FLAG", "i1"), ] ) else: target_dtype = oimodel.target.dtype wavelength_dtype = oimodel.wavelength.dtype array_dtype = np.dtype( [ ("TEL_NAME", "S16"), ("STA_NAME", "S16"), ("STA_INDEX", "<i2"), ("DIAMETER", "<f4"), ("STAXYZ", "<f8", (3,)), ("FOV", "<f8"), ("FOVTYPE", "S6"), ("CTRS_EQT", "<f8", (2,)), ("PISTONS", "<f8"), ("PIST_ERR", "<f8"), ] ) vis_dtype = oimodel.vis.dtype vis2_dtype = oimodel.vis2.dtype t3_dtype = oimodel.t3.dtype q4_dtype = oimodel.q4.dtype oimodel.array = np.zeros(self.n_holes, dtype=array_dtype) oimodel.target = np.zeros(1, dtype=target_dtype) oimodel.vis = np.zeros(self.n_baselines, dtype=vis_dtype) oimodel.vis2 = np.zeros(self.n_baselines, dtype=vis2_dtype) oimodel.t3 = np.zeros(self.n_closure_phases, dtype=t3_dtype) oimodel.q4 = np.zeros(self.n_closure_amplitudes, dtype=q4_dtype) oimodel.wavelength = np.zeros(1, dtype=wavelength_dtype)
def _maketriples_all(self): """ Calculate all three-hole combinations, baselines. Returns ------- tarray : int array Triple hole indices (0-indexed), float array of two uv vectors in all triangles """ tlist = [] uvlist = [] for i in range(self.n_holes): for j in range(self.n_holes): for k in range(self.n_holes): if i < j and j < k: tlist.append((i, j, k)) uvlist.append( ( self.ctrs_eqt[i] - self.ctrs_eqt[j], self.ctrs_eqt[j] - self.ctrs_eqt[k], ) ) tarray = np.array(tlist).astype(int) return tarray, np.array(uvlist) def _makebaselines(self): """ Calculate all hole pairs, baselines. Returns ------- barray : list Hole pairs indices, 0-indexed float array Array of baselines """ blist = [] bllist = [] for i in range(self.n_holes): for j in range(self.n_holes): if i < j: blist.append((i, j)) bllist.append(self.ctrs_eqt[i] - self.ctrs_eqt[j]) return np.array(blist).astype(int), np.array(bllist) def _makequads_all(self): """ Calculate all four-hole combinations (quads). Returns ------- qarray : int array Array of four-hole quads (0-based) uvwlist : numpy array Array of u, v, w vectors for each quad """ qlist = [] uvwlist = [] for i in range(self.n_holes): for j in range(self.n_holes): for k in range(self.n_holes): for q in range(self.n_holes): if i < j and j < k and k < q: qlist.append((i, j, k, q)) uvwlist.append( ( self.ctrs_eqt[i] - self.ctrs_eqt[j], self.ctrs_eqt[j] - self.ctrs_eqt[k], self.ctrs_eqt[k] - self.ctrs_eqt[q], ) ) qarray = np.array(qlist).astype(int) return qarray, np.array(uvwlist) def _format_staindex(self, tab): """ Convert sta_index for oifits formats (T3, Q4, V2, etc.). Parameters ---------- tab : np.ndarray Array of indices (rows of 2, 3, 4, ... elements) Returns ------- sta_index : list of int arrays List of arrays of hole baseline indices (of length 2, 3, 4, etc.) """ sta_index = [] offset = 1 if np.min(tab) == 0 else 0 # 1-indexed for row in tab: line = np.array(row, dtype=int) + offset sta_index.append(line) return sta_index
[docs] class CalibOifits: """ Produce a final calibrated AmiOIModel. Calibrate (normalize) an AMI observation by subtracting closure phases of a reference star from those of a target and dividing visibility amplitudes of the target by those of the reference star. Only closure phases, visibility amplitudes, squared visibilites, and closure amplitudes are calibrated currently. For other observables (t3amp, q4phi), calibrated output file will contain a copy of target's observables. """ def __init__(self, targoimodel, caloimodel): """ Initialize the CalibOifits object. Parameters ---------- targoimodel : AmiOIModlel The target caloimodel : AmiOIModlel The reference star (calibrator) """ self.targoimodel = targoimodel self.caloimodel = caloimodel self.calib_oimodel = targoimodel.copy()
[docs] def update_dtype(self): """Modify OI array dtype to include different piston columns for calibrated OIFITS files.""" nrows = 7 modified_dtype = np.dtype( [ ("TEL_NAME", "S16"), ("STA_NAME", "S16"), ("STA_INDEX", "<i2"), ("DIAMETER", "<f4"), ("STAXYZ", "<f8", (3,)), ("FOV", "<f8"), ("FOVTYPE", "S6"), ("CTRS_EQT", "<f8", (2,)), ("PISTON_T", "<f8"), ("PISTON_C", "<f8"), ("PIST_ERR", "<f8"), ] ) self.calib_oimodel.array = np.zeros(nrows, dtype=modified_dtype)
[docs] def calibrate(self): """ Calibrate the target AmiOIModel by the calibrator (reference star) AmiOIModel. Returns ------- calib_oimodel : AmiOIModel Calibrated AMI datamodel """ cp_out = self.targoimodel.t3["T3PHI"] - self.caloimodel.t3["T3PHI"] sqv_out = self.targoimodel.vis2["VIS2DATA"] / self.caloimodel.vis2["VIS2DATA"] va_out = self.targoimodel.vis["VISAMP"] / self.caloimodel.vis["VISAMP"] ca_out = np.log(self.targoimodel.q4["Q4AMP"] / self.caloimodel.q4["Q4AMP"]) # log of ratio # using standard propagation of error for multiplication/division # which assumes uncorrelated Gaussian errors (questionable) cperr_t = self.targoimodel.t3["T3PHIERR"] cperr_c = self.caloimodel.t3["T3PHIERR"] sqverr_c = self.targoimodel.vis2["VIS2ERR"] sqverr_t = self.caloimodel.vis2["VIS2ERR"] vaerr_t = self.targoimodel.vis["VISAMPERR"] vaerr_c = self.caloimodel.vis["VISAMPERR"] caerr_t = self.targoimodel.q4["Q4AMPERR"] caerr_c = self.caloimodel.q4["Q4AMPERR"] cperr_out = np.sqrt(cperr_t**2.0 + cperr_c**2.0) sqverr_out = sqv_out * np.sqrt( (sqverr_t / self.targoimodel.vis2["VIS2DATA"]) ** 2.0 + (sqverr_c / self.caloimodel.vis2["VIS2DATA"]) ** 2.0 ) vaerr_out = va_out * np.sqrt( (vaerr_t / self.targoimodel.vis["VISAMP"]) ** 2.0 + (vaerr_c / self.caloimodel.vis["VISAMP"]) ** 2.0 ) caerr_out = np.sqrt( (caerr_t / self.targoimodel.q4["Q4AMP"]) ** 2 + (caerr_c / self.caloimodel.q4["Q4AMP"]) ** 2 ) pistons_t = self.targoimodel.array["PISTONS"] pisterr_t = self.targoimodel.array["PIST_ERR"] pistons_c = self.caloimodel.array["PISTONS"] pisterr_c = self.caloimodel.array["PIST_ERR"] # sum in quadrature errors from target and calibrator pistons pisterr_out = np.sqrt(pisterr_t**2 + pisterr_c**2) # update OI array, which is currently all zeros, with input oi array # and updated piston columns self.update_dtype() self.calib_oimodel.array["TEL_NAME"] = self.targoimodel.array["TEL_NAME"] self.calib_oimodel.array["STA_NAME"] = self.targoimodel.array["STA_NAME"] self.calib_oimodel.array["STA_INDEX"] = self.targoimodel.array["STA_INDEX"] self.calib_oimodel.array["DIAMETER"] = self.targoimodel.array["DIAMETER"] self.calib_oimodel.array["STAXYZ"] = self.targoimodel.array["STAXYZ"] self.calib_oimodel.array["FOV"] = self.targoimodel.array["FOV"] self.calib_oimodel.array["FOVTYPE"] = self.targoimodel.array["FOVTYPE"] self.calib_oimodel.array["CTRS_EQT"] = self.targoimodel.array["CTRS_EQT"] self.calib_oimodel.array["PISTON_T"] = pistons_t self.calib_oimodel.array["PISTON_C"] = pistons_c self.calib_oimodel.array["PIST_ERR"] = pisterr_out # update calibrated oimodel arrays with calibrated observables self.calib_oimodel.t3["T3PHI"] = cp_out self.calib_oimodel.t3["T3PHIERR"] = cperr_out self.calib_oimodel.vis2["VIS2DATA"] = sqv_out self.calib_oimodel.vis2["VIS2ERR"] = sqverr_out self.calib_oimodel.vis["VISAMP"] = va_out self.calib_oimodel.vis["VISAMPERR"] = vaerr_out self.calib_oimodel.q4["Q4AMP"] = ca_out self.calib_oimodel.q4["Q4AMPERR"] = caerr_out # add calibrated header keywords calname = self.caloimodel.meta.target.proposer_name # name of calibrator star self.calib_oimodel.meta.ami.calibrator_object_id = calname return self.calib_oimodel