From 736b637d311612c829dedda70dc5bbca7544af9b Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sun, 17 May 2026 20:09:05 +0000 Subject: [PATCH] feat(gui): add read-only dashboard with HTMX polling - Add NATS connection module (nats.py) for JetStream access - Add three dashboard cards: events (24h), stream sizes, poll times - Replace placeholder index with HTMX-polling dashboard - Graceful degradation when NATS unavailable (200 with error, not 500) - Per-stream/adapter failure isolation - Add comprehensive dashboard tests Co-Authored-By: Claude Opus 4.5 --- src/central/gui/__init__.py | 5 + src/central/gui/nats.py | 46 +++++ src/central/gui/routes.py | 161 ++++++++++++++++++ .../gui/templates/_dashboard_events.html | 22 +++ .../gui/templates/_dashboard_polls.html | 31 ++++ .../gui/templates/_dashboard_streams.html | 28 +++ src/central/gui/templates/index.html | 29 +++- tests/test_dashboard.py | 158 +++++++++++++++++ 8 files changed, 473 insertions(+), 7 deletions(-) create mode 100644 src/central/gui/nats.py create mode 100644 src/central/gui/templates/_dashboard_events.html create mode 100644 src/central/gui/templates/_dashboard_polls.html create mode 100644 src/central/gui/templates/_dashboard_streams.html create mode 100644 tests/test_dashboard.py diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 1907d44..87f1269 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -80,12 +80,16 @@ async def lifespan(app: FastAPI): from central.bootstrap_config import get_settings from central.gui.db import close_pool, init_pool + from central.gui.nats import close_nats, init_nats settings = get_settings() # Initialize database pool await init_pool(settings.db_dsn) + # Initialize NATS connection + await init_nats(settings.nats_url) + # Start session cleanup task _shutdown_event = asyncio.Event() _cleanup_task = asyncio.create_task(_session_cleanup_loop()) @@ -103,6 +107,7 @@ async def lifespan(app: FastAPI): except asyncio.TimeoutError: _cleanup_task.cancel() + await close_nats() await close_pool() logger.info("Central GUI stopped") diff --git a/src/central/gui/nats.py b/src/central/gui/nats.py new file mode 100644 index 0000000..393e1f9 --- /dev/null +++ b/src/central/gui/nats.py @@ -0,0 +1,46 @@ +"""NATS connection for GUI.""" + +import logging + +import nats +from nats.js import JetStreamContext + +logger = logging.getLogger(__name__) + +_nc: nats.NATS | None = None +_js: JetStreamContext | None = None + + +async def init_nats(url: str) -> JetStreamContext | None: + """Initialize the NATS connection and JetStream context.""" + global _nc, _js + if _nc is None: + try: + _nc = await nats.connect(url) + _js = _nc.jetstream() + logger.info("Connected to NATS", extra={"url": url}) + except Exception as e: + logger.warning("Failed to connect to NATS", extra={"error": str(e)}) + _nc = None + _js = None + return _js + + +def get_js() -> JetStreamContext | None: + """Get the JetStream context. Returns None if not connected.""" + return _js + + +async def close_nats() -> None: + """Close the NATS connection.""" + global _nc, _js + if _nc is not None: + try: + await _nc.drain() + await _nc.close() + logger.info("Disconnected from NATS") + except Exception as e: + logger.warning("Error closing NATS", extra={"error": str(e)}) + finally: + _nc = None + _js = None diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 1b9dc24..f78b1cd 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -23,6 +23,9 @@ from central.gui.db import get_pool router = APIRouter() +# Streams to display on dashboard +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] + def _get_templates(): """Get templates instance (deferred import to avoid circular).""" @@ -30,6 +33,15 @@ def _get_templates(): return templates +def _format_bytes(size: int) -> str: + """Format bytes as human-readable string.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size < 1024: + return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}" + size /= 1024 + return f"{size:.1f} PB" + + def _set_session_cookie( response: Response, token: str, @@ -76,6 +88,155 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML return response +@router.get("/dashboard/events", response_class=HTMLResponse) +async def dashboard_events(request: Request) -> HTMLResponse: + """Get events by adapter for the last 24 hours.""" + templates = _get_templates() + pool = get_pool() + + events = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT adapter, COUNT(*) as count + FROM events + WHERE received > NOW() - INTERVAL '24 hours' + GROUP BY adapter + ORDER BY count DESC + """ + ) + events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + + return templates.TemplateResponse( + request=request, + name="_dashboard_events.html", + context={"events": events, "error": error}, + ) + + +@router.get("/dashboard/streams", response_class=HTMLResponse) +async def dashboard_streams(request: Request) -> HTMLResponse: + """Get stream sizes from NATS JetStream.""" + from central.gui.nats import get_js + + templates = _get_templates() + js = get_js() + + streams = None + error = None + + if js is None: + error = "NATS unavailable" + else: + streams = [] + for stream_name in DASHBOARD_STREAMS: + try: + info = await js.stream_info(stream_name) + streams.append({ + "name": stream_name, + "messages": info.state.messages, + "size": _format_bytes(info.state.bytes), + "error": None, + }) + except Exception: + streams.append({ + "name": stream_name, + "messages": 0, + "size": "0 B", + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_streams.html", + context={"streams": streams, "error": error}, + ) + + +@router.get("/dashboard/polls", response_class=HTMLResponse) +async def dashboard_polls(request: Request) -> HTMLResponse: + """Get last poll times for each adapter.""" + from central.gui.nats import get_js + + templates = _get_templates() + pool = get_pool() + js = get_js() + + adapters = [] + error = None + + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT name FROM config.adapters ORDER BY name" + ) + adapter_names = [row["name"] for row in rows] + except Exception as e: + error = f"Database error: {str(e)}" + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": [], "error": error}, + ) + + if js is None: + error = "NATS unavailable" + adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names] + else: + for name in adapter_names: + try: + # Get last message from CENTRAL_META for this adapter + sub = await js.pull_subscribe( + f"central.meta.{name}.status", + durable=f"dashboard-poll-{name}", + stream="CENTRAL_META", + ) + try: + msgs = await sub.fetch(1, timeout=1.0) + if msgs: + import json + data = json.loads(msgs[0].data.decode()) + last_poll = data.get("data", {}).get("time", "—") + adapters.append({ + "name": name, + "last_poll": last_poll, + "status": "✓", + "error": None, + }) + else: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": None, + }) + except Exception: + adapters.append({ + "name": name, + "last_poll": None, + "status": None, + "error": "unavailable", + }) + + return templates.TemplateResponse( + request=request, + name="_dashboard_polls.html", + context={"adapters": adapters, "error": error}, + ) + + @router.get("/setup", response_class=HTMLResponse) async def setup_form( request: Request, diff --git a/src/central/gui/templates/_dashboard_events.html b/src/central/gui/templates/_dashboard_events.html new file mode 100644 index 0000000..690b6ce --- /dev/null +++ b/src/central/gui/templates/_dashboard_events.html @@ -0,0 +1,22 @@ +{% if error %} +

{{ error }}

+{% elif events %} + + + + + + + + + {% for event in events %} + + + + + {% endfor %} + +
AdapterCount
{{ event.adapter }}{{ event.count }}
+{% else %} +

No events in the last 24 hours.

+{% endif %} diff --git a/src/central/gui/templates/_dashboard_polls.html b/src/central/gui/templates/_dashboard_polls.html new file mode 100644 index 0000000..c80e98b --- /dev/null +++ b/src/central/gui/templates/_dashboard_polls.html @@ -0,0 +1,31 @@ +{% if error %} +

{{ error }}

+{% elif adapters %} + + + + + + + + + + {% for adapter in adapters %} + + + {% if adapter.error %} + + {% elif adapter.last_poll %} + + + {% else %} + + + {% endif %} + + {% endfor %} + +
AdapterLast PollStatus
{{ adapter.name }}{{ adapter.error }}{{ adapter.last_poll }}{{ adapter.status }}
+{% else %} +

No adapter data available.

+{% endif %} diff --git a/src/central/gui/templates/_dashboard_streams.html b/src/central/gui/templates/_dashboard_streams.html new file mode 100644 index 0000000..246cbca --- /dev/null +++ b/src/central/gui/templates/_dashboard_streams.html @@ -0,0 +1,28 @@ +{% if error %} +

{{ error }}

+{% elif streams %} + + + + + + + + + + {% for stream in streams %} + + + {% if stream.error %} + + {% else %} + + + {% endif %} + + {% endfor %} + +
StreamMessagesSize
{{ stream.name }}{{ stream.error }}{{ stream.messages }}{{ stream.size }}
+{% else %} +

No stream data available.

+{% endif %} diff --git a/src/central/gui/templates/index.html b/src/central/gui/templates/index.html index 8690d2b..29e40b3 100644 --- a/src/central/gui/templates/index.html +++ b/src/central/gui/templates/index.html @@ -1,12 +1,27 @@ {% extends "base.html" %} -{% block title %}Central — Coming Soon{% endblock %} +{% block title %}Central — Dashboard{% endblock %} {% block content %} -
-
-

Central

-
-

Data hub GUI — coming soon.

-
+

Dashboard

+
+
+
Events (24h)
+
+ Loading... +
+
+
+
Stream Sizes
+
+ Loading... +
+
+
+
Last Poll Times
+
+ Loading... +
+
+
{% endblock %} diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py new file mode 100644 index 0000000..6572903 --- /dev/null +++ b/tests/test_dashboard.py @@ -0,0 +1,158 @@ +"""Tests for dashboard routes.""" + +import os +from unittest.mock import MagicMock, AsyncMock, patch + +import pytest + +# Set required env vars before importing central modules +os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test") +os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab") +os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222") + + +class TestFormatBytes: + """Test _format_bytes helper.""" + + def test_format_bytes_bytes(self): + """Bytes are shown as B.""" + from central.gui.routes import _format_bytes + assert _format_bytes(100) == "100 B" + + def test_format_bytes_kilobytes(self): + """KB formatting.""" + from central.gui.routes import _format_bytes + assert _format_bytes(1024) == "1.0 KB" + + def test_format_bytes_megabytes(self): + """MB formatting.""" + from central.gui.routes import _format_bytes + assert _format_bytes(1048576) == "1.0 MB" + + def test_format_bytes_gigabytes(self): + """GB formatting.""" + from central.gui.routes import _format_bytes + assert _format_bytes(1073741824) == "1.0 GB" + + +class TestDashboardEventsSQL: + """Test events query construction.""" + + def test_events_query_has_24h_filter(self): + """Events query filters by received > NOW() - 24h.""" + # We can't easily test the full route without mocking, + # but we can verify the query logic by inspecting the source + import inspect + from central.gui.routes import dashboard_events + source = inspect.getsource(dashboard_events) + assert "24 hours" in source + assert "received > NOW()" in source + + +class TestDashboardStreamsGracefulDegradation: + """Test streams endpoint graceful degradation.""" + + @pytest.mark.asyncio + async def test_nats_unavailable_returns_error_message(self): + """When NATS is unavailable, streams returns error message not 500.""" + from central.gui.routes import dashboard_streams + + mock_request = MagicMock() + mock_request.state.operator = MagicMock() + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.nats.get_js", return_value=None): + result = await dashboard_streams(mock_request) + + # Should call template with error context + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + assert context["error"] == "NATS unavailable" + assert context["streams"] is None + + +class TestDashboardPollsGracefulDegradation: + """Test polls endpoint graceful degradation.""" + + @pytest.mark.asyncio + async def test_nats_unavailable_shows_all_adapters_with_error(self): + """When NATS is unavailable, polls shows adapters with error message.""" + from central.gui.routes import dashboard_polls + + mock_request = MagicMock() + mock_request.state.operator = MagicMock() + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = [{"name": "nws"}, {"name": "firms"}] + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + with patch("central.gui.nats.get_js", return_value=None): + result = await dashboard_polls(mock_request) + + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + assert context["error"] == "NATS unavailable" + assert len(context["adapters"]) == 2 + assert context["adapters"][0]["error"] == "NATS unavailable" + + +class TestDashboardStreamsIsolation: + """Test stream failure isolation.""" + + @pytest.mark.asyncio + async def test_single_stream_failure_doesnt_crash_card(self): + """A single stream failure shows error for that stream only.""" + from central.gui.routes import dashboard_streams + + mock_request = MagicMock() + mock_request.state.operator = MagicMock() + + async def mock_stream_info(name): + if name == "CENTRAL_FIRE": + raise Exception("Not found") + state = MagicMock() + state.messages = 100 + state.bytes = 1024 + info = MagicMock() + info.state = state + return info + + mock_js = AsyncMock() + mock_js.stream_info.side_effect = mock_stream_info + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.nats.get_js", return_value=mock_js): + result = await dashboard_streams(mock_request) + + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + + streams = context["streams"] + # Should have 4 streams + assert len(streams) == 4 + + # CENTRAL_FIRE should have error + fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE") + assert fire_stream.get("error") == "unavailable" + + # CENTRAL_WX should be fine + wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX") + assert wx_stream.get("error") is None + assert wx_stream["messages"] == 100