"""FastAPI application serving QualPipe backend endpoints.
For endpoint details, request/response schemas, and parameter descriptions,
use the autogenerated Swagger UI at ``/api/docs`` (OpenAPI at
``/api/openapi.json``).
"""
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI, HTTPException, Path, Query, Request
from starlette.concurrency import run_in_threadpool
from .backends.base import BackendAPI
from .backends.factory import create_backend
from .quality_report.report_generation import QualityReportGenerator
OBSID_DESCRIPTION = "Observation block number (OBSID)."
SITE_DESCRIPTION = "Site identifier (for example North or South)."
DATE_DESCRIPTION = "Observation date in YYYY-MM-DD format."
TELESCOPE_ID_DESCRIPTION = "Unique identifier for the telescope."
[docs]
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan events."""
# Startup
app.state.backend = create_backend()
yield
# Shutdown (cleanup if needed)
app = FastAPI(
lifespan=lifespan,
title="QualPipe Backend API",
root_path="/api",
docs_url="/docs",
openapi_url="/openapi.json",
)
[docs]
@app.get("/v1/health")
def health_check():
"""Health check endpoint for Kubernetes probes."""
return {"status": "ok", "service": "backend"}
[docs]
def get_backend(request: Request) -> BackendAPI:
"""Dependency injection for backend."""
return request.app.state.backend
[docs]
@app.get("/v1/ob_date_map")
async def get_ob_date_map(backend: BackendAPI = Depends(get_backend)):
"""Get observation date mapping by scanning data files."""
# Run in threadpool since HDF5 operations are blocking
return await run_in_threadpool(backend.get_ob_date_map)
[docs]
@app.get("/v1/data")
async def get_data(
site: str = Query(..., description=SITE_DESCRIPTION),
date: str = Query(..., description=DATE_DESCRIPTION),
ob: int = Query(..., description=OBSID_DESCRIPTION),
telescope_type: str = Query(
...,
description="Telescope type (LST, MST, or SST).",
),
telescope_id: int = Query(..., description=TELESCOPE_ID_DESCRIPTION),
backend: BackendAPI = Depends(get_backend),
):
"""Retrieve data for a specific observation and telescope."""
# Validate telescope type
if telescope_type not in {"LST", "MST", "SST"}:
raise HTTPException(status_code=400, detail="Invalid telescope type")
try:
# Fetch data using backend (in threadpool for blocking I/O)
data = await run_in_threadpool(
backend.fetch_data,
ob,
telescope_id,
site,
date,
None,
)
return data
except FileNotFoundError:
raise HTTPException(
status_code=404, detail=f"No data found for observation {ob}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
[docs]
@app.get("/v1/telescope_info/{obsid}")
async def get_telescope_info(
obsid: int = Path(..., description=OBSID_DESCRIPTION),
backend: BackendAPI = Depends(get_backend),
):
"""Get telescope info for observation."""
observations = await run_in_threadpool(backend.scan_observations)
for obs in observations:
if obs.obsid == obsid:
return obs.dict()
raise HTTPException(
status_code=404, detail=f"No telescope info found for observation {obsid}"
)
[docs]
@app.get("/v1/metrics")
async def get_available_metrics(
obsid: int = Query(..., description=OBSID_DESCRIPTION),
tel_id: int = Query(..., description=TELESCOPE_ID_DESCRIPTION),
site: str = Query(..., description=SITE_DESCRIPTION),
date: str = Query(..., description=DATE_DESCRIPTION),
backend: BackendAPI = Depends(get_backend),
):
"""Provide available data quality metrics for a specific observation."""
available_metrics = await run_in_threadpool(
backend.get_available_metrics, obsid, tel_id, site, date
)
return available_metrics
[docs]
@app.get("/v1/create_quality_report")
async def create_quality_report(
obsid: int,
tel_id: int,
site: str,
date: str,
backend: BackendAPI = Depends(get_backend),
):
"""
Create a data quality report for a specific observation period using available metrics.
Parameters
----------
obsid : int
The observation block number (OBSID).
telescope_id : int
The unique identifier for the telescope.
site : str
The site identifier (e.g., 'North', 'South').
date : str
The observation date in 'YYYY-MM-DD' format.
Returns
-------
Mapping[str, Any]
Dictionary-like object mapping report section names with report contents.
"""
report_generator = QualityReportGenerator()
data = await run_in_threadpool(
backend.fetch_data,
obsid,
tel_id,
site,
date,
None,
)
quality_report = await run_in_threadpool(
report_generator.create_data_quality_report,
obsid,
tel_id,
site,
date,
data,
)
return quality_report