Use type keyword for PSF, FITS PSF added

This commit is contained in:
Lukas Klass 2020-10-26 18:01:39 +01:00
parent c7c659161f
commit 527d91aea9
11 changed files with 386 additions and 209 deletions

View File

@ -85,20 +85,31 @@ Attributes:
psf psf
--- ---
*optional* -- The PSF used for the computations. This can be either read from a Zemax file or approximated by a (obstructed) airy disk. *optional* -- The PSF used for the computations.
This can be either read from a Zemax/FITS-file or approximated by a (obstructed) airy disk.
In case of a FITS-file, the PSF must be included as a 2D map.
The header keywords ``XPIXSZ`` and ``YPIXSZ`` can be used to define the grid size of the PSF in microns.
Otherwise, the keyword ``PSFSCALE`` can be used to define the FOV per PSF pixel in arcsec/pixel.
The keywords ``XPSFCTR`` and ``YPSFCTR`` can be used to define the center point of the PSF, if the PSF is not centered on the grid.
.. code-block:: xml .. code-block:: xml
<psf val="Airy" osf="10" osf_unit=""/> <psf type="Airy" osf="10" osf_unit=""/>
.. code-block:: xml .. code-block:: xml
<psf val="data/psf.txt" osf="10" osf_unit=""/> <psf type="FITS" val="data/psf.fits" osf="10" osf_unit=""/>
.. code-block:: xml
<psf type="Zemax" val="data/psf.txt" osf="10" osf_unit=""/>
Attributes: Attributes:
* | **val:** str = "Airy" * | **type:** str = "Airy"
| The PSF to be used for the computations. This can be either the path to a Zemax file or the keyword *Airy* to for an airy disk as PSF. | The PSF to be used for the computations. This can be either the path to a Zemax file or the keyword *Airy* to for an airy disk as PSF.
* | **osf:** str = "10" * | **val:** str
| The path to the file to be read.
* | **osf:** float = "10"
| The oversampling factor to be used to calculate the contained energy and the PSF with jitter. | The oversampling factor to be used to calculate the contained energy and the PSF with jitter.
* | **osf_unit:** str, *optional* = "" * | **osf_unit:** str, *optional* = ""
| The unit of the oversampling factor. This has to be emtpy (dimensionless). The default is ``dimensionless``. | The unit of the oversampling factor. This has to be emtpy (dimensionless). The default is ``dimensionless``.

View File

@ -136,12 +136,12 @@ class Configuration(object):
mes = self.conf.common.d_aperture.check_quantity("val", u.m) mes = self.conf.common.d_aperture.check_quantity("val", u.m)
mes is not None and logger.error("Configuration check: common -> d_aperture: " + mes) mes is not None and logger.error("Configuration check: common -> d_aperture: " + mes)
if hasattr(self.conf.common, "psf"): if hasattr(self.conf.common, "psf"):
if hasattr(self.conf.common.psf, "val"): if hasattr(self.conf.common.psf, "type"):
if self.conf.common.psf().lower() != "airy": if self.conf.common.psf.type.lower() != "airy":
mes = self.conf.common.psf.check_file("val") mes = self.conf.common.psf.check_file("val")
mes is not None and logger.error("Configuration check: common -> psf: " + mes) mes is not None and logger.error("Configuration check: common -> psf: " + mes)
else: else:
setattr(self.conf.common.psf, "val", "Airy") logger.error("Configuration check: common -> psf: Missing required parameter 'type'.")
if hasattr(self.conf.common.psf, "osf"): if hasattr(self.conf.common.psf, "osf"):
mes = self.conf.common.psf.check_float("osf") mes = self.conf.common.psf.check_float("osf")
mes is not None and logger.error("Configuration check: common -> psf: " + mes) mes is not None and logger.error("Configuration check: common -> psf: " + mes)

View File

@ -0,0 +1,225 @@
from .IPSF import IPSF
from ...lib.helpers import rasterizeCircle
from ..sensor.PixelMask import PixelMask
from ...lib.logger import logger
from abc import abstractmethod
import numpy as np
import astropy.units as u
from typing import Union
from scipy.optimize import bisect
from scipy.signal import fftconvolve
from scipy.interpolate import interp2d
class AGriddedPSF(IPSF):
"""
A class for modelling the PSF from a two dimensional grid
"""
@abstractmethod
@u.quantity_input(wl="length", d_aperture="length", pixel_size="length", grid_delta="length")
def __init__(self, psf: np.ndarray, f_number: float, wl: u.Quantity, d_aperture: u.Quantity, osf: float,
pixel_size: u.Quantity, grid_delta: u.Quantity, center_point: list):
"""
Initialize a new PSF from a 2D grid.
Parameters
----------
psf : ndarray
2D numpy array containing the parsed PSF values. The zero-point is in the top left corner.
f_number : float
The working focal number of the optical system
wl : Quantity
The central wavelength which is used for calculating the PSF
d_aperture : Quantity
The diameter of the telescope's aperture.
osf : float
The oversampling factor to be used for oversampling the PSF with regards to the pixel size.
pixel_size : Quantity
The size of a pixel as length-quantity.
grid_delta : Quantity
Size of a grid element as length-Quantity with a value for each grid dimension.
center_point : list
The center point coordinates as list with the zero point in the upper left corner.
"""
# Store parameters
self._f_number = f_number
self._wl = wl
self._d_aperture = d_aperture
self._osf = osf
self._pixel_size = pixel_size
self._psf = psf
self._grid_delta = grid_delta
self._center_point = center_point
self._center_point_os = None
self._psf_os = None
self._psf_osf = None
# @u.quantity_input(jitter_sigma=u.arcsec)
def calcReducedObservationAngle(self, contained_energy: Union[str, int, float, u.Quantity],
jitter_sigma: u.Quantity = None, obstruction: float = 0.0) -> u.Quantity:
"""
Calculate the reduced observation angle in lambda / d_ap for the given contained energy.
Parameters
----------
contained_energy : Union[str, int, float, u.Quantity]
The percentage of energy to be contained within a circle with the diameter reduced observation angle.
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec
obstruction : float
The central obstruction as ratio A_ob / A_ap
Returns
-------
reduced_observation_angle: Quantity
The reduced observation angle in lambda / d_ap
"""
# Parse the contained energy
if type(contained_energy) == str:
try:
contained_energy = float(contained_energy) / 100.0 * u.dimensionless_unscaled
except ValueError:
logger.error("Could not convert encircled energy to float.")
elif type(contained_energy) in [int, float]:
contained_energy = contained_energy / 100 * u.dimensionless_unscaled
center_point, psf, psf_osf = self._calcPSF(jitter_sigma)
# Calculate the maximum possible radius for the circle containing the photometric aperture
r_max = max(np.sqrt(center_point[0] ** 2 + center_point[1] ** 2),
np.sqrt((psf.shape[0] - center_point[0]) ** 2 + center_point[1] ** 2),
np.sqrt(center_point[0] ** 2 + (psf.shape[1] - center_point[1]) ** 2),
np.sqrt((psf.shape[0] - center_point[0]) ** 2 + (psf.shape[1] - center_point[1]) ** 2))
# Calculate the total contained energy of the PSF
total = np.sum(psf)
# Iterate the optimal radius for the contained energy
r = bisect(lambda r_c: contained_energy.value - np.sum(
psf * rasterizeCircle(np.zeros((psf.shape[0], psf.shape[1])), r_c, center_point[0],
center_point[1])) / total, 0, r_max, xtol=1e-1)
# Calculate the reduced observation angle in lambda / d_ap
# noinspection PyTypeChecker
reduced_observation_angle = r / psf_osf * self._grid_delta[0] / (
self._f_number * self._d_aperture) * self._d_aperture / self._wl
return 2 * reduced_observation_angle * u.dimensionless_unscaled
def _calcPSF(self, jitter_sigma: u.Quantity = None):
"""
Calculate the PSF from the grid. This includes oversampling the PSF and convolving with the
jitter-gaussian.
Parameters
----------
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec.
Returns
-------
center_point : ndarray
The indices of the PSF's center point on the grid.
psf : ndarray
The PSF.
psf_osf : float
The oversampling factor of the returned PSF.
"""
# Calculate the psf oversampling factor for the PSF based on the current resolution of the PSF
psf_osf = np.ceil(max(self._grid_delta) / (self._pixel_size / self._osf)).value
if psf_osf == 1.0:
# No oversampling is necessary
psf = self._psf
center_point = self._center_point
else:
# Oversampling is necessary, oversample the PSF and calculate the new center point.
f = interp2d(x=np.arange(self._psf.shape[1]) - self._center_point[1],
y=np.arange(self._psf.shape[0]) - self._center_point[0], z=self._psf,
kind='cubic', copy=False, bounds_error=False, fill_value=None)
center_point = [(x + 0.5) * psf_osf - 0.5 for x in self._center_point]
psf = f((np.arange(self._psf.shape[1] * psf_osf) - center_point[1]) / psf_osf,
(np.arange(self._psf.shape[0] * psf_osf) - center_point[0]) / psf_osf)
if jitter_sigma is not None:
# Convert angular jitter to jitter on focal plane
jitter_sigma_um = (jitter_sigma.to(u.rad) * self._f_number * self._d_aperture / u.rad).to(u.um)
# Jitter is enabled. Calculate the corresponding gaussian bell and convolve it with the PSF
if min(self._grid_delta) / psf_osf < 6 * jitter_sigma_um:
# 6-sigma interval of the gaussian bell is larger than the grid width
# Calculate the necessary grid length for the 6-sigma interval of the gaussian bell
jitter_grid_length = np.ceil(6 * jitter_sigma_um / (min(self._grid_delta) / psf_osf)).value
# Make sure, the grid size is odd in order to have a defined kernel center
jitter_grid_length = int(jitter_grid_length if jitter_grid_length % 2 == 1 else jitter_grid_length + 1)
# Create a meshgrid containing the x and y coordinates of each point within the first quadrant of the
# gaussian kernel
xv, yv = np.meshgrid(range(-int((jitter_grid_length - 1) / 2), 1),
range(-int((jitter_grid_length - 1) / 2), 1))
# Calculate the gaussian kernel in the first quadrant
kernel = 1 / (2 * np.pi * jitter_sigma_um.value ** 2) * np.exp(
-((xv * min(self._grid_delta.value) / psf_osf) ** 2 +
(yv * min(self._grid_delta.value) / psf_osf) ** 2) / (2 * jitter_sigma_um.value ** 2))
# Mirror the kernel from the first quadrant to all other quadrants
kernel = np.concatenate((kernel, np.flip(kernel, axis=1)[:, 1:]), axis=1)
kernel = np.concatenate((kernel, np.flip(kernel, axis=0)[1:, :]), axis=0)
# Normalize kernel
kernel = kernel / np.sum(kernel)
# Convolve PSF with gaussian kernel
psf = fftconvolve(psf, kernel, mode="full")
# Calculate new center point
center_point = [x + int((jitter_grid_length - 1) / 2) for x in center_point]
# Save the values as object attribute
self._center_point_os = center_point
self._psf_os = psf
self._psf_osf = psf_osf
return center_point, psf, psf_osf
def mapToPixelMask(self, mask: PixelMask, jitter_sigma: u.Quantity = None, obstruction: float = 0.0) -> PixelMask:
"""
Map the integrated PSF values to a sensor grid.
Parameters
----------
obstruction
mask : PixelMask
The pixel mask to map the values to. The values will only be mapped onto entries with the value 1.
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec
Returns
-------
mask : PixelMask
The pixel mask with the integrated PSF values mapped onto each pixel.
"""
# Calculate the indices of all non-zero elements of the mask
y_ind, x_ind = np.nonzero(mask)
# Extract a rectangle containing all non-zero values of the mask
mask_red = mask[y_ind.min():(y_ind.max() + 1), x_ind.min():(x_ind.max() + 1)]
# Calculate the new PSF-center indices of the reduced mask
psf_center_ind = [mask.psf_center_ind[0] - y_ind.min(), mask.psf_center_ind[1] - x_ind.min()]
# Oversample the reduced mask
mask_red_os = self._rebin(mask_red, self._osf).view(PixelMask)
# Calculate the new PSF-center indices of the reduced mask
psf_center_ind = [(x + 0.5) * self._osf - 0.5 for x in psf_center_ind]
# Get PSF values or calculate them if not available
if self._psf_os is not None and self._center_point_os is not None and self._psf_osf is not None:
center_point = self._center_point_os
psf = self._psf_os
psf_osf = self._psf_osf
else:
center_point, psf, psf_osf = self._calcPSF(jitter_sigma)
# Calculate the coordinates of each PSF value in microns
x = (np.arange(psf.shape[1]) - center_point[1]) * self._grid_delta[1].to(u.um).value / psf_osf
y = (np.arange(psf.shape[0]) - center_point[0]) * self._grid_delta[0].to(u.um).value / psf_osf
# Initialize a two-dimensional cubic interpolation function for the PSF
psf_interp = interp2d(x=x, y=y, z=psf, kind='cubic', copy=False, bounds_error=False, fill_value=None)
# Calculate the values of the PSF for all elements of the reduced mask
res = psf_interp((np.arange(mask_red_os.shape[1]) - psf_center_ind[1]) * mask_red_os.pixel_size.to(u.um).value,
(np.arange(mask_red_os.shape[0]) - psf_center_ind[0]) * mask_red_os.pixel_size.to(u.um).value)
# Bin the oversampled reduced mask to the original resolution and multiply with the reduced mask to select only
# the relevant values
res = mask_red * self._rebin(res, 1 / self._osf)
# Integrate the reduced mask and divide by the indefinite integral to get relative intensities
res = res * mask_red_os.pixel_size.to(u.um).value ** 2 / (
psf.sum() * (self._grid_delta[0].to(u.um).value / psf_osf) ** 2)
# reintegrate the reduced mask into the complete mask
mask[y_ind.min():(y_ind.max() + 1), x_ind.min():(x_ind.max() + 1)] = res
return mask

View File

@ -0,0 +1,65 @@
from .AGriddedPSF import AGriddedPSF
from ...lib.logger import logger
import numpy as np
import astropy.units as u
from astropy.io import fits
class FITS(AGriddedPSF):
"""
A class for modelling the PSF from a FITS-file
"""
@u.quantity_input(wl="length", d_aperture="length", pixel_size="length")
def __init__(self, file: str, f_number: float, wl: u.Quantity, d_aperture: u.Quantity, osf: float,
pixel_size: u.Quantity):
"""
Initialize a new PSF from a FITS-file.
Parameters
----------
file : str
Path to the FITS-file. The origin of the coordinate system is in the upper left corner of the matrix
f_number : float
The working focal number of the optical system
wl : Quantity
The central wavelength which is used for calculating the PSF
d_aperture : Quantity
The diameter of the telescope's aperture.
osf : float
The oversampling factor to be used for oversampling the PSF with regards to the pixel size.
pixel_size : Quantity
The size of a pixel as length-quantity.
"""
# Open the fits file
hdul = fits.open(file)
# Check if a dataset is available
if len(hdul) < 1:
logger.error("PSF FITS file must contain a PSF dataset")
# Extract PSF
psf = hdul[0].data
# Extract PSF grid size
if "XPIXSZ" in hdul[0].header:
if "YPIXSZ" in hdul[0].header:
grid_delta = np.array([hdul[0].header["XPIXSZ"], hdul[0].header["YPIXSZ"]]) << u.um
else:
grid_delta = np.array([hdul[0].header["XPIXSZ"], hdul[0].header["XPIXSZ"]]) << u.um
elif "PSFSCALE" in hdul[0].header:
grid_delta = (2 * f_number * d_aperture * np.tan(hdul[0].header["PSFSCALE"] / 2 * u.arcsec)).to(u.um)
grid_delta = u.Quantity([grid_delta, grid_delta])
else:
grid_delta = u.Quantity([pixel_size, pixel_size])
# Extract PSF center point
if "XPSFCTR" in hdul[0].header and "YPSFCTR" in hdul[0].header:
center_point = [hdul[0].header["XPSFCTR"], hdul[0].header["YPSFCTR"]]
else:
center_point = [x / 2 for x in list(self._psf.shape)]
# Close the file
hdul.close()
super().__init__(psf, f_number, wl, d_aperture, osf, pixel_size, grid_delta, center_point)

View File

@ -1,17 +1,11 @@
from .IPSF import IPSF from .AGriddedPSF import AGriddedPSF
from ...lib.helpers import rasterizeCircle
from ..sensor.PixelMask import PixelMask
from ...lib.logger import logger from ...lib.logger import logger
import numpy as np import numpy as np
import astropy.units as u import astropy.units as u
import re import re
from typing import Union
from scipy.optimize import bisect
from scipy.signal import fftconvolve
from scipy.interpolate import interp2d
class Zemax(IPSF): class Zemax(AGriddedPSF):
""" """
A class for modelling the PSF from a Zemax output file A class for modelling the PSF from a Zemax output file
""" """
@ -37,203 +31,26 @@ class Zemax(IPSF):
pixel_size : Quantity pixel_size : Quantity
The size of a pixel as length-quantity. The size of a pixel as length-quantity.
""" """
# Store parameters
self.__f_number = f_number
self.__wl = wl
self.__d_aperture = d_aperture
self.__osf = osf
self.__pixel_size = pixel_size
# Read PSF from file # Read PSF from file
with open(file, encoding="utf16") as fp: with open(file, encoding="utf16") as fp:
self.__psf = np.genfromtxt((x.replace(",", ".") for x in fp), delimiter='\t', skip_header=21) psf = np.genfromtxt((x.replace(",", ".") for x in fp), delimiter='\t', skip_header=21)
# Read header parameters from the file # Read header parameters from the file
with open(file, encoding="utf16") as fp: with open(file, encoding="utf16") as fp:
head = [next(fp) for _ in range(21)] head = [next(fp) for _ in range(21)]
# Parse shape of the grid and check the read PSF-array # Parse shape of the grid and check the read PSF-array
shape = [int(x) for x in re.findall("[0-9]+", list(filter(re.compile("Image grid size: ").match, head))[0])] shape = [int(x) for x in re.findall("[0-9]+", list(filter(re.compile("Image grid size: ").match, head))[0])]
if shape != list(self.__psf.shape): if shape != list(psf.shape):
logger.warning("Not all PSF entries read.") logger.warning("Not all PSF entries read.")
# Parse and calculate the grid width # Parse and calculate the grid width
grid_delta = [float(x.replace(",", ".")) for x in grid_delta = [float(x.replace(",", ".")) for x in
re.findall("[0-9]+,*[0-9]*", list(filter(re.compile("Data area is ").match, head))[0])] re.findall("[0-9]+,*[0-9]*", list(filter(re.compile("Data area is ").match, head))[0])]
unit = re.findall(".+(?=\\.$)", re.sub("Data area is [0-9]+,*[0-9]* by [0-9]+,*[0-9]* ", "", unit = re.findall(".+(?=\\.$)", re.sub("Data area is [0-9]+,*[0-9]* by [0-9]+,*[0-9]* ", "",
list(filter(re.compile("Data area is ").match, head))[0]))[0] list(filter(re.compile("Data area is ").match, head))[0]))[0]
# noinspection PyArgumentList grid_delta = np.array(grid_delta) / np.array(shape) << u.Unit(unit)
self.__grid_delta = np.array(grid_delta) / np.array(shape) << u.Unit(unit)
# Parse the center point of the PSF in the grid # Parse the center point of the PSF in the grid
self.__center_point = [int(x) for x in center_point = [int(x) for x in
re.findall("[0-9]+", list(filter(re.compile("Center point is: ").match, head))[0])] re.findall("[0-9]+", list(filter(re.compile("Center point is: ").match, head))[0])]
self.__center_point[0] = self.__psf.shape[0] - self.__center_point[0] center_point[0] = psf.shape[0] - center_point[0]
self.__center_point[1] -= 1 center_point[1] -= 1
self.__center_point_os = None super().__init__(psf, f_number, wl, d_aperture, osf, pixel_size, grid_delta, center_point)
self.__psf_os = None
self.__psf_osf = None
# @u.quantity_input(jitter_sigma=u.arcsec)
def calcReducedObservationAngle(self, contained_energy: Union[str, int, float, u.Quantity],
jitter_sigma: u.Quantity = None, obstruction: float = 0.0) -> u.Quantity:
"""
Calculate the reduced observation angle in lambda / d_ap for the given contained energy.
Parameters
----------
contained_energy : Union[str, int, float, u.Quantity]
The percentage of energy to be contained within a circle with the diameter reduced observation angle.
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec
obstruction : float
The central obstruction as ratio A_ob / A_ap
Returns
-------
reduced_observation_angle: Quantity
The reduced observation angle in lambda / d_ap
"""
# Parse the contained energy
if type(contained_energy) == str:
try:
contained_energy = float(contained_energy) / 100.0 * u.dimensionless_unscaled
except ValueError:
logger.error("Could not convert encircled energy to float.")
elif type(contained_energy) in [int, float]:
contained_energy = contained_energy / 100 * u.dimensionless_unscaled
center_point, psf, psf_osf = self.__calcPSF(jitter_sigma)
# Calculate the maximum possible radius for the circle containing the photometric aperture
r_max = max(np.sqrt(center_point[0] ** 2 + center_point[1] ** 2),
np.sqrt((psf.shape[0] - center_point[0]) ** 2 + center_point[1] ** 2),
np.sqrt(center_point[0] ** 2 + (psf.shape[1] - center_point[1]) ** 2),
np.sqrt((psf.shape[0] - center_point[0]) ** 2 + (psf.shape[1] - center_point[1]) ** 2))
# Calculate the total contained energy of the PSF
total = np.sum(psf)
# Iterate the optimal radius for the contained energy
r = bisect(lambda r_c: contained_energy.value - np.sum(
psf * rasterizeCircle(np.zeros((psf.shape[0], psf.shape[0])), r_c, center_point[0],
center_point[1])) / total, 0, r_max, xtol=1e-1) * 2
# Calculate the reduced observation angle in lambda / d_ap
# noinspection PyTypeChecker
reduced_observation_angle = r / psf_osf * self.__grid_delta[0] / (
self.__f_number * self.__d_aperture) * self.__d_aperture / self.__wl
return reduced_observation_angle * u.dimensionless_unscaled
def __calcPSF(self, jitter_sigma: u.Quantity = None):
"""
Calculate the PSF from the Zemax-file. This includes oversampling the PSF and convolving with the
jitter-gaussian.
Parameters
----------
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec.
Returns
-------
center_point : ndarray
The indices of the PSF's center point on the grid.
psf : ndarray
The PSF.
psf_osf : float
The oversampling factor of the returned PSF.
"""
# Calculate the psf oversampling factor for the PSF based on the current resolution of the PSF
psf_osf = np.ceil(max(self.__grid_delta) / (self.__pixel_size / self.__osf)).value
if psf_osf == 1.0:
# No oversampling is necessary
psf = self.__psf
center_point = self.__center_point
else:
# Oversampling is necessary, oversample the PSF and calculate the new center point.
f = interp2d(x=np.arange(self.__psf.shape[1]) - self.__center_point[1],
y=np.arange(self.__psf.shape[0]) - self.__center_point[0], z=self.__psf,
kind='cubic', copy=False, bounds_error=False, fill_value=None)
center_point = [(x + 0.5) * psf_osf - 0.5 for x in self.__center_point]
psf = f((np.arange(self.__psf.shape[1] * psf_osf) - center_point[1]) / psf_osf,
(np.arange(self.__psf.shape[0] * psf_osf) - center_point[0]) / psf_osf)
if jitter_sigma is not None:
# Convert angular jitter to jitter on focal plane
jitter_sigma_um = (jitter_sigma.to(u.rad) * self.__f_number * self.__d_aperture / u.rad).to(u.um)
# Jitter is enabled. Calculate the corresponding gaussian bell and convolve it with the PSF
if min(self.__grid_delta) / psf_osf < 6 * jitter_sigma_um:
# 6-sigma interval of the gaussian bell is larger than the grid width
# Calculate the necessary grid length for the 6-sigma interval of the gaussian bell
jitter_grid_length = np.ceil(6 * jitter_sigma_um / (min(self.__grid_delta) / psf_osf)).value
# Make sure, the grid size is odd in order to have a defined kernel center
jitter_grid_length = int(jitter_grid_length if jitter_grid_length % 2 == 1 else jitter_grid_length + 1)
# Create a meshgrid containing the x and y coordinates of each point within the first quadrant of the
# gaussian kernel
xv, yv = np.meshgrid(range(-int((jitter_grid_length - 1) / 2), 1),
range(-int((jitter_grid_length - 1) / 2), 1))
# Calculate the gaussian kernel in the first quadrant
kernel = 1 / (2 * np.pi * jitter_sigma_um.value ** 2) * np.exp(
-((xv * min(self.__grid_delta.value) / psf_osf) ** 2 +
(yv * min(self.__grid_delta.value) / psf_osf) ** 2) / (2 * jitter_sigma_um.value ** 2))
# Mirror the kernel from the first quadrant to all other quadrants
kernel = np.concatenate((kernel, np.flip(kernel, axis=1)[:, 1:]), axis=1)
kernel = np.concatenate((kernel, np.flip(kernel, axis=0)[1:, :]), axis=0)
# Normalize kernel
kernel = kernel / np.sum(kernel)
# Convolve PSF with gaussian kernel
psf = fftconvolve(psf, kernel, mode="full")
# Calculate new center point
center_point = [x + int((jitter_grid_length - 1) / 2) for x in center_point]
# Save the values as object attribute
self.__center_point_os = center_point
self.__psf_os = psf
self.__psf_osf = psf_osf
return center_point, psf, psf_osf
def mapToPixelMask(self, mask: PixelMask, jitter_sigma: u.Quantity = None, obstruction: float = 0.0) -> PixelMask:
"""
Map the integrated PSF values to a sensor grid.
Parameters
----------
obstruction
mask : PixelMask
The pixel mask to map the values to. The values will only be mapped onto entries with the value 1.
jitter_sigma : Quantity
Sigma of the telescope's jitter in arcsec
Returns
-------
mask : PixelMask
The pixel mask with the integrated PSF values mapped onto each pixel.
"""
# Calculate the indices of all non-zero elements of the mask
y_ind, x_ind = np.nonzero(mask)
# Extract a rectangle containing all non-zero values of the mask
mask_red = mask[y_ind.min():(y_ind.max() + 1), x_ind.min():(x_ind.max() + 1)]
# Calculate the new PSF-center indices of the reduced mask
psf_center_ind = [mask.psf_center_ind[0] - y_ind.min(), mask.psf_center_ind[1] - x_ind.min()]
# Oversample the reduced mask
mask_red_os = self._rebin(mask_red, self.__osf).view(PixelMask)
# Calculate the new PSF-center indices of the reduced mask
psf_center_ind = [(x + 0.5) * self.__osf - 0.5 for x in psf_center_ind]
# Get PSF values or calculate them if not available
if self.__psf_os is not None and self.__center_point_os is not None and self.__psf_osf is not None:
center_point = self.__center_point_os
psf = self.__psf_os
psf_osf = self.__psf_osf
else:
center_point, psf, psf_osf = self.__calcPSF(jitter_sigma)
# Calculate the coordinates of each PSF value in microns
x = (np.arange(psf.shape[1]) - center_point[1]) * self.__grid_delta[1].to(u.um).value / psf_osf
y = (np.arange(psf.shape[0]) - center_point[0]) * self.__grid_delta[0].to(u.um).value / psf_osf
# Initialize a two-dimensional cubic interpolation function for the PSF
psf_interp = interp2d(x=x, y=y, z=psf, kind='cubic', copy=False, bounds_error=False, fill_value=None)
# Calculate the values of the PSF for all elements of the reduced mask
res = psf_interp((np.arange(mask_red_os.shape[1]) - psf_center_ind[1]) * mask_red_os.pixel_size.to(u.um).value,
(np.arange(mask_red_os.shape[0]) - psf_center_ind[0]) * mask_red_os.pixel_size.to(u.um).value)
# Bin the oversampled reduced mask to the original resolution and multiply with the reduced mask to select only
# the relevant values
res = mask_red * self._rebin(res, 1 / self.__osf)
# Integrate the reduced mask and divide by the indefinite integral to get relative intensities
res = res * mask_red_os.pixel_size.to(u.um).value ** 2 / (
psf.sum() * (self.__grid_delta[0].to(u.um).value / psf_osf) ** 2)
# reintegrate the reduced mask into the complete mask
mask[y_ind.min():(y_ind.max() + 1), x_ind.min():(x_ind.max() + 1)] = res
return mask

View File

@ -1,3 +1,5 @@
from .IPSF import * from .IPSF import *
from .Zemax import *
from .Airy import * from .Airy import *
from .AGriddedPSF import *
from .Zemax import *
from .FITS import *

View File

@ -6,6 +6,7 @@ import numpy as np
from typing import Union, Tuple from typing import Union, Tuple
from ..psf.Airy import Airy from ..psf.Airy import Airy
from ..psf.Zemax import Zemax from ..psf.Zemax import Zemax
from ..psf.FITS import FITS
from ..SpectralQty import SpectralQty from ..SpectralQty import SpectralQty
from .PixelMask import PixelMask from .PixelMask import PixelMask
from ...lib.logger import logger from ...lib.logger import logger
@ -23,8 +24,8 @@ class Imager(ASensor):
@u.quantity_input(pixel_geometry=u.pixel, pixel_size="length", sigma_read_out=u.electron ** 0.5 / u.pix, @u.quantity_input(pixel_geometry=u.pixel, pixel_size="length", sigma_read_out=u.electron ** 0.5 / u.pix,
center_offset=u.pix, dark_current=u.electron / u.pix / u.second, well_capacity=u.electron) center_offset=u.pix, dark_current=u.electron / u.pix / u.second, well_capacity=u.electron)
def __init__(self, parent: IRadiant, quantum_efficiency: Union[str, u.Quantity], def __init__(self, parent: IRadiant, quantum_efficiency: Union[str, u.Quantity],
pixel_geometry: u.Quantity, pixel_size: u.Quantity, sigma_read_out: u.Quantity, dark_current: u.Quantity, pixel_geometry: u.Quantity, pixel_size: u.Quantity, sigma_read_out: u.Quantity,
well_capacity: u.Quantity, f_number: Union[int, float], common_conf: Entry, dark_current: u.Quantity, well_capacity: u.Quantity, f_number: Union[int, float], common_conf: Entry,
center_offset: u.Quantity = np.array([0, 0]) << u.pix, shape: str = "circle", center_offset: u.Quantity = np.array([0, 0]) << u.pix, shape: str = "circle",
contained_energy: Union[str, int, float] = "FWHM", aperture_size: u.Quantity = None): contained_energy: Union[str, int, float] = "FWHM", aperture_size: u.Quantity = None):
""" """
@ -82,14 +83,20 @@ class Imager(ASensor):
self.__central_wl = self.__common_conf.wl_min() + ( self.__central_wl = self.__common_conf.wl_min() + (
self.__common_conf.wl_max() - self.__common_conf.wl_min()) / 2 self.__common_conf.wl_max() - self.__common_conf.wl_min()) / 2
# Parse PSF # Parse PSF
if hasattr(common_conf, "psf") and common_conf.psf().lower() == "airy": if common_conf.psf.type.lower() == "airy":
# Use an airy disk as PSF # Use an airy disk as PSF
self.__psf = Airy(self.__f_number, self.__central_wl, common_conf.d_aperture(), common_conf.psf.osf, self.__psf = Airy(self.__f_number, self.__central_wl, common_conf.d_aperture(), common_conf.psf.osf,
pixel_size) pixel_size)
else: elif common_conf.psf.type.lower() == "zemax":
# Read PSF from Zemax file # Read PSF from Zemax file
self.__psf = Zemax(common_conf.psf(), self.__f_number, self.__central_wl, common_conf.d_aperture(), self.__psf = Zemax(common_conf.psf(), self.__f_number, self.__central_wl, common_conf.d_aperture(),
common_conf.psf.osf, pixel_size) common_conf.psf.osf, pixel_size)
elif common_conf.psf.type.lower() == "fits":
# Read PSF from FITS-file
self.__psf = FITS(common_conf.psf(), self.__f_number, self.__central_wl, common_conf.d_aperture(),
common_conf.psf.osf, pixel_size)
else:
logger.error("Unknown PSF type '" + common_conf.psf() + "'.")
@u.quantity_input(exp_time="time") @u.quantity_input(exp_time="time")
def calcSNR(self, background: SpectralQty, signal: SpectralQty, obstruction: float, def calcSNR(self, background: SpectralQty, signal: SpectralQty, obstruction: float,
@ -568,7 +575,7 @@ class Imager(ASensor):
return "Missing container 'contained_energy'." return "Missing container 'contained_energy'."
mes = sensor.photometric_aperture.contained_energy.check_float("val") mes = sensor.photometric_aperture.contained_energy.check_float("val")
if mes is not None: if mes is not None:
if conf.common.psf().lower() == "airy": if conf.common.psf.type.lower() == "airy":
mes = sensor.photometric_aperture.contained_energy.check_selection("val", mes = sensor.photometric_aperture.contained_energy.check_selection("val",
["peak", "FWHM", "fwhm", ["peak", "FWHM", "fwhm",
"min"]) "min"])

View File

@ -6,7 +6,7 @@
<!-- <res val="1000" val_unit=""--> <!-- <res val="1000" val_unit=""-->
<!-- comment="Resolving power for common spectral binning. A linear binning wavelength integral is estimated as wl_min/R"/>--> <!-- comment="Resolving power for common spectral binning. A linear binning wavelength integral is estimated as wl_min/R"/>-->
<d_aperture val="0.5" val_unit="m" comment="Diameter of the telescope aperture"/> <d_aperture val="0.5" val_unit="m" comment="Diameter of the telescope aperture"/>
<psf val="Airy" osf="10" comment="PSF of the optical system. Can be Airy or a file"/> <psf type="Airy" osf="10" comment="PSF of the optical system. Can be Airy or a file"/>
<!-- <psf val="tests/data/ps2f.txt" comment="PSF of the optical system. Can be Airy or a file"/>--> <!-- <psf val="tests/data/ps2f.txt" comment="PSF of the optical system. Can be Airy or a file"/>-->
<jitter_sigma val="1" val_unit="arcsec" comment="Sigma of the telescope jitter"/> <jitter_sigma val="1" val_unit="arcsec" comment="Sigma of the telescope jitter"/>
<output path="output" format="fits" comment="Output directory to store output files"/> <output path="output" format="fits" comment="Output directory to store output files"/>

View File

@ -6,7 +6,7 @@
<res val="299792.458" val_unit="" <res val="299792.458" val_unit=""
comment="Resolving power for common spectral binning. A linear binning wavelength integral is estimated as (wl_min + wl_max)/(2*R)"/> comment="Resolving power for common spectral binning. A linear binning wavelength integral is estimated as (wl_min + wl_max)/(2*R)"/>
<d_aperture val="2.527" val_unit="m" comment="Diameter of the telescope aperture"/> <d_aperture val="2.527" val_unit="m" comment="Diameter of the telescope aperture"/>
<psf val="Airy" osf="10" osf_unit="" comment="PSF of the optical system. Can be Airy or a file"/> <psf type="Airy" osf="10" osf_unit="" comment="PSF of the optical system. Can be Airy or a file"/>
<!-- <psf val="data/psf.txt" osf="10" osf_unit="" comment="PSF of the optical system. Can be Airy or a file"/>--> <!-- <psf val="data/psf.txt" osf="10" osf_unit="" comment="PSF of the optical system. Can be Airy or a file"/>-->
<!-- <jitter_sigma val="0.1" val_unit="arcsec" comment="Sigma of the telescope jitter"/>--> <!-- <jitter_sigma val="0.1" val_unit="arcsec" comment="Sigma of the telescope jitter"/>-->
<output path="output" format="csv" comment="Output directory to store output files"/> <output path="output" format="csv" comment="Output directory to store output files"/>

BIN
tests/data/psf_5um.fits Normal file

Binary file not shown.

50
tests/psf/test_FITS.py Normal file
View File

@ -0,0 +1,50 @@
from unittest import TestCase
from esbo_etc.classes.psf.FITS import FITS
from esbo_etc.classes.psf.Airy import Airy
from esbo_etc.classes.sensor.PixelMask import PixelMask
import astropy.units as u
import numpy as np
class TestFITS(TestCase):
def setUp(self):
self.fits = FITS("tests/data/psf_5um.fits", 5.5, 5 * u.um, 0.5 * u.m, 10, 6.5 * u.um)
self.airy = Airy(5.5, 5 * u.um, 0.5 * u.m, 10, 6.5 * u.um)
def test_calcReducedObservationAngle(self):
# No jitter
self.assertTrue(np.isclose(self.fits.calcReducedObservationAngle(80).value,
self.airy.calcReducedObservationAngle(80).value, rtol=0.04))
# Jitter
self.assertTrue(np.isclose(self.fits.calcReducedObservationAngle(80, 1 * u.arcsec).value,
self.airy.calcReducedObservationAngle(80, 1 * u.arcsec).value, rtol=0.02))
def test_mapToPixelArray(self):
# No jitter
reduced_observation_angle = self.fits.calcReducedObservationAngle(80).value
d_ap = (reduced_observation_angle / (6.5 * u.um / (13.0 * 4 * u.um))).decompose() * u.pix
mask = PixelMask(np.array([1024, 1024]) << u.pix, 6.5 * u.um, np.array([0.5, 0.5]) << u.pix)
mask.createPhotometricAperture("circle", d_ap / 2)
mask = self.fits.mapToPixelMask(mask)
reduced_observation_angle_2 = self.airy.calcReducedObservationAngle(80).value
d_ap_2 = (reduced_observation_angle_2 / (6.5 * u.um / (13.0 * 4 * u.um))).decompose() * u.pix
mask_2 = PixelMask(np.array([1024, 1024]) << u.pix, 6.5 * u.um, np.array([0.5, 0.5]) << u.pix)
mask_2.createPhotometricAperture("circle", d_ap_2 / 2)
mask_2 = self.airy.mapToPixelMask(mask_2)
self.assertTrue(np.isclose(float(mask.sum()), float(mask_2.sum()), rtol=0.005))
# Jitter
reduced_observation_angle = self.fits.calcReducedObservationAngle(80, 1 * u.arcsec).value
d_ap = (reduced_observation_angle / (6.5 * u.um / (13.0 * 4 * u.um))).decompose() * u.pix
mask = PixelMask(np.array([1024, 1024]) << u.pix, 6.5 * u.um, np.array([0.5, 0.5]) << u.pix)
mask.createPhotometricAperture("circle", d_ap / 2)
mask = self.fits.mapToPixelMask(mask, 1 * u.arcsec)
reduced_observation_angle_2 = self.airy.calcReducedObservationAngle(80, 1 * u.arcsec).value
d_ap_2 = (reduced_observation_angle_2 / (6.5 * u.um / (13.0 * 4 * u.um))).decompose() * u.pix
mask_2 = PixelMask(np.array([1024, 1024]) << u.pix, 6.5 * u.um, np.array([0.5, 0.5]) << u.pix)
mask_2.createPhotometricAperture("circle", d_ap_2 / 2)
mask_2 = self.airy.mapToPixelMask(mask_2, 1 * u.arcsec)
self.assertTrue(np.isclose(float(mask.sum()), float(mask_2.sum()), rtol=0.03))