"""
FileBackend module for reading and extracting data from HDF5 files in a directory structure.
Provides the FileBackend class implementing BackendAPI for scanning, parsing, and fetching observation data.
"""
import logging
import math
import re
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any
import h5py
import hdf5plugin # noqa: F401
from astropy.time import Time
from ctapipe.io import read_table
from ctapipe.io.hdf5dataformat import (
DL1_SUBARRAY_QUALITY_GROUP,
DL1_TEL_QUALITY_GROUP,
)
from .base import BackendAPI, DataItem, ObservationInfo
logger = logging.getLogger(__name__)
[docs]
class FileBackend(BackendAPI):
"""
Backend for reading data directly from HDF5 files.
Assumes directory structure: /prefix/site/date/files
"""
def __init__(self, data_dir: str):
"""
Initialize file backend.
Args:
data_dir: Base directory prefix containing site subdirectories (ctao-north, ctao-south)
"""
self.data_prefix = Path(data_dir)
if not self.data_prefix.exists():
raise FileNotFoundError(
f"Data prefix directory does not exist: {self.data_prefix}"
)
def _parse_filename(self, filename: str) -> dict[str, Any] | None:
"""
Parse telescope filename to extract metadata.
Supports format: TEL001_20251022T235008_SBID2000000078_OBSID2000000239_CALIB_CHUNK000.mon.dl1.h5
Creator IDs is the first digit of SBID/OBSID: 1=SDMC-SUSS (midterm scheduler), 2=ACADA/CTAO-North, 3=ACADA/CTAO-South, 4=SDMC-DPPS
"""
pattern = (
r"TEL(\d+)_(\d{8}T\d{6})_SBID(\d{10})_OBSID(\d{10})_(.+)\.mon\.dl1\.h5"
)
match = re.match(pattern, filename)
if match:
tel_id, timestamp, sbid_str, obsid_str, suffix = match.groups()
sbid_creator = int(sbid_str[0])
obsid_creator = int(obsid_str[0])
return {
"tel_id": int(tel_id),
"timestamp": timestamp,
"sbid": int(sbid_str),
"obsid": int(obsid_str),
"SB creator": sbid_creator,
"OB creator": obsid_creator,
"suffix": suffix,
"filename": filename,
}
return None
def _table_to_dict(self, table: Any) -> dict[str, Any]:
"""Convert an astropy table to a structured JSON object.
For time columns (astropy.Time): converts to TAI scale Unix seconds.
For quantity columns: preserves data in original unit with unit attribute.
"""
columns: dict[str, Any] = {}
def _to_python_native(value: Any) -> Any:
if hasattr(value, "item"):
try:
value = value.item()
except (ValueError, TypeError):
return value
if isinstance(value, float) and not math.isfinite(value):
return None
if isinstance(value, list):
converted = [_to_python_native(item) for item in value]
while (
isinstance(converted, list)
and len(converted) == 1
and isinstance(converted[0], list)
):
converted = converted[0]
return converted
return value
for column_name in table.colnames:
column = table[column_name]
# Check if this is a time column
if isinstance(column, Time):
# Convert to TAI scale and extract Unix seconds
unix_tai = column.tai.unix
columns[column_name] = {
"data": _to_python_native(unix_tai.tolist()),
"unit": "s",
}
else:
# Check if this is a Quantity column
if hasattr(column, "unit"):
# astropy.Quantity
columns[column_name] = {
"data": _to_python_native(column.value.tolist()),
"unit": str(column.unit),
}
else:
# Regular array
columns[column_name] = {
"data": _to_python_native(column.tolist()),
"unit": None,
}
return {
"n_rows": len(table),
"columns": columns,
"meta": getattr(table, "meta", {}) or {},
}
def _collect_metric_dataset_paths(self, h5_file_path: Path) -> dict[str, str]:
"""Collect metric names and representative dataset paths from quality groups."""
metric_paths: dict[str, str] = {}
metrics_prefixes = [
DL1_TEL_QUALITY_GROUP,
DL1_SUBARRAY_QUALITY_GROUP,
]
with h5py.File(h5_file_path, "r") as h5file:
for prefix in metrics_prefixes:
if prefix not in h5file:
continue
def _visitor_func(name, obj, prefix=prefix):
if not isinstance(obj, h5py.Dataset):
return
path = f"{prefix}/{name}"
metric_name = name.split("/", 1)[0]
if not metric_name:
return
metric_paths.setdefault(metric_name, path)
h5file[prefix].visititems(_visitor_func)
return metric_paths
[docs]
def scan_observations(self) -> list[ObservationInfo]:
"""Scan both sites for available HDF5 files using prefix/site/date/files structure."""
observations = []
# Scan both sites
for site_name in ["ctao-north", "ctao-south"]:
site_dir = self.data_prefix / site_name
if not site_dir.exists():
logger.debug("Site directory not found: %s", site_dir)
continue
# Scan through date directories in this site
for date_dir in site_dir.iterdir():
# Look for HDF5 files in date directory
for h5_file in date_dir.glob("*.h5"):
parsed = self._parse_filename(h5_file.name)
if not parsed:
logger.warning("Could not parse filename: %s", h5_file.name)
continue
try:
# Convert timestamp to date
datetime_obj = datetime.strptime(
parsed["timestamp"], "%Y%m%dT%H%M%S"
)
date_str = datetime_obj.strftime("%Y-%m-%d")
# Convert site directory name to display format
site_display = "North" if site_name == "ctao-north" else "South"
observation = ObservationInfo(
site=site_display,
date=date_str,
obsid=parsed["obsid"],
tel_id=parsed["tel_id"],
telescope_type="", # No longer determining telescope type
h5_file=str(h5_file),
)
observations.append(observation)
except ValueError:
logger.warning(
"Could not parse timestamp from %s", h5_file.name
)
continue
return observations
[docs]
def get_ob_date_map(self) -> dict[str, list[int]]:
"""Generate observation date mapping."""
ob_map = defaultdict(list)
observations = self.scan_observations()
for obs in observations:
if obs.obsid not in ob_map[obs.date]:
ob_map[obs.date].append(obs.obsid)
# Sort observation IDs for each date
for date in ob_map:
ob_map[date].sort()
return dict(ob_map)
[docs]
def fetch_data(
self,
obsid: int,
tel_id: int,
site: str,
date: str,
keys: list[str] | None = None,
) -> dict[str, DataItem]:
"""
Fetch data for specific observation and telescope by extracting from HDF5 on-the-fly.
Args:
obsid: Observation ID
tel_id: Telescope ID
site: Site name ("North" or "South")
date: Date string in YYYY-MM-DD format
keys: Optional list of data keys to fetch
"""
# Find HDF5 file for this observation and telescope
h5_file_path = self._find_h5_file(obsid, tel_id, site, date)
if not h5_file_path:
raise FileNotFoundError(
f"No HDF5 file found for observation {obsid} telescope {tel_id} at {site} on {date}"
)
return self._extract_hdf5_data(h5_file_path, keys)
def _find_h5_file(
self, obsid: int, tel_id: int, site: str, date: str
) -> Path | None:
"""Find HDF5 file containing specific observation and telescope using direct path."""
# Validate and convert site to directory name
site_lower = site.lower()
if site_lower == "north":
site_name = "ctao-north"
elif site_lower == "south":
site_name = "ctao-south"
else:
logger.warning(
"Invalid site name: %s. Valid sites are 'North' or 'South'", site
)
return None
# Build direct path to date directory
date_dir = self.data_prefix / site_name / date
if not date_dir.exists():
logger.warning("Date directory not found: %s", date_dir)
return None
# Look for HDF5 files in the specific date directory
for h5_file in date_dir.glob("*.h5"):
parsed = self._parse_filename(h5_file.name)
if parsed and parsed["obsid"] == obsid and parsed["tel_id"] == tel_id:
return h5_file
return None
def _extract_hdf5_data(
self, h5_file_path: Path, keys: list[str] | None = None
) -> dict[str, DataItem]:
try:
metric_paths = self._collect_metric_dataset_paths(h5_file_path)
except Exception as e:
raise RuntimeError(f"Error reading HDF5 file {h5_file_path}: {e}")
data: dict[str, DataItem] = {}
for metric_name, path in metric_paths.items():
if keys is not None and metric_name not in keys:
continue
try:
table = read_table(h5_file_path, path)
except Exception as e:
logger.warning("Failed reading metric table %s: %s", path, e)
continue
data[metric_name] = DataItem(
fetchedData=self._table_to_dict(table),
fetchedMetadata={
"metric_config": table.meta.get("metric_config", {}),
"hdf5_path": path,
},
)
return data
[docs]
def get_available_metrics(
self,
obsid: int,
tel_id: int,
site: str,
date: str,
) -> list[str]:
"""Return list of available data quality metrics for a specific observation and telescope."""
available_metrics = []
h5_file_path = self._find_h5_file(obsid, tel_id, site, date)
if not h5_file_path:
logger.warning(
"No HDF5 file found for observation %s telescope %s at %s on %s",
obsid,
tel_id,
site,
date,
)
return available_metrics
metric_paths = self._collect_metric_dataset_paths(h5_file_path)
available_metrics.extend(metric_paths.keys())
return sorted(set(available_metrics))