Source code for datapipe_testbench.benchmarks.dl3.irf_performance

"""Metrics for treating IRFs following GADF."""

from pathlib import Path
from typing import override

import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.table import QTable, Table
from gammapy.irf import load_irf_dict_from_file
from hist import axis
from pyirf.utils import cone_solid_angle

from ...benchmark import Benchmark
from ...compare_1d import CompareByCat1D
from ...comparers import ComparisonResult
from ...metric import IRFMetric
from ...plotting import PerformancePoster
from ...store import MetricsStore, ResultStore
from .sensitivity import ENERGY_BINS, OFFSET_BINS, make_sensitivity_curve


class EffectiveAreaIRFMetric(IRFMetric):
    """Metric for treating Effective Area IRF in GADF."""

    @classmethod
    def _from_specific_table(cls, irf_table):
        if isinstance(irf_table, Table):
            irf_table = irf_table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(
            irf_table, ["ENERG"], ["THETA"]
        )

        new_metric = EffectiveAreaIRFMetric(
            axis_list=[
                axis.StrCategory(offsets, name="Offset"),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            label="Effective Area",
            unit=irf_table["EFFAREA"].unit,
        )
        new_metric._storage[...] = irf_table["EFFAREA"].value
        return new_metric


class EnergyMigrationIRFMetric(IRFMetric):
    """Metric for treating Energy Migration IRF in GADF."""

    @classmethod
    def _from_specific_table(cls, irf_table):
        """Create class instance from an irf table."""
        if issubclass(type(irf_table), Table):
            irf_table = irf_table[0]
        m_bins, e_bins, offsets = cls._get_metric_bins_from_table(
            irf_table, ["MIGRA", "ENERG"], ["THETA"]
        )
        new_metric = EnergyMigrationIRFMetric(
            [
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
                axis.Regular(
                    bins=len(m_bins) - 1,
                    start=m_bins[0],
                    stop=m_bins[-1],
                    name="Migration",
                ),
            ],
            label="Energy Migration",
            unit="",
        )
        new_metric._storage[...] = np.swapaxes(irf_table["MATRIX"], 1, 2)
        return new_metric


class RadMaxIRFMetric(IRFMetric):
    """Metric for treating Effective Area IRF in GADF."""

    @classmethod
    def _from_specific_table(cls, irf_table):
        """Create class instance from an irf table."""
        if issubclass(type(irf_table), Table):
            irf_table = irf_table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(
            irf_table, ["ENERG"], ["THETA"]
        )
        new_metric = RadMaxIRFMetric(
            [
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            label="$R_{max}$",
            unit=irf_table["RAD_MAX"].unit,
        )
        new_metric._storage[...] = irf_table["RAD_MAX"]
        return new_metric


class Background2dIRFMetric(IRFMetric):
    """Metric for treating 2d Background IRF in GADF."""

    @classmethod
    def _from_specific_table(cls, irf_table):
        """Create class instance from an irf table."""
        if issubclass(type(irf_table), Table):
            irf_table = irf_table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(
            irf_table, ["ENERG"], ["THETA"]
        )
        new_metric = Background2dIRFMetric(
            [
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            label="Background Rate",
            unit=irf_table["BKG"].unit,
        )
        new_metric._storage[...] = irf_table["BKG"]
        return new_metric

    def get_offset_averaged_rate(self):
        """Get background rate averaged over all offsets."""
        old, offsets, e_bins = self.to_numpy()
        new = np.zeros_like(old)
        bin_solid_angle = np.diff(cone_solid_angle(offsets * u.degree))
        for idx, cent in enumerate(offsets[1:]):
            bin_content, _ = self[idx, :].to_numpy()
            new[idx, :] = bin_content * bin_solid_angle[idx]

        tot_solid_angle = cone_solid_angle(offsets[-1] * u.degree)
        bkg = new.sum(axis=0) / tot_solid_angle / u.s / u.MeV
        return bkg, e_bins

    def compare(self, others: list):
        """Compare several instances of the class.

        Uses `self` as reference when comparing with `others`. Plots the spectra
        on top of each other, and calculates the wasserstein distance.

        Parameters
        ----------
        others : list
            List containing other instances of this class.

        Returns
        -------
        ComparisonResult
            Results of comparison of this reference to the others list.
        """
        comparer = CompareByCat1D(self.axes[0])

        fig = comparer.plot_compare_by_category(self, others)
        measures = comparer.distance_by_category(self, others)

        return ComparisonResult(
            self.__class__.__name__, {"Background": fig}, measures, None
        )


class Background3dIRFMetric(IRFMetric):
    """Metric for treating 3d Background IRF in GADF."""

    @classmethod
    def _from_specific_table(cls, irf_table):
        """Create class instance from an irf table."""
        if issubclass(type(irf_table), Table):
            irf_table = irf_table[0]
        e_bins, x_bins, y_bins = cls._get_metric_bins_from_table(
            irf_table, ["ENERG", "DETX", "DETY"], []
        )
        new_metric = Background3dIRFMetric(
            [
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
                axis.Regular(
                    bins=len(x_bins) - 1,
                    start=x_bins[0].value,
                    stop=x_bins[-1].value,
                    metadata=dict(unit=x_bins.unit),
                    name="DETX",
                ),
                axis.Regular(
                    bins=len(y_bins) - 1,
                    start=y_bins[0].value,
                    stop=y_bins[-1].value,
                    metadata=dict(unit=y_bins.unit),
                    name="DETY",
                ),
            ],
            label="Background Rate",
            unit=irf_table["BKG"].unit,
        )
        if irf_table["BKG"].shape[0] == (len(e_bins) - 1):
            new_metric._storage[...] = irf_table["BKG"]
        else:
            new_metric._storage[...] = irf_table["BKG"].T
        return new_metric

    def get_offset_averaged_rate(self, offset_max=5.0 * u.degree):
        """Get background rate averaged over all offsets."""
        bkg_h, e_bins, xbins, ybins = self.to_numpy()
        bkg_h = bkg_h / u.sr / u.s / u.MeV
        xbins, ybins = xbins * u.degree, ybins * u.degree

        xcent = np.convolve(xbins, 2 * [0.5], mode="valid")
        ycent = np.convolve(ybins, 2 * [0.5], mode="valid")
        xg, yg = np.meshgrid(xcent, ycent)
        off = np.sqrt(xg**2 + yg**2)
        sel = (off <= offset_max)[np.newaxis, ...]
        bkg_h *= np.diff(xbins).mean() * np.diff(ybins).mean()
        bkg_rate = (bkg_h * sel).sum(axis=(1, 2)) / cone_solid_angle(offset_max)
        return bkg_rate, e_bins

    def get_rate_in_offset_bins(self, fov_bins):
        """Get  background rate averaged over offset bins."""
        n_fov_bins = len(fov_bins) - 1
        bkg_h, e_bins, xbins, ybins = self.to_numpy()
        bkg_h = bkg_h / u.sr / u.s / u.MeV

        e_bins = e_bins * u.TeV
        rad_bkg = np.zeros((n_fov_bins, len(e_bins) - 1)) / u.sr / u.s / u.MeV

        xbins, ybins = xbins * u.degree, ybins * u.degree
        bkg_h *= np.diff(xbins).mean() * np.diff(ybins).mean()

        xcent = np.convolve(xbins, 2 * [0.5], mode="valid")
        ycent = np.convolve(ybins, 2 * [0.5], mode="valid")
        xg, yg = np.meshgrid(xcent, ycent)
        off = np.sqrt(xg**2 + yg**2)

        cones = np.diff(cone_solid_angle(fov_bins))

        for idx, (lo, hi) in enumerate(zip(fov_bins[:-1], fov_bins[1:])):
            sel = ((lo <= off) & (off <= hi))[np.newaxis, ...]
            rad_bkg[idx, :] = (bkg_h * sel).sum(axis=(1, 2)) / cones[idx]

        return rad_bkg, e_bins


class SensitivityIRFMetric(IRFMetric):
    """Metric for saving differential sensitivity."""

    @classmethod
    def from_sensitivity_dict(cls, sens_dict, output_store=None):
        """Instantiate Sensitivity metric from special dict."""
        e_bins = sens_dict["energy_bins"]
        off_bins = sens_dict["offset_bins"]
        new_metric = SensitivityIRFMetric(
            [
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
                axis.Regular(
                    bins=len(off_bins) - 1,
                    start=off_bins[0].value,
                    stop=off_bins[-1].value,
                    metadata=dict(unit=off_bins.unit),
                    name="Offset",
                ),
            ],
            label="Sensitivity",
            unit=sens_dict["sens"].unit,
        )
        tmp = sens_dict["sens"].as_array()
        res_data = tmp.view(np.float64).reshape(tmp.shape + (-1,))

        new_metric._storage[...] = res_data[...]
        new_metric.update_name_with_store(output_store)
        return new_metric

    @classmethod
    def _from_specific_table(cls, bench_table):
        """Create class instance from an irf table."""
        if issubclass(type(bench_table), Table):
            bench_table = bench_table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(
            bench_table, ["ENERG"], ["THETA"]
        )

        new_metric = SensitivityIRFMetric(
            [
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            unit=bench_table["ENERGY_FLUX_SENSITIVITY"].unit,
        )
        new_metric._storage[...] = bench_table["ENERGY_FLUX_SENSITIVITY"]
        return new_metric


class AngularResIRFMetric(IRFMetric):
    """Metric for treating 2d Angular resolution table."""

    @classmethod
    def _from_specific_table(cls, table):
        """Create class instance from an irf table."""
        if issubclass(type(table), Table):
            table = table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(table, ["ENERG"], ["THETA"])

        # try to find the angular resolution column. In older files it's just
        # ANGULAR_RESOULUTION, but in the latest outputs there are multiple, per
        # confidence interval.
        try:
            angres_column = next(
                key
                for key in ["ANGULAR_RESOLUTION_68", "ANGULAR_RESOLUTION"]
                if key in table.columns
            )
        except StopIteration:
            raise KeyError(
                "Couldn't find the expected column in the IRF benchmark table: "
                "ANGULAR_RESULTION_68 or ANGULAR_RESOLUTION "
            )

        new_metric = AngularResIRFMetric(
            [
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            label="Angular Resolution",
            unit=table[angres_column].unit,
        )

        new_metric._storage[...] = table[angres_column]
        return new_metric


class EnergyBiasResIRFMetric(IRFMetric):
    """Metric for treating 2d Angular resolution table."""

    @classmethod
    def _from_specific_table(cls, table):
        """Create class instance from an irf table."""
        if issubclass(type(table), Table):
            table = table[0]
        e_bins, offsets = cls._get_metric_bins_from_table(table, ["ENERG"], ["THETA"])

        new_metric = EnergyBiasResIRFMetric(
            [
                axis.StrCategory(
                    ["Bias", "Resolution"],
                    name="Kind",
                ),
                axis.StrCategory(
                    offsets,
                    name="Offset",
                ),
                axis.Regular(
                    bins=len(e_bins) - 1,
                    start=e_bins[0].value,
                    stop=e_bins[-1].value,
                    name="Energy",
                    metadata=dict(unit=e_bins.unit),
                    transform=axis.transform.log,
                ),
            ],
            unit="",
        )
        # Official Prod5 irfs have bias in different binning than resolution :(
        # we can't deal with that right now so just skip bias in that case
        if table["BIAS"].shape[1] == table["RESOLUTION"].shape[1]:
            new_metric._storage["Bias", ...] = table["BIAS"]
        new_metric._storage["Resolution", ...] = table["RESOLUTION"]
        return new_metric


[docs] class IRFBenchmark(Benchmark): """Top level performance benchmarks. Coverst requirements: PROG-0020, PROG-0030, PROG-0040, PROG-0050, PROG-0060, PROG-0070, PROG-0080, PROG-0090 For generation, requires: * GADF IRF files """ metric_path = Path("dl3/instrument_response_functions") def __init__(self, do_irf_sensitivity=False): super().__init__() self.do_irf_sensitivity = do_irf_sensitivity
[docs] @override def outputs(self) -> dict: return { "background": self.metric_path / "Background.asdf", "effective area": self.metric_path / "EffectiveArea.asdf", "energy migration": self.metric_path / "EnergyMigration.asdf", "rad max": self.metric_path / "RadMax.asdf", "angular resolution": self.metric_path / "AngularResolution.asdf", "energy bias resolution": self.metric_path / "EnergyBiasResolution.asdf", "sensitivity": self.metric_path / "Sensitivity.asdf", # Path for irf sensitivity defined in `generate_metrics` only when needed }
def _make_sensitivity_curves(self, irfs, energy_edges, offset_edges): """Make sensitivity curves.""" # Evaluate sens at midpoint of bins offsets = np.convolve(offset_edges, 2 * [0.5], mode="valid") e_ref = np.sqrt(energy_edges[1:] * energy_edges[:-1]) result = QTable({"e_ref": e_ref}) for off in offsets: sens = make_sensitivity_curve( irfs, result["e_ref"], enclosure=False, conf={"point_offset": off} ) result.add_column(sens["e2dnde"], name=str(off)) result.remove_column("e_ref") return { "sens": result, "energy_bins": energy_edges, "offset_bins": offset_edges, } def _load_all_tables(self, fits_file): """Load all tables.""" tabs = fits.open(fits_file).info(output=False)[1:] return {itm[1]: QTable.read(fits_file, hdu=itm[0]) for itm in tabs} def _select_offset(self, offset, dct): """Select offset.""" out_dict = {} for key, met in dct.items(): try: slc = (offset, slice(None)) if key == "energy bias resolution": key = "energy resolution" slc = ("Resolution", offset, slice(None)) if key == "energy migration": slc = (offset, slice(None), slice(None)) if (isinstance(met, list) and len(met[0].axes) == 0) or ( not isinstance(met, list) and len(met.axes) == 0 ): out_dict[key] = met continue if key == "rad max": continue if not isinstance(met, list): out_dict[key] = met[slc] else: out_lst = [] for itm in met: out_lst.append(itm[slc]) out_dict[key] = out_lst except KeyError: self._log.error(f"{key} IRF does not have offsets this far: {offset}") return False return out_dict @override @property def required_inputs(self) -> set[str]: return set(["dl3_irf", "dl3_benchmark"])
[docs] @override def generate_metrics(self, metric_store: MetricsStore): irf_file = metric_store.get_inputdata().dl3_irf benchmark_file = metric_store.get_inputdata().dl3_benchmark if self.do_irf_sensitivity: self.outputs()["irf sensitivity"] = ( self.metric_path / "IRF_Sensitivity.asdf" ) gp_irfs = load_irf_dict_from_file(irf_file) irf_sens = SensitivityIRFMetric.from_sensitivity_dict( self._make_sensitivity_curves( gp_irfs, ENERGY_BINS * u.TeV, OFFSET_BINS * u.degree ), metric_store, ) metric_store.store_data(irf_sens, self.outputs()["irf sensitivity"]) bench_irfs = self._load_all_tables(irf_file) effar = EffectiveAreaIRFMetric.from_table( bench_irfs["EFFECTIVE AREA"], metric_store ) if len(bench_irfs["BACKGROUND"]["BKG"].shape) == 3: bkg = Background2dIRFMetric.from_table( bench_irfs["BACKGROUND"], metric_store ) else: bkg = Background3dIRFMetric.from_table( bench_irfs["BACKGROUND"], metric_store ) if "RAD MAX" in bench_irfs: radma = RadMaxIRFMetric.from_table(bench_irfs["RAD MAX"], metric_store) else: radma = RadMaxIRFMetric([]) if "ENERGY DISPERSION" in bench_irfs: edisp = EnergyMigrationIRFMetric.from_table( bench_irfs["ENERGY DISPERSION"], metric_store ) if "ENERGY MIGRATION" in bench_irfs: edisp = EnergyMigrationIRFMetric.from_table( bench_irfs["ENERGY MIGRATION"], metric_store ) outputs = self.outputs() metric_store.store_data(bkg, outputs["background"]) metric_store.store_data(edisp, outputs["energy migration"]) metric_store.store_data(effar, outputs["effective area"]) metric_store.store_data(radma, outputs["rad max"]) if benchmark_file: bench = self._load_all_tables(benchmark_file) biasres = EnergyBiasResIRFMetric.from_table( bench["ENERGY BIAS RESOLUTION"], metric_store ) angres = AngularResIRFMetric.from_table( bench["ANGULAR RESOLUTION"], metric_store ) sens = SensitivityIRFMetric.from_table(bench["SENSITIVITY"], metric_store) else: self._log.warning( "The input didn't include `benchmark_file`, Skipping metrics for energy, angular, and sensitivity." ) biasres = EnergyBiasResIRFMetric([]) angres = AngularResIRFMetric([]) sens = SensitivityIRFMetric([]) metric_store.store_data(sens, outputs["sensitivity"]) metric_store.store_data(angres, outputs["angular resolution"]) metric_store.store_data(biasres, outputs["energy bias resolution"])
[docs] @override def compare_to_reference( self, metric_store_list: list[MetricsStore], result_store: ResultStore ): # load the histograms in to a dict by the dataset name: metrics = {k: [] for k in self.outputs().keys()} bench_header = {} bench_header["name"] = "Performance Plot" bench_header["code"] = "CTA-PROD Sensitivity" bench_header["inputs"] = [ itm.metadata["input_dataset"].to_dict() for itm in metric_store_list ] result_store.metadata["benchmarks"][self.metric_path] = bench_header poster = PerformancePoster() for name, metric in metrics.items(): for ds in metric_store_list: metric.append(ds.retrieve_data(self.outputs()[name])) metric[-1].label = ds.name # BUG: this doesn't work when the list is length 1. # Should use something like ref, *other = metrics_store_list ref, other = {}, {} for key, lst in metrics.items(): ref[key] = lst.pop(0) other[key] = lst offsets = ref["sensitivity"].axes["Offset"] for off in offsets: ref_off = self._select_offset(off, ref) oth_off = self._select_offset(off, other) if not ref_off or not oth_off: continue fig = poster.gernot_plot(ref_off, oth_off) fig.suptitle(f"Source Offset: {off}") off_result = ComparisonResult( name="Performance", plots={f"PerformancePlot_Offset_{off}": fig}, measure=None, tests=None, ) off_result.store(result_store, self.metric_path)