"""Module for handling Reference Pixels."""
# Final CCWG Recommendation of 6/2013:
#
# The reference pixel correction for the NIR detectors should be done\
# immediately following the zero frame subtraction. We recommend that
# the following steps be taken in order for each frame of each exposure,
# with the option to turn each one off *on a per-SCA basis*. The
# parameters should eventually be optimized for each detector and instrument.
#
# (1) For each amplifier, the sigma-clipped mean values for all odd and all
# even columns of the horizontal (both top and bottom) reference pixels
# should be calculated. These values should then be subtracted from
# every pixel in the corresponding odd/even columns. There should be
# an option to turn this step off, and replace with a single
# sigma-clipped mean value for all horizontal reference pixels in
# each amplifier.
#
# (2) The vertical (both left and right) reference pixels should be smoothed
# with an N-pixel wide boxcar convolution, where N may depend on detector
# and instrument (adopt N=10 as a default). The median value of the 8
# smoothed reference pixels in each row should then be multiplied by a
# gain factor of some value between 0 (which effectively turns off the
# correction) and 1 (for full subtraction, should be the default), with
# the exact value to be tuned for each detector and instrument. Finally,
# these smoothed and scaled values should be subtracted from every pixel
# in the corresponding row.
#
# Subarray processing added 7/2018
#
# For NIR exposures, if the value of the meta.exposure.noutputs attribute is 1,
# calculate the clipped means of odd and even columns
# in detector coordinates. Subtract the odd mean from the odd columns, and
# the even mean from the even columns. If there are no reference pixels in the
# subarray, omit the refpix step.
#
# If the value of meta.exposure.noutputs is 4, calculate odd and even reference
# values for each amplifier separately, if available, and subtract those values
# from their corresponding data sections. Also use side reference pixels if
# available.
#
# For MIRI subarray exposures, omit the refpix step.
import logging
from copy import deepcopy
import numpy as np
from scipy import stats
from stdatamodels.jwst.datamodels import dqflags
from jwst.lib import pipe_utils, reffile_utils
from jwst.refpix.irs2_subtract_reference import make_irs2_mask
from jwst.refpix.optimized_convolution import apply_conv_kernel, make_kernels
log = logging.getLogger(__name__)
# NIR Reference section dictionaries are zero indexed and specify the values
# to be used in the following slice:
#
# (rowstart: rowstop, colstart:colstop)
#
# The 'stop' values are one more than the actual final row or column, in
# accordance with how Python slices work
# fmt: off
NIR_reference_sections = {
"A": {
"top": (2044, 2048, 0, 512),
"bottom": (0, 4, 0, 512),
"side": (0, 2048, 0, 4),
"data": (0, 2048, 0, 512),
},
"B": {
"top": (2044, 2048, 512, 1024),
"bottom": (0, 4, 512, 1024),
"data": (0, 2048, 512, 1024),
},
"C": {
"top": (2044, 2048, 1024, 1536),
"bottom": (0, 4, 1024, 1536),
"data": (0, 2048, 1024, 1536),
},
"D": {
"top": (2044, 2048, 1536, 2048),
"bottom": (0, 4, 1536, 2048),
"side": (0, 2048, 2044, 2048),
"data": (0, 2048, 1536, 2048),
},
}
# IRS2 sections for NIRSpec have a different size due to the
# interleaved reference pixels and the reference sector.
IRS2_reference_sections = {
"0": {
"top": (2044, 2048, 0, 640),
"bottom": (0, 4, 0, 640),
"data": (0, 2048, 0, 640)
},
"A": {
"top": (2044, 2048, 640, 1280),
"bottom": (0, 4, 640, 1280),
"data": (0, 2048, 640, 1280),
},
"B": {
"top": (2044, 2048, 1280, 1920),
"bottom": (0, 4, 1280, 1920),
"data": (0, 2048, 1280, 1920),
},
"C": {
"top": (2044, 2048, 1920, 2560),
"bottom": (0, 4, 1920, 2560),
"data": (0, 2048, 1920, 2560),
},
"D": {
"top": (2044, 2048, 2560, 3200),
"bottom": (0, 4, 2560, 3200),
"data": (0, 2048, 2560, 3200),
},
}
# Special behavior is requested for NIRSpec subarrays that do not reach
# detector edges; for these input models, we will assign the top and bottom
# four rows as reference pixels to better treat pedestal noise issues.
NRS_edgeless_subarrays = ["SUB512", "SUB512S", "SUB32"]
# Special sections used for multistripe reads - we will capture refpix
# locations in a given amplifier by selecting on the reference pixel
# dq flag, so we only need to provide amplifier regions.
MULTISTRIPE_AMPLIFIER_REGIONS = {
"A": (0, 512),
"B": (512, 1024),
"C": (1024, 1536),
"D": (1536, 2048),
}
#
# MIR Reference section dictionaries are zero indexed and specify the values
# to be used in the following slice:
#
# name ('left' or 'right'): (rowstart, rowstop, column)
#
# except the 'data' entry:
#
# 'data': (rowstart, rowstop, colstart, colstop, stride)
MIR_reference_sections = {
"A": {
"left": (0, 1024, 0),
"right": (0, 1024, 1028),
"data": (0, 1024, 0, 1032, 4)
},
"B": {
"left": (0, 1024, 1),
"right": (0, 1024, 1029),
"data": (0, 1024, 1, 1032, 4)
},
"C": {
"left": (0, 1024, 2),
"right": (0, 1024, 1030),
"data": (0, 1024, 2, 1032, 4)
},
"D": {
"left": (0, 1024, 3),
"right": (0, 1024, 1031),
"data": (0, 1024, 3, 1032, 4)
},
}
NIR_DETECTORS = [
"GUIDER1",
"GUIDER2",
"NRCA1",
"NRCA2",
"NRCA3",
"NRCA4",
"NRCALONG",
"NRCB1",
"NRCB2",
"NRCB3",
"NRCB4",
"NRCBLONG",
"NIS",
"NRS1",
"NRS2"
]
# fmt: on
# Status returns
REFPIX_OK = 0
BAD_REFERENCE_PIXELS = 1
SUBARRAY_DOESNTFIT = 2
SUBARRAY_SKIPPED = 3
__all__ = [
"Dataset",
"NIRDataset",
"MIRIDataset",
"create_dataset",
"correct_model",
"reference_pixel_correction",
"process_zeroframe_correction",
"restore_input_model",
"setup_dataset_for_zeroframe",
"save_science_values",
]
[docs]
class Dataset:
"""
Data container for passing data from routine to routine.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model to be corrected
odd_even_columns : bool
Flag that controls whether odd and even-numbered columns are
processed separately (NIR only)
use_side_ref_pixels : bool
Flag that controls whether the side reference pixels are used in
the correction (NIR only)
side_smoothing_length : int
Smoothing length to use in calculating the running median of
the side reference pixels (NIR only)
side_gain : float
Gain to use in applying the side reference pixel correction
(NIR only)
conv_kernel_params : dict
Dictionary containing the parameters needed for the optimized convolution kernel
for Simple Improved Reference Subtraction (SIRS)
siglimit : float
Sigma clipping limit to use in calculating the mean of reference pixels.
odd_even_rows : bool
Flag that controls whether odd and even-numbered rows are handled
separately (MIR only)
"""
def __init__(
self,
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
conv_kernel_params,
siglimit,
odd_even_rows,
):
self.refpix_algorithm = conv_kernel_params["refpix_algorithm"]
self.sirs_kernel_model = conv_kernel_params["sirs_kernel_model"]
self.sigreject = conv_kernel_params["sigreject"]
self.gaussmooth = conv_kernel_params["gaussmooth"]
self.halfwidth = conv_kernel_params["halfwidth"]
if (
input_model.meta.subarray.xstart is None
or input_model.meta.subarray.ystart is None
or input_model.meta.subarray.xsize is None
or input_model.meta.subarray.ysize is None
):
raise ValueError("subarray metadata not found")
self.input_model = input_model
is_subarray = False
self.subarray = None
if reffile_utils.is_subarray(input_model):
is_subarray = True
self.subarray = input_model.meta.subarray.name
self.is_subarray = is_subarray
self.is_multistripe = False
if getattr(input_model.meta.subarray, "multistripe_reads1", None) is not None:
self.is_multistripe = True
self.zeroframe_proc = False
(nints, ngroups, nrows, ncols) = input_model.data.shape
self.nints = nints
self.ngroups = ngroups
self.nrows = nrows
self.ncols = ncols
self.full_shape = (nrows, ncols)
self.detector = input_model.meta.instrument.detector
self.noutputs = input_model.meta.exposure.noutputs
self.xstart = input_model.meta.subarray.xstart
self.ystart = input_model.meta.subarray.ystart
self.xsize = input_model.meta.subarray.xsize
self.ysize = input_model.meta.subarray.ysize
self.fastaxis = input_model.meta.subarray.fastaxis
self.slowaxis = input_model.meta.subarray.slowaxis
self.colstart = self.xstart - 1
self.colstop = self.colstart + self.xsize
self.rowstart = self.ystart - 1
self.rowstop = self.rowstart + self.ysize
self.odd_even_columns = odd_even_columns
self.use_side_ref_pixels = use_side_ref_pixels
self.side_smoothing_length = side_smoothing_length
self.side_gain = side_gain
self.odd_even_rows = odd_even_rows
self.bad_reference_pixels = False
self.reference_sections = None
self.amplifiers = "ABCD"
# Define temp array for processing every group
self.pixeldq = self.get_pixeldq()
self.group = None
self.siglimit = siglimit
[docs]
def sigma_clip(self, data, dq, low=None, high=None):
"""
Wrap `scipy.stats.sigmaclip` so that data with zero variance is handled cleanly.
Parameters
----------
data : ndarray
Array of pixels to be sigma-clipped
dq : ndarray
DQ array for data
low : float or `None`, optional
Lower clipping boundary, in standard deviations from the mean
high : float or `None`, optional
Upper clipping boundary, in standard deviations from the mean
Returns
-------
mean : float
Clipped mean of data array
"""
if low is None:
low = self.siglimit
if high is None:
high = self.siglimit
#
# Only calculate the clipped mean for pixels that don't have the DO_NOT_USE
# DQ bit set
goodpixels = np.where(np.bitwise_and(dq, dqflags.pixel["DO_NOT_USE"]) == 0)
#
# If there are no good pixels, return None
if len(goodpixels[0]) == 0:
return None
#
# scipy routine fails if the pixels all have exactly the same value
if np.std(data[goodpixels], dtype=np.float64) != 0.0:
clipped_ref, lowlim, uplim = stats.sigmaclip(data[goodpixels], low, high)
mean = clipped_ref.mean()
else:
mean = data[goodpixels].mean(dtype=np.float64)
return mean
[docs]
def get_pixeldq(self):
"""
Get the properly sized pixeldq array from the input model.
Returns
-------
pixeldq : ndarray
Numpy array for the pixeldq data with the full shape of the detector
"""
if self.is_subarray and not self.is_multistripe:
# deal with subarrays by embedding the pixeldq array in a full-sized
# array with DO_NOT_USE and REFERENCE_PIXEL dqflags bit set where the
# reference pixels live, except where the data are embedded
if self.detector[:3] == "MIR":
fullrows = 1024
fullcols = 1032
else:
fullrows = 2048
fullcols = 2048
self.full_shape = (fullrows, fullcols)
pixeldq = np.zeros(self.full_shape, dtype=self.input_model.pixeldq.dtype)
refpixdq_dontuse = dqflags.pixel["DO_NOT_USE"] | dqflags.pixel["REFERENCE_PIXEL"]
pixeldq[0:4, :] = refpixdq_dontuse
pixeldq[fullrows - 4 : fullrows, :] = refpixdq_dontuse
pixeldq[4 : fullrows - 4, 0:4] = refpixdq_dontuse
pixeldq[4 : fullrows - 4, fullcols - 4 : fullcols] = refpixdq_dontuse
pixeldq[self.rowstart : self.rowstop, self.colstart : self.colstop] = (
self.input_model.pixeldq.copy()
)
if self.subarray in NRS_edgeless_subarrays:
# Log assignment as rows (in DMS plane)
# despite assigning columns (in detector plane)
log.info(
f"Subarray {self.subarray} has no reference pixels: "
f"assigning top and bottom four rows as reference pixels."
)
pixeldq[self.rowstart : self.rowstop, self.colstart : self.colstart + 4] = (
pixeldq[self.rowstart : self.rowstop, self.colstart : self.colstart + 4]
| dqflags.pixel["REFERENCE_PIXEL"]
)
pixeldq[self.rowstart : self.rowstop, self.colstop - 4 : self.colstop] = (
pixeldq[self.rowstart : self.rowstop, self.colstop - 4 : self.colstop]
| dqflags.pixel["REFERENCE_PIXEL"]
)
else:
pixeldq = self.input_model.pixeldq.copy()
return pixeldq
[docs]
def get_group(self, integration, group):
"""
Get a properly sized copy of the array for each group.
Parameters
----------
integration : int
Index of the integration from the input model from which to extract
the group array
group : int
Index of the group, within the integration, from which to extract
the group array
"""
if self.group is None:
self.group = np.zeros(self.full_shape, dtype=self.input_model.data.dtype)
if self.is_subarray and not self.is_multistripe:
self.group[self.rowstart : self.rowstop, self.colstart : self.colstop] = (
self.input_model.data[integration, group].copy()
)
else:
self.group[:, :] = self.input_model.data[integration, group].copy()
[docs]
def restore_group(self, integration, group):
"""
Replace input model data with processed group array.
Parameters
----------
integration : int
Index of the integration from the input model which needs to be
updated with the newly processed group array
group : int
Index of the group, within the integration, which needs to be
updated with the newly processed group array
"""
if self.is_subarray and not self.is_multistripe:
self.input_model.data[integration, group] = self.group[
self.rowstart : self.rowstop, self.colstart : self.colstop
]
else:
self.input_model.data[integration, group] = self.group.copy()
[docs]
def log_parameters(self):
"""Log the parameters that are valid for this type of data, and those that aren't."""
is_nir = isinstance(self, NIRDataset)
if is_nir:
if not self.is_subarray:
log.info("NIR full frame data")
log.info("The following parameters are valid for this mode:")
if self.refpix_algorithm == "median":
log.info(f"use_side_ref_pixels = {self.use_side_ref_pixels}")
log.info(f"odd_even_columns = {self.odd_even_columns}")
log.info(f"side_smoothing_length = {self.side_smoothing_length}")
log.info(f"side_gain = {self.side_gain}")
log.info("The following parameter is not applicable and is ignored:")
log.info(f"odd_even_rows = {self.odd_even_rows}")
elif self.refpix_algorithm == "sirs":
log.info(f"sigreject = {self.sigreject}")
log.info(f"gaussmooth = {self.gaussmooth}")
log.info(f"halfwidth = {self.halfwidth}")
else:
log.info("NIR subarray data")
# Transform the pixeldq array from DMS to detector coords
self.dms_to_detector_dq()
ngoodside = self.count_good_side_refpixels()
ngoodtopbottom = self.count_good_top_bottom_refpixels()
# Re-assign the pixeldq array since we transformed it to detector space
# and we don't want to do it again
self.pixeldq = self.get_pixeldq()
is_4amp = False
if self.noutputs == 4:
is_4amp = True
if is_4amp:
log.info("4 readout amplifiers used")
if (ngoodside + ngoodtopbottom) == 0:
log.info("No valid reference pixels. This step will have no effect")
else:
log.info("The following parameters are valid for this mode:")
if ngoodtopbottom > 0:
log.info(f"odd_even_columns = {self.odd_even_columns}")
if ngoodside > 0:
log.info(f"use_side_ref_pixels = {self.use_side_ref_pixels}")
log.info(f"side_smoothing_length = {self.side_smoothing_length}")
log.info(f"side_gain = {self.side_gain}")
log.info("The following parameters are not applicable and are ignored")
if ngoodtopbottom == 0:
log.info(f"odd_even_columns = {self.odd_even_columns}")
if ngoodside == 0:
log.info(f"use_side_ref_pixels = {self.use_side_ref_pixels}")
log.info(f"side_smoothing_length = {self.side_smoothing_length}")
log.info(f"side_gain = {self.side_gain}")
log.info(f"odd_even_rows = {self.odd_even_rows}")
else:
log.info("Single readout amplifier used")
if ngoodtopbottom == 0:
log.info("No valid reference pixels. This step will have no effect.")
else:
log.info("The following parameter is valid for this mode:")
log.info(f"odd_even_columns = {self.odd_even_columns}")
log.info("The following parameters are not applicable and are ignored:")
log.info(f"use_side_ref_pixels = {self.use_side_ref_pixels}")
log.info(f"side_smoothing_length = {self.side_smoothing_length}")
log.info(f"side_gain = {self.side_gain}")
log.info(f"odd_even_rows = {self.odd_even_rows}")
else:
if not self.is_subarray:
log.info("MIRI full frame data")
log.info("The following parameter is valid for this mode:")
log.info(f"odd_even_rows = {self.odd_even_rows}")
log.info("The following parameters are not applicable and are ignored:")
log.info(f"use_side_ref_pixels = {self.use_side_ref_pixels}")
log.info(f"odd_even_columns = {self.odd_even_columns}")
log.info(f"side_smoothing_length = {self.side_smoothing_length}")
log.info(f"side_gain = {self.side_gain}")
else:
log.info("MIRI subarray data")
log.info("refpix processing skipped for this mode")
[docs]
def count_good_side_refpixels(self):
"""
Count the number of good side reference pixels.
Returns
-------
ngood : int
Number of good side reference pixels
"""
donotuse = dqflags.pixel["DO_NOT_USE"]
ngood = 0
for amplifier in "AD":
rowstart, rowstop, colstart, colstop = self.reference_sections[amplifier]["side"]
good = np.where(
np.bitwise_and(self.pixeldq[rowstart:rowstop, colstart:colstop], donotuse)
!= donotuse
)
ngood += len(good[0])
return ngood
[docs]
def count_good_top_bottom_refpixels(self):
"""
Count the number of good top and bottom reference pixels.
Returns
-------
ngood : int
Number of good top and bottom reference pixels
"""
donotuse = dqflags.pixel["DO_NOT_USE"]
refdq = dqflags.pixel["REFERENCE_PIXEL"]
ngood = 0
if self.subarray in NRS_edgeless_subarrays:
ngood = len(
np.where((self.pixeldq & refdq == refdq) & (self.pixeldq & donotuse != donotuse))[0]
)
log.debug(f"Edgeless subarray {self.subarray} has {ngood} reference pixels.")
else:
for edge in ["top", "bottom"]:
for amplifier in self.amplifiers:
rowstart, rowstop, colstart, colstop = self.reference_sections[amplifier][edge]
log.debug(f"Ref sections for {edge} & {amplifier}:")
log.debug(f" [{rowstart}:{rowstop}], [{colstart}:{colstop}]")
good = np.where(
np.bitwise_and(self.pixeldq[rowstart:rowstop, colstart:colstop], donotuse)
!= donotuse
)
ngood += len(good[0])
log.debug(f"For {edge} & {amplifier}: {len(good[0])}")
return ngood
[docs]
class NIRDataset(Dataset):
"""
Container for NIR detector datasets.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model to be corrected.
odd_even_columns : bool
Flag that controls whether odd and even-numbered columns are
processed separately.
use_side_ref_pixels : bool
Flag that controls whether the side reference pixels are used in
the correction.
side_smoothing_length : int
Smoothing length to use in calculating the running median of
the side reference pixels.
side_gain : float
Gain to use in applying the side reference pixel correction.
conv_kernel_params : dict
Dictionary containing the parameters needed for the optimized convolution kernel.
siglimit : float
Sigma clipping limit, in standard deviations from the mean, to use in
sigma clipping for calculating the mean of reference pixels.
"""
def __init__(
self,
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
conv_kernel_params,
siglimit,
):
super(NIRDataset, self).__init__(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
conv_kernel_params,
siglimit,
odd_even_rows=False,
)
# Set appropriate NIR sections
self.is_irs2 = pipe_utils.is_irs2(input_model)
if self.is_irs2:
self.reference_sections = deepcopy(IRS2_reference_sections)
self.amplifiers = "0ABCD"
self.irs2_odd_mask = self.make_irs2_odd_mask(input_model)
else:
self.reference_sections = NIR_reference_sections
self.irs2_odd_mask = None
[docs]
def make_irs2_odd_mask(self, input_model, scipix_n_default=16, refpix_r_default=4):
"""
Make an odd pixel mask for IRS2 mode.
The even pixel mask can be generated by inverting the odd mask.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Input model containing data to mask.
scipix_n_default : int, optional
Number of regular samples before stepping out to collect
reference samples.
refpix_r_default : int, optional
Number of reference samples before stepping back in to collect
regular samples.
Returns
-------
odd_mask : ndarray
Boolean array matching data column size in detector orientation.
`True` identifies all odd pixels (science and reference).
"""
# Get data information from input model.
# (y and x here refer to detector orientation, although input
# data has not yet been rotated)
ny = input_model.data.shape[-1] # 2048
nx = input_model.data.shape[-2] # 3200
# Default n=16, r=4
scipix_n = input_model.meta.exposure.nrs_normal
if scipix_n is None:
log.warning(f"Keyword NRS_NORM not found; using default value {scipix_n_default}")
scipix_n = scipix_n_default
refpix_r = input_model.meta.exposure.nrs_reference
if refpix_r is None:
log.warning(f"Keyword NRS_REF not found; using default value {refpix_r_default}")
refpix_r = refpix_r_default
# If these are not set to standard values, the
# reference sections values must be changed to match.
n_sector = nx // 5
areas = ["top", "bottom", "data"] # assuming no 'side'
if nx != 3200:
for i, amplifier in enumerate("0ABCD"):
x_start = n_sector * i
x_stop = n_sector * (i + 1)
for area in areas:
sec = self.reference_sections[amplifier][area]
self.reference_sections[amplifier][area] = (sec[0], sec[1], x_start, x_stop)
# Make a column mask that identifies the reference sector and
# all interleaved pixels as False, all science pixels
# and standard reference pixels as True
x_mask = make_irs2_mask(nx, ny, scipix_n, refpix_r)
# Switch True/False to identify reference pixels instead of
# science pixels
x_mask = ~x_mask
# Treat the reference sector like the other sectors
x_mask[:n_sector] = x_mask[n_sector : 2 * n_sector]
# Find even and odd interleaved pixels:
# reference pixels come in two pairs, the first set odd
# and the second set even, so pixels
# SSSSSSSSrrrrSSSSSSSSSSSSSSSSrrrrSSSS...
# have parity:
# --------0011----------------0011----...
# for the interleaved pixels, where the traditional pixels
# are:
# 01010101----0101010101010101----0101...
# first two pixels are odd
even_interleaved = x_mask.copy()
even_interleaved[0::4] = False
even_interleaved[1::4] = False
# second two pixels are even
odd_interleaved = x_mask.copy()
odd_interleaved[2::4] = False
odd_interleaved[3::4] = False
# Make an odd mask for the image columns in detector orientation.
# This will be used both for identifying the correct
# reference pixels and for applying the correction later.
odd_mask = np.full(nx, False)
odd_mask[0::2] = True
odd_mask[even_interleaved] = False
odd_mask[odd_interleaved] = True
return odd_mask
# Even though the recommendation specifies calculating the mean of the
# combined top and bottom reference sections, there's a good chance we
# might want to calculate them separately
[docs]
def collect_odd_refpixels(self, group, amplifier, top_or_bottom):
"""
Collect odd reference pixels.
For traditional readouts, odd pixels correspond to odd-numbered
rows (first, third, fifth, etc.), which are even array indices.
For IRS2 mode, science and traditional reference pixels have
the same parity, but interleaved pixels come in two pairs. The
first two are odd and the second two are even.
Parameters
----------
group : ndarray
The group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
String corresponding to the amplifier being processed
top_or_bottom : {'top', 'bottom'}
String corresponding to whether top or bottom reference pixels
are bing processed
Returns
-------
oddref : ndarray
Array containing all the odd reference pixels
odddq : ndarray
Array containing all the odd dq values for those reference pixels
"""
rowstart, rowstop, colstart, colstop = self.reference_sections[amplifier][top_or_bottom]
# handle interleaved pixels if needed
if self.is_irs2:
odd_mask = self.irs2_odd_mask[colstart:colstop]
oddref = group[rowstart:rowstop, colstart:colstop][:, odd_mask]
odddq = self.pixeldq[rowstart:rowstop, colstart:colstop][:, odd_mask]
else:
oddref = group[rowstart:rowstop, colstart:colstop:2]
odddq = self.pixeldq[rowstart:rowstop, colstart:colstop:2]
return oddref, odddq
[docs]
def collect_even_refpixels(self, group, amplifier, top_or_bottom):
"""
Collect even reference pixels.
For traditional readouts, even pixels correspond to even-numbered
rows (second, fourth, sixth, etc.), which are odd array indices.
For IRS2 mode, science and traditional reference pixels have
the same parity, but interleaved pixels come in two pairs. The
first two are odd and the second two are even.
Parameters
----------
group : ndarray
The group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
String corresponding to the amplifier being processed
top_or_bottom : {'top', 'bottom'}
String corresponding to whether top or bottom reference pixels
are bing processed
Returns
-------
evenref : ndarray
Array containing all the even reference pixels
evendq : ndarray
Array containing all the even dq values for those reference pixels
"""
rowstart, rowstop, colstart, colstop = self.reference_sections[amplifier][top_or_bottom]
# handle interleaved pixels if needed
if self.is_irs2:
even_mask = ~self.irs2_odd_mask[colstart:colstop]
evenref = group[rowstart:rowstop, colstart:colstop][:, even_mask]
evendq = self.pixeldq[rowstart:rowstop, colstart:colstop][:, even_mask]
else:
# Even columns start on the second column
colstart = colstart + 1
evenref = group[rowstart:rowstop, colstart:colstop:2]
evendq = self.pixeldq[rowstart:rowstop, colstart:colstop:2]
return evenref, evendq
[docs]
def get_odd_refvalue(self, group, amplifier, top_or_bottom):
"""
Calculate the reference pixel mean in odd-numbered columns.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
top_or_bottom : {'top', 'bottom'}
Processing top or bottom reference pixels?
Returns
-------
odd : float
Value of the clipped mean of the reference pixels in odd-numbered
columns
"""
ref, dq = self.collect_odd_refpixels(group, amplifier, top_or_bottom)
odd = self.sigma_clip(ref, dq)
return odd
[docs]
def get_even_refvalue(self, group, amplifier, top_or_bottom):
"""
Calculate the reference pixel mean in even-numbered columns.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
top_or_bottom : {'top', 'bottom'}
Processing top or bottom reference pixels?
Returns
-------
even : float
Value of the clipped mean of the reference pixels in even-numbered
columns
"""
ref, dq = self.collect_even_refpixels(group, amplifier, top_or_bottom)
even = self.sigma_clip(ref, dq)
return even
[docs]
def get_amplifier_refvalue(self, group, amplifier, top_or_bottom):
"""
Calculate the reference pixel mean for a given amplifier.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
top_or_bottom : {'top', 'bottom'}
Processing top or bottom reference pixels?
Returns
-------
odd : float
Value of the clipped mean of the reference pixels in odd-numbered
columns (if ``self.odd_even_columns``)
even : float
Value of the clipped mean of the reference pixels in even-numbered
columns (if ``self.odd_even_columns``)
mean : float
Value of the clipped mean of the reference pixels in both odd-numbered
and even-numbered columns (if not ``self.odd_even_columns``)
"""
if self.odd_even_columns:
odd = self.get_odd_refvalue(group, amplifier, top_or_bottom)
even = self.get_even_refvalue(group, amplifier, top_or_bottom)
if odd is None or even is None:
self.bad_reference_pixels = True
return odd, even
else:
rowstart, rowstop, colstart, colstop = self.reference_sections[amplifier][top_or_bottom]
ref = group[rowstart:rowstop, colstart:colstop]
dq = self.pixeldq[rowstart:rowstop, colstart:colstop]
mean = self.sigma_clip(ref, dq)
if mean is None:
self.bad_reference_pixels = True
return mean
[docs]
def get_refvalues(self, group):
"""
Get the reference pixel values.
Get values for each amplifier, odd and even columns
and top and bottom reference pixels.
Parameters
----------
group : ndarray
Group that is being processed.
Returns
-------
refpix : dict
Dictionary containing the clipped mean of the reference pixels for
each amplifier, odd and even columns (if selected, otherwise all columns)
and top and bottom.
"""
refpix = {}
for amplifier in self.amplifiers:
refpix[amplifier] = {}
refpix[amplifier]["odd"] = {}
refpix[amplifier]["even"] = {}
for top_bottom in ("top", "bottom"):
refvalues = self.get_amplifier_refvalue(group, amplifier, top_bottom)
if self.odd_even_columns:
refpix[amplifier]["odd"][top_bottom] = refvalues[0]
refpix[amplifier]["even"][top_bottom] = refvalues[1]
else:
refpix[amplifier][top_bottom] = refvalues
return refpix
[docs]
def do_top_bottom_correction(self, group, refvalues):
"""
Do the top/bottom correction.
Parameters
----------
group : ndarray
Group that is being processed. The parameter ``group`` is corrected
for the bias drift using the top and bottom reference pixels
refvalues : dict
Dictionary of reference pixel clipped means
"""
for amplifier in self.amplifiers:
datarowstart, datarowstop, datacolstart, datacolstop = self.reference_sections[
amplifier
]["data"]
if self.odd_even_columns:
oddreftop = refvalues[amplifier]["odd"]["top"]
oddrefbottom = refvalues[amplifier]["odd"]["bottom"]
evenreftop = refvalues[amplifier]["even"]["top"]
evenrefbottom = refvalues[amplifier]["even"]["bottom"]
#
# For now, just average the top and bottom corrections
oddrefsignal = self.average_with_none(oddreftop, oddrefbottom)
evenrefsignal = self.average_with_none(evenreftop, evenrefbottom)
log.debug(
f"Amplifier: {amplifier}, Odd ref: {oddrefsignal}, Even ref: {evenrefsignal}"
)
if oddrefsignal is not None and evenrefsignal is not None:
if not self.is_irs2:
oddslice = (
slice(datarowstart, datarowstop, 1),
slice(datacolstart, datacolstop, 2),
)
evenslice = (
slice(datarowstart, datarowstop, 1),
slice(datacolstart + 1, datacolstop, 2),
)
group[oddslice] = group[oddslice] - oddrefsignal
group[evenslice] = group[evenslice] - evenrefsignal
else:
dataslice = (
slice(datarowstart, datarowstop, 1),
slice(datacolstart, datacolstop, 1),
)
odd_mask = self.irs2_odd_mask[datacolstart:datacolstop]
group[dataslice][:, odd_mask] -= oddrefsignal
group[dataslice][:, ~odd_mask] -= evenrefsignal
else:
pass
else:
reftop = refvalues[amplifier]["top"]
refbottom = refvalues[amplifier]["bottom"]
refsignal = self.average_with_none(reftop, refbottom)
if refsignal is not None:
dataslice = (
slice(datarowstart, datarowstop, 1),
slice(datacolstart, datacolstop, 1),
)
group[dataslice] = group[dataslice] - refsignal
else:
pass
return
[docs]
def average_with_none(self, a, b):
"""
Average two numbers.
If one is None, return the other. If both are None, return None.
Parameters
----------
a : float or None
First number to be averaged
b : float or None
Second number to be averaged
Returns
-------
result : float or None
Average of the two numbers
"""
if a is None and b is None:
return None
if a is None:
return b
elif b is None:
return a
else:
return 0.5 * (a + b)
[docs]
def create_reflected(self, data, smoothing_length):
"""
Make an array bigger by extending it at the top and bottom.
Extend by an amount equal to .5 (smoothing length-1),
as the smoothing length will be odd.
The extension is a reflection of the ends of the input array.
Parameters
----------
data : ndarray
Input data array.
smoothing_length : int
Smoothing length; should be odd, will be converted if not.
Amount by which the input array is extended is
``smoothing_length // 2`` at the bottom and ``smoothing_length // 2`` at
the top.
Returns
-------
reflected : ndarray
Array that has been extended at the top and bottom by reflecting the
first and last few rows.
"""
nrows, ncols = data.shape
if smoothing_length % 2 == 0:
log.info("Smoothing length must be odd, adding 1")
smoothing_length = smoothing_length + 1
newheight = nrows + smoothing_length - 1
reflected = np.zeros((newheight, ncols), dtype=data.dtype)
bufsize = smoothing_length // 2
reflected[bufsize : bufsize + nrows] = data[:]
reflected[:bufsize] = data[bufsize:0:-1]
reflected[-(bufsize):] = data[-2 : -(bufsize + 2) : -1]
return reflected
[docs]
def calculate_side_ref_signal(self, group, colstart, colstop):
"""
Calculate the reference pixel signal from the side reference pixels.
Calculate by running a box up the side reference pixels and calculating
the running median.
Parameters
----------
group : ndarray
Group that is being processed
colstart : int
Starting column
colstop : int
Ending column
Returns
-------
ndarray
Median filtered version of the side reference pixels
"""
smoothing_length = self.side_smoothing_length
data = group[:, colstart : colstop + 1]
dq = self.pixeldq[:, colstart : colstop + 1]
return self.median_filter(data, dq, smoothing_length)
[docs]
def combine_ref_signals(self, left, right):
"""
Combine the left and right reference signals.
Average row-by-row.
Parameters
----------
left : ndarray
1-D array of median-filtered reference pixel values from the left side
right : ndarray
1-D array of median-filtered reference pixel values from the right side
Returns
-------
sidegroup : ndarray
2-D array of average reference pixel vector replicated horizontally
"""
combined = self.combine_with_nans(left, right)
sidegroup = np.zeros((2048, 2048))
for column in range(2048):
sidegroup[:, column] = combined
return sidegroup
[docs]
def combine_with_nans(self, a, b):
"""
Combine 2 1-D arrays that have NaNs.
* Wherever both arrays are NaN, output is 0.0.
* Wherever a is NaN and b is not, return b.
* Wherever b is NaN and a is not, return a.
* Wherever neither a nor b is NaN, return the average of
a and b.
Parameters
----------
a : ndarray
First array to combine
b : ndarray
Second array to combine
Returns
-------
result : ndarray
Combined array
"""
result = np.zeros(len(a), dtype=a.dtype)
bothnan = np.where(np.isnan(a) & np.isnan(b))
result[bothnan] = 0.0
a_nan = np.where(np.isnan(a) & ~np.isnan(b))
result[a_nan] = b[a_nan]
b_nan = np.where(~np.isnan(a) & np.isnan(b))
result[b_nan] = a[b_nan]
no_nan = np.where(~np.isnan(a) & ~np.isnan(b))
result[no_nan] = 0.5 * (a[no_nan] + b[no_nan])
return result
[docs]
def apply_side_correction(self, group, sidegroup):
"""
Apply reference pixel correction from the side reference pixels.
Parameters
----------
group : ndarray
Group being processed
sidegroup : ndarray
Side reference pixel signal replicated horizontally
Returns
-------
corrected_group : ndarray
The group corrected for the side reference pixel signal
"""
corrected_group = group - self.side_gain * sidegroup
return corrected_group
[docs]
def do_side_correction(self, group):
"""
Do all the steps of the side reference pixel correction.
Parameters
----------
group : ndarray
Group being processed
Returns
-------
corrected_group : ndarray
Corrected group
"""
continue_apply_conv_kernel = False
# Check if convolution kernels for this detector are in the reference file
# and if not, proceed with side-pixel correction as usual
if self.refpix_algorithm == "sirs" and self.sirs_kernel_model is not None:
kernels = make_kernels(
self.sirs_kernel_model,
self.input_model.meta.instrument.detector,
self.gaussmooth,
self.halfwidth,
)
if kernels is None:
log.info("The REFPIX step will use the running median")
else:
continue_apply_conv_kernel = True
#
# Apply optimized convolution kernel
if continue_apply_conv_kernel:
corrected_group = apply_conv_kernel(group, kernels, sigreject=self.sigreject)
else:
# use running median
left = self.calculate_side_ref_signal(group, 0, 3)
right = self.calculate_side_ref_signal(group, 2044, 2047)
sidegroup = self.combine_ref_signals(left, right)
corrected_group = self.apply_side_correction(group, sidegroup)
return corrected_group
[docs]
def do_corrections(self):
"""Do reference pixel correction for NIR data."""
if self.is_subarray:
if self.is_multistripe:
self.do_multistripe_corrections()
elif self.noutputs == 4:
self.do_fullframe_corrections()
else:
self.do_subarray_corrections()
else:
self.do_fullframe_corrections()
[docs]
def do_fullframe_corrections(self):
"""
Do Reference Pixels Corrections for full frame data.
Correct all amplifiers, NIR detectors. The first read of each integration
is NOT subtracted, as the signal is removed in the superbias subtraction step.
"""
#
# First transform pixeldq array to detector coordinates
self.dms_to_detector_dq()
for integration in range(self.nints):
for group in range(self.ngroups):
#
# Get the reference values from the top and bottom reference
# pixels
#
self.dms_to_detector(integration, group)
thisgroup = self.group
refvalues = self.get_refvalues(thisgroup)
log.debug(f"Integration: {integration}, Group: {group}")
self.do_top_bottom_correction(thisgroup, refvalues)
if self.use_side_ref_pixels:
corrected_group = self.do_side_correction(thisgroup)
self.group = corrected_group
else:
self.group = thisgroup
#
# Now transform back from detector to DMS coordinates.
self.detector_to_dms(integration, group)
return
[docs]
def do_subarray_corrections(self):
"""
Do corrections for subarrays.
Reference pixel values are calculated separately for odd and even columns
if ``odd_even_columns`` is `True`, otherwise a single number calculated from
all reference pixels
"""
refdq = dqflags.pixel["REFERENCE_PIXEL"]
donotuse = dqflags.pixel["DO_NOT_USE"]
# First transform to detector coordinates
# This transforms the pixeldq array from DMS to detector coordinates,
# only needs to be done once
self.dms_to_detector_dq()
# Determined refpix indices to use on each group
refpixindices = np.where(
(self.pixeldq & refdq == refdq) & (self.pixeldq & donotuse != donotuse)
)
nrefpixels = len(refpixindices[0])
if nrefpixels == 0:
self.bad_reference_pixels = True
return
if self.odd_even_columns:
oddrefpixindices_row = []
oddrefpixindices_col = []
evenrefpixindices_row = []
evenrefpixindices_col = []
for i in range(nrefpixels):
if (refpixindices[1][i] % 2) == 0:
evenrefpixindices_row.append(refpixindices[0][i])
evenrefpixindices_col.append(refpixindices[1][i])
else:
oddrefpixindices_row.append(refpixindices[0][i])
oddrefpixindices_col.append(refpixindices[1][i])
evenrefpixindices = (np.array(evenrefpixindices_row), np.array(evenrefpixindices_col))
oddrefpixindices = (np.array(oddrefpixindices_row), np.array(oddrefpixindices_col))
for integration in range(self.nints):
for group in range(self.ngroups):
# Get the reference values from the top and bottom reference pixels
self.dms_to_detector(integration, group)
thisgroup = self.group
if self.odd_even_columns:
evenrefpixvalue = self.sigma_clip(
thisgroup[evenrefpixindices], self.pixeldq[evenrefpixindices]
)
oddrefpixvalue = self.sigma_clip(
thisgroup[oddrefpixindices], self.pixeldq[oddrefpixindices]
)
thisgroup[:, 0::2] -= evenrefpixvalue
thisgroup[:, 1::2] -= oddrefpixvalue
else:
refpixvalue = self.sigma_clip(
thisgroup[refpixindices], self.pixeldq[refpixindices]
)
thisgroup -= refpixvalue
# Now transform back from detector to DMS coordinates.
self.detector_to_dms(integration, group)
return
[docs]
def get_multistripe_refvalues(self, group, stripe_idx, fastmin):
"""
Collect refpix values for each amplifier.
If ``odd_even_columns``, additionally separate
each amplifier reference value by odd and
even column. All values returned are sigma-
clipped.
Parameters
----------
group : ndarray[float]
The current group to be corrected.
stripe_idx : int
The stripe index for superstripe observations. For substripe,
this should be set to -1.
fastmin : int
The subarray offset in the fast read direction.
Returns
-------
refpix : dict
The dictionary of reference values,
keyed on amplifier name and stored
either as odd and even values, or
as the mean value.
"""
refpix = {}
refdq = dqflags.pixel["REFERENCE_PIXEL"]
dnudq = dqflags.pixel["DO_NOT_USE"]
# Multistripe will have 3D pixeldq, so we index on stripe.
if stripe_idx >= 0:
pixeldq = self.pixeldq[stripe_idx]
else:
pixeldq = self.pixeldq
# coordinates for the current array
_, x = np.mgrid[: pixeldq.shape[0], : pixeldq.shape[1]]
x += fastmin
for amplifier in self.amplifiers:
amp_xi, amp_xf = MULTISTRIPE_AMPLIFIER_REGIONS[amplifier]
in_amp = (x >= amp_xi) & (x < amp_xf)
if not np.any(in_amp):
continue
refpix[amplifier] = {}
mask = in_amp & (pixeldq & refdq > 0) & (pixeldq & dnudq == 0)
if self.odd_even_columns:
odd_mask = mask.copy()
odd_mask[:, ::2] = False
even_mask = mask.copy()
even_mask[:, 1::2] = False
even_ref = group[even_mask]
even_dq = pixeldq[even_mask]
even = self.sigma_clip(even_ref, even_dq)
odd_ref = group[odd_mask]
odd_dq = pixeldq[odd_mask]
odd = self.sigma_clip(odd_ref, odd_dq)
refpix[amplifier]["odd"] = odd
refpix[amplifier]["even"] = even
else:
refpix[amplifier]["mean"] = self.sigma_clip(group[mask], pixeldq[mask])
return refpix
[docs]
def apply_multistripe_correction(self, group, refvalues, fastmin):
"""
Remove the reference pixel signal from the data.
Parameters
----------
group : ndarray[float]
The current group being corrected.
refvalues : dict
The dictionary of reference pixel
signals.
fastmin : int
The subarray offset in the fast read direction.
Returns
-------
ndarray[float]
The group with the reference pixel
signal removed.
"""
# coordinates for the current array
_, x = np.mgrid[: group.shape[0], : group.shape[1]]
x += fastmin
for amplifier in self.amplifiers:
amp_xi, amp_xf = MULTISTRIPE_AMPLIFIER_REGIONS[amplifier]
# Ensure that fast read region lies within amp region
in_amp = (x >= amp_xi) & (x < amp_xf)
if not np.any(in_amp):
continue
if self.odd_even_columns:
even = refvalues[amplifier]["even"]
odd = refvalues[amplifier]["odd"]
if even is not None and odd is not None:
group[:, ::2][in_amp[:, ::2]] -= even
group[:, 1::2][in_amp[:, 1::2]] -= odd
else:
mean = refvalues[amplifier]["mean"]
if mean is not None:
group[in_amp] -= mean
return group
[docs]
def do_multistripe_corrections(self):
"""
Do reference pixel corrections for multistripe data.
Existing reference pixel algorithms assume
fixed, contiguous refpix regions; multistripe
needs separate methods.
"""
# First transform pixeldq array to detector coordinates
self.dms_to_detector_dq()
if np.abs(self.input_model.meta.subarray.fastaxis) == 1:
fastmin = self.input_model.meta.subarray.xstart - 1
else:
fastmin = self.input_model.meta.subarray.ystart - 1
for integration_num in range(self.nints):
# Multistripe requires passing the stripe index to access the correct
# pixeldq plane
if n_superstripe := getattr(self.input_model.meta.subarray, "num_superstripe", 0) > 0:
stripe_idx = integration_num % n_superstripe
else:
stripe_idx = -1
for group_num in range(self.ngroups):
# Select group and transform to detector frame.
self.dms_to_detector(integration_num, group_num)
thisgroup = self.group
refvalues = self.get_multistripe_refvalues(thisgroup, stripe_idx, fastmin)
thisgroup = self.apply_multistripe_correction(thisgroup, refvalues, fastmin)
self.group = thisgroup
# Now transform back from detector to DMS coordinates.
self.detector_to_dms(integration_num, group_num)
return
[docs]
def dms_to_detector(self, integration, group):
"""
Convert data from DMS to detector frame.
Parameters
----------
integration : int
Integration number
group : int
Group number
"""
self.get_group(integration, group)
self.group = reffile_utils.science_detector_frame_transform(
self.group, self.fastaxis, self.slowaxis
)
[docs]
def detector_to_dms(self, integration, group):
"""
Convert data from detector to DMS frame.
Parameters
----------
integration : int
Integration number
group : int
Group number
"""
self.group = reffile_utils.detector_science_frame_transform(
self.group, self.fastaxis, self.slowaxis
)
self.restore_group(integration, group)
[docs]
def dms_to_detector_dq(self):
"""Convert DQ data from DMS to detector frame."""
self.pixeldq = reffile_utils.science_detector_frame_transform(
self.pixeldq, self.fastaxis, self.slowaxis
)
[docs]
class MIRIDataset(Dataset):
"""
Container for MIRI data.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model to be corrected
odd_even_rows : bool
Flag that controls whether odd and even-numbered rows are
handled separately
conv_kernel_params : dict
Dictionary containing the parameters needed for the optimized convolution kernel
siglimit : float
Sigma clipping limit for outlier rejection
"""
def __init__(self, input_model, odd_even_rows, conv_kernel_params, siglimit):
super(MIRIDataset, self).__init__(
input_model,
odd_even_columns=False,
use_side_ref_pixels=False,
side_smoothing_length=False,
side_gain=False,
conv_kernel_params=conv_kernel_params,
odd_even_rows=odd_even_rows,
siglimit=siglimit,
)
self.reference_sections = MIR_reference_sections
[docs]
def dms_to_detector(self, integration, group):
"""
Convert MIRI data from DMS to detector frame.
MIRI data is the same in detector and DMS frames.
Parameters
----------
integration : int
Integration number
group : int
Group number
"""
pass
[docs]
def detector_to_dms(self, integration, group):
"""
Convert MIRI data from detector to DMS frame.
Parameters
----------
integration : int
Integration number
group : int
Group number
"""
pass
[docs]
def collect_odd_refpixels(self, group, amplifier, left_or_right):
"""
Collect reference pixels from odd-numbered rows.
Parameters
----------
group : ndarray
Group being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier being processed
left_or_right : {'left', 'right'}
Process left or right side reference pixels
Returns
-------
oddref : ndarray
Reference pixels from odd-numbered rows
odddq : ndarray
DQ values for reference pixels from odd-numbered rows
"""
rowstart, rowstop, column = self.reference_sections[amplifier][left_or_right]
oddref = group[rowstart:rowstop:2, column]
odddq = self.pixeldq[rowstart:rowstop:2, column]
return oddref, odddq
[docs]
def collect_even_refpixels(self, group, amplifier, left_or_right):
"""
Collect reference pixels from even-numbered rows.
Parameters
----------
group : ndarray
Group being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier being processed
left_or_right : {'left', 'right'}
Process left or right side reference pixels
Returns
-------
evenref : ndarray
Reference pixels from even-numbered rows
evendq : ndarray
DQ values for reference pixels from even-numbered rows
"""
rowstart, rowstop, column = self.reference_sections[amplifier][left_or_right]
rowstart = rowstart + 1
evenref = group[rowstart:rowstop:2, column]
evendq = self.pixeldq[rowstart:rowstop:2, column]
return evenref, evendq
[docs]
def get_odd_refvalue(self, group, amplifier, left_or_right):
"""
Calculate reference values in odd-numbered rows.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
left_or_right : {'left', 'right'}
Processing left or right reference pixels?
Returns
-------
odd : float
Value of the clipped mean of the reference pixels in odd-numbered
rows
"""
ref, dq = self.collect_odd_refpixels(group, amplifier, left_or_right)
odd = self.sigma_clip(ref, dq)
return odd
[docs]
def get_even_refvalue(self, group, amplifier, left_or_right):
"""
Calculate reference value in even-numbered rows.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
left_or_right : {'left', 'right'}
Processing left or right reference pixels?
Returns
-------
even : float
Value of the clipped mean of the reference pixels in even-numbered
rows
"""
ref, dq = self.collect_even_refpixels(group, amplifier, left_or_right)
even = self.sigma_clip(ref, dq)
return even
[docs]
def get_amplifier_refvalue(self, group, amplifier, left_or_right):
"""
Calculate the reference pixel mean for a given amplifier.
Parameters
----------
group : ndarray
Group that is being processed
amplifier : {'A', 'B', 'C', 'D'}
Amplifier that is being processed
left_or_right : {'left', 'right'}
Processing left or right side reference pixels?
Returns
-------
odd : float
Value of the clipped mean of the reference pixels in odd-numbered
rows (if ``self.odd_even_rows``)
even : float
Value of the clipped mean of the reference pixels in even-numbered
rows (if ``self.odd_even_rows``)
mean : float
Value of the clipped mean of the reference pixels in both odd-numbered
and even-numbered rows (if not ``self.odd_even_rows``)
"""
if self.odd_even_rows:
odd = self.get_odd_refvalue(group, amplifier, left_or_right)
even = self.get_even_refvalue(group, amplifier, left_or_right)
if odd is None:
log.warning(f"Odd rows for amplifier {amplifier} have no good reference pixels")
self.bad_reference_pixels = True
elif even is None:
log.warning(f"Even rows for amplifier {amplifier} have no good reference pixels")
self.bad_reference_pixels = True
return odd, even
else:
rowstart, rowstop, column = self.reference_sections[amplifier][left_or_right]
ref = group[rowstart:rowstop, column]
dq = self.pixeldq[rowstart:rowstop, column]
mean = self.sigma_clip(ref, dq)
if mean is None:
self.bad_reference_pixels = True
return mean
[docs]
def get_refvalues(self, group):
"""
Get the reference pixel values.
Values for each amplifier, odd and even rows and left and right side
reference pixels are returned in a dictionary.
Parameters
----------
group : ndarray
Group that is being processed.
Returns
-------
refpix : dict
Dictionary containing the clipped mean of the reference pixels for
each amplifier, odd and even rows (if selected, otherwise all rows)
and left and right.
"""
refpix = {}
for amplifier in self.amplifiers:
refpix[amplifier] = {}
refpix[amplifier]["odd"] = {}
refpix[amplifier]["even"] = {}
for left_right in ("left", "right"):
refvalues = self.get_amplifier_refvalue(group, amplifier, left_right)
if self.odd_even_rows:
refpix[amplifier]["odd"][left_right] = refvalues[0]
refpix[amplifier]["even"][left_right] = refvalues[1]
else:
refpix[amplifier][left_right] = refvalues
return refpix
[docs]
def do_left_right_correction(self, group, refvalues):
"""
Do the reference pixel correction.
Parameters
----------
group : ndarray
Group that is being processed. This parameter is corrected for the
bias drift using the left and right reference pixels
refvalues : dict
Dictionary of reference pixel clipped means
"""
for amplifier in self.amplifiers:
datarowstart, datarowstop, datacolstart, datacolstop, stride = self.reference_sections[
amplifier
]["data"]
if self.odd_even_rows:
oddrefleft = refvalues[amplifier]["odd"]["left"]
oddrefright = refvalues[amplifier]["odd"]["right"]
evenrefleft = refvalues[amplifier]["even"]["left"]
evenrefright = refvalues[amplifier]["even"]["right"]
#
# For now, just average the left and right corrections
oddrefsignal = 0.5 * (oddrefleft + oddrefright)
evenrefsignal = 0.5 * (evenrefleft + evenrefright)
oddslice = (
slice(datarowstart, datarowstop, 2),
slice(datacolstart, datacolstop, 4),
)
evenslice = (
slice(datarowstart + 1, datarowstop, 2),
slice(datacolstart, datacolstop, 4),
)
group[oddslice] = group[oddslice] - oddrefsignal
group[evenslice] = group[evenslice] - evenrefsignal
else:
refleft = refvalues[amplifier]["left"]
refright = refvalues[amplifier]["right"]
refsignal = 0.5 * (refleft + refright)
dataslice = (
slice(datarowstart, datarowstop, 1),
slice(datacolstart, datacolstop, 4),
)
group[dataslice] = group[dataslice] - refsignal
return
[docs]
def do_corrections(self):
"""Perform reference pixel correction for MIRI data."""
if self.is_subarray:
self.do_subarray_corrections()
else:
self.do_fullframe_corrections()
[docs]
def do_subarray_corrections(self):
"""
Perform reference pixel correction for MIRI subarrays.
Currently skipped.
"""
log.warning("Refpix correction skipped for MIRI subarray")
return
[docs]
def do_fullframe_corrections(self):
"""Do Reference Pixels Corrections for all amplifiers, MIRI detectors."""
first_read = np.zeros((self.nints, self.nrows, self.ncols))
log.info("Subtracting initial read from each integration")
for i in range(self.nints):
first_read[i] = self.input_model.data[i, 0].copy()
self.input_model.data[i] = self.input_model.data[i] - first_read[i]
for integration in range(self.nints):
# Don't process the first group as it's all zeros and the clipped
# mean will return NaN
for group in range(1, self.ngroups):
# Get the reference values from the top and bottom reference pixels
self.get_group(integration, group)
thisgroup = self.group
refvalues = self.get_refvalues(thisgroup)
if self.bad_reference_pixels:
log.warning(f"Group {group} has no reference pixels")
break
self.do_left_right_correction(thisgroup, refvalues)
self.restore_group(integration, group)
log.info("Adding initial read back in")
for i in range(self.nints):
self.input_model.data[i] += first_read[i]
del first_read
return
[docs]
def create_dataset(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
odd_even_rows,
conv_kernel_params,
siglimit,
):
"""
Create a dataset object from an input model.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data model to be corrected
odd_even_columns : bool
Flag that controls whether odd and even-numbered columns are
processed separately (NIR only)
use_side_ref_pixels : bool
Flag that controls whether the side reference pixels are used in
the correction (NIR only)
side_smoothing_length : int
Smoothing length to use in calculating the running median of
the side reference pixels (NIR only)
side_gain : float
Gain to use in applying the side reference pixel correction
(NIR only)
odd_even_rows : bool
Flag that controls whether odd and even-numbered rows are handled
separately (MIR only)
conv_kernel_params : dict
Dictionary containing the parameters needed for the optimized convolution kernel
siglimit : float
Sigma clipping limit for outlier rejection
Returns
-------
dataset_subclass : `Dataset`
The appropriate subclass of the dataset class
"""
detector = input_model.meta.instrument.detector
if reffile_utils.is_subarray(input_model):
colstart = input_model.meta.subarray.xstart - 1
colstop = colstart + input_model.meta.subarray.xsize
rowstart = input_model.meta.subarray.ystart - 1
rowstop = rowstart + input_model.meta.subarray.ysize
if rowstart < 0 or colstart < 0 or rowstop > 2048 or colstop > 2048:
return None
if detector[:3] == "MIR":
return MIRIDataset(input_model, odd_even_rows, conv_kernel_params, siglimit)
elif detector in NIR_DETECTORS:
return NIRDataset(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
conv_kernel_params,
siglimit,
)
else:
log.error("Unrecognized detector")
return NIRDataset(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
conv_kernel_params,
siglimit,
)
[docs]
def correct_model(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
odd_even_rows,
conv_kernel_params,
siglimit,
):
"""
Perform Reference Pixel Correction on a JWST Model.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Model to be corrected
odd_even_columns : bool
Flag that controls whether odd and even-numbered columns are
processed separately (NIR only)
use_side_ref_pixels : bool
Flag that controls whether the side reference pixels are used in
the correction (NIR only)
side_smoothing_length : int
Smoothing length the use in calculating the running median of
the side reference pixels (NIR only)
side_gain : float
Gain to use in applying the side reference pixel correction
(NIR only)
odd_even_rows : bool
Flag that controls whether odd and even-numbered rows are handled
separately (MIR only)
conv_kernel_params : dict
Dictionary containing the parameters needed for the optimized convolution kernel
siglimit : float
Sigma clipping limit for outlier rejection
Returns
-------
status : int
Return status
"""
if input_model.meta.instrument.name == "MIRI":
if reffile_utils.is_subarray(input_model):
log.warning("Refpix correction skipped for MIRI subarrays")
return SUBARRAY_SKIPPED
input_dataset = create_dataset(
input_model,
odd_even_columns,
use_side_ref_pixels,
side_smoothing_length,
side_gain,
odd_even_rows,
conv_kernel_params,
siglimit,
)
if input_dataset is None:
status = SUBARRAY_DOESNTFIT
return status
input_dataset.log_parameters()
reference_pixel_correction(input_dataset)
return REFPIX_OK
[docs]
def reference_pixel_correction(input_dataset):
"""
Do the Reference Pixel Correction.
Parameters
----------
input_dataset : `Dataset`
Dataset to be corrected
Returns
-------
input_dataset : `Dataset`
Corrected dataset
"""
input_dataset.do_corrections()
if input_dataset.input_model.meta.exposure.zero_frame:
log.info("Processing the zero frame")
process_zeroframe_correction(input_dataset)
return
[docs]
def process_zeroframe_correction(input_dataset):
"""
Do the Reference Pixel Correction for the ZEROFRAME array.
Parameters
----------
input_dataset : `Dataset`
Dataset to be corrected
Returns
-------
input_dataset : `Dataset`
Corrected dataset
"""
# Setup input model for ZEROFRAME
saved_values = save_science_values(input_dataset)
setup_dataset_for_zeroframe(input_dataset, saved_values)
# Run refpix correction on ZEROFRAME
input_dataset.do_corrections()
restore_input_model(input_dataset, saved_values)
[docs]
def setup_dataset_for_zeroframe(input_dataset, saved_values):
"""
Set up dataset for zero frame.
Parameters
----------
input_dataset : `Dataset`
Dataset to be modified.
saved_values : tuple
A tuple of saved values used to setup the final
corrected `~stdatamodels.jwst.datamodels.RampModel`.
"""
# Setup dimensions
dims = input_dataset.input_model.zeroframe.shape
nints, nrows, ncols = dims
ngroups = 1
new_dims = (nints, ngroups, nrows, ncols)
# Setup ZEROFRAME data
data = input_dataset.input_model.zeroframe
data = data.reshape(new_dims)
# Setup ZEROFRAME dummy groupdq
gdtype = input_dataset.input_model.groupdq.dtype
gdq = np.zeros(dims, dtype=gdtype)
wh_zero = saved_values[-1]
gdq[wh_zero] = dqflags.pixel["DO_NOT_USE"]
gdq = gdq.reshape(new_dims)
# Setup dataset with ZEROFRAME data
input_dataset.ngroups = ngroups
input_dataset.pixeldq = input_dataset.get_pixeldq()
input_dataset.input_model.data = data
input_dataset.input_model.groupdq = gdq
input_dataset.zeroframe_proc = True
[docs]
def save_science_values(input_dataset):
"""
Save corrected data for the SCI extension.
Parameters
----------
input_dataset : `Dataset`
Dataset to be corrected.
Returns
-------
data : ndarray
The corrected SCI data.
gdq : ndarray
The corrected SCI groupdq.
pdq : ndarray
The corrected SCI pixeldq.
wh_zero : ndarray
The location of the zeroed out locations in the ZEROFRAME.
"""
data = input_dataset.input_model.data
gdq = input_dataset.input_model.groupdq
pdq = input_dataset.input_model.pixeldq
wh_zero = np.where(input_dataset.input_model.zeroframe[:, :, :] == 0.0)
return data, gdq, pdq, wh_zero