Source code for pyspi.utils.data_builder.time_series_builder

from astropy.time.core import Time, TimeDelta
import yaml
import os
import numpy as np
from datetime import datetime
import h5py
from astropy.io import fits


from pyspi.io.get_files import get_files
from pyspi.io.package_data import get_path_of_external_data_dir
from pyspi.utils.livedets import double_names, triple_names
from pyspi.utils.function_utils import find_needed_ids, ISDC_MJD_to_cxcsec, \
    get_time_object

from threeML.io.file_utils import sanitize_filename
from threeML.utils.time_series.event_list import EventListWithDeadTime,\
    EventListWithLiveTime
from threeML.utils.time_series.binned_spectrum_series import \
    BinnedSpectrumSeries
from threeML.utils.spectrum.binned_spectrum import BinnedSpectrumWithDispersion
from threeML.utils.data_builders.time_series_builder import TimeSeriesBuilder


[docs]class SPISWFile(object):
[docs] def __init__(self, det, pointing_id, ebounds): """ Class to read in all the data needed from a SCW file for a given config file :param config: Config yml filename, Config object or dict :param det: For which detector? """ # General nameing self._det_name = f"Detector {det}" self._mission = "Integral/SPI" self._ebounds = ebounds # How many echans? self._n_channels = len(self._ebounds)-1 # Check that det is a valid number self._det = det assert self._det in np.arange(85), f"{self._det} is not a valid"\ " detector. Please only use detector ids between 0 and 84." # Find the SW ID of the data file we need for this time self._pointing_id = pointing_id # Get the data, either from afs or from ISDC archive try: # Get the data from the afs server get_files(pointing_id, access="afs") except AssertionError: # Get the files from the iSDC data archive print("AFS data access did not work." " I will try the ISDC data archive.") get_files(pointing_id, access="isdc") # Read in all we need self._read_in_pointing_data(self._pointing_id) self._get_deadtime_info()
def _read_in_pointing_data(self, pointing_id): """ Gets all needed information from the data file for the given pointing_id :param pointing_id: pointing_id for which we want the data :returns: """ with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', pointing_id, 'spi_oper.fits.gz')) as hdu_oper: # Get time of first and last event (t0 at grb time) time_sgl = ISDC_MJD_to_cxcsec(hdu_oper[1].data['time']) time_psd = ISDC_MJD_to_cxcsec(hdu_oper[2].data['time']) time_me2 = ISDC_MJD_to_cxcsec(hdu_oper[4].data['time']) time_me3 = ISDC_MJD_to_cxcsec(hdu_oper[5].data['time']) time_start = np.min(np.concatenate([time_sgl, time_psd, time_me2, time_me3])) time_stop = np.max(np.concatenate([time_sgl, time_psd, time_me2, time_me3])) # renorm time start to 0 time_sgl -= time_start time_psd -= time_start time_me2 -= time_start time_me3 -= time_start self._time_start = 0 self._time_stop = time_stop-time_start # save the norm time, we need this when we read in the dead time info self._norm_time = time_start # Read in the data for the wanted detector # For single events we have to take both the non_psd # (often called sgl here...) # and the psd events. Both added together # give the real single events. if self._det in range(19) or self._det == "singles": dets_sgl = hdu_oper[1].data['DETE'] if self._det != "singles": time_sgl = time_sgl[dets_sgl == self._det] energy_sgl = hdu_oper[1].data['energy'][dets_sgl == self._det] else: energy_sgl = hdu_oper[1].data['energy'] dets_psd = hdu_oper[2].data['DETE'] if self._det != "singles": time_psd = time_psd[dets_psd == self._det] energy_psd = hdu_oper[2].data['energy'][dets_psd == self._det] else: energy_psd = hdu_oper[2].data['energy'] if self._det in range(19, 61): dets_me2 = np.sort(hdu_oper[4].data['DETE'], axis=1) i, k = double_names[self._det] mask = np.logical_and(dets_me2[:, 0] == i, dets_me2[:, 1] == k) time_me2 = time_me2[mask] energy_me2 = np.sum(hdu_oper[4].data['energy'][mask], axis=1) if self._det in range(61,85): dets_me3 = np.sort(hdu_oper[5].data['DETE'], axis=1) i, j, k = triple_names[self._det] mask = np.logical_and(np.logical_and(dets_me3[:, 0] == i, dets_me3[:, 1] == j), dets_me3[:, 2] == k) time_me3 = time_me3[mask] energy_me3 = np.sum(hdu_oper[5].data['energy'][mask], axis=1) if self._det in range(19): self._times = time_psd self._energies = energy_psd self._times = np.append(self._times, time_sgl) self._energies = np.append(self._energies, energy_sgl) # sort in time sort_array = np.argsort(self._times) self._times = self._times[sort_array] self._energies = self._energies[sort_array] if self._det in range(19, 61): self._times = time_me2 self._energies = energy_me2 if self._det in range(61, 85): self._times = time_me3 self._energies = energy_me3 # Check if there are any counts if np.sum(self._energies) == 0: raise AssertionError(f"The detector {self._det} has zero counts" " and is therefore not active." " Please exclude this detector!") # Bin this in the energy bins we have self._energy_bins = np.ones_like(self._energies, dtype=int)*-1 # Loop over ebins for i, (emin, emax) in enumerate(zip(self._ebounds[:-1], self._ebounds[1:])): mask = np.logical_and(self._energies > emin, self._energies < emax) self._energy_bins[mask] = np.ones_like(self._energy_bins[mask])*i # Throw away all events that have energies outside of the ebounds that # should be used mask = self._energy_bins == -1 self._energy_bins = self._energy_bins[~mask] self._times = self._times[~mask] self._energies = self._energies[~mask] def _get_deadtime_info(self): """ Get the deadtime info from the hk file :returns: """ # read in the OB-Time (on-board time) and the ISDC-MJD times # of the events from the data file. We will need this to # convert the OB-Times given in the hk file into ISDC-MJD time with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'spi_oper.fits.gz')) as hdu_oper: ob_time = hdu_oper["SPI.-OSGL-ALL"].data["OB_TIME"] time = hdu_oper["SPI.-OSGL-ALL"].data["TIME"] # calulate the true on-board time from the 4 given values # in the fits files. See: # (https://www.isdc.unige.ch/integral/support/faq.cgi?DATA-006) ob_time_data = (ob_time[:, 0]*655363**3 + ob_time[:, 1]*655362**2 + ob_time[:, 2]*65536 + ob_time[:, 3]) # read in hk file with the deadtime per 1 second intervall # in units of 100 nano seconds deadtimes = np.array([]) with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'spi_science_hk.fits.gz')) as hdu_oper: for i in range(19): deadtimes = np.append(deadtimes, hdu_oper["SPI.-SCHK-HRW"]. data[f"P__DF__CAFDT__L{i}"]) ob_time = hdu_oper["SPI.-SCHK-CNV"].data["OB_TIME"] # reshape to get shape (num_det, num_timebins) # and multipy by 100*10**-9 to get the deadtime in # seconds deadtimes = deadtimes.reshape(19, -1)*100*10**-9 # again get true OB-Time ob_time_hk = (ob_time[:, 0]*655363**3 + ob_time[:, 1]*655362**2 + ob_time[:, 2]*65536 + ob_time[:, 3]) # use linear interpolation of OB-Time/ISDC-MJD pairs from # the data file to get the ISDC-MJD time of the hk time bins times_hk = np.interp(ob_time_hk, ob_time_data, time) # add end of last time bin (+1 second in units of days) times_hk = np.append(times_hk, times_hk[-1]+(1.0/(24*3600))) # save time and deadtime - add end of self._deadtime_bin_edges = ISDC_MJD_to_cxcsec(times_hk) - \ self._norm_time self._deadtimes = deadtimes[self._det] @property def geometry_file_path(self): """ :returns: Path to the spacecraft geometry file """ return os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'sc_orbit_param.fits.gz') @property def times(self): """ :returns: times of detected events """ return self._times @property def energies(self): """ :returns: energies of detected events """ return self._energies @property def energy_bins(self): """ :returns: energy bin number of every event """ return self._energy_bins @property def ebounds(self): """ :returns: ebounds of analysis """ return self._ebounds @property def det(self): """ :returns: detector ID """ return self._det @property def n_channels(self): """ :returns: number energy channels """ return self._n_channels @property def time_start(self): """ :returns: start time of lightcurve """ return self._time_start @property def time_stop(self): """ :returns: stop time of lightcurve """ return self._time_stop @property def det_name(self): """ :returns: Name det """ return self._det_name @property def mission(self): """ :returns: Name Mission """ return self._mission @property def deadtime_bin_starts(self): """ :returns: Start time of time bins which have the deadtime information """ return self._deadtime_bin_edges[:-1] @property def deadtime_bin_stops(self): """ :returns: Stop time of time bins which have the deadtime information """ return self._deadtime_bin_edges[1:] @property def deadtimes_per_interval(self): """ :returns: Deadtime per time bin which have the deadtime information """ return self._deadtimes @property def livetimes_per_interval(self): """ :returns: Livetime per time bin which have the deadtime information """ return 1-self._deadtimes
[docs]class SPISWFileGRB(object):
[docs] def __init__(self, det, ebounds, time_of_grb, sgl_type=None): """ Class to read in all the data needed from a SCW file for a given grbtime :param det: For which detector? :param ebounds: Ebounds for the Analysis. :param time_of_grb: Time of the GRB as "YYMMDD HHMMSS" :param sgl_type: Which type of single events? Only normal sgl, psd or both? :returns: Object """ self._det_name = f"Detector {det}" self._mission = "Integral/SPI" # Set ebounds of energy bins self._ebounds = ebounds # How many echans? self._n_channels = len(self._ebounds)-1 # Time of GRB. Needed to get the correct pointing. self._time_of_GRB = get_time_object(time_of_grb) # Check that det is a valid number self._det = det assert self._det in np.arange(85), f"{self._det} is not a valid"\ "detector. Please only use detector ids between 0 and 84." if self._det < 19: assert sgl_type is not None, "Only PSD Events?" assert sgl_type in ["psd", "sgl", "both"], \ "sgl_type must be psd, sgl or both" self._sgl_type = sgl_type # Find the SW ID of the data file we need for this time self._pointing_id = find_needed_ids(self._time_of_GRB) # Get the data, either from afs or from ISDC archive try: # Get the data from the afs server get_files(self._pointing_id, access="afs") except AssertionError: # Get the files from the iSDC data archive print("AFS data access did not work." " I will try the ISDC data archive.") get_files(self._pointing_id, access="isdc") # Reference time of GRB self._GRB_ref_time_cxcsec =\ ISDC_MJD_to_cxcsec((self._time_of_GRB).tt.mjd-51544) # Read in all we need self._read_in_pointing_data() self._get_deadtime_info()
def _read_in_pointing_data(self): """ Gets all needed information from the data file for the given pointing_id :param pointing_id: pointing_id for which we want the data :returns: """ with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'spi_oper.fits.gz')) as hdu_oper: # Get time of first and last event (t0 at grb time) time_sgl = ISDC_MJD_to_cxcsec(hdu_oper[1].data['time']) \ - self._GRB_ref_time_cxcsec time_psd = ISDC_MJD_to_cxcsec(hdu_oper[2].data['time']) \ - self._GRB_ref_time_cxcsec time_me2 = ISDC_MJD_to_cxcsec(hdu_oper[4].data['time']) \ - self._GRB_ref_time_cxcsec time_me3 = ISDC_MJD_to_cxcsec(hdu_oper[5].data['time']) \ - self._GRB_ref_time_cxcsec # Get time of first and last detected photon self._time_start = np.min(np.concatenate([time_sgl, time_psd, time_me2, time_me3])) self._time_stop = np.max(np.concatenate([time_sgl, time_psd, time_me2, time_me3])) # Read in the data for the wanted detector # For single events we have to take both the non_psd # (often called sgl here...) and the psd events. # Both added together give the real single events. if self._det in range(19): dets_sgl = hdu_oper[1].data['DETE'] time_sgl = time_sgl[dets_sgl == self._det] energy_sgl = hdu_oper[1].data['energy'][dets_sgl == self._det] dets_psd = hdu_oper[2].data['DETE'] time_psd = time_psd[dets_psd == self._det] energy_psd = hdu_oper[2].data['energy'][dets_psd == self._det] if self._det in range(19, 61): dets_me2 = np.sort(hdu_oper[4].data['DETE'], axis=1) i, k = double_names[self._det] mask = np.logical_and(dets_me2[:, 0] == i, dets_me2[:, 1] == k) time_me2 = time_me2[mask] energy_me2 = np.sum(hdu_oper[4].data['energy'][mask], axis=1) if self._det in range(61,85): dets_me3 = np.sort(hdu_oper[5].data['DETE'], axis=1) i, j, k = triple_names[self._det] mask = np.logical_and(np.logical_and(dets_me3[:, 0] == i, dets_me3[:, 1] == j), dets_me3[:, 2] == k) time_me3 = time_me3[mask] energy_me3 = np.sum(hdu_oper[5].data['energy'][mask], axis=1) if self._det in range(19): self._times = np.array([]) self._energies = np.array([]) # Use the single events with the given flag if self._sgl_type == "psd" or self._sgl_type == "both": self._times = np.append(self._times, time_psd) self._energies = np.append(self._energies, energy_psd) if self._sgl_type == "sgl" or self._sgl_type == "both": self._times = np.append(self._times, time_sgl) self._energies = np.append(self._energies, energy_sgl) # sort in time sort_array = np.argsort(self._times) self._times = self._times[sort_array] self._energies = self._energies[sort_array] if self._det in range(19, 61): self._times = time_me2 self._energies = energy_me2 if self._det in range(61, 85): self._times = time_me3 self._energies = energy_me3 # Check if there are any counts if np.sum(self._energies) == 0: raise AssertionError(f"The detector {self._det} has zero counts " "and is therefore not active. " "Please exclude this detector!") # Assign the corresponsing energy channel number to every detected # energy self._energy_bins = np.ones_like(self._energies, dtype=int)*-1 # Loop over ebins for i, (emin, emax) in enumerate(zip(self._ebounds[:-1], self._ebounds[1:])): mask = np.logical_and(self._energies > emin, self._energies < emax) self._energy_bins[mask] = np.ones_like(self._energy_bins[mask])*i # Throw away all events that have energies outside of the ebounds that # should be used mask = self._energy_bins == -1 self._energy_bins = self._energy_bins[~mask] self._times = self._times[~mask] self._energies = self._energies[~mask] def _get_deadtime_info(self): """ Get the deadtime info from the hk file :returns: """ # read in the OB-Time (on-board time) and the ISDC-MJD times # of the events from the data file. We will need this to # convert the OB-Times given in the hk file into ISDC-MJD time with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'spi_oper.fits.gz')) as hdu_oper: ob_time = hdu_oper["SPI.-OSGL-ALL"].data["OB_TIME"] time = hdu_oper["SPI.-OSGL-ALL"].data["TIME"] # calulate the true on-board time from the 4 given values # in the fits files. See: # (https://www.isdc.unige.ch/integral/support/faq.cgi?DATA-006) ob_time_data = (ob_time[:, 0]*655363**3 + ob_time[:, 1]*655362**2 + ob_time[:, 2]*65536 + ob_time[:, 3]) # read in hk file with the deadtime per 1 second intervall # in units of 100 nano seconds deadtimes = np.array([]) with fits.open(os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'spi_science_hk.fits.gz')) as hdu_oper: for i in range(19): deadtimes = np.append(deadtimes, hdu_oper["SPI.-SCHK-HRW"]. data[f"P__DF__CAFDT__L{i}"]) ob_time = hdu_oper["SPI.-SCHK-CNV"].data["OB_TIME"] # reshape to get shape (num_det, num_timebins) # and multipy by 100*10**-9 to get the deadtime in # seconds deadtimes = deadtimes.reshape(19, -1)*100*10**-9 # again get true OB-Time ob_time_hk = (ob_time[:, 0]*655363**3 + ob_time[:, 1]*655362**2 + ob_time[:, 2]*65536 + ob_time[:, 3]) # use linear interpolation of OB-Time/ISDC-MJD pairs from # the data file to get the ISDC-MJD time of the hk time bins times_hk = np.interp(ob_time_hk, ob_time_data, time) # add end of last time bin (+1 second in units of days) times_hk = np.append(times_hk, times_hk[-1]+(1.0/(24*3600))) # save time and deadtime - add end of self._deadtime_bin_edges = ISDC_MJD_to_cxcsec(times_hk) - \ self._GRB_ref_time_cxcsec self._deadtimes = deadtimes[self._det] @property def geometry_file_path(self): """ :returns: Path to the spacecraft geometry file """ return os.path.join(get_path_of_external_data_dir(), 'pointing_data', self._pointing_id, 'sc_orbit_param.fits.gz') @property def times(self): """ :returns: times of detected events """ return self._times @property def energies(self): """ :returns: energies of detected events """ return self._energies @property def energy_bins(self): """ :returns: energy bin number of every event """ return self._energy_bins @property def ebounds(self): """ :returns: ebounds of analysis """ return self._ebounds @property def det(self): """ :returns: detector ID """ return self._det @property def n_channels(self): """ :returns: number energy channels """ return self._n_channels @property def time_start(self): """ :returns: start time of lightcurve """ return self._time_start @property def time_stop(self): """ :returns: stop time of lightcurve """ return self._time_stop @property def det_name(self): """ :returns: Name det """ return self._det_name @property def mission(self): """ :returns: Name Mission """ return self._mission @property def deadtime_bin_starts(self): """ :returns: Start time of time bins which have the deadtime information """ return self._deadtime_bin_edges[:-1] @property def deadtime_bin_stops(self): """ :returns: Stop time of time bins which have the deadtime information """ return self._deadtime_bin_edges[1:] @property def deadtimes_per_interval(self): """ :returns: Deadtime per time bin which have the deadtime information """ return self._deadtimes @property def livetimes_per_interval(self): """ :returns: Livetime per time bin which have the deadtime information """ return 1-self._deadtimes
[docs]class TimeSeriesBuilderSPI(TimeSeriesBuilder):
[docs] def __init__( self, name, time_series, response=None, poly_order=-1, verbose=True, restore_poly_fit=None, container_type=BinnedSpectrumWithDispersion, **kwargs ): """ Class to build the time_series for SPI. Inherited from the 3ML TimeSeriesBuilder with added class methods to build the object for SPI datafiles. :param name: Name of the tsb :param time_series: Timeseries with the data :param response: Response object :param poly_order: poly order for the polynominal fitting :param verbose: Verbose? :param restore_poly_fit: Path to a file with the poly bkg fits :param containter_type: ContainerType for spectrum :returns: Object """ super(TimeSeriesBuilderSPI, self).__init__( name, time_series, response=response, poly_order=poly_order, unbinned=False, verbose=verbose, restore_poly_fit=restore_poly_fit, container_type=container_type, **kwargs )
[docs] @classmethod def from_spi_grb(cls, name, det, time_of_grb, ebounds=None, response=None, sgl_type=None, restore_background=None, poly_order=0, verbose=True ): """ Class method to build the time_series_builder for a given GRB time :param name: Name of object :param det: Which det? :param ebounds: Output ebounds for analysis. :param time_of_grb: Astropy time object with the time of the GRB (t0) :param response: InstrumenResponse Object :param sgl_type: What kind of sinlge events? Standard single events? PSD events? Or both? :param restore_background: File to restore bkg :param poly_order: Which poly_order? -1 gives automatic determination :param verbose: Verbose? :returns: Initalized TimeSeriesBuilder object """ assert ebounds is not None or response is not None, "You have to "\ "either specify ebounds or input a response object." if ebounds is not None and response is not None: # check that the ebounds match assert np.all(response.ebounds == ebounds), "Ebounds do not "\ "match the ones in the response object!" elif response is not None: # ebounds is None in this case ebounds = response.ebounds # Get an object with all the needed information spi_grb_setup = SPISWFileGRB(det, ebounds, time_of_grb, sgl_type=sgl_type) event_list = EventListWithLiveTime( arrival_times=spi_grb_setup.times, measurement=spi_grb_setup.energy_bins, n_channels=spi_grb_setup.n_channels, start_time=spi_grb_setup.time_start, stop_time=spi_grb_setup.time_stop, live_time=spi_grb_setup.livetimes_per_interval, live_time_starts=spi_grb_setup.deadtime_bin_starts, live_time_stops=spi_grb_setup.deadtime_bin_stops, first_channel=0, instrument=spi_grb_setup.det_name, mission=spi_grb_setup.mission, verbose=verbose, edges=spi_grb_setup.ebounds, ) return cls( name, event_list, poly_order=poly_order, verbose=verbose, restore_poly_fit=restore_background, response=response, container_type=BinnedSpectrumWithDispersion )
[docs] @classmethod def from_spi_constant_pointing(cls, det=0, pointing_id="118900570010", ebounds=None, response=None, ): """ Class method to build the time_series_builder for a given pointing id :param det: Which det? :param ebounds: Output ebounds for analysis. :param pointing_id: Pointing ID :param response: InstrumenResponse Object :returns: Initalized TimeSeriesBuilder object """ assert ebounds is not None or response is not None, "You have to "\ "either specify ebounds or input a response object." if ebounds is not None and response is not None: # check that the ebounds match assert np.all(response.ebounds == ebounds), "Ebounds do not "\ "match the ones in the response object!" elif response is not None: # ebounds is None in this case ebounds = response.ebounds spi_grb_setup1 = SPISWFile(det, pointing_id, ebounds) event_list = EventListWithLiveTime( arrival_times=spi_grb_setup1.times, measurement=spi_grb_setup1.energy_bins, n_channels=spi_grb_setup1.n_channels, start_time=spi_grb_setup1.time_start, stop_time=spi_grb_setup1.time_stop, live_time=spi_grb_setup1.livetimes_per_interval, live_time_starts=spi_grb_setup1.deadtime_bin_starts, live_time_stops=spi_grb_setup1.deadtime_bin_stops, first_channel=0, instrument=spi_grb_setup1.det_name, mission=spi_grb_setup1.mission, verbose=False, edges=spi_grb_setup1.ebounds, ) tsb = cls(f"SPIID{pointing_id}D{det}", event_list, verbose=True, response=response, container_type=BinnedSpectrumWithDispersion) # set active time to total time of sw tsb.set_active_time_interval(f"{spi_grb_setup1.times[0]}-" f"{spi_grb_setup1.times[-1]}") return tsb