"""Metrics for treating IRFs following GADF."""
from pathlib import Path
from typing import override
import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.table import QTable, Table
from gammapy.irf import load_irf_dict_from_file
from hist import axis
from pyirf.utils import cone_solid_angle
from ...benchmark import Benchmark
from ...compare_1d import CompareByCat1D
from ...comparers import ComparisonResult
from ...metric import IRFMetric
from ...plotting import PerformancePoster
from ...store import MetricsStore, ResultStore
from .sensitivity import ENERGY_BINS, OFFSET_BINS, make_sensitivity_curve
class EffectiveAreaIRFMetric(IRFMetric):
"""Metric for treating Effective Area IRF in GADF."""
@classmethod
def _from_specific_table(cls, irf_table):
if isinstance(irf_table, Table):
irf_table = irf_table[0]
e_bins, offsets = cls._get_metric_bins_from_table(
irf_table, ["ENERG"], ["THETA"]
)
new_metric = EffectiveAreaIRFMetric(
axis_list=[
axis.StrCategory(offsets, name="Offset"),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
label="Effective Area",
unit=irf_table["EFFAREA"].unit,
)
new_metric._storage[...] = irf_table["EFFAREA"].value
return new_metric
class EnergyMigrationIRFMetric(IRFMetric):
"""Metric for treating Energy Migration IRF in GADF."""
@classmethod
def _from_specific_table(cls, irf_table):
"""Create class instance from an irf table."""
if issubclass(type(irf_table), Table):
irf_table = irf_table[0]
m_bins, e_bins, offsets = cls._get_metric_bins_from_table(
irf_table, ["MIGRA", "ENERG"], ["THETA"]
)
new_metric = EnergyMigrationIRFMetric(
[
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
axis.Regular(
bins=len(m_bins) - 1,
start=m_bins[0],
stop=m_bins[-1],
name="Migration",
),
],
label="Energy Migration",
unit="",
)
new_metric._storage[...] = np.swapaxes(irf_table["MATRIX"], 1, 2)
return new_metric
class RadMaxIRFMetric(IRFMetric):
"""Metric for treating Effective Area IRF in GADF."""
@classmethod
def _from_specific_table(cls, irf_table):
"""Create class instance from an irf table."""
if issubclass(type(irf_table), Table):
irf_table = irf_table[0]
e_bins, offsets = cls._get_metric_bins_from_table(
irf_table, ["ENERG"], ["THETA"]
)
new_metric = RadMaxIRFMetric(
[
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
label="$R_{max}$",
unit=irf_table["RAD_MAX"].unit,
)
new_metric._storage[...] = irf_table["RAD_MAX"]
return new_metric
class Background2dIRFMetric(IRFMetric):
"""Metric for treating 2d Background IRF in GADF."""
@classmethod
def _from_specific_table(cls, irf_table):
"""Create class instance from an irf table."""
if issubclass(type(irf_table), Table):
irf_table = irf_table[0]
e_bins, offsets = cls._get_metric_bins_from_table(
irf_table, ["ENERG"], ["THETA"]
)
new_metric = Background2dIRFMetric(
[
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
label="Background Rate",
unit=irf_table["BKG"].unit,
)
new_metric._storage[...] = irf_table["BKG"]
return new_metric
def get_offset_averaged_rate(self):
"""Get background rate averaged over all offsets."""
old, offsets, e_bins = self.to_numpy()
new = np.zeros_like(old)
bin_solid_angle = np.diff(cone_solid_angle(offsets * u.degree))
for idx, cent in enumerate(offsets[1:]):
bin_content, _ = self[idx, :].to_numpy()
new[idx, :] = bin_content * bin_solid_angle[idx]
tot_solid_angle = cone_solid_angle(offsets[-1] * u.degree)
bkg = new.sum(axis=0) / tot_solid_angle / u.s / u.MeV
return bkg, e_bins
def compare(self, others: list):
"""Compare several instances of the class.
Uses `self` as reference when comparing with `others`. Plots the spectra
on top of each other, and calculates the wasserstein distance.
Parameters
----------
others : list
List containing other instances of this class.
Returns
-------
ComparisonResult
Results of comparison of this reference to the others list.
"""
comparer = CompareByCat1D(self.axes[0])
fig = comparer.plot_compare_by_category(self, others)
measures = comparer.distance_by_category(self, others)
return ComparisonResult(
self.__class__.__name__, {"Background": fig}, measures, None
)
class Background3dIRFMetric(IRFMetric):
"""Metric for treating 3d Background IRF in GADF."""
@classmethod
def _from_specific_table(cls, irf_table):
"""Create class instance from an irf table."""
if issubclass(type(irf_table), Table):
irf_table = irf_table[0]
e_bins, x_bins, y_bins = cls._get_metric_bins_from_table(
irf_table, ["ENERG", "DETX", "DETY"], []
)
new_metric = Background3dIRFMetric(
[
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
axis.Regular(
bins=len(x_bins) - 1,
start=x_bins[0].value,
stop=x_bins[-1].value,
metadata=dict(unit=x_bins.unit),
name="DETX",
),
axis.Regular(
bins=len(y_bins) - 1,
start=y_bins[0].value,
stop=y_bins[-1].value,
metadata=dict(unit=y_bins.unit),
name="DETY",
),
],
label="Background Rate",
unit=irf_table["BKG"].unit,
)
if irf_table["BKG"].shape[0] == (len(e_bins) - 1):
new_metric._storage[...] = irf_table["BKG"]
else:
new_metric._storage[...] = irf_table["BKG"].T
return new_metric
def get_offset_averaged_rate(self, offset_max=5.0 * u.degree):
"""Get background rate averaged over all offsets."""
bkg_h, e_bins, xbins, ybins = self.to_numpy()
bkg_h = bkg_h / u.sr / u.s / u.MeV
xbins, ybins = xbins * u.degree, ybins * u.degree
xcent = np.convolve(xbins, 2 * [0.5], mode="valid")
ycent = np.convolve(ybins, 2 * [0.5], mode="valid")
xg, yg = np.meshgrid(xcent, ycent)
off = np.sqrt(xg**2 + yg**2)
sel = (off <= offset_max)[np.newaxis, ...]
bkg_h *= np.diff(xbins).mean() * np.diff(ybins).mean()
bkg_rate = (bkg_h * sel).sum(axis=(1, 2)) / cone_solid_angle(offset_max)
return bkg_rate, e_bins
def get_rate_in_offset_bins(self, fov_bins):
"""Get background rate averaged over offset bins."""
n_fov_bins = len(fov_bins) - 1
bkg_h, e_bins, xbins, ybins = self.to_numpy()
bkg_h = bkg_h / u.sr / u.s / u.MeV
e_bins = e_bins * u.TeV
rad_bkg = np.zeros((n_fov_bins, len(e_bins) - 1)) / u.sr / u.s / u.MeV
xbins, ybins = xbins * u.degree, ybins * u.degree
bkg_h *= np.diff(xbins).mean() * np.diff(ybins).mean()
xcent = np.convolve(xbins, 2 * [0.5], mode="valid")
ycent = np.convolve(ybins, 2 * [0.5], mode="valid")
xg, yg = np.meshgrid(xcent, ycent)
off = np.sqrt(xg**2 + yg**2)
cones = np.diff(cone_solid_angle(fov_bins))
for idx, (lo, hi) in enumerate(zip(fov_bins[:-1], fov_bins[1:])):
sel = ((lo <= off) & (off <= hi))[np.newaxis, ...]
rad_bkg[idx, :] = (bkg_h * sel).sum(axis=(1, 2)) / cones[idx]
return rad_bkg, e_bins
class SensitivityIRFMetric(IRFMetric):
"""Metric for saving differential sensitivity."""
@classmethod
def from_sensitivity_dict(cls, sens_dict, output_store=None):
"""Instantiate Sensitivity metric from special dict."""
e_bins = sens_dict["energy_bins"]
off_bins = sens_dict["offset_bins"]
new_metric = SensitivityIRFMetric(
[
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
axis.Regular(
bins=len(off_bins) - 1,
start=off_bins[0].value,
stop=off_bins[-1].value,
metadata=dict(unit=off_bins.unit),
name="Offset",
),
],
label="Sensitivity",
unit=sens_dict["sens"].unit,
)
tmp = sens_dict["sens"].as_array()
res_data = tmp.view(np.float64).reshape(tmp.shape + (-1,))
new_metric._storage[...] = res_data[...]
new_metric.update_name_with_store(output_store)
return new_metric
@classmethod
def _from_specific_table(cls, bench_table):
"""Create class instance from an irf table."""
if issubclass(type(bench_table), Table):
bench_table = bench_table[0]
e_bins, offsets = cls._get_metric_bins_from_table(
bench_table, ["ENERG"], ["THETA"]
)
new_metric = SensitivityIRFMetric(
[
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
unit=bench_table["ENERGY_FLUX_SENSITIVITY"].unit,
)
new_metric._storage[...] = bench_table["ENERGY_FLUX_SENSITIVITY"]
return new_metric
class AngularResIRFMetric(IRFMetric):
"""Metric for treating 2d Angular resolution table."""
@classmethod
def _from_specific_table(cls, table):
"""Create class instance from an irf table."""
if issubclass(type(table), Table):
table = table[0]
e_bins, offsets = cls._get_metric_bins_from_table(table, ["ENERG"], ["THETA"])
# try to find the angular resolution column. In older files it's just
# ANGULAR_RESOULUTION, but in the latest outputs there are multiple, per
# confidence interval.
try:
angres_column = next(
key
for key in ["ANGULAR_RESOLUTION_68", "ANGULAR_RESOLUTION"]
if key in table.columns
)
except StopIteration:
raise KeyError(
"Couldn't find the expected column in the IRF benchmark table: "
"ANGULAR_RESULTION_68 or ANGULAR_RESOLUTION "
)
new_metric = AngularResIRFMetric(
[
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
label="Angular Resolution",
unit=table[angres_column].unit,
)
new_metric._storage[...] = table[angres_column]
return new_metric
class EnergyBiasResIRFMetric(IRFMetric):
"""Metric for treating 2d Angular resolution table."""
@classmethod
def _from_specific_table(cls, table):
"""Create class instance from an irf table."""
if issubclass(type(table), Table):
table = table[0]
e_bins, offsets = cls._get_metric_bins_from_table(table, ["ENERG"], ["THETA"])
new_metric = EnergyBiasResIRFMetric(
[
axis.StrCategory(
["Bias", "Resolution"],
name="Kind",
),
axis.StrCategory(
offsets,
name="Offset",
),
axis.Regular(
bins=len(e_bins) - 1,
start=e_bins[0].value,
stop=e_bins[-1].value,
name="Energy",
metadata=dict(unit=e_bins.unit),
transform=axis.transform.log,
),
],
unit="",
)
# Official Prod5 irfs have bias in different binning than resolution :(
# we can't deal with that right now so just skip bias in that case
if table["BIAS"].shape[1] == table["RESOLUTION"].shape[1]:
new_metric._storage["Bias", ...] = table["BIAS"]
new_metric._storage["Resolution", ...] = table["RESOLUTION"]
return new_metric
[docs]
class IRFBenchmark(Benchmark):
"""Top level performance benchmarks.
Coverst requirements:
PROG-0020, PROG-0030, PROG-0040, PROG-0050,
PROG-0060, PROG-0070, PROG-0080, PROG-0090
For generation, requires:
* GADF IRF files
"""
metric_path = Path("dl3/instrument_response_functions")
def __init__(self, do_irf_sensitivity=False):
super().__init__()
self.do_irf_sensitivity = do_irf_sensitivity
[docs]
@override
def outputs(self) -> dict:
return {
"background": self.metric_path / "Background.asdf",
"effective area": self.metric_path / "EffectiveArea.asdf",
"energy migration": self.metric_path / "EnergyMigration.asdf",
"rad max": self.metric_path / "RadMax.asdf",
"angular resolution": self.metric_path / "AngularResolution.asdf",
"energy bias resolution": self.metric_path / "EnergyBiasResolution.asdf",
"sensitivity": self.metric_path / "Sensitivity.asdf",
# Path for irf sensitivity defined in `generate_metrics` only when needed
}
def _make_sensitivity_curves(self, irfs, energy_edges, offset_edges):
"""Make sensitivity curves."""
# Evaluate sens at midpoint of bins
offsets = np.convolve(offset_edges, 2 * [0.5], mode="valid")
e_ref = np.sqrt(energy_edges[1:] * energy_edges[:-1])
result = QTable({"e_ref": e_ref})
for off in offsets:
sens = make_sensitivity_curve(
irfs, result["e_ref"], enclosure=False, conf={"point_offset": off}
)
result.add_column(sens["e2dnde"], name=str(off))
result.remove_column("e_ref")
return {
"sens": result,
"energy_bins": energy_edges,
"offset_bins": offset_edges,
}
def _load_all_tables(self, fits_file):
"""Load all tables."""
tabs = fits.open(fits_file).info(output=False)[1:]
return {itm[1]: QTable.read(fits_file, hdu=itm[0]) for itm in tabs}
def _select_offset(self, offset, dct):
"""Select offset."""
out_dict = {}
for key, met in dct.items():
try:
slc = (offset, slice(None))
if key == "energy bias resolution":
key = "energy resolution"
slc = ("Resolution", offset, slice(None))
if key == "energy migration":
slc = (offset, slice(None), slice(None))
if (isinstance(met, list) and len(met[0].axes) == 0) or (
not isinstance(met, list) and len(met.axes) == 0
):
out_dict[key] = met
continue
if key == "rad max":
continue
if not isinstance(met, list):
out_dict[key] = met[slc]
else:
out_lst = []
for itm in met:
out_lst.append(itm[slc])
out_dict[key] = out_lst
except KeyError:
self._log.error(f"{key} IRF does not have offsets this far: {offset}")
return False
return out_dict
@override
@property
def required_inputs(self) -> set[str]:
return set(["dl3_irf", "dl3_benchmark"])
[docs]
@override
def generate_metrics(self, metric_store: MetricsStore):
irf_file = metric_store.get_inputdata().dl3_irf
benchmark_file = metric_store.get_inputdata().dl3_benchmark
if self.do_irf_sensitivity:
self.outputs()["irf sensitivity"] = (
self.metric_path / "IRF_Sensitivity.asdf"
)
gp_irfs = load_irf_dict_from_file(irf_file)
irf_sens = SensitivityIRFMetric.from_sensitivity_dict(
self._make_sensitivity_curves(
gp_irfs, ENERGY_BINS * u.TeV, OFFSET_BINS * u.degree
),
metric_store,
)
metric_store.store_data(irf_sens, self.outputs()["irf sensitivity"])
bench_irfs = self._load_all_tables(irf_file)
effar = EffectiveAreaIRFMetric.from_table(
bench_irfs["EFFECTIVE AREA"], metric_store
)
if len(bench_irfs["BACKGROUND"]["BKG"].shape) == 3:
bkg = Background2dIRFMetric.from_table(
bench_irfs["BACKGROUND"], metric_store
)
else:
bkg = Background3dIRFMetric.from_table(
bench_irfs["BACKGROUND"], metric_store
)
if "RAD MAX" in bench_irfs:
radma = RadMaxIRFMetric.from_table(bench_irfs["RAD MAX"], metric_store)
else:
radma = RadMaxIRFMetric([])
if "ENERGY DISPERSION" in bench_irfs:
edisp = EnergyMigrationIRFMetric.from_table(
bench_irfs["ENERGY DISPERSION"], metric_store
)
if "ENERGY MIGRATION" in bench_irfs:
edisp = EnergyMigrationIRFMetric.from_table(
bench_irfs["ENERGY MIGRATION"], metric_store
)
outputs = self.outputs()
metric_store.store_data(bkg, outputs["background"])
metric_store.store_data(edisp, outputs["energy migration"])
metric_store.store_data(effar, outputs["effective area"])
metric_store.store_data(radma, outputs["rad max"])
if benchmark_file:
bench = self._load_all_tables(benchmark_file)
biasres = EnergyBiasResIRFMetric.from_table(
bench["ENERGY BIAS RESOLUTION"], metric_store
)
angres = AngularResIRFMetric.from_table(
bench["ANGULAR RESOLUTION"], metric_store
)
sens = SensitivityIRFMetric.from_table(bench["SENSITIVITY"], metric_store)
else:
self._log.warning(
"The input didn't include `benchmark_file`, Skipping metrics for energy, angular, and sensitivity."
)
biasres = EnergyBiasResIRFMetric([])
angres = AngularResIRFMetric([])
sens = SensitivityIRFMetric([])
metric_store.store_data(sens, outputs["sensitivity"])
metric_store.store_data(angres, outputs["angular resolution"])
metric_store.store_data(biasres, outputs["energy bias resolution"])
[docs]
@override
def compare_to_reference(
self, metric_store_list: list[MetricsStore], result_store: ResultStore
):
# load the histograms in to a dict by the dataset name:
metrics = {k: [] for k in self.outputs().keys()}
bench_header = {}
bench_header["name"] = "Performance Plot"
bench_header["code"] = "CTA-PROD Sensitivity"
bench_header["inputs"] = [
itm.metadata["input_dataset"].to_dict() for itm in metric_store_list
]
result_store.metadata["benchmarks"][self.metric_path] = bench_header
poster = PerformancePoster()
for name, metric in metrics.items():
for ds in metric_store_list:
metric.append(ds.retrieve_data(self.outputs()[name]))
metric[-1].label = ds.name
# BUG: this doesn't work when the list is length 1.
# Should use something like ref, *other = metrics_store_list
ref, other = {}, {}
for key, lst in metrics.items():
ref[key] = lst.pop(0)
other[key] = lst
offsets = ref["sensitivity"].axes["Offset"]
for off in offsets:
ref_off = self._select_offset(off, ref)
oth_off = self._select_offset(off, other)
if not ref_off or not oth_off:
continue
fig = poster.gernot_plot(ref_off, oth_off)
fig.suptitle(f"Source Offset: {off}")
off_result = ComparisonResult(
name="Performance",
plots={f"PerformancePlot_Offset_{off}": fig},
measure=None,
tests=None,
)
off_result.store(result_store, self.metric_path)