#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Filename: fitter.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import multiprocessing
import numpy as np
import scipy.interpolate as interpolate
import scipy.optimize as optimize
import kcorrect.response
[docs]
class Fitter(object):
"""Nonnegative SED fitting object
Parameters
----------
abcorrect : bool
correct maggies to AB (default False)
responses : list of str
names of responses to use
templates : kcorrect.template.Template object
templates to use
redshift_range : list of np.float32
minimum and maximum redshifts (default [0., 1.])
nredshift : int or np.int32
number of redshifts in interpolation grid
Attributes
----------
abcorrect : bool
correct maggies to AB
Amatrix : scipy.interpolate.interp1d object
interpolator to produce A matrix (set to None until set_Amatrix called)
responses : list of str
names of responses to use
nredshift : int or np.int32
number of redshifts in interpolation grid
redshift_range : list of np.float32
minimum and maximum redshifts (default [0., 1.])
redshifts : ndarray of np.float32
redshifts in grid
templates : kcorrect.template.Template object
templates to use
"""
def __init__(self, responses=None, templates=None, redshift_range=[0., 1.],
nredshift=2000, abcorrect=False):
[docs]
self.abcorrect = abcorrect
[docs]
self.responses = responses
[docs]
self.templates = templates
[docs]
self.nredshift = nredshift
[docs]
self.redshift_range = np.float32(redshift_range)
[docs]
self.redshifts = (self.redshift_range[0] +
(self.redshift_range[1] - self.redshift_range[0]) *
np.arange(nredshift, dtype=np.float32) /
np.float32(nredshift - 1))
return
[docs]
def _interpolate_Amatrix(self, redshifts=None, A=None):
"""Interpolate the A matrix to an interpolator"""
return(interpolate.interp1d(redshifts, A, axis=0))
[docs]
def _calc_Amatrix(self, responses=None):
"""Create an A matrix and return it (don't set attribute)"""
# Make sure responses are loaded
f = kcorrect.response.ResponseDict()
for response in responses:
f.load_response(response)
# Create rmatrix
rmatrix = np.zeros((self.nredshift,
len(responses),
self.templates.nsed), dtype=np.float32)
for iz, z in enumerate(self.redshifts):
self.templates.set_redshift(redshift=z)
for ir, response in enumerate(responses):
if(self.templates.interpolate):
rmatrix[iz, ir, :] = f[response].project(sed=self.templates)
else:
# project SEDs in groups if there is a very large # of them
nchunk = 1000
for ised in np.arange(0, self.templates.nsed, nchunk, dtype=np.int32):
ised_start = ised
ised_end = ised + nchunk
if(ised_end > self.templates.nsed):
ised_end = self.templates.nsed
rmatrix[iz, ir, ised_start:ised_end] = f[response].project(wave=self.templates.wave,
flux=self.templates.flux[ised_start:ised_end, :])
# Now create Amatrix interpolator
Amatrix = self._interpolate_Amatrix(redshifts=self.redshifts, A=rmatrix)
# Return templates to z=0
self.templates.set_redshift(redshift=0.)
return(Amatrix)
[docs]
def to_ab(self, maggies=None, ivar=None):
"""Convert input maggies to AB
Parameters
----------
maggies : ndarray of np.float32
array of fluxes in standard SDSS system
ivar : ndarray of np.float32
inverse variances in standard SDSS system (optional)
Returns
-------
ab_maggies : ndarray of np.float32
array of fluxes converted to AB
ab_ivar : ndarray of np.float32
inverse variances converted to AB (if ivar input)
Notes
-----
This method just returns maggies and/or ivar unchanged,
as for this object we expect AB maggies on input.
fit_coeffs() calls this on its inputs.
"""
if(ivar is not None):
return(maggies, ivar)
else:
return(maggies)
[docs]
def set_Amatrix(self):
"""Set Amatrix, interpolator for the design matrix"""
self.Amatrix = self._calc_Amatrix(responses=self.responses)
return
[docs]
def _fit_coeffs(self, redshift=None, maggies=None, ivar=None, mc=0):
"""Fit coefficients to single object
Parameters
----------
redshift : np.float32
redshift
maggies : ndarray of np.float32
fluxes of each band in AB maggies
ivar : ndarray of np.float32
inverse variance of each band
mc : int
if greater than 0, generate Monte Carlo of coefficients
Returns
-------
coeffs : ndarray of np.float32
[ntemplates] coefficients for each template
coeffs_mc : ndarray of np.float32
[ntemplates, mc] coefficients for each MC for each template
maggies_mc : ndarray of np.float32
[len(maggies), mc] maggies for each MC
Notes
-----
This recasts the A and b matrices as np.float64 before sending them
to scipy.optimize.nnls(). This is necessitated by a change in the algorithm
for NNLS in SciPy 1.12.0 and later, which is apparently less numerically robust
than the original algorithm.
"""
default_zeros = np.zeros(self.templates.nsed, dtype=np.float32)
inverr = np.sqrt(ivar)
inverr_matrix = np.transpose(np.tile(inverr, (self.templates.nsed, 1)))
try:
A = inverr_matrix * self.Amatrix(redshift)
except ValueError:
if(mc == 0):
return(default_zeros)
else:
coeffs_mc = np.zeros((self.templates.nsed, mc),
dtype=np.float32)
return(default_zeros, coeffs_mc)
b = maggies * inverr
try:
coeffs, rnorm = optimize.nnls(np.float64(A), np.float64(b))
except RuntimeError:
coeffs, rnorm = optimize.nnls(np.float64(A), np.float64(b),
maxiter=A.shape[1] * 100)
if(mc == 0):
return(coeffs)
else:
coeffs_mc = np.zeros((self.templates.nsed, mc), dtype=np.float32)
igd = np.where(inverr > 0)[0]
err = 1. / inverr[igd]
maggies_mc = np.zeros((len(maggies), mc), dtype=np.float32)
for imc in np.arange(mc, dtype=int):
maggies_mc[:, imc] = maggies
maggies_mc[igd, imc] = maggies_mc[igd, imc] + np.random.normal(size=len(igd)) * err
b_mc = maggies_mc[:, imc] * inverr
try:
tmp_coeffs_mc, rnorm = optimize.nnls(np.float64(A), np.float64(b_mc))
except RuntimeError:
tmp_coeffs_mc, rnorm = optimize.nnls(np.float64(A), np.float64(b_mc),
maxiter=A.shape[1] * 100)
coeffs_mc[:, imc] = tmp_coeffs_mc
return(coeffs, coeffs_mc, maggies_mc)
[docs]
def fit_coeffs(self, redshift=None, maggies=None, ivar=None, mc=0):
"""Fit coefficients
Parameters
----------
redshift : np.float32 or ndarray of np.float32
redshift(s)
maggies : ndarray of np.float32
fluxes of each band in AB maggies
ivar : ndarray of np.float32
inverse variance of each band
mc : int
if greater than 1, generate Monte Carlo of coefficients
Returns
-------
coeffs : ndarray of np.float32
coefficients for each template
coeffs_mc : ndarray of np.float32
[ntemplates, mc] coefficients for each MC for each template
maggies_mc : ndarray of np.float32
[nbands, mc] Monte Carlo maggies
Notes
-----
maggies are assumed to be Galactic-extinction corrected already.
Calls this class's to_ab() method on input maggies.
If redshift is an array, even with just one element, coeffs is
returned as an [nredshift, ntemplate] array.
Otherwise coeffs is returned as an [ntemplate] array.
Occasionally the optimizer will report "NNLS quitting on
iteration count." This indicates that the default number of
iterations for scipy.optimize.nnls was not enough. Under these
conditions, this code tries a much larger number of
iterations. If that still fails, you will receive a traceback.
"""
if(redshift is None):
raise TypeError("Must specify redshift to fit coefficients")
# Check a bunch of things about the input arrays
(array, n, redshift, maggies, ivar,
dumdum) = self._process_inputs(redshift=redshift, maggies=maggies,
ivar=ivar)
# Call single
if(n == 1):
if(mc == 0):
coeffs = self._fit_coeffs(redshift=np.squeeze(redshift),
maggies=np.squeeze(maggies),
ivar=np.squeeze(ivar),
mc=mc)
else:
coeffs, coeffs_mc, maggies_mc = self._fit_coeffs(redshift=np.squeeze(redshift),
maggies=np.squeeze(maggies),
ivar=np.squeeze(ivar),
mc=mc)
if(array):
coeffs = coeffs.reshape(1, len(coeffs))
if(mc > 0):
coeffs_mc = coeffs_mc.reshape(1, len(coeffs), mc)
maggies_mc = maggies_mc.reshape(1, len(maggies), mc)
if(mc > 0):
return(coeffs, coeffs_mc, maggies_mc)
else:
return(coeffs)
# Loop for multiple
coeffs = np.zeros((n, self.templates.nsed), dtype=np.float32)
if(mc > 0):
coeffs_mc = np.zeros((n, self.templates.nsed, mc), dtype=np.float32)
maggies_mc = np.zeros((n, len(self.responses), mc), dtype=np.float32)
for i, r in enumerate(redshift):
if(mc == 0):
coeffs[i, :] = self._fit_coeffs(redshift=r, maggies=maggies[i, :],
ivar=ivar[i, :])
else:
tmp_coeffs, tmp_coeffs_mc, tmp_maggies_mc = self._fit_coeffs(redshift=r,
maggies=maggies[i, :],
ivar=ivar[i, :],
mc=mc)
coeffs[i, :] = tmp_coeffs
coeffs_mc[i, :, :] = tmp_coeffs_mc
maggies_mc[i, :, :] = tmp_maggies_mc
if(mc == 0):
return(coeffs)
else:
return(coeffs, coeffs_mc, maggies_mc)
[docs]
def _reconstruct(self, Amatrix=None, redshift=None, coeffs=None,
band_shift=0.):
"""Reconstruct maggies associated with coefficients
Parameters
----------
Amatrix : scipy.interpolate.interp1d
interpolator to use for Amatrix
redshift : np.float32
redshift
coeffs : ndarray of np.float32
coefficients
band_shift : np.float32
blueshift to apply to reconstructed bandpasses
Returns
-------
maggies : ndarray of np.float32
maggies in each band
Notes
-----
Amatrix should be an interpolator over redshift that returns
an array that is number of responses by number of templates.
"""
default_zeros = np.zeros(len(self.responses), dtype=np.float32)
# Check a bunch of things about the input arrays
(array, n, redshift, d1, d2,
coeffs) = self._process_inputs(redshift=redshift, coeffs=coeffs)
# Consider blueward shift of bandpass due to redshift
# of observation and due to band_shift
shift = (1. + redshift) * (1. + band_shift) - 1.
# Calculate maggies
try:
A = Amatrix(shift)
except ValueError:
raise ValueError("Redshift out of range for interpolating A matrix!")
if(array):
maggies = np.einsum('ijk,ki->ij', A,
coeffs.T.reshape(self.templates.nsed, n))
else:
maggies = A.dot(coeffs)
# For band_shift !=0, require this normalization given that
# AB source is not altered.
maggies = maggies / (1. + band_shift)
return(maggies)
[docs]
def reconstruct(self, redshift=None, coeffs=None, band_shift=0.):
"""Reconstruct AB maggies associated with coefficients
Parameters
----------
redshift : np.float32 or ndarray of np.float32
redshift
coeffs : ndarray of np.float32
coefficients
band_shift : np.float32
blueshift to apply to reconstructed bandpasses
Returns
-------
maggies : ndarray of np.float32
AB maggies in each band
Notes
-----
Returns AB maggies, but note that if to_ab() is non-trivial,
these may not be directly comparable to the input maggies.
"""
return(self._reconstruct(Amatrix=self.Amatrix, redshift=redshift,
coeffs=coeffs, band_shift=band_shift))