Source code for jdaviz.configs.cubeviz.plugins.parsers

import os
import warnings

import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from specutils import Spectrum

from jdaviz.core.custom_units_and_equivs import PIX2, _eqv_flux_to_sb_pixel
from jdaviz.core.events import SnackbarMessage
from jdaviz.core.registries import data_parser_registry
from jdaviz.core.unit_conversion_utils import check_if_unit_is_per_solid_angle
from jdaviz.utils import standardize_metadata, PRIHDR_KEY, download_uri_to_path

__all__ = ['parse_data']

EXT_TYPES = dict(flux=['flux', 'sci', 'data'],
                 uncert=['ivar', 'err', 'error', 'var', 'uncert'],
                 mask=['mask', 'dq', 'quality', 'data_quality'])


[docs] @data_parser_registry("cubeviz-data-parser") def parse_data(app, file_obj, data_type=None, data_label=None, parent=None, cache=None, local_path=None, timeout=None, specutils_format=None, spectral_axis_index=None): """ Attempts to parse a data file and auto-populate available viewers in cubeviz. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. file_obj : str The path to a cube-like data file. data_type : str, {'flux', 'mask', 'uncert'} The data type used to explicitly differentiate parsed data. data_label : str, optional The label to be applied to the Glue data component. parent : str, optional Data label for "parent" data to associate with the loaded data as "child". cache : None, bool, or str Cache the downloaded file if the data are retrieved by a query to a URL or URI. local_path : str, optional Cache remote files to this path. This is only used if data is requested from `astroquery.mast`. timeout : float, optional If downloading from a remote URI, set the timeout limit for remote requests in seconds (passed to `~astropy.utils.data.download_file` or `~astroquery.mast.Conf.timeout`). specutils_format : str, optional Optional format string to pass to Spectrum.read(), see https://specutils.readthedocs.io/en/stable/spectrum1d.html#list-of-loaders for valid format strings. Useful for processed files that may not include the original headers with information used to auto-identify. """ flux_viewer_reference_name = app._jdaviz_helper._default_flux_viewer_reference_name uncert_viewer_reference_name = app._jdaviz_helper._default_uncert_viewer_reference_name spectrum_viewer_reference_name = app._jdaviz_helper._default_spectrum_viewer_reference_name if data_type is not None and data_type.lower() not in ('flux', 'mask', 'uncert'): raise TypeError("Data type must be one of 'flux', 'mask', or 'uncert' " f"but got '{data_type}'") # If the file object is an hdulist or a string, use the generic parser for # fits files. # TODO: this currently only supports fits files. We will want to make this # generic enough to work with other file types (e.g. ASDF). For now, this # supports MaNGA and JWST data. metadata = {} if isinstance(file_obj, fits.hdu.hdulist.HDUList): if 'PRIMARY' in file_obj: metadata[PRIHDR_KEY] = standardize_metadata(file_obj['PRIMARY'].header) try: _parse_spectrum1d_3d( app, Spectrum.read(file_obj, meta=metadata), data_label=data_label, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) except Exception: # nosec _parse_hdulist( app, file_obj, file_name=data_label, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) elif isinstance(file_obj, str): if file_obj.lower().endswith('.gif'): # pragma: no cover _parse_gif(app, file_obj, data_label, flux_viewer_reference_name=flux_viewer_reference_name) return # try parsing file_obj as a URI/URL: file_obj = download_uri_to_path( file_obj, cache=cache, local_path=local_path, timeout=timeout ) file_name = os.path.basename(file_obj) with fits.open(file_obj) as hdulist: prihdr = hdulist[0].header if specutils_format is not None: sc = Spectrum.read(file_obj, format=specutils_format) _parse_spectrum1d_3d( app, sc, data_label=data_label, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) app.get_tray_item_from_name("Spectral Extraction").disabled_msg = "" return telescop = prihdr.get('TELESCOP', '').lower() exptype = prihdr.get('EXP_TYPE', '').lower() # NOTE: Alerted to deprecation of FILETYPE keyword from pipeline on 2022-07-08 # Kept for posterity in for data processed prior to this date. Use EXP_TYPE instead filetype = prihdr.get('FILETYPE', '').lower() if telescop == 'jwst' and ('ifu' in exptype or 'mrs' in exptype or filetype == '3d ifu cube'): sc = Spectrum.read(file_obj) data_label = app.return_data_label(file_name) _parse_spectrum1d_3d( app, sc, data_label=data_label, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) else: try: _parse_spectrum1d_3d( app, Spectrum.read(hdulist), data_label=data_label or file_name, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) except Exception: # nosec _parse_hdulist( app, hdulist, file_name=data_label or file_name, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) # If the data types are custom data objects, use explicit parsers. Note # that this relies on the glue-astronomy machinery to turn the data object # into something glue can understand. elif isinstance(file_obj, Spectrum) and file_obj.flux.ndim in (1, 3): if file_obj.flux.ndim == 3: _parse_spectrum1d_3d( app, file_obj, data_label=data_label, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name ) else: _parse_spectrum1d( app, file_obj, data_label=data_label, spectrum_viewer_reference_name=spectrum_viewer_reference_name ) elif isinstance(file_obj, np.ndarray) and file_obj.ndim == 3: if spectral_axis_index is not None and spectral_axis_index < 0: spectral_axis_index = spectral_axis_index + file_obj.ndim _parse_ndarray(app, file_obj, data_label=data_label, data_type=data_type, flux_viewer_reference_name=flux_viewer_reference_name, uncert_viewer_reference_name=uncert_viewer_reference_name, spectral_axis_index=spectral_axis_index) else: raise NotImplementedError(f'Unsupported data format: {file_obj}')
def _get_celestial_wcs(wcs): """ If `wcs` has a celestial component return that, otherwise return None """ return wcs.celestial if hasattr(wcs, 'celestial') else None def _return_spectrum_with_correct_units(flux, wcs, metadata, data_type=None, target_wave_unit=None, hdulist=None, uncertainty=None, mask=None, apply_pix2=False, spectral_axis=None): """Upstream issue of WCS not using the correct units for data must be fixed here. Issue: https://github.com/astropy/astropy/issues/3658. Also converts flux units to flux/pix2 solid angle units, if `flux` is not a surface brightness and `apply_pix2` is True. """ # handle scale factors when they are included in the unit # (has to be done before Spectrum creation) if not np.isclose(flux.unit.scale, 1, rtol=1e-5): flux = flux.to(flux.unit / flux.unit.scale) with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', message='Input WCS indicates that the spectral axis is not last', category=UserWarning) if spectral_axis is None: sc = Spectrum(flux=flux, wcs=wcs, uncertainty=uncertainty, mask=mask, meta=metadata) else: sc = Spectrum(flux=flux, spectral_axis=spectral_axis, uncertainty=uncertainty, mask=mask, meta=metadata) # convert flux and uncertainty to per-pix2 if input is not a surface brightness target_flux_unit = None if (apply_pix2 and (data_type != "mask") and (not check_if_unit_is_per_solid_angle(flux.unit))): target_flux_unit = flux.unit / PIX2 elif check_if_unit_is_per_solid_angle(flux.unit, return_unit=True) == "spaxel": # We need to convert spaxel to pixel squared, since spaxel isn't fully supported by astropy # This is horribly ugly but just multiplying by u.Unit("spaxel") doesn't work target_flux_unit = flux.unit * u.Unit('spaxel') / PIX2 if target_wave_unit is None and hdulist is not None: found_target = False for ext in ('SCI', 'FLUX', 'PRIMARY', 'DATA'): # In priority order if found_target: break if ext not in hdulist: continue hdr = hdulist[ext].header # The WCS could be swapped or unswapped. for cunit_num in (3, 1): cunit_key = f"CUNIT{cunit_num}" ctype_key = f"CTYPE{cunit_num}" if cunit_key in hdr and 'WAV' in hdr[ctype_key]: target_wave_unit = u.Unit(hdr[cunit_key]) found_target = True break if target_wave_unit == sc.spectral_axis.unit: target_wave_unit = None if (target_wave_unit is None) and (target_flux_unit is None): # Nothing to convert new_sc = sc elif target_flux_unit is None: # Convert wavelength only new_sc = sc.with_spectral_axis_unit(target_wave_unit) elif target_wave_unit is None: # Convert flux only and only PIX2 stuff new_sc = sc.with_flux_unit(target_flux_unit, equivalencies=_eqv_flux_to_sb_pixel()) else: # Convert both new_sc = sc.with_spectral_axis_and_flux_units( target_wave_unit, target_flux_unit, flux_equivalencies=_eqv_flux_to_sb_pixel()) if target_wave_unit is not None: new_sc.meta['_orig_spec'] = sc # Need this for later return new_sc def _parse_hdulist(app, hdulist, file_name=None, flux_viewer_reference_name=None, uncert_viewer_reference_name=None): if file_name is None and hasattr(hdulist, 'file_name'): file_name = hdulist.file_name else: file_name = file_name or "Unknown HDU object" is_loaded = [] wcs_sci = None # TODO: This needs refactoring to be more robust. # Current logic fails if there are multiple EXTVER. for hdu in hdulist: if hdu.data is None or not hdu.is_image or hdu.data.ndim != 3: continue data_type = _get_data_type_by_hdu(hdu) if not data_type: continue # Only load each type once. if data_type in is_loaded: continue is_loaded.append(data_type) data_label = app.return_data_label(file_name, hdu.name) if data_type == 'flux': wcs = WCS(hdu.header, hdulist) wcs_sci = wcs else: wcs = wcs_sci if 'BUNIT' in hdu.header: try: flux_unit = u.Unit(hdu.header['BUNIT']) except Exception: warnings.warn("Invalid BUNIT, using count as data unit", UserWarning) flux_unit = u.count elif data_type == 'mask': # DQ flags have no unit flux_unit = u.dimensionless_unscaled else: warnings.warn("Invalid BUNIT, using count as data unit", UserWarning) flux_unit = u.count flux = hdu.data << flux_unit metadata = standardize_metadata(hdu.header) if hdu.name != 'PRIMARY' and 'PRIMARY' in hdulist: metadata[PRIHDR_KEY] = standardize_metadata(hdulist['PRIMARY'].header) # store original WCS in metadata. this is a hacky workaround for converting subsets # to sky regions, where the parent data of the subset might have dropped spatial WCS info metadata['_orig_spatial_wcs'] = _get_celestial_wcs(wcs) apply_pix2 = data_type in ['flux', 'uncert'] sc = _return_spectrum_with_correct_units(flux, wcs, metadata, data_type=data_type, hdulist=hdulist, apply_pix2=apply_pix2) app.add_data(sc, data_label) if data_type == 'mask': # We no longer auto-populate the mask cube into a viewer, but we still want # to keep track of this cube for use in, e.g., spectral extraction. app._jdaviz_helper._loaded_mask_cube = app.data_collection[data_label] elif data_type == 'uncert': app.add_data_to_viewer(uncert_viewer_reference_name, data_label) app._jdaviz_helper._loaded_uncert_cube = app.data_collection[data_label] else: # flux # Forced wave unit conversion made it lose stuff, so re-add # also re-get unit from sc in case a factor of pix2 was applied app.data_collection[data_label].get_component("flux").units = sc.unit # Add flux to top left image viewer app.add_data_to_viewer(flux_viewer_reference_name, data_label) app._jdaviz_helper._loaded_flux_cube = app.data_collection[data_label] def _parse_spectrum1d_3d(app, file_obj, data_label=None, flux_viewer_reference_name=None, uncert_viewer_reference_name=None): """Load spectrum1d as a cube.""" if data_label is None: data_label = "Unknown spectrum object" for attr in ("flux", "mask", "uncertainty"): val = getattr(file_obj, attr) if val is None: continue if attr == "mask": flux = val << u.dimensionless_unscaled # DQ flags have no unit elif attr == "uncertainty": flux = val.represent_as(StdDevUncertainty).quantity flux[np.isinf(flux)] = np.nan # Avoid INF from IVAR conversion else: flux = val with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', message='Input WCS indicates that the spectral axis is not last', category=UserWarning) meta = standardize_metadata(file_obj.meta) # store original WCS in metadata. this is a hacky workaround for # converting subsets to sky regions, where the parent data of the # subset might have dropped spatial WCS info meta['_orig_spatial_wcs'] = None if hasattr(file_obj, 'wcs'): meta['_orig_spatial_wcs'] = _get_celestial_wcs(file_obj.wcs) # Also convert data loaded in flux units to a per-square-pixel surface # brightness unit (e.g Jy to Jy/pix**2) s1d = _return_spectrum_with_correct_units( flux, file_obj.wcs, meta, data_type=attr, apply_pix2=True) cur_data_label = app.return_data_label(data_label, attr.upper()) app.add_data(s1d, cur_data_label) if attr == 'flux': app.add_data_to_viewer(flux_viewer_reference_name, cur_data_label) app._jdaviz_helper._loaded_flux_cube = app.data_collection[cur_data_label] elif attr == 'uncertainty': app.add_data_to_viewer(uncert_viewer_reference_name, cur_data_label) app._jdaviz_helper._loaded_uncert_cube = app.data_collection[cur_data_label] elif attr == 'mask': app._jdaviz_helper._loaded_mask_cube = app.data_collection[cur_data_label] def _parse_spectrum1d(app, file_obj, data_label=None, spectrum_viewer_reference_name=None): # Here 'file_obj' is a Spectrum if data_label is None: data_label = app.return_data_label(file_obj) # store original WCS in metadata. this is a hacky workaround for converting subsets # to sky regions, where the parent data of the subset might have dropped spatial WCS info file_obj.meta['_orig_spatial_wcs'] = _get_celestial_wcs(file_obj.wcs) if hasattr(file_obj, 'wcs') else None # noqa: E501 # TODO: glue-astronomy translators only look at the flux property of # specutils Spectrum objects. Fix to support uncertainties and masks. # convert data loaded in flux units to a per-square-pixel surface # brightness unit (e.g Jy to Jy/pix**2) if not check_if_unit_is_per_solid_angle(file_obj.flux.unit): file_obj = file_obj.with_flux_unit( file_obj.flux.unit / PIX2, equivalencies=_eqv_flux_to_sb_pixel()) app.add_data(file_obj, data_label) app.add_data_to_viewer(spectrum_viewer_reference_name, data_label) def _parse_ndarray(app, file_obj, data_label=None, data_type=None, flux_viewer_reference_name=None, uncert_viewer_reference_name=None, spectral_axis_index=None): if data_label is None: data_label = app.return_data_label(file_obj) if data_type is None: data_type = 'flux' # Cannot change axis to ensure roundtripping within Cubeviz. # Axes must already be (x, y, z) at this point. flux = file_obj if not hasattr(flux, 'unit'): flux = flux << (u.count / PIX2) meta = standardize_metadata({'_orig_spatial_wcs': None}) if spectral_axis_index is None: # Default to last axis in array for the spectral axis msg = "Spectral axis index not specified, assuming last axis." app.hub.broadcast(SnackbarMessage(msg, sender=app._jdaviz_helper, color="warning")) spectral_axis_index = flux.ndim - 1 s3d = Spectrum(flux=flux, meta=meta, spectral_axis_index=spectral_axis_index) # convert data loaded in flux units to a per-square-pixel surface # brightness unit (e.g Jy to Jy/pix**2) if not check_if_unit_is_per_solid_angle(s3d.flux.unit): s3d = s3d.with_flux_unit(s3d.flux.unit / PIX2, equivalencies=_eqv_flux_to_sb_pixel()) app.add_data(s3d, data_label) if data_type == 'flux': app.add_data_to_viewer(flux_viewer_reference_name, data_label) app._jdaviz_helper._loaded_flux_cube = app.data_collection[data_label] elif data_type == 'uncert': app.add_data_to_viewer(uncert_viewer_reference_name, data_label) app._jdaviz_helper._loaded_uncert_cube = app.data_collection[data_label] elif data_type == 'mask': app._jdaviz_helper._loaded_mask_cube = app.data_collection[data_label] def _parse_gif(app, file_obj, data_label=None, flux_viewer_reference_name=None): # pragma: no cover # NOTE: Parsing GIF needs imageio and Pillow, both are which undeclared # in setup.cfg but might or might not be installed by declared ones. import imageio file_name = os.path.basename(file_obj) if data_label is None: data_label = app.return_data_label(file_obj) flux = imageio.v3.imread(file_obj, mode='P') # All frames as gray scale flux = np.rot90(np.moveaxis(flux, 0, 2), k=-1, axes=(0, 1)) meta = {'filename': file_name, '_orig_spatial_wcs': None} s3d = Spectrum(flux=flux * (u.count / PIX2), meta=standardize_metadata(meta)) app.add_data(s3d, data_label) app.add_data_to_viewer(flux_viewer_reference_name, data_label) def _get_data_type_by_hdu(hdu): # If the data type is some kind of integer, assume it's the mask/dq if (hdu.data.dtype in (int, np.uint, np.uint8, np.uint16, np.uint32) or any(x in hdu.name.lower() for x in EXT_TYPES['mask'])): data_type = 'mask' elif ('errtype' in [x.lower() for x in hdu.header.keys()] or any(x in hdu.name.lower() for x in EXT_TYPES['uncert'])): data_type = 'uncert' elif any(x in hdu.name.lower() for x in EXT_TYPES['flux']): data_type = 'flux' else: data_type = '' return data_type