359 lines
16 KiB
Python
359 lines
16 KiB
Python
from ..lib.helpers import isLambda, readCSV
|
|
from ..lib.logger import logger
|
|
from scipy.interpolate import interp1d
|
|
import astropy.units as u
|
|
from typing import Union, Callable
|
|
import os
|
|
from scipy.integrate import trapz
|
|
import numpy as np
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
class SpectralQty:
|
|
"""
|
|
A class to hold and work with spectral quantities
|
|
"""
|
|
|
|
def __init__(self, wl: u.Quantity, qty: u.Quantity, fill_value: Union[bool, int, float] = 0):
|
|
"""
|
|
Initialize a new spectral quantity
|
|
|
|
Parameters
|
|
----------
|
|
wl : Quantity
|
|
The binned wavelengths
|
|
qty : Quantity
|
|
The quantity values corresponding to the binned wavelengths. If the values are supplied without a unit,
|
|
they are assumed to be dimensionless.
|
|
fill_value : Union[bool, int, float]
|
|
How to treat missing values. True enables extrapolation, False disables extrapolation and the spectrum will
|
|
be truncated. If a numeric value is given, the missing values will be filled with this value.
|
|
Returns
|
|
-------
|
|
sqty : SpectralQty
|
|
The created spectral quantity.
|
|
"""
|
|
# Check if both lengths are equal
|
|
if wl.value.size == qty.value.size:
|
|
# check if units are given. If not, add a dimensionless unit
|
|
if hasattr(wl, "unit"):
|
|
self.wl = wl
|
|
else:
|
|
self.wl = wl * u.dimensionless_unscaled
|
|
if hasattr(qty, "unit"):
|
|
self.qty = qty
|
|
else:
|
|
self.qty = qty * u.dimensionless_unscaled
|
|
else:
|
|
logger.error("Lengths not matching")
|
|
self._fill_value = fill_value
|
|
|
|
@classmethod
|
|
def fromFile(cls, file: str, wl_unit_default: u.Quantity = None, qty_unit_default: u.Quantity = None,
|
|
fill_value: Union[bool, int, float] = 0) -> "SpectralQty":
|
|
"""
|
|
Initialize a new spectral quantity and read the values from a file
|
|
|
|
Parameters
|
|
----------
|
|
file : str
|
|
Path to the file to read the values from. The file needs to provide two columns: wavelength
|
|
and the corresponding spectral quantity. The format of the file will be guessed by
|
|
`astropy.io.ascii.read()`. If the file doesn't provide units via astropy's enhanced CSV format, the units
|
|
will be read from the column headers or otherwise assumed to be *wl_unit_default* and *qty_unit_default*.
|
|
wl_unit_default : Quantity
|
|
Default unit to be used for the wavelength column if no units are provided by the file.
|
|
qty_unit_default : Quantity
|
|
Default unit to be used for the quantity column if no units are provided by the file.
|
|
fill_value : Union[bool, int, float]
|
|
How to treat missing values. True enables extrapolation, False disables extrapolation and the spectrum will
|
|
be truncated. If a numeric value is given, the missing values will be filled with this value.
|
|
Returns
|
|
-------
|
|
sqty : SpectralQty
|
|
The created spectral quantity.
|
|
"""
|
|
# Read the file
|
|
data = readCSV(file, [wl_unit_default, qty_unit_default] if wl_unit_default is not None and
|
|
qty_unit_default is not None else None)
|
|
return cls(data[data.colnames[0]].quantity, data[data.colnames[1]].quantity, fill_value=fill_value)
|
|
|
|
def __str__(self, precision: int = 4) -> str:
|
|
"""
|
|
Convert a SpectralQty object to a string representation
|
|
|
|
Parameters
|
|
----------
|
|
precision : int
|
|
Precision of the printed values
|
|
|
|
Returns
|
|
-------
|
|
ret : str
|
|
String representation of the object
|
|
"""
|
|
wl_str = []
|
|
qty_str = []
|
|
for i in range(len(self.wl)):
|
|
wl_str_temp = "%%.%dg" % precision % self.wl[i].value
|
|
qty_str_temp = "%%.%dg" % precision % self.qty[i].value
|
|
wl_str.append(wl_str_temp.ljust(max(len(wl_str_temp), len(qty_str_temp)), " "))
|
|
qty_str.append(qty_str_temp.ljust(max(len(wl_str_temp), len(qty_str_temp)), " "))
|
|
return "Wavelength: [" + ", ".join(wl_str) + "] " + self.wl.unit.to_string() + os.linesep +\
|
|
"Quantitiy: [" + ", ".join(qty_str) + "] " + self.qty.unit.to_string()
|
|
|
|
def __eq__(self, other) -> bool:
|
|
"""
|
|
Check if this object is equal to another object
|
|
|
|
Parameters
|
|
----------
|
|
other : SpectralQty
|
|
Object to compare with
|
|
|
|
Returns
|
|
-------
|
|
res : bool
|
|
Result of the comparison
|
|
"""
|
|
return self.wl.unit.is_equivalent(other.wl.unit) and self.qty.unit.is_equivalent(other.qty.unit) and \
|
|
len(self.wl) == len(other.wl) and len(self.qty) == len(other.qty) and \
|
|
np.allclose(self.wl.value, other.wl.to(self.wl.unit).value) and \
|
|
np.allclose(self.qty.value, other.qty.to(self.qty.unit).value)
|
|
|
|
def __add__(self, other: Union[int, float, u.Quantity, "SpectralQty", Callable[[u.Quantity], u.Quantity]]) ->\
|
|
"SpectralQty":
|
|
"""
|
|
Calculate the sum with another object
|
|
|
|
Parameters
|
|
----------
|
|
other : Union[int, float, u.Quantity, "SpectralQty", Callable]
|
|
Addend to be added to this object. If the binning of the object on the right hand side differs
|
|
from the binning of the left object, the object on the right hand side will be rebinned.
|
|
|
|
Returns
|
|
-------
|
|
sum : SpectralQty
|
|
The sum of both objects
|
|
"""
|
|
# Summand is of type int or float, use same unit
|
|
if isinstance(other, int) or isinstance(other, float):
|
|
return SpectralQty(self.wl, self.qty + other * self.qty.unit)
|
|
# Summand is of type Quantity
|
|
elif isinstance(other, u.Quantity):
|
|
if other.unit == self.qty.unit:
|
|
return SpectralQty(self.wl, self.qty + other)
|
|
else:
|
|
raise TypeError("Units are not matching for addition.")
|
|
# Summand is of type lambda
|
|
elif isLambda(other):
|
|
return SpectralQty(self.wl, self.qty + other(self.wl).value * other(self.wl[0]).unit)
|
|
# Summand is of type SpectralQty
|
|
else:
|
|
if other.wl.unit.is_equivalent(self.wl.unit) and other.qty.unit.is_equivalent(self.qty.unit):
|
|
if len(self.wl) == len(other.wl) and (self.wl == other.wl).all():
|
|
# Wavelengths are matching, just add the quantities
|
|
return SpectralQty(self.wl, self.qty + other.qty)
|
|
else:
|
|
# Wavelengths are not matching, rebinning needed
|
|
other_rebinned = other.rebin(self.wl)
|
|
if len(self.wl) == len(other_rebinned.wl) and (self.wl == other_rebinned.wl).all():
|
|
return SpectralQty(self.wl, self.qty + other_rebinned.qty)
|
|
else:
|
|
# Wavelengths are still not matching as extrapolation is disabled, rebin this spectral quantity
|
|
return SpectralQty(other_rebinned.wl, self.rebin(other_rebinned.wl).qty + other_rebinned.qty)
|
|
else:
|
|
logger.error("Units are not matching for addition.")
|
|
|
|
__radd__ = __add__
|
|
|
|
def __sub__(self, other: Union[int, float, u.Quantity, "SpectralQty", Callable[[u.Quantity], u.Quantity]]) ->\
|
|
"SpectralQty":
|
|
"""
|
|
Calculate the difference to another object
|
|
|
|
Parameters
|
|
----------
|
|
other : Union[int, float, u.Quantity, "SpectralQty", Callable]
|
|
Subtrahend to be subtracted from this object. If the binning of the object on the right hand side differs
|
|
from the binning of the left object, the object on the right hand side will be rebinned.
|
|
|
|
Returns
|
|
-------
|
|
diff : SpectralQty
|
|
The difference of both objects
|
|
"""
|
|
# Subtrahend is of type int or float, use same unit
|
|
if isinstance(other, int) or isinstance(other, float):
|
|
return SpectralQty(self.wl, self.qty - other * self.qty.unit)
|
|
# Subtrahend is of type Quantity
|
|
elif isinstance(other, u.Quantity):
|
|
if other.unit == self.qty.unit:
|
|
return SpectralQty(self.wl, self.qty - other)
|
|
else:
|
|
raise TypeError('Units are not matching for subtraction.')
|
|
# Subtrahend is of type lambda
|
|
elif isLambda(other):
|
|
return SpectralQty(self.wl, self.qty - other(self.wl).value * other(self.wl[0]).unit)
|
|
# Subtrahend is of type SpectralQty
|
|
else:
|
|
if other.wl.unit.is_equivalent(self.wl.unit) and other.qty.unit.is_equivalent(self.qty.unit):
|
|
# Wavelengths are matching, just subtract the quantities
|
|
if len(self.wl) == len(other.wl) and (self.wl == other.wl).all():
|
|
return SpectralQty(self.wl, self.qty - other.qty)
|
|
# Wavelengths are not matching, rebinning needed
|
|
else:
|
|
# Rebin subtrahend
|
|
other_rebinned = other.rebin(self.wl)
|
|
if len(self.wl) == len(other_rebinned.wl) and (self.wl == other_rebinned.wl).all():
|
|
return SpectralQty(self.wl, self.qty - other_rebinned.qty)
|
|
else:
|
|
# Wavelengths are still not matching as extrapolation is disabled, rebin this spectral quantity
|
|
return SpectralQty(other_rebinned.wl, self.rebin(other_rebinned.wl).qty - other_rebinned.qty)
|
|
else:
|
|
logger.error("Units are not matching for substraction.")
|
|
|
|
def __mul__(self, other: Union[int, float, u.Quantity, "SpectralQty", Callable[[u.Quantity], u.Quantity]]) ->\
|
|
"SpectralQty":
|
|
"""
|
|
Calculate the product with another object
|
|
|
|
Parameters
|
|
----------
|
|
other : Union[int, float, u.Quantity, "SpectralQty", Callable]
|
|
Factor to be multiplied with this object. If the binning of the object on the right hand side differs
|
|
from the binning of the left object, the object on the right hand side will be rebinned.
|
|
|
|
Returns
|
|
-------
|
|
prod : SpectralQty
|
|
The product of both objects
|
|
"""
|
|
# Factor is of type int, float or Quantity, just multiply
|
|
if isinstance(other, int) or isinstance(other, float) or isinstance(other, u.Quantity):
|
|
return SpectralQty(self.wl, self.qty * other)
|
|
# Factor is of type lambda
|
|
elif isLambda(other):
|
|
return SpectralQty(self.wl, self.qty * other(self.wl).value * other(self.wl[0]).unit)
|
|
# Factor is of type SpectralQty
|
|
else:
|
|
if other.wl.unit.is_equivalent(self.wl.unit):
|
|
# Wavelengths are matching, just multiply the quantities
|
|
if len(self.wl) == len(other.wl) and (self.wl == other.wl).all():
|
|
return SpectralQty(self.wl, self.qty * other.qty)
|
|
# Wavelengths are not matching, rebinning needed
|
|
else:
|
|
# Rebin factor
|
|
other_rebinned = other.rebin(self.wl)
|
|
if len(self.wl) == len(other_rebinned.wl) and (self.wl == other_rebinned.wl).all():
|
|
return SpectralQty(self.wl, self.qty * other_rebinned.qty)
|
|
else:
|
|
# Wavelengths are still not matching as extrapolation is disabled, rebin this spectral quantity
|
|
return SpectralQty(other_rebinned.wl, self.rebin(other_rebinned.wl).qty * other_rebinned.qty)
|
|
else:
|
|
logger.error("Units are not matching for multiplication.")
|
|
|
|
__rmul__ = __mul__
|
|
|
|
def __truediv__(self, other: Union[int, float, u.Quantity, "SpectralQty", Callable[[u.Quantity], u.Quantity]]) ->\
|
|
"SpectralQty":
|
|
"""
|
|
Calculate the quotient with another object
|
|
|
|
Parameters
|
|
----------
|
|
other : Union[int, float, u.Quantity, "SpectralQty", Callable]
|
|
Divisor for this object. If the binning of the object on the right hand side differs
|
|
from the binning of the left object, the object on the right hand side will be rebinned.
|
|
|
|
Returns
|
|
-------
|
|
quot : SpectralQty
|
|
The quotient of both objects
|
|
"""
|
|
# Factor is of type int, float or Quantity, just multiply
|
|
if isinstance(other, int) or isinstance(other, float) or isinstance(other, u.Quantity):
|
|
return SpectralQty(self.wl, self.qty / other)
|
|
# Factor is of type lambda
|
|
elif isLambda(other):
|
|
return SpectralQty(self.wl, self.qty / other(self.wl).value / other(self.wl[0]).unit)
|
|
# Factor is of type SpectralQty
|
|
else:
|
|
if other.wl.unit.is_equivalent(self.wl.unit):
|
|
# Wavelengths are matching, just multiply the quantities
|
|
if len(self.wl) == len(other.wl) and (self.wl == other.wl).all():
|
|
return SpectralQty(self.wl, self.qty / other.qty)
|
|
# Wavelengths are not matching, rebinning needed
|
|
else:
|
|
# Rebin factor
|
|
other_rebinned = other.rebin(self.wl)
|
|
if len(self.wl) == len(other_rebinned.wl) and (self.wl == other_rebinned.wl).all():
|
|
return SpectralQty(self.wl, self.qty / other_rebinned.qty)
|
|
else:
|
|
# Wavelengths are still not matching as extrapolation is disabled, rebin this spectral quantity
|
|
return SpectralQty(other_rebinned.wl, self.rebin(other_rebinned.wl).qty / other_rebinned.qty)
|
|
else:
|
|
logger.error("Units are not matching for division.")
|
|
|
|
def __pow__(self, other: Union[int, float]) -> "SpectralQty":
|
|
"""
|
|
Raise to spectral quantity to the given power.
|
|
|
|
Parameters
|
|
----------
|
|
other : Union[int, float]
|
|
The exponent for this object.
|
|
|
|
Returns
|
|
-------
|
|
prod : SpectralQty
|
|
The spectral quantity raised to the given power.
|
|
"""
|
|
# Factor is of type int, float or Quantity, just multiply
|
|
if isinstance(other, int) or isinstance(other, float):
|
|
return SpectralQty(self.wl, self.qty ** other)
|
|
|
|
def rebin(self, wl: u.Quantity) -> "SpectralQty":
|
|
"""
|
|
Resample the spectral quantity sqty(wl) over the new grid wl, rebinning if necessary, otherwise interpolates.
|
|
Copied from ExoSim (https://github.com/ExoSim/ExoSimPublic).
|
|
|
|
Parameters
|
|
----------
|
|
wl : Quantity
|
|
new binned wavelengths
|
|
|
|
Returns
|
|
-------
|
|
sqty : SpectralQty
|
|
The rebinned spectral quantity
|
|
"""
|
|
|
|
if not wl.unit.is_equivalent(self.wl.unit):
|
|
logger.error("Mismatching units for rebinning: " + wl.unit + ", " + self.wl.unit)
|
|
if (wl.value.size == 1 and (wl < min(self.wl) or wl > max(self.wl))) or (
|
|
wl.value.size > 1 and (min(wl) < min(self.wl) or max(wl) > max(self.wl))):
|
|
if isinstance(self._fill_value, bool):
|
|
if not self._fill_value:
|
|
logger.warning("Extrapolation disabled, bandwidth will be reduced.")
|
|
# Remove new wavelengths where extrapolation would have been necessary
|
|
wl = [x.value for x in wl if min(self.wl) <= x <= max(self.wl)] * wl.unit
|
|
f = interp1d(self.wl, self.qty.value, fill_value="extrapolate")
|
|
else:
|
|
f = interp1d(self.wl, self.qty.value, fill_value=self._fill_value, bounds_error=False)
|
|
else:
|
|
f = interp1d(self.wl, self.qty.value)
|
|
return SpectralQty(wl, f(wl.to(self.wl.unit)) * self.qty.unit)
|
|
|
|
def integrate(self) -> u.Quantity:
|
|
"""
|
|
Integrate the spectral quantity over the given spectrum using
|
|
|
|
Returns
|
|
-------
|
|
int : Quantity
|
|
The integrated quantity
|
|
"""
|
|
return u.Quantity(trapz(self.qty, self.wl))
|