feat(gui): add read-only dashboard with HTMX polling

- Add NATS connection module (nats.py) for JetStream access
- Add three dashboard cards: events (24h), stream sizes, poll times
- Replace placeholder index with HTMX-polling dashboard
- Graceful degradation when NATS unavailable (200 with error, not 500)
- Per-stream/adapter failure isolation
- Add comprehensive dashboard tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-17 20:09:05 +00:00
commit 736b637d31
8 changed files with 473 additions and 7 deletions

View file

@ -80,12 +80,16 @@ async def lifespan(app: FastAPI):
from central.bootstrap_config import get_settings
from central.gui.db import close_pool, init_pool
from central.gui.nats import close_nats, init_nats
settings = get_settings()
# Initialize database pool
await init_pool(settings.db_dsn)
# Initialize NATS connection
await init_nats(settings.nats_url)
# Start session cleanup task
_shutdown_event = asyncio.Event()
_cleanup_task = asyncio.create_task(_session_cleanup_loop())
@ -103,6 +107,7 @@ async def lifespan(app: FastAPI):
except asyncio.TimeoutError:
_cleanup_task.cancel()
await close_nats()
await close_pool()
logger.info("Central GUI stopped")

46
src/central/gui/nats.py Normal file
View file

@ -0,0 +1,46 @@
"""NATS connection for GUI."""
import logging
import nats
from nats.js import JetStreamContext
logger = logging.getLogger(__name__)
_nc: nats.NATS | None = None
_js: JetStreamContext | None = None
async def init_nats(url: str) -> JetStreamContext | None:
"""Initialize the NATS connection and JetStream context."""
global _nc, _js
if _nc is None:
try:
_nc = await nats.connect(url)
_js = _nc.jetstream()
logger.info("Connected to NATS", extra={"url": url})
except Exception as e:
logger.warning("Failed to connect to NATS", extra={"error": str(e)})
_nc = None
_js = None
return _js
def get_js() -> JetStreamContext | None:
"""Get the JetStream context. Returns None if not connected."""
return _js
async def close_nats() -> None:
"""Close the NATS connection."""
global _nc, _js
if _nc is not None:
try:
await _nc.drain()
await _nc.close()
logger.info("Disconnected from NATS")
except Exception as e:
logger.warning("Error closing NATS", extra={"error": str(e)})
finally:
_nc = None
_js = None

View file

@ -23,6 +23,9 @@ from central.gui.db import get_pool
router = APIRouter()
# Streams to display on dashboard
DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"]
def _get_templates():
"""Get templates instance (deferred import to avoid circular)."""
@ -30,6 +33,15 @@ def _get_templates():
return templates
def _format_bytes(size: int) -> str:
"""Format bytes as human-readable string."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024:
return f"{size:.1f} {unit}" if unit != "B" else f"{size} {unit}"
size /= 1024
return f"{size:.1f} PB"
def _set_session_cookie(
response: Response,
token: str,
@ -76,6 +88,155 @@ async def index(request: Request, csrf_protect: CsrfProtect = Depends()) -> HTML
return response
@router.get("/dashboard/events", response_class=HTMLResponse)
async def dashboard_events(request: Request) -> HTMLResponse:
"""Get events by adapter for the last 24 hours."""
templates = _get_templates()
pool = get_pool()
events = []
error = None
try:
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT adapter, COUNT(*) as count
FROM events
WHERE received > NOW() - INTERVAL '24 hours'
GROUP BY adapter
ORDER BY count DESC
"""
)
events = [{"adapter": row["adapter"], "count": row["count"]} for row in rows]
except Exception as e:
error = f"Database error: {str(e)}"
return templates.TemplateResponse(
request=request,
name="_dashboard_events.html",
context={"events": events, "error": error},
)
@router.get("/dashboard/streams", response_class=HTMLResponse)
async def dashboard_streams(request: Request) -> HTMLResponse:
"""Get stream sizes from NATS JetStream."""
from central.gui.nats import get_js
templates = _get_templates()
js = get_js()
streams = None
error = None
if js is None:
error = "NATS unavailable"
else:
streams = []
for stream_name in DASHBOARD_STREAMS:
try:
info = await js.stream_info(stream_name)
streams.append({
"name": stream_name,
"messages": info.state.messages,
"size": _format_bytes(info.state.bytes),
"error": None,
})
except Exception:
streams.append({
"name": stream_name,
"messages": 0,
"size": "0 B",
"error": "unavailable",
})
return templates.TemplateResponse(
request=request,
name="_dashboard_streams.html",
context={"streams": streams, "error": error},
)
@router.get("/dashboard/polls", response_class=HTMLResponse)
async def dashboard_polls(request: Request) -> HTMLResponse:
"""Get last poll times for each adapter."""
from central.gui.nats import get_js
templates = _get_templates()
pool = get_pool()
js = get_js()
adapters = []
error = None
try:
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT name FROM config.adapters ORDER BY name"
)
adapter_names = [row["name"] for row in rows]
except Exception as e:
error = f"Database error: {str(e)}"
return templates.TemplateResponse(
request=request,
name="_dashboard_polls.html",
context={"adapters": [], "error": error},
)
if js is None:
error = "NATS unavailable"
adapters = [{"name": name, "last_poll": None, "status": None, "error": "NATS unavailable"} for name in adapter_names]
else:
for name in adapter_names:
try:
# Get last message from CENTRAL_META for this adapter
sub = await js.pull_subscribe(
f"central.meta.{name}.status",
durable=f"dashboard-poll-{name}",
stream="CENTRAL_META",
)
try:
msgs = await sub.fetch(1, timeout=1.0)
if msgs:
import json
data = json.loads(msgs[0].data.decode())
last_poll = data.get("data", {}).get("time", "")
adapters.append({
"name": name,
"last_poll": last_poll,
"status": "",
"error": None,
})
else:
adapters.append({
"name": name,
"last_poll": None,
"status": None,
"error": None,
})
except Exception:
adapters.append({
"name": name,
"last_poll": None,
"status": None,
"error": None,
})
except Exception:
adapters.append({
"name": name,
"last_poll": None,
"status": None,
"error": "unavailable",
})
return templates.TemplateResponse(
request=request,
name="_dashboard_polls.html",
context={"adapters": adapters, "error": error},
)
@router.get("/setup", response_class=HTMLResponse)
async def setup_form(
request: Request,

View file

@ -0,0 +1,22 @@
{% if error %}
<p class="error">{{ error }}</p>
{% elif events %}
<table>
<thead>
<tr>
<th>Adapter</th>
<th>Count</th>
</tr>
</thead>
<tbody>
{% for event in events %}
<tr>
<td>{{ event.adapter }}</td>
<td>{{ event.count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No events in the last 24 hours.</p>
{% endif %}

View file

@ -0,0 +1,31 @@
{% if error %}
<p class="error">{{ error }}</p>
{% elif adapters %}
<table>
<thead>
<tr>
<th>Adapter</th>
<th>Last Poll</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{% for adapter in adapters %}
<tr>
<td>{{ adapter.name }}</td>
{% if adapter.error %}
<td colspan="2" class="error">{{ adapter.error }}</td>
{% elif adapter.last_poll %}
<td>{{ adapter.last_poll }}</td>
<td>{{ adapter.status }}</td>
{% else %}
<td></td>
<td></td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No adapter data available.</p>
{% endif %}

View file

@ -0,0 +1,28 @@
{% if error %}
<p class="error">{{ error }}</p>
{% elif streams %}
<table>
<thead>
<tr>
<th>Stream</th>
<th>Messages</th>
<th>Size</th>
</tr>
</thead>
<tbody>
{% for stream in streams %}
<tr>
<td>{{ stream.name }}</td>
{% if stream.error %}
<td colspan="2" class="error">{{ stream.error }}</td>
{% else %}
<td>{{ stream.messages }}</td>
<td>{{ stream.size }}</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No stream data available.</p>
{% endif %}

View file

@ -1,12 +1,27 @@
{% extends "base.html" %}
{% block title %}Central — Coming Soon{% endblock %}
{% block title %}Central — Dashboard{% endblock %}
{% block content %}
<article>
<header>
<h1>Central</h1>
</header>
<p>Data hub GUI — coming soon.</p>
</article>
<h1>Dashboard</h1>
<div class="grid">
<article>
<header>Events (24h)</header>
<div hx-get="/dashboard/events" hx-trigger="load, every 10s" hx-swap="innerHTML">
Loading...
</div>
</article>
<article>
<header>Stream Sizes</header>
<div hx-get="/dashboard/streams" hx-trigger="load, every 10s" hx-swap="innerHTML">
Loading...
</div>
</article>
<article>
<header>Last Poll Times</header>
<div hx-get="/dashboard/polls" hx-trigger="load, every 10s" hx-swap="innerHTML">
Loading...
</div>
</article>
</div>
{% endblock %}

158
tests/test_dashboard.py Normal file
View file

@ -0,0 +1,158 @@
"""Tests for dashboard routes."""
import os
from unittest.mock import MagicMock, AsyncMock, patch
import pytest
# Set required env vars before importing central modules
os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test")
os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab")
os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222")
class TestFormatBytes:
"""Test _format_bytes helper."""
def test_format_bytes_bytes(self):
"""Bytes are shown as B."""
from central.gui.routes import _format_bytes
assert _format_bytes(100) == "100 B"
def test_format_bytes_kilobytes(self):
"""KB formatting."""
from central.gui.routes import _format_bytes
assert _format_bytes(1024) == "1.0 KB"
def test_format_bytes_megabytes(self):
"""MB formatting."""
from central.gui.routes import _format_bytes
assert _format_bytes(1048576) == "1.0 MB"
def test_format_bytes_gigabytes(self):
"""GB formatting."""
from central.gui.routes import _format_bytes
assert _format_bytes(1073741824) == "1.0 GB"
class TestDashboardEventsSQL:
"""Test events query construction."""
def test_events_query_has_24h_filter(self):
"""Events query filters by received > NOW() - 24h."""
# We can't easily test the full route without mocking,
# but we can verify the query logic by inspecting the source
import inspect
from central.gui.routes import dashboard_events
source = inspect.getsource(dashboard_events)
assert "24 hours" in source
assert "received > NOW()" in source
class TestDashboardStreamsGracefulDegradation:
"""Test streams endpoint graceful degradation."""
@pytest.mark.asyncio
async def test_nats_unavailable_returns_error_message(self):
"""When NATS is unavailable, streams returns error message not 500."""
from central.gui.routes import dashboard_streams
mock_request = MagicMock()
mock_request.state.operator = MagicMock()
mock_templates = MagicMock()
mock_response = MagicMock()
mock_templates.TemplateResponse.return_value = mock_response
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.nats.get_js", return_value=None):
result = await dashboard_streams(mock_request)
# Should call template with error context
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["error"] == "NATS unavailable"
assert context["streams"] is None
class TestDashboardPollsGracefulDegradation:
"""Test polls endpoint graceful degradation."""
@pytest.mark.asyncio
async def test_nats_unavailable_shows_all_adapters_with_error(self):
"""When NATS is unavailable, polls shows adapters with error message."""
from central.gui.routes import dashboard_polls
mock_request = MagicMock()
mock_request.state.operator = MagicMock()
mock_conn = AsyncMock()
mock_conn.fetch.return_value = [{"name": "nws"}, {"name": "firms"}]
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_response = MagicMock()
mock_templates.TemplateResponse.return_value = mock_response
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
with patch("central.gui.nats.get_js", return_value=None):
result = await dashboard_polls(mock_request)
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["error"] == "NATS unavailable"
assert len(context["adapters"]) == 2
assert context["adapters"][0]["error"] == "NATS unavailable"
class TestDashboardStreamsIsolation:
"""Test stream failure isolation."""
@pytest.mark.asyncio
async def test_single_stream_failure_doesnt_crash_card(self):
"""A single stream failure shows error for that stream only."""
from central.gui.routes import dashboard_streams
mock_request = MagicMock()
mock_request.state.operator = MagicMock()
async def mock_stream_info(name):
if name == "CENTRAL_FIRE":
raise Exception("Not found")
state = MagicMock()
state.messages = 100
state.bytes = 1024
info = MagicMock()
info.state = state
return info
mock_js = AsyncMock()
mock_js.stream_info.side_effect = mock_stream_info
mock_templates = MagicMock()
mock_response = MagicMock()
mock_templates.TemplateResponse.return_value = mock_response
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.nats.get_js", return_value=mock_js):
result = await dashboard_streams(mock_request)
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
streams = context["streams"]
# Should have 4 streams
assert len(streams) == 4
# CENTRAL_FIRE should have error
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
assert fire_stream.get("error") == "unavailable"
# CENTRAL_WX should be fine
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
assert wx_stream.get("error") is None
assert wx_stream["messages"] == 100