Source code for jdaviz.configs.specviz.plugins.parsers

import base64
import pathlib
import uuid

import numpy as np

from astropy.io.registry import IORegistryError
from astropy.nddata import StdDevUncertainty

from specutils import Spectrum1D, SpectrumList, SpectrumCollection

from jdaviz.core.registries import data_parser_registry

__all__ = ["specviz_spectrum1d_parser"]


[docs]@data_parser_registry("specviz-spectrum1d-parser") def specviz_spectrum1d_parser(app, data, data_label=None, format=None, show_in_viewer=True): """ Loads a data file or `~specutils.Spectrum1D` object into Specviz. Parameters ---------- data : str, `~specutils.Spectrum1D`, or `~specutils.SpectrumList` Spectrum1D, SpectrumList, or path to compatible data file. data_label : str The Glue data label found in the ``DataCollection``. format : str Loader format specification used to indicate data format in `~specutils.Spectrum1D.read` io method. """ # If no data label is assigned, give it a unique identifier if not data_label: data_label = "specviz_data|" + str( base64.b85encode(uuid.uuid4().bytes), "utf-8" ) if isinstance(data, SpectrumCollection): raise TypeError("SpectrumCollection detected." " Please provide a Spectrum1D or SpectrumList") elif isinstance(data, Spectrum1D): data = [data] data_label = [data_label] # No special processing is needed in this case, but we include it for completeness elif isinstance(data, SpectrumList): pass elif isinstance(data, list): data = SpectrumList.read(data, format=format) else: path = pathlib.Path(data) if path.is_file(): try: data = [Spectrum1D.read(str(path), format=format)] data_label = [data_label] except IORegistryError: # Multi-extension files may throw a registry error data = SpectrumList.read(str(path), format=format) elif path.is_dir(): data = SpectrumList.read(str(path), format=format) if data == []: raise ValueError(f"`specutils.SpectrumList.read('{str(path)}')` " "returned an empty list") else: raise FileNotFoundError("No such file: " + str(path)) if isinstance(data, SpectrumList): if not isinstance(data_label, (list, tuple)): temp_labels = [] for i in range(len(data)): temp_labels.append(f"{data_label} {i}") data_label = temp_labels elif len(data_label) != len(data): raise ValueError(f"Length of data labels list ({len(data_label)}) is different" f" than length of list of data ({len(data)})") # If there's already data in the viewer, convert units if needed current_unit = None current_spec = app.get_data_from_viewer("spectrum-viewer") if current_spec != {} and current_spec is not None: spec_key = list(current_spec.keys())[0] current_unit = current_spec[spec_key].spectral_axis.unit with app.data_collection.delay_link_manager_update(): # these are used to build a combined spectrum with all # input spectra included (taken from https://github.com/spacetelescope/ # dat_pyinthesky/blob/main/jdat_notebooks/MRS_Mstar_analysis/ # JWST_Mstar_dataAnalysis_analysis.ipynb) wlallorig = [] fnuallorig = [] dfnuallorig = [] for i, spec in enumerate(data): wave_units = spec.spectral_axis.unit flux_units = spec.flux.unit if current_unit is not None and spec.spectral_axis.unit != current_unit: spec = Spectrum1D(flux=spec.flux, spectral_axis=spec.spectral_axis.to(current_unit)) app.add_data(spec, data_label[i]) # handle display, with the SpectrumList special case in mind. if show_in_viewer: if isinstance(data, SpectrumList): # add spectrum to combined result for wlind in range(len(spec.spectral_axis)): wlallorig.append(spec.spectral_axis[wlind].value) fnuallorig.append(spec.flux[wlind].value) dfnuallorig.append(spec.uncertainty[wlind].array) elif i == 0: app.add_data_to_viewer("spectrum-viewer", data_label[i]) # reset display ranges, or build combined spectrum, when input is a SpectrumList instance if isinstance(data, SpectrumList): # build combined spectrum wlallarr = np.array(wlallorig) fnuallarr = np.array(fnuallorig) dfnuallarr = np.array(dfnuallorig) srtind = np.argsort(wlallarr) wlall = wlallarr[srtind] fnuall = fnuallarr[srtind] fnuallerr = dfnuallarr[srtind] # units are not being handled properly yet. unc = StdDevUncertainty(fnuallerr * flux_units) spec = Spectrum1D(flux=fnuall * flux_units, spectral_axis=wlall * wave_units, uncertainty=unc) # needs perhaps a better way to label the combined spectrum label = "Combined " + data_label[0] app.add_data(spec, label) if show_in_viewer: app.add_data_to_viewer("spectrum-viewer", label)