Skip to content

Commit

Permalink
Handle custom uncertainty names and test them
Browse files Browse the repository at this point in the history
  • Loading branch information
dhomeier committed Mar 15, 2023
1 parent 1d568a1 commit d5705fa
Showing 1 changed file with 43 additions and 28 deletions.
71 changes: 43 additions & 28 deletions specutils/io/default_loaders/wcs_fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
'VAR': VarianceUncertainty,
'IVAR': InverseVariance}

UNCERT_EXP = {'STD': 1, 'VAR': 2, 'IVAR': -2}
UNCERT_EXP = {'std': 1, 'var': 2, 'ivar': -2}


def identify_wcs1d_fits(origin, *args, **kwargs):
Expand All @@ -44,17 +44,18 @@ def identify_wcs1d_fits(origin, *args, **kwargs):
with read_fileobj_or_hdulist(*args, **kwargs) as hdulist:
return (hdulist[hdu].header.get('WCSDIM', 1) == 1 and
(hdulist[hdu].header['NAXIS'] == 1 or
hdulist[hdu].header.get('WCSAXES', 0) == 1 or
hdulist[hdu].header.get('DISPAXIS', -1) >= 0 ) and not
hdulist[hdu].header.get('WCSAXES', 0) == 1 or
hdulist[hdu].header.get('DISPAXIS', -1) >= 0 ) and not
hdulist[hdu].header.get('MSTITLE', 'n').startswith('2dF-SDSS LRG') and not
# Check in CTYPE1 key for linear solution (single spectral axis)
hdulist[hdu].header.get('CTYPE1', 'w').upper().startswith('MULTISPE'))


@data_loader("wcs1d-fits", identifier=identify_wcs1d_fits,
dtype=Spectrum1D, extensions=['fits', 'fit'], priority=5)
def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None, hdu=None,
verbose=False, mask_hdu=True, uncertainty_hdu=True, **kwargs):
def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None,
hdu=None, verbose=False, mask_hdu=True,
uncertainty_hdu=True, uncertainty_type=None, **kwargs):
"""
Loader for single spectrum-per-HDU spectra in FITS files, with the spectral
axis stored in the header as FITS-WCS. The flux unit of the spectrum is
Expand Down Expand Up @@ -84,9 +85,12 @@ def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None, hdu=Non
mask_hdu : int, str, bool or None, optional
The index or name of the HDU to read mask from
(default: try to autodetect; `False`|`None`: do not read in).
uncertainy_hdu : int, str, bool or None, optional
uncertainty_hdu : int, str, bool or None, optional
The index or name of the HDU to read uncertainy from
(default: try to autodetect; `False`|`None`: do not read in).
uncertainty_type : str or None, optional
The ``uncertainty_type`` of `~astropy.nddata.NDUncertainty`
(one of 'std', 'var', 'ivar'; default: try to infer from HDU EXTNAME).
**kwargs
Extra keywords for :func:`~specutils.io.parsing_utils.read_fileobj_or_hdulist`.
Expand Down Expand Up @@ -161,16 +165,22 @@ def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None, hdu=Non
AstropyUserWarning)

if uncertainty is not None:
if hdulist[ext].name in UNCERT_REF:
unc_type = hdulist[ext].name
if uncertainty_type is None:
if hdulist[ext].name in UNCERT_REF:
unc_type = hdulist[ext].name
else:
warnings.warn(f"Could not determine uncertainty type for HDU '{ext}' "
f"('{hdulist[ext].name}'), assuming 'StdDev'.",
AstropyUserWarning)
unc_type = 'STD'
elif str.upper(uncertainty_type) in UNCERT_REF:
unc_type = str.upper(uncertainty_type)
else:
warnings.warn(f"Could not determine uncertainty type for HDU '{ext}' "
f"('{hdulist[ext].name}'), assuming 'StdDev'.",
AstropyUserWarning)
unc_type = 'STD'
raise ValueError(f"Invalid uncertainty type: '{uncertainty_type}'; "
"should be one of 'std', 'var', 'ivar'.")
uunit = u.Unit(header.get('BUNIT', flux_unit))
if unc_type != 'STD':
uunit = uunit**UNCERT_EXP[unc_type]
uunit = uunit**UNCERT_EXP[unc_type.lower()]
uncertainty = UNCERT_REF[unc_type](u.Quantity(uncertainty, unit=uunit))

if spectral_axis_unit is not None:
Expand Down Expand Up @@ -223,9 +233,9 @@ def wcs1d_fits_writer(spectrum, file_name, hdu=0, update_header=False,
uncertainty_name : str or `None`, optional
HDU name to store uncertainty under (default set from type; `None`: do not save)
unit : str or :class:`~astropy.units.Unit`, optional
Unit for the flux (and associated uncertainty; defaults to `spectrum.flux.unit`)
Unit for the flux (and associated uncertainty; defaults to ``spectrum.flux.unit``)
dtype : str or :class:`~numpy.dtype`, optional
Floating point type for storing flux array (defaults to `spectrum.flux.dtype`)
Floating point type for storing flux array (defaults to ``spectrum.flux.dtype``)
"""
# Create HDU list from WCS
try:
Expand Down Expand Up @@ -262,35 +272,40 @@ def wcs1d_fits_writer(spectrum, file_name, hdu=0, update_header=False,
if flux_name is not None:
hdulist[0].name = flux_name

# Append mask array (duplicate WCS for that extension)?
# Append mask array
if spectrum.mask is not None and mask_name is not None:
hdulist.append(wcs.to_fits()[0])
hdulist.append(fits.ImageHDU())
hdulist[-1].data = spectrum.mask
if mask_name == '':
hdulist[-1].name = 'MASK'
else:
hdulist[-1].name = mask_name
hdu += 1
# Warn if extension name for saving was explicitly chosen.
# Warn if saving was requested (per explicitly choosing extension name).
elif mask_name is not None and mask_name != '':
warnings.warn("No mask found in this Spectrum1D, none saved.", AstropyUserWarning)

# Append uncertainty array (duplicate WCS for that extension)?
# Append uncertainty array
if spectrum.uncertainty is not None and uncertainty_name is not None:
hdulist.append(wcs.to_fits()[0])
# uncertainty - units to be inferred from spectrum.flux
if uncertainty_name == '':
uncertainty_name = [n for n in UNCERT_REF if
isinstance(spectrum.uncertainty, UNCERT_REF[n])][0]
if uncertainty_name in ('VAR', 'IVAR'):
uunit = funit**UNCERT_EXP[uncertainty_name]
hdulist.append(fits.ImageHDU())
uncertainty_name = str.upper(uncertainty_name)
uncertainty_type = type(spectrum.uncertainty)
# uncertainty - units to be inferred from and consistent with spectrum.flux.
if uncertainty_name in UNCERT_REF and uncertainty_type != UNCERT_REF[uncertainty_name]:
raise ValueError(f"Illegal label for uncertainty: '{uncertainty_name}' is reserved "
f"for {UNCERT_REF[uncertainty_name]}, not {uncertainty_type}.")

if spectrum.uncertainty.uncertainty_type in ('var', 'ivar'):
uunit = funit**UNCERT_EXP[spectrum.uncertainty.uncertainty_type]
else:
uunit = funit
if uncertainty_name == '':
uncertainty_name = spectrum.uncertainty.uncertainty_type.upper()
sig = spectrum.uncertainty.quantity.to_value(uunit, equivalencies=u.spectral_density(disp))
hdulist[-1].data = sig.astype(ftype)
hdulist[-1].name = uncertainty_name
hdu += 1
# Warn if extension name for saving was explicitly chosen.
# Warn if saving was requested (per explicitly choosing extension name).
elif uncertainty_name is not None and uncertainty_name != '':
warnings.warn("No uncertainty array found in this Spectrum1D, none saved.",
AstropyUserWarning)
Expand Down Expand Up @@ -468,7 +483,7 @@ def _read_non_linear_iraf_fits(file_obj, spectral_axis_unit=None, flux_unit=None
Returns
-------
Tuple of data to pass to SpectrumCollection() or Spectrum1D():
Tuple of data to pass to `SpectrumCollection`() or `Spectrum1D`():
spectral_axis : :class:`~astropy.units.Quantity`
The spectral axis or axes as constructed from WCS(hdulist[0].header).
Expand Down

0 comments on commit d5705fa

Please sign in to comment.