import collections
from typing import Optional
import numpy as np
from astromodels import Parameter, Model
from astromodels.functions.priors import Cosine_Prior, Uniform_prior
from threeML import PluginPrototype
from threeML.io.file_utils import sanitize_filename
from threeML.plugins.DispersionSpectrumLike import DispersionSpectrumLike
from threeML.plugins.SpectrumLike import SpectrumLike
from threeML.io.logging import setup_logger
from pyspi.utils.response.spi_drm import SPIDRM
log = setup_logger(__name__)
[docs]class SPILike(DispersionSpectrumLike):
"""
Plugin for the data of SPI, based on PySPI
"""
[docs] def __init__(
self,
name: str,
observation,
background,
bkg_base_array,
free_position: bool,
verbose: bool = True,
**kwargs
):
"""
Init the plugin for a constant source analysis with PySPI
:param name: Name of plugin
:param observation: observed spectrum
:param background: background spectrum
:param bkg_base_array: Base array for background model
:param free_position: Free the position in the fit?
:param verbose: Verbose?
:returns: Object
"""
self._free_position: bool = free_position
if not isinstance(
observation.response, SPIDRM
):
log.error("The response associated with the observation"
" is not a SPIDRM")
raise AssertionError()
super(SPILike, self).__init__(name,
observation,
background,
verbose,
**kwargs)
self._bkg_base_array = bkg_base_array
self._bkg_array = np.ones(len(self._bkg_base_array))
[docs] def set_model(self, likelihood_model: Model) -> None:
"""
Set the model to be used in the joint minimization.
:param likelihood_model: likelihood model instance
:returns:
"""
super(SPILike, self).set_model(likelihood_model)
if self._free_position:
log.info(f"Freeing the position of {self.name} and setting priors")
for key in self._like_model.point_sources.keys():
self._like_model.point_sources[key].position.ra.free = True
self._like_model.point_sources[key].position.dec.free = True
self._like_model.point_sources[key].position.ra.prior = \
Uniform_prior(lower_bound=0.0, upper_bound=360)
self._like_model.point_sources[key].position.dec.prior = \
Cosine_Prior(lower_bound=-90.0, upper_bound=90)
ra = self._like_model.point_sources[key].position.ra.value
dec = self._like_model.point_sources[key].position.dec.value
else:
for key in self._like_model.point_sources.keys():
ra = self._like_model.point_sources[key].position.ra.value
dec = self._like_model.point_sources[key].position.dec.value
self._response.set_location(ra, dec)
def _evaluate_model(self, precalc_fluxes=None):
"""
Evaluate the model
:param precalc_fluxes: Precaclulated flux of spectrum
:returns: model counts
"""
source = super(SPILike, self)._evaluate_model(precalc_fluxes=
precalc_fluxes)
self._update_bkg_array()
bkg = self._bkg_array*self._bkg_base_array
return source+bkg
[docs] def get_model(self, precalc_fluxes: Optional[np.ndarray] = None) -> np.ndarray:
"""
Get the model
:param precalc_fluxes: Precaclulated flux of spectrum
:returns: model counts
"""
if self._free_position:
# assumes that the is only one point source which is how
# it should be!
ra, dec = self._like_model.get_point_source_position(0)
self._response.set_location(ra, dec)
return super(SPILike, self).get_model(precalc_fluxes=precalc_fluxes)
def _add_bkg_nuisance_parameter(self, bkg_parameters) -> None:
"""
Add the bkg parameter. Are saved as array.
:param bkg_parameters:
:returns:
"""
self._bkg_parameters = bkg_parameters
for parameter in bkg_parameters:
self.nuisance_parameters[parameter.name] = parameter
self._bkg_array = np.ones(len(bkg_parameters))
def _update_bkg_array(self) -> None:
"""
Update the array with the background parameter
:returns:
"""
for key in self._like_model.parameters.keys():
if "bkg" in key:
idx = int(key.split("_")[-1])
self._bkg_array[idx] = self._like_model.parameters[key].value
[docs] def set_free_position(self, flag):
"""
Set the free position flag
:param flag: True or False
:returns:
"""
self._free_position = flag
[docs] @classmethod
def from_spectrumlike(
cls,
spectrum_like,
bkg_base_array,
free_position=False
):
"""
Generate SPILikeGRB from an existing SpectrumLike child
:param spectrum_like: SpectrumLike child
:param rsp_object: Response object
:free_position: Free the position? boolean
:returns: Initialized Object
"""
return cls(
spectrum_like.name,
spectrum_like._observed_spectrum,
spectrum_like._background_spectrum,
bkg_base_array,
free_position,
spectrum_like._verbose,
)
[docs]class SPILikeGRB(DispersionSpectrumLike):
"""
Plugin for the data of SPI, based on PySPI
"""
[docs] def __init__(
self,
name,
observation,
background=None,
free_position=False,
verbose=True,
**kwargs
):
"""
Init the plugin for a GRB analysis with PySPI
:param name: Name of plugin
:param observation: observed spectrum
:param background: background spectrum
:param free_position: Free the position in the fit?
:param verbose: Verbose?
"""
self._free_position = free_position
assert isinstance(
observation.response, SPIDRM
), "The response associated with the observation is not a SPIDRM"
super(SPILikeGRB, self).__init__(name,
observation,
background,
verbose,
**kwargs)
[docs] def set_model(self, likelihood_model):
"""
Set the model to be used in the joint minimization.
:param likelihood_model: likelihood model instance
:returns:
"""
super(SPILikeGRB, self).set_model(likelihood_model)
if self._free_position:
print("Freeing the position of %s and setting priors" % self.name)
for key in self._like_model.point_sources.keys():
self._like_model.point_sources[key].position.ra.free = True
self._like_model.point_sources[key].position.dec.free = True
self._like_model.point_sources[key].position.ra.prior = \
Uniform_prior(lower_bound=0.0, upper_bound=360)
self._like_model.point_sources[key].position.dec.prior = \
Cosine_Prior(lower_bound=-90.0, upper_bound=90)
ra = self._like_model.point_sources[key].position.ra.value
dec = self._like_model.point_sources[key].position.dec.value
else:
for key in self._like_model.point_sources.keys():
ra = self._like_model.point_sources[key].position.ra.value
dec = self._like_model.point_sources[key].position.dec.value
self._response.set_location(ra, dec)
[docs] def get_model(self, precalc_fluxes=None):
"""
Get the model
:param precalc_fluxes: Precaclulated flux of spectrum
:returns: model counts
"""
if self._free_position:
# assumes that the is only one point source which is how
# it should be!
ra, dec = self._like_model.get_point_source_position(0)
self._response.set_location(ra, dec)
return super(SPILikeGRB, self).get_model(precalc_fluxes=precalc_fluxes)
[docs] def set_free_position(self, flag):
"""
Set the free position flag
:param flag: True or False
:returns:
"""
self._free_position = flag
[docs] @classmethod
def from_spectrumlike(
cls, spectrum_like, free_position=False
):
"""
Generate SPILikeGRB from an existing SpectrumLike child
:param spectrum_like: SpectrumLike child
:param rsp_object: Response object
:free_position: Free the position? boolean
:returns: Initialized Object
"""
return cls(
spectrum_like.name,
spectrum_like._observed_spectrum,
spectrum_like._background_spectrum,
free_position,
spectrum_like._verbose,
)