Source code for kcorrect.utils

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# @Filename: utils.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)


import numpy as np

import kcorrect.response


[docs] def sed_ab(wave=None): """Calculate AB source spectrum Parameters ---------- wave : ndarray of np.float32 wavelength in Angstrom Returns ------- ab : ndarray of np.float32 AB flux in erg cm^{-2} s^{-1} Angstrom^{-1} """ # This is 3631 Jy * c / wavelength^2 # Keeping cspeed in order unity units to avoid overflows cspeed = 2.99792 # speed of light in 10^{18} A / s ab = 3.631e-2 / wave**2 * cspeed return(ab)
[docs] def read_basel(filename=None): """Read Basel-style spectrum file Parameters ---------- filename : str file to read from Returns ------- info : dict meta-data for model wave : ndarray of np.float32 [1221] wavelengths in nm flux : ndarray of np.float32 [N, 1221] fluxes in erg/cm^2/s/Hz Notes ----- 1221 hardcoded as number of spectral elements. Model parameters stored as arrays in info attribute are: 'modelno' 'teff' 'logg' 'mh' 'vturb' 'xh' """ fp = open(filename, "r") fpstr = fp.read() fp.close words = fpstr.split() nwave = 1221 nunits = (len(words) - nwave) // (nwave + 6) wavestr = words[0:nwave] wave = np.array([np.float32(x) for x in wavestr], dtype=np.float32) flux = np.zeros((nunits, nwave), dtype=np.float32) modelno = np.zeros(nunits, dtype=np.int32) teff = np.zeros(nunits, dtype=np.float32) logg = np.zeros(nunits, dtype=np.float32) mh = np.zeros(nunits, dtype=np.float32) vturb = np.zeros(nunits, dtype=np.float32) xh = np.zeros(nunits, dtype=np.float32) for unit in range(nunits): start = nwave + unit * (nwave + 6) modelno[unit] = np.int32(words[start]) teff[unit] = np.float32(words[start + 1]) logg[unit] = np.float32(words[start + 2]) mh[unit] = np.float32(words[start + 3]) vturb[unit] = np.float32(words[start + 4]) xh[unit] = np.float32(words[start + 5]) fluxstr = words[start + 6:start + 6 + nwave] flux[unit, :] = np.array([np.float32(x) for x in fluxstr], dtype=np.float32) info = dict() info['modelno'] = modelno info['teff'] = teff info['logg'] = logg info['mh'] = mh info['vturb'] = vturb info['xh'] = xh return(info, wave, flux)
[docs] def sdss_ab_correct(maggies=None, ivar=None, abfix=[-0.036, 0.012, 0.010, 0.028, 0.040]): """Return AB maggies based on SDSS standard maggies Parameters ---------- maggies : ndarray of np.float32 array of fluxes in standard SDSS system ivar : ndarray of np.float32 inverse variances in standard SDSS system (optional) abfix : ndarray or list of np.float32 [5]-array of magnitude offset to apply to standard values Returns ------- ab_maggies : ndarray of np.float32 array of fluxes converted to AB ab_ivar : ndarray of np.float32 inverse variances converted to AB (if ivar input) Notes ----- Uses the AB conversions produced by D. Eisenstein, in his message sdss-calib/1152 :: u(AB,2.5m) = u(database, 2.5m) - 0.036 g(AB,2.5m) = g(database, 2.5m) + 0.012 r(AB,2.5m) = r(database, 2.5m) + 0.010 i(AB,2.5m) = i(database, 2.5m) + 0.028 z(AB,2.5m) = z(database, 2.5m) + 0.040 Unless an alternative is specified with the abfix parameter. """ maggies = np.float32(maggies) if(ivar is not None): ivar = np.float32(ivar) if(maggies.shape != ivar.shape): raise("maggies and ivar must be the same shape") abfix = np.array(abfix, dtype=np.float32) if(abfix.shape != (5,)): raise("abfix must have 5 values") abfac = 10.**(- 0.4 * abfix) if(maggies.ndim == 1): if(maggies.size != 5): raise("sdss_ab_correct expects 5 SDSS maggies values (ugriz)") ab_maggies = maggies * abfac if(ivar is not None): ab_ivar = ivar / abfac**2 else: ab_maggies = np.zeros(maggies.shape, dtype=np.float32) for i, cabfac in enumerate(abfac): ab_maggies[..., i] = maggies[..., i] * cabfac if(ivar is not None): ab_ivar = np.zeros(maggies.shape, dtype=np.float32) for i, cabfac in enumerate(abfac): ab_ivar[..., i] = ivar[..., i] / cabfac**2 if(ivar is not None): return(ab_maggies, ab_ivar) else: return(ab_maggies)
[docs] def error_floor(floor=None, maggies=None, ivar=None): """Calculate new inverse variance with fractional error floors Parameters ---------- floor : ndarray of np.float32 [Nr] fractional error floor for each response maggies : ndarray of np.float32 [N, Nr] or [Nr] array of maggies ivar : ndarray of np.float32 [N, Nr] or [Nr] array of inverse variance of maggies Returns ------- ivar : ndarray of np.float32 [N, Nr] or [Nr] array of inverse variances """ floor = np.array(floor) maggies = np.array(maggies) ivar = np.array(ivar) if(maggies.ndim == 1): array = False else: array = True if(maggies.shape[-1] != floor.shape[0]): raise("floor must have same number of responses as maggies") if(maggies.shape != ivar.shape): raise("maggies and ivar must be same shape") if(array): for i in np.arange(floor.size): iok = np.where(ivar[:, i] > 0.)[0] if(len(iok) > 0): ferr = 1. / (np.abs(maggies[iok, i]) * np.sqrt(ivar[iok, i])) ioklow = iok[np.where(ferr < floor[i])[0]] ivar[ioklow, i] = 1. / (np.abs(maggies[ioklow, i]) * floor[i])**2 else: for i in np.arange(floor.size): if(ivar[i] > 0.): ferr = 1. / (np.abs(maggies[i]) * np.sqrt(ivar[i])) if(ferr < floor[i]): ivar[i] = 1. / (np.abs(maggies[i]) * floor[i])**2 return(ivar)
[docs] def sdss_asinh_to_maggies(mag=None, mag_err=None, extinction=None): """Calculate maggies and ivar based on catalog parameters Parameters ---------- mag : ndarray of np.float32 [N, 5] or [5] array of asinh magnitudes from SDSS mag_err : ndarray of np.float32 [N, 5] or [5] array of asinh magnitude errors from SDSS extinction : ndarray or list of np.float32 [N, 5] or [5] array of Galactic extinctions from SDSS Returns ------- maggies : ndarray of np.float32 [N, 5] or [5] array of maggies ivar : ndarray of np.float32 [N, 5] or [5] array of inverse variances Notes ----- If extinction set, applies extinction (after converting to maggies). Does not apply AB corrections. If mag_err is None on input, only maggies is returned. """ mag = np.float32(mag) if(mag_err is not None): mag_err = np.float32(mag_err) if(mag.shape != mag_err.shape): raise("mag and mag_err must be the same shape") if(extinction is not None): extinction = np.float32(extinction) if(mag.shape != extinction.shape): raise("mag and extinction must be the same shape") else: extinction = np.zeros(mag.shape, dtype=np.float32) if(mag.shape[-1] != 5): raise("mag must have 5 values per object") b0 = np.array([1.4e-10, 0.9e-10, 1.2e-10, 1.8e-10, 7.4e-10], dtype=np.float32) maggies = np.zeros(mag.shape, dtype=np.float32) for k in np.arange(5, dtype=int): maggies[..., k] = 2. * b0[k] * np.sinh(- np.log(b0[k]) - (0.4 * np.log(10.) * mag[..., k])) maggies[..., k] = (maggies[..., k] * 10.**(0.4 * extinction[..., k])) if(mag_err is not None): maggies_ivar = np.zeros(mag.shape, dtype=np.float32) for k in np.arange(5, dtype=int): maggies_err = (2. * b0[k] * np.cosh(- np.log(b0[k]) - (0.4 * np.log(10.) * mag[..., k])) * 0.4 * np.log(10.) * mag_err[..., k]) maggies_ivar[..., k] = 1. / maggies_err**2 if(mag_err is None): return(maggies) else: return(maggies, maggies_ivar)
[docs] def maggies2flambda(maggies=None, ivar=None, responses=None): """Return arrays of wave, flambda for maggies Parameters ---------- maggies : np.float32 or ndarray of np.float32 maggies to convert ivar : np.float32 or ndarray of np.float32 inverse variance of maggies to convert (or None) responses : list of str names of responses that each maggies corresponds to Returns ------- wave : np.float32 or ndarray of np.float32 effective wavelength of each response curve flux : np.float32 or ndarray of np.float32 flux in erg/s/cm^2/A corresponding to maggies flux_ivar : np.float32 or ndarray of np.float32 inverse variance of flux (if ivar input) Notes ----- If ivar is set on input, output is (wave, flux, flux_ivar). If ivar is not set on input, output is (wave, flux) Effective wavelength is derived from response curve; curve is loaded into the response dictionary if it isn't there already. """ r = kcorrect.response.ResponseDict() for response in responses: if(response not in r): r.load_response(response) wave = np.array([r[x].lambda_eff for x in responses], dtype=np.float32) m2f = sed_ab(wave) if(maggies.ndim == 1): flambda = m2f * maggies if(ivar is not None): fivar = ivar / m2f**2 else: flambda = np.outer(np.ones(maggies.shape[0], dtype=np.float32), m2f) * maggies if(ivar is not None): fivar = np.outer(np.ones(maggies.shape[0], dtype=np.float32), 1. / m2f**2) * ivar if(ivar is not None): return(wave, flambda, fivar) else: return(wave, flambda)