Central
-Data hub GUI — coming soon.
-diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 1907d44..87f1269 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -80,12 +80,16 @@ async def lifespan(app: FastAPI): from central.bootstrap_config import get_settings from central.gui.db import close_pool, init_pool + from central.gui.nats import close_nats, init_nats settings = get_settings() # Initialize database pool await init_pool(settings.db_dsn) + # Initialize NATS connection + await init_nats(settings.nats_url) + # Start session cleanup task _shutdown_event = asyncio.Event() _cleanup_task = asyncio.create_task(_session_cleanup_loop()) @@ -103,6 +107,7 @@ async def lifespan(app: FastAPI): except asyncio.TimeoutError: _cleanup_task.cancel() + await close_nats() await close_pool() logger.info("Central GUI stopped") diff --git a/src/central/gui/nats.py b/src/central/gui/nats.py new file mode 100644 index 0000000..393e1f9 --- /dev/null +++ b/src/central/gui/nats.py @@ -0,0 +1,46 @@ +"""NATS connection for GUI.""" + +import logging + +import nats +from nats.js import JetStreamContext + +logger = logging.getLogger(__name__) + +_nc: nats.NATS | None = None +_js: JetStreamContext | None = None + + +async def init_nats(url: str) -> JetStreamContext | None: + """Initialize the NATS connection and JetStream context.""" + global _nc, _js + if _nc is None: + try: + _nc = await nats.connect(url) + _js = _nc.jetstream() + logger.info("Connected to NATS", extra={"url": url}) + except Exception as e: + logger.warning("Failed to connect to NATS", extra={"error": str(e)}) + _nc = None + _js = None + return _js + + +def get_js() -> JetStreamContext | None: + """Get the JetStream context. Returns None if not connected.""" + return _js + + +async def close_nats() -> None: + """Close the NATS connection.""" + global _nc, _js + if _nc is not None: + try: + await _nc.drain() + await _nc.close() + logger.info("Disconnected from NATS") + except Exception as e: + logger.warning("Error closing NATS", extra={"error": str(e)}) + finally: + _nc = None + _js = None diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 1b9dc24..f78b1cd 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -23,6 +23,9 @@ from central.gui.db import get_pool router = APIRouter() +# Streams to display on dashboard +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] + def _get_templates(): """Get templates instance (deferred import to avoid circular).""" @@ -30,6 +33,15 @@ def _get_templates(): return templates +def _format_bytes(size: int) -> str: + """Format bytes as human-readable string.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size < 1024: + return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}" + size /= 1024 + return f"{size:.1f} PB" + + def _set_session_cookie( response: Response, token: str, @@ -76,6 +88,155 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML return response +@router.get("/dashboard/events", response_class=HTMLResponse) +async def dashboard_events(request: Request) -> HTMLResponse: + """Get events by adapter for the last 24 hours.""" + templates = _get_templates() + pool = get_pool() + + events = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT adapter, COUNT(*) as count + FROM events + WHERE received > NOW() - INTERVAL '24 hours' + GROUP BY adapter + ORDER BY count DESC + """ + ) + events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + + return templates.TemplateResponse( + request=request, + name="_dashboard_events.html", + context={"events": events, "error": error}, + ) + + +@router.get("/dashboard/streams", response_class=HTMLResponse) +async def dashboard_streams(request: Request) -> HTMLResponse: + """Get stream sizes from NATS JetStream.""" + from central.gui.nats import get_js + + templates = _get_templates() + js = get_js() + + streams = None + error = None + + if js is None: + error = "NATS unavailable" + else: + streams = [] + for stream_name in DASHBOARD_STREAMS: + try: + info = await js.stream_info(stream_name) + streams.append({ + "name": stream_name, + "messages": info.state.messages, + "size": _format_bytes(info.state.bytes), + "error": None, + }) + except Exception: + streams.append({ + "name": stream_name, + "messages": 0, + "size": "0 B", + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_streams.html", + context={"streams": streams, "error": error}, + ) + + +@router.get("/dashboard/polls", response_class=HTMLResponse) +async def dashboard_polls(request: Request) -> HTMLResponse: + """Get last poll times for each adapter.""" + from central.gui.nats import get_js + + templates = _get_templates() + pool = get_pool() + js = get_js() + + adapters = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT name FROM config.adapters ORDER BY name" + ) + adapter_names = [row["name"] for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": [], "error": error}, + ) + + if js is None: + error = "NATS unavailable" + adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names] + else: + for name in adapter_names: + try: + # Get last message from CENTRAL_META for this adapter + sub = await js.pull_subscribe( + f"central.meta.{name}.status", + durable=f"dashboard-poll-{name}", + stream="CENTRAL_META", + ) + try: + msgs = await sub.fetch(1, timeout=1.0) + if msgs: + import json + data = json.loads(msgs[0].data.decode()) + last_poll = data.get("data", {}).get("time", "—") + adapters.append({ + "name": name, + "last_poll": last_poll, + "status": "✓", + "error": None, + }) + else: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": adapters, "error": error}, + ) + + @router.get("/setup", response_class=HTMLResponse) async def setup_form( request: Request, diff --git a/src/central/gui/templates/_dashboard_events.html b/src/central/gui/templates/_dashboard_events.html new file mode 100644 index 0000000..690b6ce --- /dev/null +++ b/src/central/gui/templates/_dashboard_events.html @@ -0,0 +1,22 @@ +{% if error %} +
{{ error }}
+{% elif events %} +| Adapter | +Count | +
|---|---|
| {{ event.adapter }} | +{{ event.count }} | +
No events in the last 24 hours.
+{% endif %} diff --git a/src/central/gui/templates/_dashboard_polls.html b/src/central/gui/templates/_dashboard_polls.html new file mode 100644 index 0000000..c80e98b --- /dev/null +++ b/src/central/gui/templates/_dashboard_polls.html @@ -0,0 +1,31 @@ +{% if error %} +{{ error }}
+{% elif adapters %} +| Adapter | +Last Poll | +Status | +||||
|---|---|---|---|---|---|---|
| {{ adapter.name }} | + {% if adapter.error %} +{{ adapter.error }} | + {% elif adapter.last_poll %} +{{ adapter.last_poll }} | +{{ adapter.status }} | + {% else %} +— | +— | + {% endif %} +|
No adapter data available.
+{% endif %} diff --git a/src/central/gui/templates/_dashboard_streams.html b/src/central/gui/templates/_dashboard_streams.html new file mode 100644 index 0000000..246cbca --- /dev/null +++ b/src/central/gui/templates/_dashboard_streams.html @@ -0,0 +1,28 @@ +{% if error %} +{{ error }}
+{% elif streams %} +| Stream | +Messages | +Size | +||
|---|---|---|---|---|
| {{ stream.name }} | + {% if stream.error %} +{{ stream.error }} | + {% else %} +{{ stream.messages }} | +{{ stream.size }} | + {% endif %} +|
No stream data available.
+{% endif %} diff --git a/src/central/gui/templates/index.html b/src/central/gui/templates/index.html index 8690d2b..29e40b3 100644 --- a/src/central/gui/templates/index.html +++ b/src/central/gui/templates/index.html @@ -1,12 +1,27 @@ {% extends "base.html" %} -{% block title %}Central — Coming Soon{% endblock %} +{% block title %}Central — Dashboard{% endblock %} {% block content %} -Data hub GUI — coming soon.
-