Source code for specutils.spectra.spectrum1d

import logging

import numpy as np
from astropy import units as u
from astropy.nddata import NDDataRef
from astropy.utils.decorators import lazyproperty
from astropy.nddata import NDUncertainty
from ..wcs import WCSWrapper, WCSAdapter
from .spectrum_mixin import OneDSpectrumMixin

__all__ = ['Spectrum1D']

__doctest_skip__ = ['Spectrum1D.spectral_resolution']


[docs]class Spectrum1D(OneDSpectrumMixin, NDDataRef): """ Spectrum container for 1D spectral data. Parameters ---------- flux : `astropy.units.Quantity` or astropy.nddata.NDData`-like The flux data for this spectrum. spectral_axis : `astropy.units.Quantity` Dispersion information with the same shape as the last (or only) dimension of flux. wcs : `astropy.wcs.WCS` or `gwcs.wcs.WCS` WCS information object. velocity_convention : {"doppler_relativistic", "doppler_optical", "doppler_radio"} Convention used for velocity conversions. rest_value : `~astropy.units.Quantity` Any quantity supported by the standard spectral equivalencies (wavelength, energy, frequency, wave number). Describes the rest value of the spectral axis for use with velocity conversions. uncertainty : `~astropy.nddata.NDUncertainty` Contains uncertainty information along with propagation rules for spectrum arithmetic. Can take a unit, but if none is given, will use the unit defined in the flux. meta : dict Arbitrary container for any user-specific information to be carried around with the spectrum container object. """ def __init__(self, flux=None, spectral_axis=None, wcs=None, velocity_convention=None, rest_value=None, *args, **kwargs): # If the flux (data) argument is a subclass of nddataref (as it would # be for internal arithmetic operations), avoid setup entirely. if isinstance(flux, NDDataRef): self._velocity_convention = flux._velocity_convention self._rest_value = flux._rest_value super(Spectrum1D, self).__init__(flux) return # Ensure that the flux argument is an astropy quantity if flux is not None and not isinstance(flux, u.Quantity): raise ValueError("Flux must be a `Quantity` object.") # Insure that the unit information codified in the quantity object is # the One True Unit. kwargs.setdefault('unit', flux.unit if isinstance(flux, u.Quantity) else kwargs.get('unit')) # In cases of slicing, new objects will be initialized with `data` # instead of `flux`. Ensure we grab the `data` argument. if flux is None and 'data' in kwargs: flux = kwargs.pop('data') # Attempt to parse the spectral axis. If none is given, try instead to # parse a given wcs. This is put into a GWCS object to # then be used behind-the-scenes for all specutils operations. if spectral_axis is not None: # Ensure that the spectral axis is an astropy quantity if not isinstance(spectral_axis, u.Quantity): raise ValueError("Spectral axis must be a `Quantity` object.") wcs = WCSWrapper.from_array(spectral_axis) elif wcs is not None: if not issubclass(wcs.__class__, WCSAdapter): wcs = WCSWrapper(wcs) elif isinstance(flux, float) or isinstance(flux, int) or isinstance(flux, np.ndarray): # In the case where the arithmetic operation is being performed with # a single float, int, or array object, just go ahead and ignore wcs # requirements super(Spectrum1D, self).__init__(data=flux) return else: # If no wcs and no spectral axis has been given, raise an error raise LookupError("No WCS object or spectral axis information has " "been given. Please provide one.") self._velocity_convention = velocity_convention if rest_value is None: if wcs.rest_frequency != 0: self._rest_value = wcs.rest_frequency * u.Hz elif wcs.rest_wavelength != 0: self._rest_value = wcs.rest_wavelength * u.AA else: self._rest_value = 0 * u.AA else: self._rest_value = rest_value if not isinstance(self._rest_value, u.Quantity): logging.info("No unit information provided with rest value. " "Assuming units of spectral axis ('%s').", spectral_axis.unit) self._rest_value = u.Quantity(rest_value, spectral_axis.unit) elif not self._rest_value.unit.is_equivalent(u.AA) and not self._rest_value.unit.is_equivalent(u.Hz): raise u.UnitsError("Rest value must be energy/wavelength/frequency equivalent.") super(Spectrum1D, self).__init__( data=flux.value if isinstance(flux, u.Quantity) else flux, wcs=wcs, **kwargs) @property def frequency(self): """ The frequency as a `~astropy.units.Quantity` in units of GHz """ return self.spectral_axis.to(u.GHz, u.spectral()) @property def wavelength(self): """ The wavelength as a `~astropy.units.Quantity` in units of Angstroms """ return self.spectral_axis.to(u.AA, u.spectral()) @property def energy(self): """ The energy of the spectral axis as a `~astropy.units.Quantity` in units of eV. """ return self.spectral_axis.to(u.eV, u.spectral()) @property def photon_flux(self): """ The flux density of photons as a `~astropy.units.Quantity`, in units of photons per cm^2 per second per spectral_axis unit """ flux_in_spectral_axis_units = self.flux.to(u.W * u.cm**-2 * self.spectral_axis.unit**-1, u.spectral_density(self.spectral_axis)) photon_flux_density = flux_in_spectral_axis_units / (self.energy / u.photon) return photon_flux_density.to(u.photon * u.cm**-2 * u.s**-1 * self.spectral_axis.unit**-1) @lazyproperty def bin_edges(self): return self.wcs.bin_edges() @property def shape(self): return self.flux.shape @staticmethod def _compare_wcs(this_operand, other_operand): """ NNData arithmetic callable to determine if two wcs's are compatible. """ # If the other operand is a simple number or array, allow the operations if (isinstance(other_operand, float) or isinstance(other_operand, int) or isinstance(other_operand, np.ndarray)): return True # First check if units are equivalent, if so, create a new spectrum # object with spectral axis in compatible units other_wcs = other_operand.wcs.with_spectral_unit( this_operand.wcs.spectral_axis_unit) if other_wcs is None: return False # Check if the shape of the axes are compatible if this_operand.spectral_axis.shape != other_operand.spectral_axis.shape: logging.error("Shape of spectral axes between operands must be " "equivalent.") return False # And that they cover the same range if (this_operand.spectral_axis[0] != other_operand.spectral_axis[0] or this_operand.spectral_axis[-1] != other_operand.spectral_axis[-1]): logging.error("Spectral axes between operands must cover the " "same range. Interpolation may be required.") return False # Check if the delta dispersion is equivalent between the two axes if not np.array_equal(np.diff(this_operand.spectral_axis), np.diff(other_operand.spectral_axis)): logging.error("Delta dispersion of spectral axes of operands " "must be equivalent. Interpolation may be required.") return False return True def __add__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other, unit=self.unit) return self.add( other, compare_wcs=lambda o1, o2: self._compare_wcs(self, other)) def __sub__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other, unit=self.unit) return self.subtract( other, compare_wcs=lambda o1, o2: self._compare_wcs(self, other)) def __mul__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.multiply( other, compare_wcs=lambda o1, o2: self._compare_wcs(self, other)) def __div__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.divide( other, compare_wcs=lambda o1, o2: self._compare_wcs(self, other)) def __truediv__(self, other): if not isinstance(other, NDDataRef): other = u.Quantity(other) return self.divide( other, compare_wcs=lambda o1, o2: self._compare_wcs(self, other)) def _format_array_summary(self, label, array): mean = np.mean(array) s = "{:17} [ {:.5}, ..., {:.5} ], mean={:.5}" return s.format(label+':', array[0], array[-1], mean) def __str__(self): result = "Spectrum1D " # Handle case of single value flux if self.flux.ndim == 0: result += "(length=1)\n" return result + "flux: {}".format(self.flux) # Handle case of multiple flux arrays result += "(length={})\n".format(len(self.spectral_axis)) if self.flux.ndim > 1: for i, flux in enumerate(self.flux): label = 'flux{:2}'.format(i) result += self._format_array_summary(label, flux) + '\n' else: result += self._format_array_summary('flux', self.flux) + '\n' # Add information about spectral axis result += self._format_array_summary('spectral axis', self.spectral_axis) # Add information about uncertainties if available if self.uncertainty: result += "\nuncertainty: [ {}, ..., {} ]".format( self.uncertainty[0], self.uncertainty[-1]) return result def __repr__(self): if self.wcs: result = "<Spectrum1D(flux={}, spectral_axis={})>".format( repr(self.flux), repr(self.spectral_axis)) else: result = "<Spectrum1D(flux={})>".format(repr(self.flux)) return result
[docs] def spectral_resolution(self, true_dispersion, delta_dispersion, axis=-1): """Evaluate the probability distribution of the spectral resolution. Examples -------- To tabulate a binned resolution function at 6000A covering +/-10A in 0.2A steps: >>> R = spectrum1d.spectral_resolution( ... 6000 * u.Angstrom, np.linspace(-10, 10, 51) * u.Angstrom) >>> assert R.shape == (50,) >>> assert np.allclose(R.sum(), 1.) To build a sparse resolution matrix for true wavelengths 4000-8000A in 0.1A steps: >>> R = spectrum1d.spectral_resolution( ... np.linspace(4000, 8000, 40001)[:, np.newaxis] * u.Angstrom, ... np.linspace(-10, +10, 201) * u.Angstrom) >>> assert R.shape == (40000, 200) >>> assert np.allclose(R.sum(axis=1), 1.) Parameters ---------- true_dispersion : `~astropy.units.Quantity` True value(s) of dispersion for which the resolution should be evaluated. delta_dispersion : `~astropy.units.Quantity` Array of (observed - true) dispersion bin edges to integrate the resolution probability density over. axis : int Which axis of ``delta_dispersion`` contains the strictly increasing dispersion values to interpret as bin edges. The dimension of ``delta_dispersion`` along this axis must be at least two. Returns ------- numpy array Array of dimensionless probabilities calculated as the integral of P(observed | true) over each bin in (observed - true). The output shape is the result of broadcasting the input shapes. """ pass