#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Filename: response.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import os
import re
import astropy.io.ascii
import astropy.io.fits
import astropy.table
import numpy as np
import scipy.integrate as integrate
import scipy.interpolate as interpolate
import scipy.optimize as optimize
import kcorrect
import kcorrect.template
import kcorrect.utils
[docs]
def all_responses(response_dir=os.path.join(kcorrect.KCORRECT_DIR, 'data',
'responses'),
check_validity=False):
"""List all responses available
Parameters
----------
response_dir : str
path to directory containing responses
check_validity : bool
if True, check the validity of the files
Returns
-------
responses : list of str
response names for available bandpasses
Notes
-----
Returns all base names of files with the suffix '.dat' in the
response_dir directory.
By default, response_dir is the "data/responses" directory within
the kcorrect Python distribution.
If the user specifies response_dir and check_validity is False,
there is no guarantee that the response files in the specified
directory are valid!
If check_validity is True, the responses are also loaded into the
ResponseDict() singleton.
"""
rdir = os.path.join(response_dir)
files = os.listdir(rdir)
responses = []
f = ResponseDict()
for filename in files:
if(os.path.isfile(os.path.join(rdir, filename))):
m = re.match('^(.*)\\.dat$', filename)
if(m is not None):
response = m.group(1)
valid = True
if(check_validity):
try:
f.load_response(response)
except:
valid = False
if(valid):
responses.append(response)
return(responses)
# Class to define a singleton
[docs]
class ResponseDictSingleton(type):
[docs]
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(ResponseDictSingleton,
cls).__call__(*args, **kwargs)
return cls._instances[cls]
[docs]
class ResponseDict(dict, metaclass=ResponseDictSingleton):
"""Dictionary of all responses (singleton)
"""
def __init__(self):
return
[docs]
def load_response(self, response=None, reload=False):
"""Load response into dictionary
Parameters
----------
response : str
response to load
reload : bool
if True, reload the response if already in ResponseDict (default False)
"""
if((response in self) & (reload is False)):
return
self[response] = Response()
self[response].fromdat(filename='{n}.dat'.format(n=response))
return
[docs]
class Response(object):
"""Astronomical bandpass description
Parameters
----------
filename : str
file name to read from
wave : ndarray of np.float32
wavelength grid in Angstroms
response : ndarray of np.float32
response function
Attributes
----------
filename : str
source filename, or None
fwhm, fwhm_low, fwhm_hight : np.float32
FWHM of response, with low and high wavelength limits (Angstroms)
interp : scipy.interpolate.interp1d object
interpolation object
lambda_eff : np.float32
effective wavelength in Angstroms
nwave : int
number of wavelength samples
response : ndarray of np.float32
response function
solar_magnitude : np.float32
absolute magnitude of Sun through filter, or None
solar_sed : kcorrect.template.SED object
SED associated with Sun for solar_magnitude
vega2ab : np.float32
magnitude offset from Vega to AB (m_AB - m_Vega), or None
vega_sed : kcorrect.template.SED object
SED associated with Vega for vega_magnitude
wave : ndarray of np.float32
wavelength grid in Angstroms
Notes
-----
The response should be the relative response of the system
(atmosphere, telescope, detector, etc) to a photon at each
given wavelength entering the Earth's atmosphere (for a ground
based telescope) or the telescope aperture (space based).
If a file is given it is assumed to be in fixed_width
format, readable and writeable by astropy.io.ascii
The wavelengths are sorted on input so the attribute wave is
always increasing.
The attribute interp() takes wavelength in Angstroms as its one
positional argument.
The effective wavelength is defined as described in Blanton &
Roweis (2007).
"""
def __init__(self, filename=None, wave=None, response=None):
if((wave is not None) &
(response is not None)):
isort = np.argsort(wave)
self.wave = wave[isort]
self.response = response[isort]
else:
self.wave = wave
self.response = response
[docs]
self.filename = filename
[docs]
self.solar_magnitude = None
[docs]
self.vega_magnitude = None
if(self.filename is not None):
self.fromdat(filename)
else:
if(self.wave is not None):
self.nwave = len(self.wave)
self._setup()
return
[docs]
def _setup(self):
"""Some initial setup after an input"""
self.set_interp()
self.set_lambda_eff()
self.set_solar_magnitude()
self.set_vega2ab()
self.set_fwhm_limits()
return
[docs]
def response_dtype(self):
"""Returns numpy dtype for SED"""
response_dtype = np.dtype([('wave', type(self.wave[0]), self.nwave),
('response', type(self.response[0]), self.nwave)])
return(response_dtype)
[docs]
def set_interp(self):
"""Sets attribute interp to interpolation function"""
if((self.wave is None) | (self.response is None)):
self.interp = None
return
self.interp = interpolate.interp1d(self.wave, self.response,
kind='cubic',
bounds_error=False,
fill_value=0.)
return
[docs]
def fromfits(self, filename=None, ext=1):
"""Read response from FITS files
Parameters
----------
filename : str
input file name
ext : str or int
extension to read from
"""
response_hdulist = astropy.io.fits.open(filename)
response = response_hdulist[ext].data[0]
self.nwave = len(response['wave'])
isort = np.argsort(response['wave'])
self.wave = response['wave'][isort]
self.response = response['response'][isort]
self._setup()
return
[docs]
def fromdat(self, filename=None):
"""Read response from fixed_width file
Parameters
----------
filename : str
input file name
Notes
-----
If an absolute path, reads that. If not, looks relative
to KCORRECT_DIR/python/kcorrect/data/responses
"""
if(os.path.isabs(filename)):
infilename = filename
else:
infilename = os.path.join(kcorrect.KCORRECT_DIR, 'data',
'responses', filename)
if(os.path.exists(infilename) is False):
raise ValueError("No response file: {f}".format(f=infilename))
dat = astropy.io.ascii.read(infilename, format='fixed_width')
self.nwave = len(dat)
isort = np.argsort(dat['lambda'])
self.wave = dat['lambda'][isort]
self.response = dat['pass'][isort]
self._setup()
return
[docs]
def tofits(self, filename=None, ext='FLUX', clobber=True):
"""Write SED to FITS files
filename : str
output file name
ext : str or int
extension to write to
clobber : bool
whether to clobber the existing file or add an HDU
"""
out = np.zeros(1, self.response_dtype())
out['wave'] = self.wave
out['response'] = self.response
out_table = astropy.table.Table(out)
out_table.write(filename, overwrite=clobber)
return
[docs]
def project(self, sed=None, wave=None, flux=None):
"""Project spectrum onto response
Parameters
----------
sed : kcorrect.template.SED object
spectrum in kcorrect format
wave : ndarray of np.float32
wavelength grid (used only if sed and func not set)
flux : ndarray of np.float32
flux grid (used only if sed and func not set)
Returns
-------
maggies : np.float32
[nsed] nmaggies associated with spectrum through bandpass
Notes
-----
Fluxes should be in erg cm^{-2} s^{-1} Angstrom^{-1}
If "flux" and "wave" are specified, then wave must be
a 1-dimensional array, and flux must be a 1-dimensional
or 2-dimensional array, with the last axis corresponding
to wavelength and with the same number of elements as
wave.
Assumes AB calibration.
If the bandpass is outside the range of the solar model, 0 is returned.
"""
if(sed is None):
if((wave is None) or (flux is None)):
raise ValueError("must specify sed, or wave and flux")
wave = np.float32(wave)
flux = np.float32(flux)
if(wave.ndim != 1):
raise ValueError("wave must be 1-D array")
if(flux.shape[-1] != wave.shape[0]):
raise ValueError("last axis of flux must match wave")
if(flux.ndim > 2):
raise ValueError("flux must be 1-D or 2-D array")
sed_wave = wave
if(flux.ndim == 1):
nsed = 1
else:
nsed = flux.shape[0]
interp = interpolate.interp1d(wave, flux, kind='cubic',
bounds_error=False, fill_value=0.)
else:
sed_wave = sed.wave
interp = sed.interp
nsed = sed.nsed
# Find SED wavelengths to integrate over
keep = (sed_wave >= self.wave[0]) & (sed_wave <= self.wave[-1])
ikeep = np.where(keep)[0]
if(len(ikeep) == 0):
return(0.)
if(ikeep[0] > 0):
keep[ikeep[0] - 1] = 1
if(ikeep[-1] < len(sed_wave) - 1):
keep[ikeep[-1] + 1] = 1
# Find full grid of wavelengths for integration
integrate_wave = np.unique(np.append(sed_wave[keep], self.wave))
# Interpolate to grid
integrate_sed = interp(integrate_wave)
integrate_response = self.interp(integrate_wave)
# Perform integration for numerator
numer = np.zeros(nsed, dtype=np.float32)
if(nsed == 1):
integrand_numer = integrate_sed * integrate_response * integrate_wave
numer = integrate.trapezoid(integrate_wave, np.squeeze(integrand_numer))
else:
for ised in np.arange(nsed, dtype=int):
integrand_numer = integrate_sed[ised, :] * integrate_response * integrate_wave
numer[ised] = integrate.trapezoid(integrate_wave,
np.squeeze(integrand_numer))
# Perform integration for denominator
integrand_denom = (kcorrect.utils.sed_ab(integrate_wave) *
integrate_response * integrate_wave)
denom = integrate.trapezoid(integrate_wave, integrand_denom)
# AB maggies are projection of SED onto response divided by same
# projection for the AB source.
maggies = np.squeeze(numer / denom)
return(maggies)
[docs]
def set_lambda_eff(self):
"""Set effective wavelength
Notes
-----
Sets attribute lambda_eff
"""
# Just use original grid; good enough.
wave = self.wave
response = self.response
# Perform integration for numerator
integrand_numer = np.log(wave) * response / wave
numer = integrate.trapezoid(wave, integrand_numer)
# Perform integration for denominator
integrand_denom = response / wave
denom = integrate.trapezoid(wave, integrand_denom)
# Set effective wavelength
self.lambda_eff = np.exp(numer / denom)
return
[docs]
def set_fwhm_limits(self):
"""Set limits for FWHM
Notes
-----
Sets attributes fwhm_low, fwhm_high, fwhm.
fwhm_low is the lowest wavelength value for which the response
reaches 50% maximum when starting from the low end.
fwhm_high is the highest wavelength value for which the response
reaches 50% maximum when starting from the high end.
fwhm is (fwhm_high - fwhm_low)
"""
# Just use original grid; good enough.
wave = self.wave.copy()
iresponse = self.interp(wave)
maxresponse = iresponse.max()
iresponse = iresponse / maxresponse
# Find lower
iupper = np.where(iresponse > 0.5)[0][0]
if(iupper == 0):
fwhm_low = wave[iupper]
else:
ilower = iupper - 1
fwhm_low = optimize.brentq(lambda x : (self.interp(x) / maxresponse - 0.5),
wave[ilower], wave[iupper])
# Find upper
ilower = np.where(iresponse >= 0.5)[0][-1]
if(ilower == len(iresponse) - 1):
fwhm_high = wave[-1]
else:
iupper = ilower + 1
fwhm_high = optimize.brentq(lambda x : (self.interp(x) / maxresponse - 0.5),
wave[ilower], wave[iupper])
self.fwhm_low = fwhm_low
self.fwhm_high = fwhm_high
self.fwhm = fwhm_high - fwhm_low
return
[docs]
def set_solar_magnitude(self):
"""Set absolute magnitude of Sun through filter
Notes
-----
Uses lcbsun.ori model from Lejeune et al (1997)
If the response function is outside the model wavelength range,
solar_magnitude is set to None.
"""
if(self.solar_sed is None):
sunfile = os.path.join(kcorrect.KCORRECT_DIR, 'data', 'basel',
'lcbsun.ori')
info, wave, flux = kcorrect.utils.read_basel(filename=sunfile)
# Now convert to Angstroms and erg/cm^2/s/Ang at 10 pc
radius = 6.960e+10
wave = wave * 10. # nm to Angstrom
pctocm = 3.086e+18
cspeed = 2.99792e+18 # Ang/s
for unit in range(flux.shape[0]):
flux[unit, :] = np.pi * 4. * flux * cspeed / wave**2
flux = flux * (radius / (10. * pctocm))**2
self.solar_sed = kcorrect.template.SED(wave=wave, flux=flux)
self.solar_sed.info = info
solar_maggies = self.project(sed=self.solar_sed)
if(solar_maggies > 0):
self.solar_magnitude = - 2.5 * np.log10(solar_maggies)
else:
self.solar_magnitude = None
return
[docs]
def set_vega2ab(self):
"""Set Vega to AB magnitude conversion
Notes
-----
Uses lcbvega.ori model from Lejeune et al (1997)
If the response function is outside the model wavelength range,
vega2ab is set to None.
"""
if(self.vega_sed is None):
vegafile = os.path.join(kcorrect.KCORRECT_DIR, 'data', 'basel',
'lcbvega.ori')
info, wave, flux = kcorrect.utils.read_basel(filename=vegafile)
# Conversion to match Hayes et al. 1985
radius = 1.91144e+11 # Backed out to get normalization right
dvega = 7.68 # Vega is 7.68 pc
wave = wave * 10. # nm to Angstrom
pctocm = 3.086e+18
cspeed = 2.99792e+18 # Ang/s
for unit in range(flux.shape[0]):
flux[unit, :] = np.pi * 4. * flux * cspeed / wave**2
flux = flux * (radius / (dvega * pctocm))**2
self.vega_sed = kcorrect.template.SED(wave=wave, flux=flux)
self.vega_sed.info = info
vega_maggies = self.project(sed=self.vega_sed)
if(vega_maggies > 0):
self.vega2ab = - 2.5 * np.log10(vega_maggies)
else:
self.vega2ab = None
return