Source code for qualpipe_webapp.backend.backends.file_backend

"""
FileBackend module for reading and extracting data from HDF5 files in a directory structure.

Provides the FileBackend class implementing BackendAPI for scanning, parsing, and fetching observation data.
"""

import logging
import math
import re
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any

import h5py
import hdf5plugin  # noqa: F401
from astropy.time import Time
from ctapipe.io import read_table
from ctapipe.io.hdf5dataformat import (
    DL1_SUBARRAY_QUALITY_GROUP,
    DL1_TEL_QUALITY_GROUP,
)

from .base import BackendAPI, DataItem, ObservationInfo

logger = logging.getLogger(__name__)


[docs] class FileBackend(BackendAPI): """ Backend for reading data directly from HDF5 files. Assumes directory structure: /prefix/site/date/files """ def __init__(self, data_dir: str): """ Initialize file backend. Args: data_dir: Base directory prefix containing site subdirectories (ctao-north, ctao-south) """ self.data_prefix = Path(data_dir) if not self.data_prefix.exists(): raise FileNotFoundError( f"Data prefix directory does not exist: {self.data_prefix}" ) def _parse_filename(self, filename: str) -> dict[str, Any] | None: """ Parse telescope filename to extract metadata. Supports format: TEL001_20251022T235008_SBID2000000078_OBSID2000000239_CALIB_CHUNK000.mon.dl1.h5 Creator IDs is the first digit of SBID/OBSID: 1=SDMC-SUSS (midterm scheduler), 2=ACADA/CTAO-North, 3=ACADA/CTAO-South, 4=SDMC-DPPS """ pattern = ( r"TEL(\d+)_(\d{8}T\d{6})_SBID(\d{10})_OBSID(\d{10})_(.+)\.mon\.dl1\.h5" ) match = re.match(pattern, filename) if match: tel_id, timestamp, sbid_str, obsid_str, suffix = match.groups() sbid_creator = int(sbid_str[0]) obsid_creator = int(obsid_str[0]) return { "tel_id": int(tel_id), "timestamp": timestamp, "sbid": int(sbid_str), "obsid": int(obsid_str), "SB creator": sbid_creator, "OB creator": obsid_creator, "suffix": suffix, "filename": filename, } return None def _table_to_dict(self, table: Any) -> dict[str, Any]: """Convert an astropy table to a structured JSON object. For time columns (astropy.Time): converts to TAI scale Unix seconds. For quantity columns: preserves data in original unit with unit attribute. """ columns: dict[str, Any] = {} def _to_python_native(value: Any) -> Any: if hasattr(value, "item"): try: value = value.item() except (ValueError, TypeError): return value if isinstance(value, float) and not math.isfinite(value): return None if isinstance(value, list): converted = [_to_python_native(item) for item in value] while ( isinstance(converted, list) and len(converted) == 1 and isinstance(converted[0], list) ): converted = converted[0] return converted return value for column_name in table.colnames: column = table[column_name] # Check if this is a time column if isinstance(column, Time): # Convert to TAI scale and extract Unix seconds unix_tai = column.tai.unix columns[column_name] = { "data": _to_python_native(unix_tai.tolist()), "unit": "s", } else: # Check if this is a Quantity column if hasattr(column, "unit"): # astropy.Quantity columns[column_name] = { "data": _to_python_native(column.value.tolist()), "unit": str(column.unit), } else: # Regular array columns[column_name] = { "data": _to_python_native(column.tolist()), "unit": None, } return { "n_rows": len(table), "columns": columns, "meta": getattr(table, "meta", {}) or {}, } def _collect_metric_dataset_paths(self, h5_file_path: Path) -> dict[str, str]: """Collect metric names and representative dataset paths from quality groups.""" metric_paths: dict[str, str] = {} metrics_prefixes = [ DL1_TEL_QUALITY_GROUP, DL1_SUBARRAY_QUALITY_GROUP, ] with h5py.File(h5_file_path, "r") as h5file: for prefix in metrics_prefixes: if prefix not in h5file: continue def _visitor_func(name, obj, prefix=prefix): if not isinstance(obj, h5py.Dataset): return path = f"{prefix}/{name}" metric_name = name.split("/", 1)[0] if not metric_name: return metric_paths.setdefault(metric_name, path) h5file[prefix].visititems(_visitor_func) return metric_paths
[docs] def scan_observations(self) -> list[ObservationInfo]: """Scan both sites for available HDF5 files using prefix/site/date/files structure.""" observations = [] # Scan both sites for site_name in ["ctao-north", "ctao-south"]: site_dir = self.data_prefix / site_name if not site_dir.exists(): logger.debug("Site directory not found: %s", site_dir) continue # Scan through date directories in this site for date_dir in site_dir.iterdir(): # Look for HDF5 files in date directory for h5_file in date_dir.glob("*.h5"): parsed = self._parse_filename(h5_file.name) if not parsed: logger.warning("Could not parse filename: %s", h5_file.name) continue try: # Convert timestamp to date datetime_obj = datetime.strptime( parsed["timestamp"], "%Y%m%dT%H%M%S" ) date_str = datetime_obj.strftime("%Y-%m-%d") # Convert site directory name to display format site_display = "North" if site_name == "ctao-north" else "South" observation = ObservationInfo( site=site_display, date=date_str, obsid=parsed["obsid"], tel_id=parsed["tel_id"], telescope_type="", # No longer determining telescope type h5_file=str(h5_file), ) observations.append(observation) except ValueError: logger.warning( "Could not parse timestamp from %s", h5_file.name ) continue return observations
[docs] def get_ob_date_map(self) -> dict[str, list[int]]: """Generate observation date mapping.""" ob_map = defaultdict(list) observations = self.scan_observations() for obs in observations: if obs.obsid not in ob_map[obs.date]: ob_map[obs.date].append(obs.obsid) # Sort observation IDs for each date for date in ob_map: ob_map[date].sort() return dict(ob_map)
[docs] def fetch_data( self, obsid: int, tel_id: int, site: str, date: str, keys: list[str] | None = None, ) -> dict[str, DataItem]: """ Fetch data for specific observation and telescope by extracting from HDF5 on-the-fly. Args: obsid: Observation ID tel_id: Telescope ID site: Site name ("North" or "South") date: Date string in YYYY-MM-DD format keys: Optional list of data keys to fetch """ # Find HDF5 file for this observation and telescope h5_file_path = self._find_h5_file(obsid, tel_id, site, date) if not h5_file_path: raise FileNotFoundError( f"No HDF5 file found for observation {obsid} telescope {tel_id} at {site} on {date}" ) return self._extract_hdf5_data(h5_file_path, keys)
def _find_h5_file( self, obsid: int, tel_id: int, site: str, date: str ) -> Path | None: """Find HDF5 file containing specific observation and telescope using direct path.""" # Validate and convert site to directory name site_lower = site.lower() if site_lower == "north": site_name = "ctao-north" elif site_lower == "south": site_name = "ctao-south" else: logger.warning( "Invalid site name: %s. Valid sites are 'North' or 'South'", site ) return None # Build direct path to date directory date_dir = self.data_prefix / site_name / date if not date_dir.exists(): logger.warning("Date directory not found: %s", date_dir) return None # Look for HDF5 files in the specific date directory for h5_file in date_dir.glob("*.h5"): parsed = self._parse_filename(h5_file.name) if parsed and parsed["obsid"] == obsid and parsed["tel_id"] == tel_id: return h5_file return None def _extract_hdf5_data( self, h5_file_path: Path, keys: list[str] | None = None ) -> dict[str, DataItem]: try: metric_paths = self._collect_metric_dataset_paths(h5_file_path) except Exception as e: raise RuntimeError(f"Error reading HDF5 file {h5_file_path}: {e}") data: dict[str, DataItem] = {} for metric_name, path in metric_paths.items(): if keys is not None and metric_name not in keys: continue try: table = read_table(h5_file_path, path) except Exception as e: logger.warning("Failed reading metric table %s: %s", path, e) continue data[metric_name] = DataItem( fetchedData=self._table_to_dict(table), fetchedMetadata={ "metric_config": table.meta.get("metric_config", {}), "hdf5_path": path, }, ) return data
[docs] def get_available_metrics( self, obsid: int, tel_id: int, site: str, date: str, ) -> list[str]: """Return list of available data quality metrics for a specific observation and telescope.""" available_metrics = [] h5_file_path = self._find_h5_file(obsid, tel_id, site, date) if not h5_file_path: logger.warning( "No HDF5 file found for observation %s telescope %s at %s on %s", obsid, tel_id, site, date, ) return available_metrics metric_paths = self._collect_metric_dataset_paths(h5_file_path) available_metrics.extend(metric_paths.keys()) return sorted(set(available_metrics))