"""Base classes which define the Level2 Associations."""
import copy
import logging
import re
from collections import deque
from os.path import split
from pathlib import Path
from jwst.associations import Association, libpath
from jwst.associations.exceptions import AssociationNotValidError
from jwst.associations.lib.acid import ACID
from jwst.associations.lib.constraint import (
Constraint,
SimpleConstraint,
)
from jwst.associations.lib.dms_base import (
IMAGE2_NONSCIENCE_EXP_TYPES,
IMAGE2_SCIENCE_EXP_TYPES,
PRODUCT_NAME_DEFAULT,
SPEC2_SCIENCE_EXP_TYPES,
Constraint_TargetAcq,
DMSAttrConstraint,
DMSBaseMixin,
)
from jwst.associations.lib.member import Member
from jwst.associations.lib.process_list import ListCategory
from jwst.associations.lib.prune import prune
from jwst.associations.lib.rules_level3_base import _EMPTY, DMS_Level3_Base
from jwst.associations.lib.rules_level3_base import Utility as Utility_Level3
from jwst.associations.lib.utilities import getattr_from_list, getattr_from_list_nofail
from jwst.associations.registry import RegistryMarker
from jwst.lib.suffix import remove_suffix
# Configure logging
logger = logging.getLogger(__name__)
__all__ = [
"_EMPTY",
"ASN_SCHEMA",
"AsnMixin_Lv2Image",
"AsnMixin_Lv2Nod",
"AsnMixin_Lv2Imprint",
"AsnMixin_Lv2Special",
"AsnMixin_Lv2Spectral",
"AsnMixin_Lv2WFSS",
"Constraint_Background",
"Constraint_Base",
"Constraint_ExtCal",
"Constraint_Image_Nonscience",
"Constraint_Image_Science",
"Constraint_Imprint",
"Constraint_Imprint_Special",
"Constraint_Mode",
"Constraint_Single_Science",
"Constraint_Special",
"Constraint_Spectral_Science",
"Constraint_Target",
"DMSLevel2bBase",
"DMSAttrConstraint",
"Utility",
]
# The schema that these associations must adhere to.
ASN_SCHEMA = RegistryMarker.schema(libpath() / "asn_schema_jw_level2b.json")
# Flag to exposure type
FLAG_TO_EXPTYPE = {
"background": "background",
}
# File templates
_DMS_POOLNAME_REGEX = r"jw(\d{5})_(\d{3})_(\d{8}[Tt]\d{6})_pool"
_LEVEL1B_REGEX = r"(?P<path>.+)(?P<type>_uncal)(?P<extension>\..+)"
# Key that uniquely identifies items.
KEY = "expname"
[docs]
class DMSLevel2bBase(DMSBaseMixin, Association):
"""Basic class for DMS Level2 associations."""
# Set the validation schema
schema_file = ASN_SCHEMA.schema
# Attribute values that are indicate the
# attribute is not specified.
INVALID_VALUES = _EMPTY
def __init__(self, *args, **kwargs):
super(DMSLevel2bBase, self).__init__(*args, **kwargs)
# Initialize validity checks
self.validity.update(
{
"has_science": {
"validated": False,
"check": lambda member: member["exptype"] == "science",
},
"allowed_candidates": {"validated": False, "check": self.validate_candidates},
}
)
# Other presumptions on the association
if "constraints" not in self.data:
self.data["constraints"] = "No constraints"
if "asn_type" not in self.data:
self.data["asn_type"] = "user_built"
if "asn_id" not in self.data:
self.data["asn_id"] = "a3001"
if "asn_pool" not in self.data:
self.data["asn_pool"] = "none"
[docs]
def get_exposure_type(self, item, default="science"):
"""
General Level 2 override of exposure type definition.
The exposure type definition is overridden from the default
for the following cases:
- 'psf' -> 'science'
Parameters
----------
item : dict
The pool entry to determine the exposure type of
default : str or None
The default exposure type.
If None, routine will raise LookupError.
Returns
-------
exposure_type : str
Always what is defined as ``default``
"""
self.original_exposure_type = super(DMSLevel2bBase, self).get_exposure_type(
item, default=default
)
if self.original_exposure_type == "psf":
return default
return self.original_exposure_type
[docs]
def members_by_type(self, member_type):
"""
Get list of members by their exposure type.
Returns
-------
list of str
List of members.
"""
member_type = member_type.lower()
try:
members = self.current_product["members"]
except KeyError:
result = []
else:
result = [member for member in members if member_type == member["exptype"].lower()]
return result
[docs]
def has_science(self):
"""
Check association for a science member.
Returns
-------
bool
True if it does.
"""
limit_reached = len(self.members_by_type("science")) >= 1
return limit_reached
[docs]
def dms_product_name(self):
"""
Define product name.
Returns
-------
str
The product name.
"""
try:
science = self.members_by_type("science")[0]
except IndexError:
return PRODUCT_NAME_DEFAULT
try:
path_expname = Path(science["expname"])
science_path = path_expname.parent / path_expname.stem
except Exception:
return PRODUCT_NAME_DEFAULT
no_suffix_path, separator = remove_suffix(str(science_path))
return no_suffix_path
[docs]
def make_member(self, item):
"""
Create a member from the item.
Parameters
----------
item : dict
The item to create member from.
Returns
-------
member : Member
The member
"""
# Set exposure error status.
try:
exposerr = item["exposerr"]
except KeyError:
exposerr = None
# Create the member.
# The various `is_item_xxx` methods are used to determine whether the name
# should represent the form of the data product containing all integrations.
member = Member(
{
"expname": Utility.rename_to_level2a(
item["filename"],
use_integrations=(
self.is_item_coron(item)
|
# NIS_AMI used to use rate files;
# updated to use rateints
self.is_item_ami(item)
| self.is_item_tso(item)
),
),
"exptype": self.get_exposure_type(item),
"exposerr": exposerr,
},
item=item,
)
return member
def _init_hook(self, item):
"""Post-check and pre-add initialization."""
self.data["target"] = item["targetid"]
self.data["program"] = "{:0>5s}".format(item["program"])
self.data["asn_pool"] = Path(item.meta["pool_file"]).name
self.data["constraints"] = str(self.constraints)
self.data["asn_id"] = self.acid.id
self.new_product(self.dms_product_name())
def _add(self, item):
"""
Add item to this association.
Parameters
----------
item : dict
The item to be adding.
"""
member = self.make_member(item)
if self.is_member(member):
return
members = self.current_product["members"]
members.append(member)
self.update_validity(member)
# Update association state due to new member
self.update_asn()
def _add_items(self, items, meta=None, product_name_func=None, acid="o999", **kwargs):
"""
Force adding items to the association.
Parameters
----------
items : [object[, ...]]
A list of items to make members of the association.
meta : dict
A dict to be merged into the association meta information.
The following are suggested to be assigned:
- ``asn_type``
The type of association.
- ``asn_rule``
The rule which created this association.
- ``asn_pool``
The pool from which the exposures came from
- ``program``
Originating observing program
product_name_func : func
Used if product name is 'undefined' using
the class's procedures.
acid : str
The association candidate id to use. Since Level2
associations require it, one must be specified.
Notes
-----
This is a low-level shortcut into adding members, such as file names,
to an association. All defined shortcuts and other initializations are
by-passed, resulting in a potentially unusable association.
`product_name_func` is used to define the product names instead of
the default methods. The call signature is::
product_name_func(item, idx)
where ``item`` is each item being added and ``idx`` is the count of items.
"""
if meta is None:
meta = {}
# Setup association candidate.
if acid.startswith("o"):
ac_type = "observation"
elif acid.startswith("c"):
ac_type = "background"
else:
raise ValueError(
f'Invalid association id specified: "{acid}"\n\tMust be of form "oXXX" or "c1XXX"'
)
self._acid = ACID((acid, ac_type))
# set the default exptype
exptype = "science"
for idx, item in enumerate(items, start=1):
self.new_product()
members = self.current_product["members"]
if isinstance(item, tuple):
expname = item[0]
else:
expname = item
# check to see if kwargs are passed and if exptype is given
if kwargs:
if "with_exptype" in kwargs:
if item[1]:
exptype = item[1]
else:
exptype = "science"
member = Member({"expname": expname, "exptype": exptype}, item=item)
members.append(member)
self.update_validity(member)
self.update_asn()
# If a product name function is given, attempt
# to use.
if product_name_func is not None:
try:
self.current_product["name"] = product_name_func(item, idx)
except Exception:
logger.debug(
"Attempted use of product_name_func failed. Default product name used."
)
self.data.update(meta)
self.current_sequence = next(self.sequence)
[docs]
def update_asn(self):
"""Update association info based on current members."""
super(DMSLevel2bBase, self).update_asn()
self.current_product["name"] = self.dms_product_name()
[docs]
def validate_candidates(self, _member):
"""
Allow only OBSERVATION or BACKGROUND candidates.
Parameters
----------
_member : Member
Member being added; ignored.
Returns
-------
bool
True if candidate is OBSERVATION, BACKGROUND and at least one
member is background; otherwise false.
"""
# If an observation, then we're good.
if self.acid.type.lower() == "observation":
return True
# If a background, check that there is a background
# exposure
if self.acid.type.lower() == "background":
for entry in self.current_product["members"]:
if entry["exptype"].lower() == "background":
return True
# If not background member, or some other candidate type,
# fail.
return False
def __repr__(self):
try:
file_name, json_repr = self.ioregistry["json"].dump(self)
except Exception:
return str(self.__class__)
return json_repr
def __str__(self):
"""
Create human readable version of the association.
Returns
-------
str
Human-readable string version of association.
"""
result = [f"Association {self.asn_name:s}"]
# Parameters of the association
result.append(
" Parameters:"
" Product type: {asn_type:s}"
" Rule: {asn_rule:s}"
" Program: {program:s}"
" Target: {target:s}"
" Pool: {asn_pool:s}".format(
asn_type=getattr(self.data, "asn_type", "indetermined"),
asn_rule=getattr(self.data, "asn_rule", "indetermined"),
program=getattr(self.data, "program", "indetermined"),
target=getattr(self.data, "target", "indetermined"),
asn_pool=getattr(self.data, "asn_pool", "indetermined"),
)
)
result.append(f" {str(self.constraints):s}")
# Products of the association
for product in self.data["products"]:
result.append("\t{} with {} members".format(product["name"], len(product["members"])))
# That's all folks
result.append("\n")
return "\n".join(result)
[docs]
@RegistryMarker.utility
class Utility:
"""Utility functions that understand DMS Level 2 associations."""
[docs]
@staticmethod
@RegistryMarker.callback("finalize")
def finalize(associations):
"""
Check validity and duplications in an association list.
Parameters
----------
associations : [association[, ...]]
List of associations.
Returns
-------
finalized_associations : [association[, ...]]
The validated list of associations.
"""
finalized_asns = []
lv2_asns = []
for asn in associations:
if isinstance(asn, DMSLevel2bBase):
finalized = asn.finalize()
if finalized is not None:
lv2_asns.extend(finalized)
else:
finalized_asns.append(asn)
# Having duplicate Level 2 associations is expected.
lv2_asns = prune(lv2_asns)
# Ensure sequencing is correct.
Utility_Level3.resequence(lv2_asns)
return finalized_asns + lv2_asns
[docs]
@staticmethod
def merge_asns(associations):
"""
Merge level2 associations.
Parameters
----------
associations : [asn(, ...)]
Associations to search for merging.
Returns
-------
associations : [association(, ...)]
List of associations, some of which may be merged.
"""
others = []
lv2_asns = []
for asn in associations:
if isinstance(asn, DMSLevel2bBase):
lv2_asns.append(asn)
else:
others.append(asn)
lv2_asns = Utility._merge_asns(lv2_asns)
return others + lv2_asns
[docs]
@staticmethod
def rename_to_level2a(level1b_name, use_integrations=False):
"""
Rename a Level 1b Exposure to another level.
Parameters
----------
level1b_name : str
The Level 1b exposure name.
use_integrations : bool
Use 'rateints' instead of 'rate' as the suffix.
Returns
-------
str
The Level 2a name.
"""
match = re.match(_LEVEL1B_REGEX, level1b_name)
if match is None or match.group("type") != "_uncal":
logger.warning(
f"Item FILENAME='{level1b_name}' is not a Level 1b name. "
"Cannot transform to Level 2a."
)
return level1b_name
suffix = "rate"
if use_integrations:
suffix = "rateints"
level2a_name = "".join([match.group("path"), "_", suffix, match.group("extension")])
return level2a_name
[docs]
@staticmethod
def resequence(*args, **kwargs):
"""
Resequence the numbers to conform to level 3 associations.
Returns
-------
list[association, ...] or None
If associations provided, resequenced order of
provided associations.
"""
return Utility_Level3.resequence(*args, **kwargs)
[docs]
@staticmethod
def sort_by_candidate(asns):
"""
Sort associations by candidate.
Parameters
----------
asns : [Association[,...]]
List of associations
Returns
-------
sorted_by_candidate : [Associations[,...]]
New list of the associations sorted.
Notes
-----
The current definition of candidates allows strictly alphabetical
sorting::
aXXXX > cXXXX > oXXX
If this changes, a comparison function will need be implemented.
"""
return sorted(asns, key=lambda asn: asn["asn_id"])
@staticmethod
def _merge_asns(asns):
"""
Merge associations by `asn_type` and `asn_id`.
Parameters
----------
asns : [asn(, ...)]
Associations to search for merging.
Returns
-------
associations : [association(, ...)]
List of associations, some of which may be merged.
"""
merged = {}
for asn in asns:
idx = "_".join([asn["asn_type"], asn["asn_id"]])
try:
current_asn = merged[idx]
except KeyError:
merged[idx] = asn
current_asn = asn
for product in asn["products"]:
merge_occurred = False
for current_product in current_asn["products"]:
if product["name"] == current_product["name"]:
member_names = {member["expname"] for member in product["members"]}
current_member_names = [
member["expname"] for member in current_product["members"]
]
new_names = member_names.difference(current_member_names)
new_members = [
member
for member in product["members"]
if member["expname"] in new_names
]
current_product["members"].extend(new_members)
merge_occurred = True
if not merge_occurred:
current_asn["products"].append(product)
merged_asns = list(merged.values())
return merged_asns
# Basic constraints
[docs]
class Constraint_Base(Constraint):
"""Select on program and instrument."""
def __init__(self):
super(Constraint_Base, self).__init__(
[
DMSAttrConstraint(name="program", sources=["program"]),
DMSAttrConstraint(
name="is_tso",
sources=["tsovisit"],
required=False,
force_unique=True,
),
]
)
[docs]
class Constraint_Background(DMSAttrConstraint):
"""Select backgrounds."""
def __init__(self):
super(Constraint_Background, self).__init__(
sources=["bkgdtarg"],
force_unique=False,
name="background",
force_reprocess=ListCategory.EXISTING,
only_on_match=True,
)
[docs]
class Constraint_ExtCal(Constraint):
"""
Remove any nis_extcals from the associations.
They are NOT to receive level-2b or level-3 processing.
"""
def __init__(self):
super(Constraint_ExtCal, self).__init__(
[DMSAttrConstraint(name="exp_type", sources=["exp_type"], value="nis_extcal")],
reduce=Constraint.notany,
)
[docs]
class Constraint_Imprint(Constraint):
"""Select on imprint exposures."""
def __init__(self):
super(Constraint_Imprint, self).__init__(
[DMSAttrConstraint(name="imprint", sources=["is_imprt"])],
reprocess_on_match=True,
work_over=ListCategory.EXISTING,
)
[docs]
class Constraint_Imprint_Special(Constraint):
"""Select on imprint exposures with mosaic tile number."""
def __init__(self, association=None):
# If an association is not provided, the check for original
# exposure type is ignored.
if association is None:
def sources(_item):
return "not imprint"
else:
def sources(_item):
return association.original_exposure_type
super(Constraint_Imprint_Special, self).__init__(
[
DMSAttrConstraint(
name="imprint",
sources=["is_imprt"],
force_reprocess=ListCategory.EXISTING,
only_on_match=True,
),
DMSAttrConstraint(
name="mosaic_tile",
sources=["mostilno"],
),
SimpleConstraint(
value="imprint",
sources=sources,
test=lambda v1, v2: v1 != v2,
force_unique=False,
),
],
)
[docs]
class Constraint_Mode(Constraint):
"""Select on instrument and optical path."""
def __init__(self):
super(Constraint_Mode, self).__init__(
[
DMSAttrConstraint(name="instrument", sources=["instrume"]),
DMSAttrConstraint(name="detector", sources=["detector"]),
DMSAttrConstraint(name="opt_elem", sources=["filter", "band"]),
DMSAttrConstraint(
name="opt_elem2",
sources=["pupil", "grating"],
required=False,
),
DMSAttrConstraint(
name="fxd_slit",
sources=["fxd_slit"],
required=False,
),
DMSAttrConstraint(
name="subarray",
sources=["subarray"],
required=False,
),
DMSAttrConstraint(
name="channel",
sources=["channel"],
required=False,
),
Constraint(
[
DMSAttrConstraint(sources=["detector"], value="nirspec"),
DMSAttrConstraint(sources=["filter"], value="opaque"),
],
reduce=Constraint.notany,
),
Constraint(
[
DMSAttrConstraint(sources=["detector"], value="nrcblong"),
DMSAttrConstraint(sources=["exp_type"], value="nrc_tsgrism"),
],
reduce=Constraint.notall,
),
Constraint(
[
DMSAttrConstraint(
sources=["visitype"],
value=".+wfsc.+",
),
],
reduce=Constraint.notany,
),
DMSAttrConstraint(
name="slit",
sources=["fxd_slit"],
required=False,
),
]
)
[docs]
class Constraint_Image_Science(DMSAttrConstraint):
"""Select on science images."""
def __init__(self):
super(Constraint_Image_Science, self).__init__(
name="exp_type", sources=["exp_type"], value="|".join(IMAGE2_SCIENCE_EXP_TYPES)
)
[docs]
class Constraint_Image_Nonscience(Constraint):
"""Select on non-science images."""
def __init__(self):
super(Constraint_Image_Nonscience, self).__init__(
[
Constraint_TargetAcq(),
DMSAttrConstraint(
name="non_science",
sources=["exp_type"],
value="|".join(IMAGE2_NONSCIENCE_EXP_TYPES),
),
Constraint(
[
DMSAttrConstraint(
name="exp_type", sources=["exp_type"], value="nrs_msaspec"
),
DMSAttrConstraint(sources=["msastate"], value="primarypark_allopen"),
DMSAttrConstraint(sources=["grating"], value="mirror"),
]
),
],
reduce=Constraint.any,
)
[docs]
class Constraint_Single_Science(Constraint):
"""
Allow only single science exposure.
Parameters
----------
has_science_fn : func
Function to determine whether the association has a science member already.
No arguments are provided
exposure_type_fn : func
Function to determine the association exposure type of the item.
Should take a single argument of item.
**sc_kwargs : dict
Keyword arguments to pass to the parent class
`~jwst.associations.lib.constraint.Constraint`.
Notes
-----
The ``has_science_fn`` is further wrapped in a lambda function
to provide a closure. Otherwise if the function is a bound method,
that method may end up pointing to an instance that is not calling
this constraint.
"""
def __init__(self, has_science_fn, exposure_type_fn, **sc_kwargs):
super(Constraint_Single_Science, self).__init__(
[
SimpleConstraint(
value=True,
sources=lambda item: exposure_type_fn(item) == "science",
),
SimpleConstraint(
value=False,
sources=lambda _item: has_science_fn(),
),
],
**sc_kwargs,
)
[docs]
class Constraint_Special(DMSAttrConstraint):
"""Select on backgrounds and other auxiliary images."""
def __init__(self):
super(Constraint_Special, self).__init__(
name="is_special",
sources=["bkgdtarg", "is_psf"],
)
[docs]
class Constraint_Spectral_Science(Constraint):
"""
Select on spectral science.
Parameters
----------
exclude_exp_types : [exp_type[, ...]]
List of exposure types to not consider from
from the general list.
"""
def __init__(self, exclude_exp_types=None):
if exclude_exp_types is None:
general_science = SPEC2_SCIENCE_EXP_TYPES
else:
general_science = set(SPEC2_SCIENCE_EXP_TYPES).symmetric_difference(exclude_exp_types)
super(Constraint_Spectral_Science, self).__init__(
[
DMSAttrConstraint(
name="exp_type", sources=["exp_type"], value="|".join(general_science)
)
],
reduce=Constraint.any,
)
[docs]
class Constraint_Target(Constraint):
"""Select on target ID."""
def __init__(self):
constraints = [
Constraint(
[
DMSAttrConstraint(
name="acdirect",
sources=["asn_candidate"],
value=r"\[\('c\d{4}', 'direct_image'\)\]",
),
SimpleConstraint(name="target", sources=lambda _item: "000"),
]
),
DMSAttrConstraint(
name="target",
sources=["targetid"],
),
]
super(Constraint_Target, self).__init__(constraints, reduce=Constraint.any)
# ---------------------------------------------
# Mixins to define the broad category of rules.
# ---------------------------------------------
[docs]
class AsnMixin_Lv2Image:
"""Level 2 Image association base."""
def _init_hook(self, item):
"""Post-check and pre-add initialization."""
super(AsnMixin_Lv2Image, self)._init_hook(item)
self.data["asn_type"] = "image2"
[docs]
class AsnMixin_Lv2Imprint:
"""Level 2 association handling for matching imprint images."""
[docs]
def prune_imprints(self):
"""
Prune extra imprint exposures from the association members.
First, check for imprints that match the background flag
for the "science" member. Any included imprints that do
not match the background flag are left in the association
without further checks.
Among these imprints, check to see if any have target IDs that
match the science member. If not, use all imprints for further
matches. If so, use only the matching ones for further checks;
remove the non-matching imprints.
If there are more imprints than science members remaining, and
if any of the remaining imprints match the science dither position
index, discard any imprints that do not match the dither position.
"""
# Only one product for Lv2 associations
members = self["products"][0]["members"]
# Check for science and imprint members
science = []
imprint_sci = []
imprint_bkg = []
science_is_background = False
science_targets = set()
for member in members:
try:
target = member.item["targetid"]
bkgdtarg = member.item["bkgdtarg"]
except KeyError:
# ignore any members with missing data -
# no pruning will happen in this case
continue
if member["exptype"] == "science":
science.append(member)
science_targets.add(str(target))
if bkgdtarg == "t":
science_is_background = True
elif member["exptype"] == "imprint":
# Store imprints by background status
if bkgdtarg == "t":
imprint_bkg.append(member)
else:
imprint_sci.append(member)
# Only check the imprints that match the "science" members
if science_is_background:
imprints_to_check = imprint_bkg
else:
imprints_to_check = imprint_sci
# If there are multiple targets in the imprints to check,
# and if one of them matches the science data, keep only that one
imprints_matching_target = []
imprints_not_matching_target = []
for imprint_exp in imprints_to_check:
try:
target = imprint_exp.item["targetid"]
except KeyError:
# Imprint does not match target if it is missing a target ID
imprints_not_matching_target.append(imprint_exp)
continue
if str(target) in science_targets:
imprints_matching_target.append(imprint_exp)
else:
imprints_not_matching_target.append(imprint_exp)
if len(imprints_matching_target) > 0:
imprints_to_check = imprints_matching_target
for imprint_exp in imprints_not_matching_target:
members.remove(imprint_exp)
# If 1 or more science and more imprints than science,
# discard any imprints that don't match the science dither index
if 1 <= len(science) < len(imprints_to_check):
imprint_to_keep = set()
for science_exp in science:
if "dithptin" not in science_exp.item:
continue
for imprint_exp in imprints_to_check:
if "dithptin" not in imprint_exp.item:
continue
if imprint_exp.item["dithptin"] == science_exp.item["dithptin"]:
imprint_to_keep.add(imprint_exp["expname"])
# if there were any matching imprints, remove the extras
if len(imprint_to_keep) > 0:
for imprint_exp in imprints_to_check:
if imprint_exp["expname"] not in imprint_to_keep:
members.remove(imprint_exp)
[docs]
def finalize(self):
"""
Finalize the association.
For some spectrographic modes, imprint images are taken alongside
the science data, the background data, or both. If there are
extra imprints in the association, we should keep only the best
matches to the science data.
Returns
-------
associations: [association[, ...]] or None
List of fully-qualified associations that this association
represents.
`None` if a complete association cannot be produced.
"""
if self.is_valid:
self.prune_imprints()
return super().finalize()
[docs]
class AsnMixin_Lv2Nod:
"""
Associations that need to create nodding associations.
For some spectrographic modes, background spectra are taken by
nodding between different slits, or internal slit positions.
The main associations rules will collect all the exposures of all the nods
into a single association. Then, on finalization, this one association
is split out into many associations, where each nod is, in turn, treated
as science, and the other nods are treated as background.
"""
[docs]
@staticmethod
def nod_background_overlap(science_item, background_item):
"""
Check that a candidate background nod will not overlap with science.
For NIRSpec fixed slit or MOS data, this returns True if the
background candidate shares a primary dither point with the
science or if the target ID does not match between science
and background candidate.
In addition, for NIRSpec fixed slit exposures taken with slit
S1600A1 in a 5-point nod pattern, this returns True when the
background candidate is in the next closest primary position
to the science or if the target ID does not match between
science and background candidate.
For any other data, this function returns False (no overlap).
Parameters
----------
science_item : Member
The science member.
background_item : Member
The background member.
Returns
-------
bool
True if overlap is present, False otherwise.
"""
# Get exp_type, needed for any data:
# if not present, return False
try:
exptype = str(science_item["exp_type"]).lower()
except KeyError:
return False
# Return False for any non-FS or MOS data
if exptype not in ["nrs_fixedslit", "nrs_msaspec"]:
return False
# Check for target ID - if present,
# it must match for either FS or MOS
try:
sci_target_id = str(science_item["targetid"]).lower()
bkg_target_id = str(background_item["targetid"]).lower()
except KeyError:
sci_target_id = None
bkg_target_id = None
if sci_target_id != bkg_target_id:
# Report overlap - different activities contain
# different sources that may overlap, even if not
# at the same primary nod position.
return True
# Get pattern number values, needed for FS or MOS
try:
numdthpt = int(science_item["numdthpt"])
patt_num = int(science_item["patt_num"])
bkg_num = int(background_item["patt_num"])
except (KeyError, ValueError):
numdthpt = 0
patt_num = None
bkg_num = None
# Get subpxpts (or old alternate 'subpxpns')
try:
subpx = int(science_item["subpxpts"])
except (KeyError, ValueError):
try:
subpx = int(science_item["subpxpns"])
except (KeyError, ValueError):
subpx = None
try:
bkg_subpx = int(background_item["subpxpts"])
except (KeyError, ValueError):
try:
bkg_subpx = int(background_item["subpxpns"])
except (KeyError, ValueError):
bkg_subpx = None
# If values not found, return False for MOS, True for FS
if None in [patt_num, bkg_num, subpx, bkg_subpx] or subpx == 0 or bkg_subpx == 0:
if exptype == "nrs_fixedslit":
# These values are required for background
# candidates for FS: report an overlap.
return True
else:
# For MOS, it's okay to go ahead and use them.
# Report no overlap.
return False
# Check for primary point overlap
sci_prim_dithpt = (patt_num - 1) // subpx
bkg_prim_dithpt = (bkg_num - 1) // bkg_subpx
if sci_prim_dithpt == bkg_prim_dithpt:
# Primary points are the same - overlap is present
return True
# Primary points are not the same -
# no further check needed for MOS
if exptype == "nrs_msaspec":
return False
# Get slit name, needed only for FS S1600A1 check
try:
slit = str(science_item["fxd_slit"]).lower()
except KeyError:
slit = None
# Background is currently only expected to overlap severely for
# 5 point dithers with fixed slit S1600A1. Only the nearest
# dithers to the science observation need to be excluded.
if (
exptype == "nrs_fixedslit"
and slit == "s1600a1"
and numdthpt // subpx == 5
and abs(sci_prim_dithpt - bkg_prim_dithpt) <= 1
):
return True
# For anything else, return False
return False
[docs]
def make_nod_asns(self):
"""
Make background nod Associations.
For observing modes, such as NIRSpec MSA, exposures can be
nodded, such that the object is in a different position in the
slitlet. The association creation simply groups these all
together as a single association, all exposures marked as
``science``. When complete, this method will create separate
associations each exposure becoming the single science
exposure, and the other exposures then become ``background``.
Returns
-------
associations : [association[, ...]]
List of new associations to be used in place of
the current one.
"""
for product in self["products"]:
members = product["members"]
# Split out the science vs. non-science
# The non-science exposures will get attached
# to every resulting association.
science_exps = [member for member in members if member["exptype"] == "science"]
nonscience_exps = [member for member in members if member["exptype"] != "science"]
# Create new associations for each science, using
# the other science as background.
results = []
for science_exp in science_exps:
asn = copy.deepcopy(self)
asn.data["products"] = None
product_name = remove_suffix(Path(split(science_exp["expname"])[1]).stem)[0]
asn.new_product(product_name)
new_members = asn.current_product["members"]
new_members.append(science_exp)
for other_science in science_exps:
if other_science["expname"] != science_exp["expname"]:
# check for likely overlap between science
# and background candidate
overlap = self.nod_background_overlap(science_exp.item, other_science.item)
if not overlap:
now_background = Member(other_science)
now_background["exptype"] = "background"
new_members.append(now_background)
new_members += nonscience_exps
if asn.is_valid:
results.append(asn)
return results
[docs]
def finalize(self):
"""
Finalize association.
For some spectrographic modes, background spectra are taken by
nodding between different slits, or internal slit positions.
The main associations rules will collect all the exposures of all the nods
into a single association. Then, on finalization, this one association
is split out into many associations, where each nod is, in turn, treated
as science, and the other nods are treated as background.
Returns
-------
associations : [association[, ...]] or None
List of fully-qualified associations that this association
represents.
`None` if a complete association cannot be produced.
"""
if self.is_valid:
return self.make_nod_asns()
else:
return None
[docs]
class AsnMixin_Lv2Special:
"""
Process special and non-science exposures as science.
Attributes
----------
original_exposure_type : str
The original exposure type of what is referred to as the "science" member
"""
original_exposure_type = None
[docs]
def get_exposure_type(self, item, default="science"):
"""
Override to force exposure type to always be science.
The only case where this should not happen is if the association
already has its science, and the current item is an imprint. Leave
that item as an imprint.
Parameters
----------
item : dict
The pool entry to determine the exposure type of
default : str or None
The default exposure type.
If None, routine will raise LookupError.
Returns
-------
exposure_type : str
Always what is defined as ``default``
"""
self.original_exposure_type = super(AsnMixin_Lv2Special, self).get_exposure_type(
item, default=default
)
if self.has_science():
if self.original_exposure_type == "imprint":
return "imprint"
return default
[docs]
class AsnMixin_Lv2Spectral(DMSLevel2bBase):
"""Level 2 Spectral association base."""
def _init_hook(self, item):
"""Post-check and pre-add initialization."""
super(AsnMixin_Lv2Spectral, self)._init_hook(item)
self.data["asn_type"] = "spec2"
[docs]
class AsnMixin_Lv2WFSS:
"""Utility and overrides for WFSS associations."""
[docs]
def add_catalog_members(self):
"""Add catalog and direct image member based on direct image members."""
directs = self.members_by_type("direct_image")
if not directs:
raise AssociationNotValidError(
f"{self.__class__.__name__} has no required direct image exposures"
)
sciences = self.members_by_type("science")
if not sciences:
raise AssociationNotValidError(
f"{self.__class__.__name__} has no required science exposure"
)
science = sciences[0]
# Remove all direct images from the association.
members = self.current_product["members"]
direct_idxs = [
idx for idx, member in enumerate(members) if member["exptype"] == "direct_image"
]
deque(list.pop(members, idx) for idx in sorted(direct_idxs, reverse=True))
# Add the Level3 catalog, direct image, and segmentation map members
self.direct_image = self.find_closest_direct(science, directs)
lv3_direct_image_root = DMS_Level3_Base._dms_product_name(self) # noqa: SLF001
members.append(
Member({"expname": lv3_direct_image_root + "_i2d.fits", "exptype": "direct_image"})
)
members.append(
Member({"expname": lv3_direct_image_root + "_cat.ecsv", "exptype": "sourcecat"})
)
members.append(
Member({"expname": lv3_direct_image_root + "_segm.fits", "exptype": "segmap"})
)
[docs]
def finalize(self):
"""
Finalize the association.
For WFSS, this involves taking all the direct image exposures,
determine which one is first after last science exposure,
and creating the catalog name from that image.
Returns
-------
associations : [association[, ...]] or None
List of fully-qualified associations that this association
represents.
`None` if a complete association cannot be produced.
"""
try:
self.add_catalog_members()
except AssociationNotValidError as err:
logger.debug("%s: %s", self.__class__.__name__, str(err))
return None
return super(AsnMixin_Lv2WFSS, self).finalize()
[docs]
def get_exposure_type(self, item, default="science"):
"""
Modify exposure type depending on dither pointing index.
If an imaging exposure has been found, treat it as a direct image.
Parameters
----------
item : Member
The item to pull exposure type from.
default : str
The default value if no exposure type is present, defaults to "science".
Returns
-------
str
The exposure type of the item.
"""
exp_type = super(AsnMixin_Lv2WFSS, self).get_exposure_type(item, default)
if exp_type == "science" and item["exp_type"] in ["nis_image", "nrc_image"]:
exp_type = "direct_image"
return exp_type
[docs]
@staticmethod
def find_closest_direct(science, directs):
"""
Find the direct image that is closest to the science.
Closeness is defined as number difference in the exposure sequence number,
as defined in the column EXPSPCIN.
Parameters
----------
science : dict
The science member to compare against
directs : [dict[,...]]
The available direct members
Returns
-------
closest : dict
The direct image that is the "closest"
"""
closest = directs[0] # If the search fails, just use the first.
try:
expspcin = int(getattr_from_list(science.item, ["expspcin"], _EMPTY)[1])
except KeyError:
# If exposure sequence cannot be determined, just fall through.
logger.debug("Science exposure %s has no EXPSPCIN defined.", science)
else:
min_diff = 9999 # Initialize to an invalid value.
for direct in directs:
try:
direct_expspcin = int(getattr_from_list(direct.item, ["expspcin"], _EMPTY)[1])
except KeyError:
# Try the next one.
logger.debug("Direct image %s has no EXPSPCIN defined.", direct)
continue
diff = direct_expspcin - expspcin
if diff < min_diff and diff > 0:
min_diff = diff
closest = direct
return closest
[docs]
def get_opt_element(self):
"""
Get string representation of the optical elements.
Returns
-------
opt_elem : str
The Level3 Product name representation
of the optical elements.
Notes
-----
This is an override for the method in
`~jwst.associations.lib.dms_base.DMSBaseMixin`.
The optical element is retrieved from the chosen direct image
found in ``self.direct_image``, determined in the ``self.finalize``
method.
"""
item = self.direct_image.item
opt_elems = []
for keys in [["filter", "band"], ["pupil", "grating"]]:
opt_elem = getattr_from_list_nofail(item, keys, _EMPTY)[1]
if opt_elem:
opt_elems.append(opt_elem)
opt_elems.sort(key=str.lower)
full_opt_elem = "-".join(opt_elems)
if full_opt_elem == "":
full_opt_elem = "clear"
return full_opt_elem