Source code for qualpipe_webapp.backend.main

"""FastAPI application serving QualPipe backend endpoints.

For endpoint details, request/response schemas, and parameter descriptions,
use the autogenerated Swagger UI at ``/api/docs`` (OpenAPI at
``/api/openapi.json``).
"""

from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI, HTTPException, Path, Query, Request
from starlette.concurrency import run_in_threadpool

from .backends.base import BackendAPI
from .backends.factory import create_backend
from .quality_report.report_generation import QualityReportGenerator

OBSID_DESCRIPTION = "Observation block number (OBSID)."
SITE_DESCRIPTION = "Site identifier (for example North or South)."
DATE_DESCRIPTION = "Observation date in YYYY-MM-DD format."
TELESCOPE_ID_DESCRIPTION = "Unique identifier for the telescope."


[docs] @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan events.""" # Startup app.state.backend = create_backend() yield
# Shutdown (cleanup if needed) app = FastAPI( lifespan=lifespan, title="QualPipe Backend API", root_path="/api", docs_url="/docs", openapi_url="/openapi.json", )
[docs] @app.get("/v1/health") def health_check(): """Health check endpoint for Kubernetes probes.""" return {"status": "ok", "service": "backend"}
[docs] def get_backend(request: Request) -> BackendAPI: """Dependency injection for backend.""" return request.app.state.backend
[docs] @app.get("/v1/ob_date_map") async def get_ob_date_map(backend: BackendAPI = Depends(get_backend)): """Get observation date mapping by scanning data files.""" # Run in threadpool since HDF5 operations are blocking return await run_in_threadpool(backend.get_ob_date_map)
[docs] @app.get("/v1/data") async def get_data( site: str = Query(..., description=SITE_DESCRIPTION), date: str = Query(..., description=DATE_DESCRIPTION), ob: int = Query(..., description=OBSID_DESCRIPTION), telescope_type: str = Query( ..., description="Telescope type (LST, MST, or SST).", ), telescope_id: int = Query(..., description=TELESCOPE_ID_DESCRIPTION), backend: BackendAPI = Depends(get_backend), ): """Retrieve data for a specific observation and telescope.""" # Validate telescope type if telescope_type not in {"LST", "MST", "SST"}: raise HTTPException(status_code=400, detail="Invalid telescope type") try: # Fetch data using backend (in threadpool for blocking I/O) data = await run_in_threadpool( backend.fetch_data, ob, telescope_id, site, date, None, ) return data except FileNotFoundError: raise HTTPException( status_code=404, detail=f"No data found for observation {ob}" ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
[docs] @app.get("/v1/telescope_info/{obsid}") async def get_telescope_info( obsid: int = Path(..., description=OBSID_DESCRIPTION), backend: BackendAPI = Depends(get_backend), ): """Get telescope info for observation.""" observations = await run_in_threadpool(backend.scan_observations) for obs in observations: if obs.obsid == obsid: return obs.dict() raise HTTPException( status_code=404, detail=f"No telescope info found for observation {obsid}" )
[docs] @app.get("/v1/metrics") async def get_available_metrics( obsid: int = Query(..., description=OBSID_DESCRIPTION), tel_id: int = Query(..., description=TELESCOPE_ID_DESCRIPTION), site: str = Query(..., description=SITE_DESCRIPTION), date: str = Query(..., description=DATE_DESCRIPTION), backend: BackendAPI = Depends(get_backend), ): """Provide available data quality metrics for a specific observation.""" available_metrics = await run_in_threadpool( backend.get_available_metrics, obsid, tel_id, site, date ) return available_metrics
[docs] @app.get("/v1/create_quality_report") async def create_quality_report( obsid: int, tel_id: int, site: str, date: str, backend: BackendAPI = Depends(get_backend), ): """ Create a data quality report for a specific observation period using available metrics. Parameters ---------- obsid : int The observation block number (OBSID). telescope_id : int The unique identifier for the telescope. site : str The site identifier (e.g., 'North', 'South'). date : str The observation date in 'YYYY-MM-DD' format. Returns ------- Mapping[str, Any] Dictionary-like object mapping report section names with report contents. """ report_generator = QualityReportGenerator() data = await run_in_threadpool( backend.fetch_data, obsid, tel_id, site, date, None, ) quality_report = await run_in_threadpool( report_generator.create_data_quality_report, obsid, tel_id, site, date, data, ) return quality_report