Source code for pyspi.utils.response.spi_response

import os
from datetime import datetime
import numpy as np
import scipy.interpolate as interpolate
from astropy.time.core import Time
from IPython.display import HTML
from numba import njit
from interpolation import interp
import copy

from pyspi.io.get_files import get_files
from pyspi.io.package_data import (get_path_of_internal_data_dir,
                                   get_path_of_external_data_dir)
from pyspi.utils.response.spi_pointing import SPIPointing
from pyspi.utils.response.spi_frame import (transform_icrs_to_spi,
                                            transform_spi_to_icrs)
from pyspi.utils.response.spi_response_data import (ResponseDataPhotopeak,
                                                    ResponseDataRMF)
from pyspi.utils.function_utils import find_needed_ids


[docs]@njit def trapz(y, x): """ Fast trapz integration with numba :param x: x values :param y: y values :returns: Trapz integrated """ return np.trapz(y, x)
[docs]@njit def log_interp1d(x_new, x_old, y_old): """ Linear interpolation in log space for base value pairs (x_old, y_old) for new x_values x_new :param x_old: Old x values used for interpolation :param y_old: Old y values used for interpolation :param x_new: New x values :returns: y_new from liner interpolation in log space """ # log of all logx = np.log10(x_old) logxnew = np.log10(x_new) # Avoid nan entries for yy=0 entries logy = np.log10(np.where(y_old <= 0, 1e-99, y_old)) lin_interp = interp(logx, logy, logxnew) return np.power(10., lin_interp)
[docs]@njit def add_frac(ph_matrix, i, idx, ebounds, einlow, einhigh): """ Recursive Funktion to get the fraction of einlow... """ if idx+1 == len(ebounds): pass elif ebounds[idx+1] >= einhigh: ph_matrix[i, idx] =\ np.min( np.array( [1, (einhigh-ebounds[idx])/ (einhigh-einlow)] ) ) else: frac = np.min(np.array([(ebounds[idx+1]-einlow)/(einhigh-einlow), (ebounds[idx+1]-ebounds[idx])/(einhigh-einlow)])) ph_matrix[i, idx] = frac add_frac(ph_matrix, i, idx+1, ebounds, einlow, einhigh)
@njit(fastmath=True) def _get_xy_pos(azimuth, zenith, xmin, ymin, xbin, ybin): """ Get the xy position on the reponse grid for given azimuth and zenith :param azimuth: Azmiuth angle in rad :param zenith: Zenith angle in rad :param xmin: Smallest x-grid entry :param ymin: Smallest y-grid entry :param xbin: Size of bins in x-direction :param ybin: Size of bins in y-direction :returns: Grid postition (x,y) """ x = np.cos(azimuth)*np.cos(zenith) y = np.sin(azimuth)*np.cos(zenith) z = np.sin(zenith) zenith_pointing = np.arccos(x) azimuth_pointing = np.arctan2(z,y) x_pos = (zenith_pointing * np.cos(azimuth_pointing) - xmin) / xbin y_pos = (zenith_pointing * np.sin(azimuth_pointing) - ymin) / ybin return x_pos, y_pos def _prep_out_pixels(ix_left, ix_right, iy_low, iy_up): """ Simple function to get the 2D-indices of the 4 points defined by given pairs of indices in x and y direction :param ix_left: x-index of the left points :param ix_right: x-index of the right points :param iy_low: y-index of the bottom points :param iy_up: y-index of the top points :returns: array with the 4 2D indices defining the 4 grid points """ left_low = [int(ix_left), int(iy_low)] right_low = [int(ix_right), int(iy_low)] left_up = [int(ix_left), int(iy_up)] right_up = [int(ix_right), int(iy_up)] out = np.array([left_low, right_low, left_up, right_up]).T return out
[docs]def multi_response_irf_read_objects(times, detector, drm='Photopeak'): """ TODO: This is very ugly. Come up with a better way. Function to initalize the needed responses for the given times. Only initalize every needed response version once! Because of memory. One response object needs about 1 GB of RAM... TODO: Not needed at the moment. We need this when we want to analyse many pointings together. :param times: Times of the different sw used :returns: list with correct response version object of the times """ response_versions = [] for time in times: if not time: # Default latest response version response_versions.append(4) elif time < Time(datetime.strptime('031206 060000', '%y%m%d %H%M%S')): response_versions.append(0) elif time < Time(datetime.strptime('040717 082006', '%y%m%d %H%M%S')): response_versions.append(1) elif time < Time(datetime.strptime('090219 095957', '%y%m%d %H%M%S')): response_versions.append(2) elif time < Time(datetime.strptime('100527 124500', '%y%m%d %H%M%S')): response_versions.append(3) else: response_versions.append(4) responses = [None, None, None, None, None] response_irf_read_times = [] for version in response_versions: if responses[version] is None: # Create this response object if drm == "Photopeak": responses[version] = ResponseDataPhotopeak( detector=detector, version=version) else: responses[version] = ResponseDataRMF(version=version) response_irf_read_times.append(responses[version]) return response_irf_read_times
[docs]class ResponseGenerator(object):
[docs] def __init__(self, pointing_id=None, ebounds=None, response_irf_read_object=None, det=None): """ Base Response Class - Here we have everything that stays the same for GRB and Constant Pointsource Reponses :param ebounds: User defined ebins for binned effective area :param response_irf_read_object: Object that holds the read in irf values :param sc_matrix: Matrix to convert SPI coordinate system <-> ICRS :param det: Which detector """ # Get the data, either from afs or from ISDC archive try: # Get the data from the afs server get_files(pointing_id, access="afs") except AssertionError: # Get the files from the iSDC data archive print("AFS data access did not work. " "I will try the ISDC data archive.") get_files(pointing_id, access="isdc") # Read in geometry file to get sc_matrix geometry_file_path = os.path.join(get_path_of_external_data_dir(), 'pointing_data', pointing_id, 'sc_orbit_param.fits.gz') pointing_object = SPIPointing(geometry_file_path) sc_matrix = pointing_object.sc_matrix[10] self._irf_ob = response_irf_read_object self._ebounds = ebounds if ebounds is not None: self.set_binned_data_energy_bounds(ebounds) self._sc_matrix = sc_matrix self._det = det self._pointing_id = pointing_id
[docs] def set_binned_data_energy_bounds(self, ebounds): """ Change the energy bins for the binned effective_area :param ebounds: New ebinedges: ebounds[:-1] start of ebins, ebounds[1:] end of ebins :returns: """ # if the new bins are not the old ones: update them if not np.array_equal(ebounds, self._ebounds): self._ene_min = ebounds[:-1] self._ene_max = ebounds[1:] self._ebounds = ebounds
[docs] def get_xy_pos(self, azimuth, zenith): """ Get xy position (in SPI simulation) for given azimuth and zenith :param azimuth: Azimuth in Sat. coordinates [rad] :param zenith: Zenith in Sat. coordinates [rad] :returns: grid position in (x,y) coordinates """ # we call a numba function here to speed it up return _get_xy_pos(azimuth, zenith, self.irf_ob.irf_xmin, self.irf_ob.irf_ymin, self.irf_ob.irf_xbin, self.irf_ob.irf_ybin)
[docs] def set_location(self, ra, dec): """ Calculate the weighted irfs for the three event types for a given position :param azimuth: Azimuth position in sat frame :param zenith: Zenith position in sat frame :returns: """ # Transform ra, dec from icrs to spi frame azimuth, zenith = transform_icrs_to_spi(ra, dec, self._sc_matrix) self._weighted_irfs(np.deg2rad(azimuth), np.deg2rad(zenith)) self._recalculate_response()
[docs] def set_location_direct_sat_coord(self, azimuth, zenith): """ Calculate the weighted irfs for the three event types for a given position :param azimuth: Azimuth position in sat frame :param zenith: Zenith position in sat frame :returns: ra and dec value """ self._weighted_irfs(np.deg2rad(azimuth), np.deg2rad(zenith)) self._recalculate_response() return transform_spi_to_icrs(azimuth, zenith, self._sc_matrix)
def _weighted_irfs(self, azimuth, zenith): """ Calculate the weighted irfs for the three event types for a given position :param azimuth: Azimuth position in sat frame :param zenith: Zenith position in sat frame :returns: """ raise NotImplementedError("Must be implemented in child class.") def _recalculate_response(self): raise NotImplementedError("Must be implemented in child class.") def _get_irf_weights(self, x_pos, y_pos): """ Get the 4 grid points around (x_pos, y_pos) and the weights for a rectangular interpolation :param x_pos: grid position x-coordinates :param y_pos: grid position y-coordinates :returns: weights, x-indices, y-indices of the 4 closest points """ # get the four nearest neighbors ix_left = np.floor(x_pos) if (x_pos >= 0.0) else np.floor(x_pos) - 1 iy_low = np.floor(y_pos) if (y_pos >= 0.0) else np.floor(y_pos) - 1 ix_right = ix_left + 1 iy_up = iy_low + 1 wgt_right = float(x_pos - ix_left) wgt_up = float(y_pos - iy_low) # pre set the weights wgt = np.zeros(4) # Get the weights if ix_left < 0.: if ix_right < 0.: out = _prep_out_pixels(ix_left, ix_right, iy_low, iy_up) return wgt, out[0], out[1] ix_left = ix_right wgt_left = 0.5 wgt_right = 0.5 elif ix_right >= self.irf_ob.irf_nx: if ix_left >= self.irf_ob.irf_nx: out = _prep_out_pixels(ix_left, ix_right, iy_low, iy_up) return wgt, out[0], out[1] ix_right = ix_left wgt_left = 0.5 wgt_right = 0.5 else: wgt_left = 1. - wgt_right if iy_low < 0: if iy_up < 0: out = _prep_out_pixels(ix_left, ix_right, iy_low, iy_up) return wgt, out[0], out[1] iy_low = iy_up wgt_up = 0.5 wgt_low = 0.5 elif iy_up >= self.irf_ob.irf_ny: if iy_low >= self.irf_ob.irf_ny: out = _prep_out_pixels(ix_left, ix_right, iy_low, iy_up) return wgt, out[0], out[1] iy_up = iy_low wgt_up = 0.5 wgt_low = 0.5 else: wgt_low = 1. - wgt_up wgt[0] = wgt_left * wgt_low wgt[1] = wgt_right * wgt_low wgt[2] = wgt_left * wgt_up wgt[3] = wgt_right * wgt_up out = _prep_out_pixels(ix_left, ix_right, iy_low, iy_up) return wgt, out[0], out[1] @property def irf_ob(self): """ :returns: the irf_read object with the information from the response simulation """ return self._irf_ob @property def det(self): """ :returns: detector number """ return self._det @property def ebounds(self): """ :returns: Ebounds of the analysis """ return self._ebounds @property def ene_min(self): """ :returns: Start of ebounds """ return self._ene_min @property def ene_max(self): """ :returns: End of Ebounds """ return self._ene_max @property def rod(self): """ Ensure that you know what you are doing. :returns: Roland """ return HTML(filename=os.path.join( get_path_of_internal_data_dir(), "Roland.html") )
[docs]class ResponseRMFGenerator(ResponseGenerator):
[docs] def __init__(self, pointing_id=None, monte_carlo_energies=None, ebounds=None, response_irf_read_object=None, det=None, fixed_rsp_matrix=None): """ Init Response object with total RMF used :param pointing_id: The pointing ID for which the response should be valid :param ebound: Ebounds of Ebins :param monte_carlo_energies: Input energy bin edges :param response_irf_read_object: Object that holds the read in irf values :param det: Detector ID :param fixed_rsp_matrix: A fixed response matrix to overload the normal matrix :returns: Object """ assert isinstance(response_irf_read_object, ResponseDataRMF) super().__init__( pointing_id=pointing_id, ebounds=ebounds, response_irf_read_object=response_irf_read_object, det=det) self._monte_carlo_energies = monte_carlo_energies if fixed_rsp_matrix is None: self._rebin_rmfs() self._given_rsp_mat = False self._rsp_matrix = None else: self._given_rsp_mat = True self._rsp_matrix = fixed_rsp_matrix self._weighted_irf_ph = None self._weighted_irf_nonph_1 = None self._weighted_irf_nonph_2 = None
[docs] @classmethod def from_time(cls, time, det, ebounds, monte_carlo_energies, rsp_read_obj, fixed_rsp_matrix=None): """ Init Response object with total RMF used from a time :param time: Time for which to construct the response object :param ebound: Ebounds of Ebins :param monte_carlo_energies: Input energy bin edges :param response_irf_read_object: Object that holds the read in irf values :param det: Detector ID :param fixed_rsp_matrix: A fixed response matrix to overload the normal matrix :returns: Object """ pointing_id = find_needed_ids(time) return cls( pointing_id=pointing_id, monte_carlo_energies=monte_carlo_energies, ebounds=ebounds, response_irf_read_object=rsp_read_obj, det=det, fixed_rsp_matrix=fixed_rsp_matrix )
def _rebin_rmfs(self): """ Rebin the base rmf shape matrices for the given ebounds and incoming energies :returns: """ # Number of ebins on input and output side N_ebins = len(self.ebounds)-1 N_monte_carlo = len(self.monte_carlo_energies) # get the interpolation grid (ein and the log mean of the eout_bins) log_eout_mean = 10**((np.log10(self.ebounds[:-1]) + np.log10(self.ebounds[1:])) / 2.0) eout_width = self.ebounds[1:] - self.ebounds[:-1] xx, yy = np.meshgrid(self.monte_carlo_energies, log_eout_mean) points = np.array((xx.ravel(), yy.ravel())).T # build the interpolation functions for rmf_2 mat and rmf_3 mat base_ebounds = self.irf_ob.ebounds_rmf_2_base base_ebounds_width = base_ebounds[1:]-base_ebounds[:-1] x_base = self.irf_ob.energies_database y_base = 10**((np.log10(base_ebounds[:-1])+ np.log10(base_ebounds[1:]))/2.0) lin_int_rmf2 =\ interpolate.RegularGridInterpolator((x_base, y_base), self.irf_ob.rmf_2_base / base_ebounds_width, bounds_error=False, fill_value=0) lin_int_rmf3 =\ interpolate.RegularGridInterpolator((x_base, y_base), self.irf_ob.rmf_3_base / base_ebounds_width, bounds_error=False, fill_value=0) # call the interpolation and create the correct matrix shape mat2inter = (lin_int_rmf2(points).reshape(N_ebins, N_monte_carlo).T) mat3inter = (lin_int_rmf3(points).reshape(N_ebins, N_monte_carlo).T) # trapz integrate over out bins self._mat2inter = 0.5*(mat2inter[1:]+mat2inter[:-1])*eout_width self._mat3inter = 0.5*(mat3inter[1:]+mat3inter[:-1])*eout_width # normalize the rows norm_fact = np.sum(self._mat2inter, axis=1) # Normalize this - is this correct? self._mat2inter[norm_fact>0] /= norm_fact[norm_fact>0][:, np.newaxis] self._mat2inter[norm_fact<=0] = 0 norm_fact = np.sum(self._mat3inter, axis=1) # Normalize this - is this correct? self._mat3inter[norm_fact>0] /= norm_fact[norm_fact>0][:, np.newaxis] self._mat3inter[norm_fact<=0] = 0 self._ph_matrix = np.zeros((N_monte_carlo-1, N_ebins)) for i, (einlow, einhigh) in \ enumerate(zip(self.monte_carlo_energies[:-1], self.monte_carlo_energies[1:])): if (not einhigh < self.ebounds[0]) and (not einlow > self.ebounds[-1]): # find id and fraction idx = np.argwhere(self.ebounds > einlow)[0, 0]-1 if idx==-1: idx=0 add_frac(self._ph_matrix, i, idx, self.ebounds, einlow, einhigh) def _weighted_irfs(self, azimuth, zenith): """ Calculate the weighted irfs for the three event types for a given position :param azimuth: Azimuth position in sat frame :param zenith: Zenith position in sat frame :returns: """ # get the x,y position on the grid x, y = self.get_xy_pos(azimuth, zenith) # compute the weights between the grids wgt, xx, yy = self._get_irf_weights(x, y) # If outside of the response pattern set response to zero try: # select these points on the grid and weight them together self._weighted_irf_ph = \ self.irf_ob.irfs_photopeak[:, self._det, xx, yy].dot(wgt) self._weighted_irf_nonph_1 = \ self.irf_ob.irfs_nonphoto_1[:, self._det, xx, yy].dot(wgt) self._weighted_irf_nonph_2 = \ self.irf_ob.irfs_nonphoto_2[:, self._det, xx, yy].dot(wgt) except IndexError: self._weighted_irf_ph =\ np.zeros_like(self.irf_ob.irfs_photopeak[:, self._det, 20, 20]) self._weighted_irf_nonph_1\ = np.zeros_like(self.irf_ob.irfs_nonphoto_1[:, self._det, 20, 20]) self._weighted_irf_nonph_2\ = np.zeros_like(self.irf_ob.irfs_nonphoto_2[:, self._det, 20, 20]) def _recalculate_response(self): """ Get response for the current position :returns: """ if self._given_rsp_mat: self._matrix = self._rsp_matrix else: ein_bins = np.empty((len(self._monte_carlo_energies)-1, 2)) ein_bins[:, 0] = self._monte_carlo_energies[:-1] ein_bins[:, 1] = self._monte_carlo_energies[1:] monte_carlo_log_mean = 10**(( np.log10(self._monte_carlo_energies[1:]) + np.log10(self._monte_carlo_energies[:-1]))/2.0) interph = log_interp1d(monte_carlo_log_mean, self.irf_ob.energies_database, self._weighted_irf_ph) inter2 = log_interp1d(monte_carlo_log_mean, self.irf_ob.energies_database, self._weighted_irf_nonph_1) inter3 = log_interp1d(monte_carlo_log_mean, self.irf_ob.energies_database, self._weighted_irf_nonph_2) # TODO clean up these .T calls and maybe change # the trapz to logmean mat1 = (inter2*self._mat2inter.T).T mat2 = (inter3*self._mat3inter.T).T # Add photopeak to the ebin with the photon energy within # its bounds self._transpose_matrix = mat1+mat2+(interph*self._ph_matrix.T).T self._matrix = self._transpose_matrix.T
[docs] def clone(self): """ Clone this response object :returns: cloned response """ return ResponseRMFGenerator( pointing_id=copy.deepcopy(self._pointing_id), monte_carlo_energies=copy.deepcopy(self.monte_carlo_energies), ebounds=copy.deepcopy(self.ebounds), response_irf_read_object=self.irf_ob, det=copy.deepcopy(self.det), fixed_rsp_matrix=copy.deepcopy(self._rsp_matrix) )
@property def matrix(self): """ :returns: response matrix """ return self._matrix @property def transpose_matrix(self): """ :returns: transposed response matrix """ return self._transpose_matrix @property def monte_carlo_energies(self): """ :returns: Input energies for response """ return self._monte_carlo_energies
[docs]class ResponsePhotopeakGenerator(ResponseGenerator):
[docs] def __init__(self, pointing_id=None, ebounds=None, response_irf_read_object=None, det=None): """ Init Response object with photopeak only :param pointing_id: The pointing ID for which the response should be valid :param ebound: Ebounds of Ebins :param response_irf_read_object: Object that holds the read in irf values :param det: Detector ID """ assert isinstance(response_irf_read_object, ResponseDataPhotopeak) # call init of base class super(ResponsePhotopeakGenerator, self).__init__( pointing_id, ebounds, response_irf_read_object, det) self._effective_area = None self._weighted_irf_ph = None
def _weighted_irfs(self, azimuth, zenith): """ Calculate the weighted irfs for the three event types for a given position :param azimuth: Azimuth position in sat frame :param zenith: Zenith position in sat frame :returns: """ # get the x,y position on the grid x, y = self.get_xy_pos(azimuth, zenith) # compute the weights between the grids wgt, xx, yy = self._get_irf_weights(x, y) # If outside of the response pattern set response to zero try: # select these points on the grid and weight them together self._weighted_irf_ph = \ self.irf_ob.irfs_photopeak[:,self._det, xx, yy].dot(wgt) except IndexError: self._weighted_irf_ph =\ np.zeros_like(self.irf_ob.irfs_photopeak[:, self._det, 20, 20]) def _recalculate_response(self): """ Get response for a given det :param det: Detector ID :returns: Full DRM """ #n_energy_bins = len(self._ebounds) - 1 ebins = np.empty((len(self._ene_min), 2)) eff_area = np.empty_like(ebins) ebins[:, 0] = self._ene_min ebins[:, 1] = self._ene_max inter = log_interp1d(self.ebounds, self.irf_ob.energies_database, self._weighted_irf_ph) eff_area[:, 0] = inter[:-1] eff_area[:, 1] = inter[1:] self._effective_area = trapz(eff_area, ebins)/(self._ene_max - self._ene_min)
[docs] @classmethod def from_time(cls, time, det, ebounds, rsp_read_obj,): """ Init Response object with photopeak only :param time: The time for which the response should be valid :param ebound: Ebounds of Ebins :param response_irf_read_object: Object that holds the read in irf values :param det: Detector ID :return: Object """ pointing_id = find_needed_ids(time) return cls( pointing_id=pointing_id, ebounds=ebounds, response_irf_read_object=rsp_read_obj, det=det, )
@property def effective_area(self): """ :returns: vector with photopeak effective area """ return self._effective_area