"""JWST-specific Step and Pipeline base classes."""
import logging
from pathlib import Path
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import JwstDataModel, read_metadata
from stpipe import Pipeline, crds_client
from stpipe import Step as _Step
from jwst import __version__, __version_commit__
from jwst.datamodels import ModelContainer, ModelLibrary
from jwst.lib.suffix import remove_suffix
from jwst.stpipe._cal_logs import _LOG_FORMATTER
log = logging.getLogger(__name__)
__all__ = ["JwstStep", "JwstPipeline"]
[docs]
class JwstStep(_Step):
"""A JWST pipeline step."""
spec = """
output_ext = string(default='.fits') # Output file type
""" # noqa: E501
_log_records_formatter = _LOG_FORMATTER
@classmethod
def _datamodels_open(cls, init, **kwargs):
return datamodels.open(init, **kwargs)
@classmethod
def _get_crds_parameters(cls, dataset):
"""
Get CRDS parameters for the given dataset.
If the input dataset is a filename, achieve this by lazy-loading its metadata.
Parameters
----------
dataset : str
The name of the dataset.
Returns
-------
dict
A dictionary of CRDS parameters.
str
The name of the observatory.
"""
crds_observatory = "jwst"
# list or container: just set to zeroth model
# this is what stpipe does internally for ModelContainer already
if isinstance(dataset, (list, tuple, ModelContainer)):
if len(dataset) == 0:
raise ValueError(f"Input dataset {dataset} is empty")
dataset = dataset[0]
# Already open: use the model's method to get CRDS parameters
if isinstance(dataset, (ModelLibrary, JwstDataModel)):
return (
dataset.get_crds_parameters(),
crds_observatory,
)
# If we get here, we had better have a filename
if isinstance(dataset, str):
dataset = Path(dataset)
if not isinstance(dataset, Path):
raise TypeError(f"Cannot get CRDS parameters for {dataset} of type {type(dataset)}")
# for associations, open as ModelLibrary, which supports lazy-loading
if dataset.suffix.lower() == ".json":
model = ModelLibrary(dataset, asn_n_members=1, asn_exptypes=["science"])
return (model.get_crds_parameters(), crds_observatory)
# for all other cases, use read_metadata directly to lazy-load
return (read_metadata(dataset, flatten=True), crds_observatory)
@staticmethod
def get_stpipe_loggers():
"""
Get the names of loggers to configure.
Returns
-------
loggers : tuple of str
Tuple of log names to configure.
"""
# Specify the log names for any dependencies whose
# loggers we want to configure and for the special "py.warnings"
# logger which is the source of warning log messages for python warnings
return ("jwst", "stcal", "stdatamodels", "stpipe", "tweakwcs", "py.warnings")
[docs]
def load_as_level2_asn(self, obj):
"""
Load object as an association.
Loads the specified object into a Level2 association.
If necessary, prepend ``Step.input_dir`` to all members.
Parameters
----------
obj : object
Object to load as a Level2 association
Returns
-------
association : object
Association from ``jwst.associations.lib.rules_level2_base.DMSLevel2bBase``
"""
# Prevent circular import:
from jwst.associations.lib.update_path import update_key_value
from jwst.associations.load_as_asn import LoadAsLevel2Asn
asn = LoadAsLevel2Asn.load(obj, basename=self.output_file)
update_key_value(asn, "expname", (), mod_func=self.make_input_path)
return asn
[docs]
def load_as_level3_asn(self, obj):
"""
Load object as an association.
Loads the specified object into a Level3 association.
If necessary, prepend ``Step.input_dir`` to all members.
Parameters
----------
obj : object
Object to load as a Level3 association
Returns
-------
association : object
Association from ``jwst.associations.lib.rules_level3_base.DMS_Level3_Base``
"""
# Prevent circular import:
from jwst.associations.lib.update_path import update_key_value
from jwst.associations.load_as_asn import LoadAsAssociation
asn = LoadAsAssociation.load(obj)
update_key_value(asn, "expname", (), mod_func=self.make_input_path)
return asn
def prepare_output(self, init, make_copy=None, open_models=True, open_as_type=None, **kwargs):
"""
Open the input data as a model, making a copy if necessary.
If the input data is a filename or path, it is opened
and the open model is returned.
If it is a list of models, it is opened as a ModelContainer.
In this case, or if the input is a simple datamodel or a
ModelContainer, a deep copy of the model/container is returned,
in order to avoid modifying the input models.
If the input is a ModelLibrary, it is simply returned, in order
to avoid making unnecessary copies for performance-critical
use cases.
All copies are skipped if this step has a parent (i.e. it is
called as part of a pipeline).
Set make_copy explicitly to True or False to override the above
behavior.
Parameters
----------
init : str, list, JwstDataModel, ModelContainer, or ModelLibrary
Input data to open.
make_copy : bool or None
If True, a copy of the input will always be made.
If False, a copy will never be made. If None, a copy is
conditionally made, depending on the input and whether the
step is called in a standalone context.
open_models : bool
If True and the input is a filename or list of filenames,
then datamodels.open will be called to open the input.
If False, the input is returned as is.
open_as_type : class or None
If provided, the input will be opened as the specified class
before returning. Intended for use with simple datamodel input
only: container types and associations should be handled directly
in the calling code.
**kwargs
Additional keyword arguments to pass to datamodels.open. Used
only if the input is a str or list.
Returns
-------
model : JwstDataModel or ModelContainer or ModelLibrary
The opened datamodel(s).
Raises
------
TypeError
If make_copy=True and the input is a type that cannot be copied.
"""
# Check whether input contains datamodels
copy_needed = False
if isinstance(init, list):
is_datamodel = [isinstance(m, datamodels.JwstDataModel) for m in init]
if any(is_datamodel):
# Make the list into a ModelContainer, since it contains models
init = ModelContainer(init)
copy_needed = True
elif isinstance(init, (datamodels.JwstDataModel, ModelContainer)):
copy_needed = True
# Input might be a filename or path.
# In that case, open it if desired.
if not isinstance(init, (datamodels.JwstDataModel, ModelLibrary, ModelContainer)):
if open_models:
if open_as_type is not None:
# It is assumed the provided class is appropriate for the input.
input_models = open_as_type(init, **kwargs)
else:
input_models = datamodels.open(init)
else:
# Return the filename or path -
# the calling code will handle opening it as needed.
input_models = init
elif isinstance(init, datamodels.JwstDataModel):
# Simple data model: update the datamodel type if needed
if open_as_type is not None and type(init) is not open_as_type:
# This will make a shallow copy.
input_models = open_as_type(init, **kwargs)
else:
# Otherwise use the init model directly
input_models = init
else:
# ModelContainer or ModelLibrary: use the init model directly.
input_models = init
# Make a deep copy if needed
if make_copy is None:
make_copy = copy_needed and self.parent is None
if make_copy:
try:
input_models = input_models.copy()
except AttributeError:
# This should only happen if make_copy is explicitly set to
# True and the input is a string or a ModelLibrary.
raise TypeError(
f"Copy is not possible for input type {type(input_models)}"
) from None
return input_models
def finalize_result(self, result, reference_files_used):
"""
Update the result with the software version and reference files used.
Parameters
----------
result : `~stdatamodels.DataModel`
The output data model to be updated.
reference_files_used : list of tuple
The names and file paths of reference files used.
"""
if isinstance(result, JwstDataModel):
result.meta.calibration_software_revision = __version_commit__ or "RELEASE"
result.meta.calibration_software_version = __version__
if len(reference_files_used) > 0:
for ref_name, filename in reference_files_used:
if hasattr(result.meta.ref_file, ref_name):
getattr(result.meta.ref_file, ref_name).name = filename
result.meta.ref_file.crds.sw_version = crds_client.get_svn_version()
result.meta.ref_file.crds.context_used = crds_client.get_context_used(
result.crds_observatory
)
if self.parent is None:
log.info(f"Results used CRDS context: {result.meta.ref_file.crds.context_used}")
if self.class_alias:
if not hasattr(result, "cal_logs"):
result.cal_logs = {}
setattr(result.cal_logs, self.class_alias, self._log_records)
def remove_suffix(self, name):
"""
Remove the suffix if a known suffix is already in name.
Parameters
----------
name : str
The name to remove the suffix from.
Returns
-------
name : str
The name with the suffix removed.
"""
return remove_suffix(name)
def run(self, *args):
"""
Run the step.
Parameters
----------
*args
Arguments passed to `stpipe.Step.run`.
Returns
-------
result : Any
The step output
"""
result = super().run(*args)
if not self.parent:
log.info(f"Results used jwst version: {__version__}")
return result
[docs]
class JwstPipeline(Pipeline, JwstStep):
"""
A JWST pipeline.
JwstPipeline needs to inherit from Pipeline, but also
be a subclass of JwstStep so that it will pass checks
when constructing a pipeline using JwstStep class methods.
"""
def finalize_result(self, result, _reference_files_used):
"""
Update the result with the software version and reference files used.
Parameters
----------
result : `~stdatamodels.DataModel`
The output data model to be updated.
_reference_files_used : list of tuple
The names and file paths of reference files used.
"""
if isinstance(result, JwstDataModel):
log.info(
"Results used CRDS context: "
f"{crds_client.get_context_used(result.crds_observatory)}"
)
if self.class_alias:
if not hasattr(result, "cal_logs"):
result.cal_logs = {}
# remove the step logs as they're captured by the pipeline log
for _, step in self.step_defs.items():
if hasattr(result.cal_logs, step.class_alias):
delattr(result.cal_logs, step.class_alias)
setattr(result.cal_logs, self.class_alias, self._log_records)