diff --git a/src/central/gui/audit.py b/src/central/gui/audit.py
index 7d2f8f1..6992537 100644
--- a/src/central/gui/audit.py
+++ b/src/central/gui/audit.py
@@ -10,6 +10,7 @@ AUTH_LOGOUT = "auth.logout"
AUTH_PASSWORD_CHANGE = "auth.password_change"
OPERATOR_CREATE = "operator.create"
ADAPTER_UPDATE = "adapter.update"
+STREAM_UPDATE = "stream.update"
async def write_audit(
diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py
index d548306..10c6b2e 100644
--- a/src/central/gui/routes.py
+++ b/src/central/gui/routes.py
@@ -1,9 +1,12 @@
"""Route handlers for Central GUI."""
import json
+import logging
import re
from typing import Any
+logger = logging.getLogger("central.gui.routes")
+
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi_csrf_protect import CsrfProtect
@@ -22,6 +25,7 @@ from central.gui.audit import (
AUTH_LOGOUT,
AUTH_PASSWORD_CHANGE,
OPERATOR_CREATE,
+ STREAM_UPDATE,
write_audit,
)
from central.gui.db import get_pool
@@ -872,3 +876,231 @@ async def adapters_edit_submit(
)
return RedirectResponse(url="/adapters", status_code=302)
+
+
+# =============================================================================
+# Streams routes
+# =============================================================================
+
+
+@router.get("/streams", response_class=HTMLResponse)
+async def streams_list(
+ request: Request,
+ csrf_protect: CsrfProtect = Depends(),
+) -> HTMLResponse:
+ """List all streams with live data."""
+ from central.gui.nats import get_js
+
+ templates = _get_templates()
+ pool = get_pool()
+ operator = request.state.operator
+ js = get_js()
+
+ async with pool.acquire() as conn:
+ rows = await conn.fetch(
+ """
+ SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at
+ FROM config.streams
+ ORDER BY name
+ """
+ )
+
+ streams = []
+ for row in rows:
+ stream_data = {
+ "name": row["name"],
+ "max_age_s": row["max_age_s"],
+ "max_bytes_cfg": row["max_bytes"],
+ "managed_max_bytes": row["managed_max_bytes"],
+ "live_bytes": None,
+ "live_messages": None,
+ "live_first_seq": None,
+ "live_last_seq": None,
+ "live_first_ts": None,
+ "live_last_ts": None,
+ "first_ts_error": None,
+ "last_ts_error": None,
+ "error": None,
+ }
+
+ # Fetch live data from JetStream
+ if js is not None:
+ try:
+ info = await js.stream_info(row["name"])
+ stream_data["live_bytes"] = info.state.bytes
+ stream_data["live_messages"] = info.state.messages
+ stream_data["live_first_seq"] = info.state.first_seq
+ stream_data["live_last_seq"] = info.state.last_seq
+
+ # Fetch first / last message timestamps via get_msg
+ # RawStreamMsg has .time attribute (not .metadata.timestamp)
+ if info.state.first_seq > 0:
+ try:
+ first_msg = await js.get_msg(row["name"], seq=info.state.first_seq)
+ stream_data["live_first_ts"] = first_msg.time
+ except Exception as e:
+ logger.warning(
+ "get_msg first failed",
+ extra={"stream": row["name"], "err": type(e).__name__},
+ )
+ stream_data["live_first_ts"] = None
+ stream_data["first_ts_error"] = type(e).__name__
+
+ if info.state.last_seq > 0 and info.state.last_seq != info.state.first_seq:
+ try:
+ last_msg = await js.get_msg(row["name"], seq=info.state.last_seq)
+ stream_data["live_last_ts"] = last_msg.time
+ except Exception as e:
+ logger.warning(
+ "get_msg last failed",
+ extra={"stream": row["name"], "err": type(e).__name__},
+ )
+ stream_data["live_last_ts"] = None
+ stream_data["last_ts_error"] = type(e).__name__
+ elif info.state.last_seq == info.state.first_seq and info.state.first_seq > 0:
+ # Single message in stream
+ stream_data["live_last_ts"] = stream_data.get("live_first_ts")
+
+ except Exception as e:
+ logger.exception("Stream info failed", extra={"stream": row["name"]})
+ stream_data["error"] = f"unavailable: {type(e).__name__}"
+ else:
+ stream_data["error"] = "NATS unavailable"
+
+ streams.append(stream_data)
+
+ csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
+ response = templates.TemplateResponse(
+ request=request,
+ name="streams_list.html",
+ context={
+ "operator": operator,
+ "csrf_token": csrf_token,
+ "streams": streams,
+ },
+ )
+ csrf_protect.set_csrf_cookie(signed_token, response)
+ return response
+
+
+@router.post("/streams/{name}", response_class=HTMLResponse)
+async def streams_update(
+ request: Request,
+ name: str,
+ csrf_protect: CsrfProtect = Depends(),
+) -> Response:
+ """Update stream max_age_s."""
+ from central.gui.nats import get_js
+
+ templates = _get_templates()
+ pool = get_pool()
+ operator = request.state.operator
+
+ # Validate CSRF
+ await csrf_protect.validate_csrf(request)
+
+ form = await request.form()
+ max_age_s_str = form.get("max_age_s", "").strip()
+
+ errors: dict[str, str] = {}
+
+ # Parse max_age_s
+ try:
+ max_age_s = int(max_age_s_str)
+ except (ValueError, TypeError):
+ max_age_s = 0
+ errors[name] = "max_age_s must be a valid integer"
+
+ # Validate range: 1 hour to 5 years
+ MIN_AGE = 3600 # 1 hour
+ MAX_AGE = 5 * 365 * 24 * 3600 # 5 years (157680000)
+ if not errors and (max_age_s < MIN_AGE or max_age_s > MAX_AGE):
+ errors[name] = f"max_age_s must be between {MIN_AGE} (1 hour) and {MAX_AGE} (5 years)"
+
+ async with pool.acquire() as conn:
+ # Check stream exists
+ row = await conn.fetchrow(
+ "SELECT name, max_age_s FROM config.streams WHERE name = $1",
+ name,
+ )
+
+ if row is None:
+ return Response(status_code=404, content="Stream not found")
+
+ if errors:
+ # Re-render with errors
+ js = get_js()
+ rows = await conn.fetch(
+ """
+ SELECT name, max_age_s, max_bytes, managed_max_bytes, updated_at
+ FROM config.streams
+ ORDER BY name
+ """
+ )
+
+ streams = []
+ for r in rows:
+ stream_data = {
+ "name": r["name"],
+ "max_age_s": r["max_age_s"],
+ "max_bytes_cfg": r["max_bytes"],
+ "managed_max_bytes": r["managed_max_bytes"],
+ "live_bytes": None,
+ "live_messages": None,
+ "live_first_ts": None,
+ "live_last_ts": None,
+ "error": None,
+ }
+
+ if js is not None:
+ try:
+ info = await js.stream_info(r["name"])
+ stream_data["live_bytes"] = info.state.bytes
+ stream_data["live_messages"] = info.state.messages
+ stream_data["live_first_ts"] = info.state.first_ts
+ stream_data["live_last_ts"] = info.state.last_ts
+ except Exception:
+ stream_data["error"] = "unavailable"
+ else:
+ stream_data["error"] = "NATS unavailable"
+
+ streams.append(stream_data)
+
+ csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
+ response = templates.TemplateResponse(
+ request=request,
+ name="streams_list.html",
+ context={
+ "operator": operator,
+ "csrf_token": csrf_token,
+ "streams": streams,
+ "errors": errors,
+ },
+ )
+ csrf_protect.set_csrf_cookie(signed_token, response)
+ return response
+
+ old_max_age_s = row["max_age_s"]
+
+ # Update stream
+ await conn.execute(
+ """
+ UPDATE config.streams
+ SET max_age_s = $1, updated_at = now()
+ WHERE name = $2
+ """,
+ max_age_s,
+ name,
+ )
+
+ # Write audit log
+ await write_audit(
+ conn,
+ STREAM_UPDATE,
+ operator_id=operator.id,
+ target=name,
+ before={"max_age_s": old_max_age_s},
+ after={"max_age_s": max_age_s},
+ )
+
+ return RedirectResponse(url="/streams", status_code=302)
diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html
index 1d2e24b..61f1afa 100644
--- a/src/central/gui/templates/base.html
+++ b/src/central/gui/templates/base.html
@@ -17,6 +17,7 @@
{% if operator %}
Dashboard
Adapters
+ Streams
{{ operator.username }}
Change Password
diff --git a/src/central/gui/templates/streams_list.html b/src/central/gui/templates/streams_list.html
new file mode 100644
index 0000000..8ae99ed
--- /dev/null
+++ b/src/central/gui/templates/streams_list.html
@@ -0,0 +1,92 @@
+{% extends "base.html" %}
+
+{% block title %}Central — Streams{% endblock %}
+
+{% block content %}
+Streams
+
+
+{% for stream in streams %}
+
+
+ {{ stream.name }}
+ {% if stream.managed_max_bytes %}
+ Managed by supervisor
+ {% endif %}
+
+
+
+
Live Data:
+ {% if stream.error %}
+
({{ stream.error }})
+ {% else %}
+
+ - Messages: {{ stream.live_messages }}
+ - Size: {{ stream.live_bytes|filesizeformat }}
+ {% if stream.live_messages == 0 %}
+ - First message: (empty)
+ - Last message: (empty)
+ {% else %}
+ - First message:
+ {% if stream.live_first_ts %}
+ {{ stream.live_first_ts.isoformat() }}
+ {% elif stream.first_ts_error %}
+ (lookup failed: {{ stream.first_ts_error }})
+ {% else %}
+ (unknown)
+ {% endif %}
+
+ - Last message:
+ {% if stream.live_last_ts %}
+ {{ stream.live_last_ts.isoformat() }}
+ {% elif stream.last_ts_error %}
+ (lookup failed: {{ stream.last_ts_error }})
+ {% else %}
+ (unknown)
+ {% endif %}
+
+ {% endif %}
+
+ {% endif %}
+
+
+
+
Configuration:
+
+ - Max age: {{ (stream.max_age_s / 86400)|round(1) }} days ({{ stream.max_age_s }}s)
+ - Max bytes (current): {{ stream.max_bytes_cfg|filesizeformat }}
+
+
+
+ {% if errors and errors[stream.name] %}
+ {{ errors[stream.name] }}
+ {% endif %}
+
+
+
Set Retention:
+
+ {% set presets = [(1, '1 day'), (7, '7 days'), (14, '14 days'), (30, '30 days'), (365, '365 days')] %}
+ {% for days, label in presets %}
+ {% set preset_seconds = days * 86400 %}
+
+ {% endfor %}
+
+
+
+
+
+{% endfor %}
+
+{% endblock %}
diff --git a/tests/test_streams.py b/tests/test_streams.py
new file mode 100644
index 0000000..c2346fa
--- /dev/null
+++ b/tests/test_streams.py
@@ -0,0 +1,589 @@
+"""Tests for streams list and edit routes."""
+
+import os
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+# Set required env vars before importing central modules
+os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test")
+os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab")
+os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222")
+
+
+class TestStreamsListUnauthenticated:
+ """Test streams list without authentication."""
+
+ @pytest.mark.asyncio
+ async def test_streams_list_unauthenticated_redirects(self):
+ """GET /streams without auth redirects to /login."""
+ from central.gui.routes import streams_list
+
+ # The middleware handles the redirect, so we test the route exists
+ # In practice, middleware returns 302 before route is called
+ assert streams_list is not None
+
+
+class TestStreamsListAuthenticated:
+ """Test streams list with authentication."""
+
+ @pytest.mark.asyncio
+ async def test_streams_list_returns_all_streams_with_timestamps(self):
+ """GET /streams authenticated returns 200 with all four streams and timestamps."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_FIRE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ {"name": "CENTRAL_META", "max_age_s": 86400, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ {"name": "CENTRAL_QUAKE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream with proper state fields
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 1024000
+ mock_stream_info.state.messages = 100
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 100
+ mock_js.stream_info.return_value = mock_stream_info
+
+ # Mock get_msg for timestamps (RawStreamMsg has .time attribute)
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+ assert len(context["streams"]) == 4
+ stream_names = [s["name"] for s in context["streams"]]
+ assert "CENTRAL_FIRE" in stream_names
+ assert "CENTRAL_META" in stream_names
+ assert "CENTRAL_QUAKE" in stream_names
+ assert "CENTRAL_WX" in stream_names
+
+ # Check that timestamps are populated
+ fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE")
+ assert fire_stream["live_bytes"] == 1024000
+ assert fire_stream["live_messages"] == 100
+ assert fire_stream["live_first_ts"] == test_ts
+ assert fire_stream["live_last_ts"] == test_ts
+
+
+class TestStreamsListNatsUnavailable:
+ """Test streams list when NATS is unavailable."""
+
+ @pytest.mark.asyncio
+ async def test_nats_unavailable_shows_unavailable(self):
+ """GET /streams when NATS unreachable shows (unavailable) for live data."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=None):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+ assert context["streams"][0]["error"] == "NATS unavailable"
+ assert context["streams"][0]["live_bytes"] is None
+
+
+class TestStreamsListPartialFailure:
+ """Test streams list with one stream's info raising."""
+
+ @pytest.mark.asyncio
+ async def test_stream_info_raises_shows_exception_type(self):
+ """GET /streams with stream_info error shows unavailable with exception type."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_FIRE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream - CENTRAL_FIRE raises ValueError, CENTRAL_WX works
+ mock_js = AsyncMock()
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+
+ def stream_info_side_effect(name):
+ if name == "CENTRAL_FIRE":
+ raise ValueError("Stream not found")
+ mock_info = MagicMock()
+ mock_info.state.bytes = 2048
+ mock_info.state.messages = 50
+ mock_info.state.first_seq = 1
+ mock_info.state.last_seq = 50
+ return mock_info
+
+ mock_js.stream_info.side_effect = stream_info_side_effect
+
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE")
+ wx_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_WX")
+
+ # Exception type should be in error message
+ assert "unavailable: ValueError" in fire_stream["error"]
+ assert wx_stream["error"] is None
+ assert wx_stream["live_bytes"] == 2048
+
+
+class TestStreamsListEmptyStream:
+ """Test streams list with empty stream (zero messages)."""
+
+ @pytest.mark.asyncio
+ async def test_empty_stream_no_get_msg_calls(self):
+ """Empty stream (first_seq == 0) renders correctly, no get_msg calls made."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_QUAKE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream with empty stream (first_seq = 0)
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 0
+ mock_stream_info.state.messages = 0
+ mock_stream_info.state.first_seq = 0
+ mock_stream_info.state.last_seq = 0
+ mock_js.stream_info.return_value = mock_stream_info
+ mock_js.get_msg = AsyncMock()
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ quake_stream = context["streams"][0]
+ assert quake_stream["live_messages"] == 0
+ assert quake_stream["live_first_ts"] is None
+ assert quake_stream["live_last_ts"] is None
+ assert quake_stream["error"] is None
+
+ # get_msg should NOT have been called for empty stream
+ mock_js.get_msg.assert_not_called()
+
+
+class TestStreamsListSingleMessage:
+ """Test streams list with single-message stream."""
+
+ @pytest.mark.asyncio
+ async def test_single_message_stream_reuses_timestamp(self):
+ """Single-message stream (first_seq == last_seq) calls get_msg once."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream with single message (first_seq == last_seq)
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 512
+ mock_stream_info.state.messages = 1
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 1
+ mock_js.stream_info.return_value = mock_stream_info
+
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ wx_stream = context["streams"][0]
+ assert wx_stream["live_messages"] == 1
+ assert wx_stream["live_first_ts"] == test_ts
+ assert wx_stream["live_last_ts"] == test_ts
+
+ # get_msg should have been called once (for first message only)
+ assert mock_js.get_msg.call_count == 1
+
+
+class TestStreamsListGetMsgFailure:
+ """Test streams list when get_msg fails for a message."""
+
+ @pytest.mark.asyncio
+ async def test_get_msg_first_fails_shows_lookup_error(self):
+ """get_msg raises for first message shows lookup failed, rest renders normally."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 2048
+ mock_stream_info.state.messages = 50
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 50
+ mock_js.stream_info.return_value = mock_stream_info
+
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+
+ # First call fails, second succeeds
+ def get_msg_side_effect(name, seq):
+ if seq == 1:
+ raise KeyError("Message not found")
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ return mock_msg
+
+ mock_js.get_msg.side_effect = get_msg_side_effect
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ wx_stream = context["streams"][0]
+ # Main stream info still works
+ assert wx_stream["live_bytes"] == 2048
+ assert wx_stream["live_messages"] == 50
+ assert wx_stream["error"] is None
+ # First timestamp failed
+ assert wx_stream["live_first_ts"] is None
+ assert wx_stream["first_ts_error"] == "KeyError"
+ # Last timestamp succeeded
+ assert wx_stream["live_last_ts"] == test_ts
+ assert wx_stream["last_ts_error"] is None
+
+
+class TestStreamsUpdate:
+ """Test stream update POST handler."""
+
+ @pytest.mark.asyncio
+ async def test_valid_update_redirects(self):
+ """POST /streams/CENTRAL_WX with valid max_age_s redirects and updates DB."""
+ from central.gui.routes import streams_update
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_form = MagicMock()
+ mock_form.get.return_value = "1209600" # 14 days
+ mock_request.form = AsyncMock(return_value=mock_form)
+
+ mock_conn = AsyncMock()
+ mock_conn.fetchrow.return_value = {"name": "CENTRAL_WX", "max_age_s": 604800}
+ mock_conn.execute = AsyncMock()
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_csrf = MagicMock()
+ mock_csrf.validate_csrf = AsyncMock()
+
+ captured_audit = {}
+
+ async def capture_audit(conn, action, operator_id=None, target=None, before=None, after=None):
+ captured_audit["action"] = action
+ captured_audit["target"] = target
+ captured_audit["before"] = before
+ captured_audit["after"] = after
+
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.routes.write_audit", side_effect=capture_audit):
+ result = await streams_update(mock_request, "CENTRAL_WX", mock_csrf)
+
+ assert result.status_code == 302
+ assert result.headers["location"] == "/streams"
+
+ # Verify audit
+ assert captured_audit["action"] == "stream.update"
+ assert captured_audit["target"] == "CENTRAL_WX"
+ assert captured_audit["before"]["max_age_s"] == 604800
+ assert captured_audit["after"]["max_age_s"] == 1209600
+
+ @pytest.mark.asyncio
+ async def test_too_small_max_age_shows_error(self):
+ """POST /streams/CENTRAL_WX with max_age_s=60 shows error, DB unchanged."""
+ from central.gui.routes import streams_update
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_form = MagicMock()
+ mock_form.get.return_value = "60" # 1 minute - too small
+ mock_request.form = AsyncMock(return_value=mock_form)
+
+ mock_conn = AsyncMock()
+ mock_conn.fetchrow.return_value = {"name": "CENTRAL_WX", "max_age_s": 604800}
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+ mock_conn.execute = AsyncMock()
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.validate_csrf = AsyncMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=None):
+ result = await streams_update(mock_request, "CENTRAL_WX", mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+ assert "CENTRAL_WX" in context["errors"]
+ assert "3600" in context["errors"]["CENTRAL_WX"] # mentions min value
+
+ @pytest.mark.asyncio
+ async def test_too_large_max_age_shows_error(self):
+ """POST /streams/CENTRAL_WX with max_age_s=999999999 shows error, DB unchanged."""
+ from central.gui.routes import streams_update
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_form = MagicMock()
+ mock_form.get.return_value = "999999999" # Way too large
+ mock_request.form = AsyncMock(return_value=mock_form)
+
+ mock_conn = AsyncMock()
+ mock_conn.fetchrow.return_value = {"name": "CENTRAL_WX", "max_age_s": 604800}
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+ mock_conn.execute = AsyncMock()
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.validate_csrf = AsyncMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=None):
+ result = await streams_update(mock_request, "CENTRAL_WX", mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+ assert "CENTRAL_WX" in context["errors"]
+
+ @pytest.mark.asyncio
+ async def test_nonexistent_stream_returns_404(self):
+ """POST /streams/nonexistent returns 404."""
+ from central.gui.routes import streams_update
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_form = MagicMock()
+ mock_form.get.return_value = "604800"
+ mock_request.form = AsyncMock(return_value=mock_form)
+
+ mock_conn = AsyncMock()
+ mock_conn.fetchrow.return_value = None # Stream not found
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_csrf = MagicMock()
+ mock_csrf.validate_csrf = AsyncMock()
+
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ result = await streams_update(mock_request, "nonexistent", mock_csrf)
+
+ assert result.status_code == 404
+
+
+class TestStreamsAudit:
+ """Test stream audit logging."""
+
+ @pytest.mark.asyncio
+ async def test_audit_captures_max_age_change(self):
+ """Audit row captures before/after max_age_s values."""
+ from central.gui.routes import streams_update
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_form = MagicMock()
+ mock_form.get.return_value = "1209600" # 14 days
+ mock_request.form = AsyncMock(return_value=mock_form)
+
+ mock_conn = AsyncMock()
+ mock_conn.fetchrow.return_value = {"name": "CENTRAL_QUAKE", "max_age_s": 604800} # 7 days
+ mock_conn.execute = AsyncMock()
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_csrf = MagicMock()
+ mock_csrf.validate_csrf = AsyncMock()
+
+ captured_audit = {}
+
+ async def capture_audit(conn, action, operator_id=None, target=None, before=None, after=None):
+ captured_audit["action"] = action
+ captured_audit["operator_id"] = operator_id
+ captured_audit["target"] = target
+ captured_audit["before"] = before
+ captured_audit["after"] = after
+
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.routes.write_audit", side_effect=capture_audit):
+ await streams_update(mock_request, "CENTRAL_QUAKE", mock_csrf)
+
+ assert captured_audit["action"] == "stream.update"
+ assert captured_audit["operator_id"] == 1
+ assert captured_audit["target"] == "CENTRAL_QUAKE"
+ assert captured_audit["before"] == {"max_age_s": 604800}
+ assert captured_audit["after"] == {"max_age_s": 1209600}