From 16e31efd7333f29b6bf5389b9b004608160d3747 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Sun, 17 May 2026 23:59:31 +0000 Subject: [PATCH] fix(gui): use get_msg().time for stream timestamps, fix badge layout - nats-py StreamState doesn't expose first_ts/last_ts - Fetch timestamps via js.get_msg(stream, seq=N).time instead - Handle edge cases: empty streams, single-message streams, get_msg failures - Fix badge overlap using flex layout instead of float:right - Change label from "Max bytes (config)" to "Max bytes (current)" Co-Authored-By: Claude Opus 4.5 --- src/central/gui/routes.py | 46 +++- src/central/gui/templates/streams_list.html | 35 ++- tests/test_streams.py | 229 +++++++++++++++++++- 3 files changed, 287 insertions(+), 23 deletions(-) diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 5a055e5..10c6b2e 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1,9 +1,12 @@ """Route handlers for Central GUI.""" import json +import logging import re from typing import Any +logger = logging.getLogger("central.gui.routes") + from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi_csrf_protect import CsrfProtect @@ -911,8 +914,12 @@ async def streams_list( "managed_max_bytes": row["managed_max_bytes"], "live_bytes": None, "live_messages": None, + "live_first_seq": None, + "live_last_seq": None, "live_first_ts": None, "live_last_ts": None, + "first_ts_error": None, + "last_ts_error": None, "error": None, } @@ -922,10 +929,41 @@ async def streams_list( info = await js.stream_info(row["name"]) stream_data["live_bytes"] = info.state.bytes stream_data["live_messages"] = info.state.messages - stream_data["live_first_ts"] = info.state.first_ts - stream_data["live_last_ts"] = info.state.last_ts - except Exception: - stream_data["error"] = "unavailable" + stream_data["live_first_seq"] = info.state.first_seq + stream_data["live_last_seq"] = info.state.last_seq + + # Fetch first / last message timestamps via get_msg + # RawStreamMsg has .time attribute (not .metadata.timestamp) + if info.state.first_seq > 0: + try: + first_msg = await js.get_msg(row["name"], seq=info.state.first_seq) + stream_data["live_first_ts"] = first_msg.time + except Exception as e: + logger.warning( + "get_msg first failed", + extra={"stream": row["name"], "err": type(e).__name__}, + ) + stream_data["live_first_ts"] = None + stream_data["first_ts_error"] = type(e).__name__ + + if info.state.last_seq > 0 and info.state.last_seq != info.state.first_seq: + try: + last_msg = await js.get_msg(row["name"], seq=info.state.last_seq) + stream_data["live_last_ts"] = last_msg.time + except Exception as e: + logger.warning( + "get_msg last failed", + extra={"stream": row["name"], "err": type(e).__name__}, + ) + stream_data["live_last_ts"] = None + stream_data["last_ts_error"] = type(e).__name__ + elif info.state.last_seq == info.state.first_seq and info.state.first_seq > 0: + # Single message in stream + stream_data["live_last_ts"] = stream_data.get("live_first_ts") + + except Exception as e: + logger.exception("Stream info failed", extra={"stream": row["name"]}) + stream_data["error"] = f"unavailable: {type(e).__name__}" else: stream_data["error"] = "NATS unavailable" diff --git a/src/central/gui/templates/streams_list.html b/src/central/gui/templates/streams_list.html index ab5735a..8ae99ed 100644 --- a/src/central/gui/templates/streams_list.html +++ b/src/central/gui/templates/streams_list.html @@ -8,10 +8,10 @@
{% for stream in streams %}
-
+
{{ stream.name }} {% if stream.managed_max_bytes %} - Managed by supervisor + Managed by supervisor {% endif %}
@@ -21,10 +21,31 @@ ({{ stream.error }}) {% else %}
    -
  • Size: {{ stream.live_bytes|default(0)|filesizeformat if stream.live_bytes is not none else '(unavailable)' }}
  • -
  • Messages: {{ stream.live_messages if stream.live_messages is not none else '(unavailable)' }}
  • -
  • First message: {{ stream.live_first_ts.isoformat() if stream.live_first_ts else '(none)' }}
  • -
  • Last message: {{ stream.live_last_ts.isoformat() if stream.live_last_ts else '(none)' }}
  • +
  • Messages: {{ stream.live_messages }}
  • +
  • Size: {{ stream.live_bytes|filesizeformat }}
  • + {% if stream.live_messages == 0 %} +
  • First message: (empty)
  • +
  • Last message: (empty)
  • + {% else %} +
  • First message: + {% if stream.live_first_ts %} + {{ stream.live_first_ts.isoformat() }} + {% elif stream.first_ts_error %} + (lookup failed: {{ stream.first_ts_error }}) + {% else %} + (unknown) + {% endif %} +
  • +
  • Last message: + {% if stream.live_last_ts %} + {{ stream.live_last_ts.isoformat() }} + {% elif stream.last_ts_error %} + (lookup failed: {{ stream.last_ts_error }}) + {% else %} + (unknown) + {% endif %} +
  • + {% endif %}
{% endif %}
@@ -33,7 +54,7 @@ Configuration: diff --git a/tests/test_streams.py b/tests/test_streams.py index b12f4ee..c2346fa 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,6 +1,7 @@ """Tests for streams list and edit routes.""" import os +from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -28,8 +29,8 @@ class TestStreamsListAuthenticated: """Test streams list with authentication.""" @pytest.mark.asyncio - async def test_streams_list_returns_all_streams(self): - """GET /streams authenticated returns 200 with all four streams.""" + async def test_streams_list_returns_all_streams_with_timestamps(self): + """GET /streams authenticated returns 200 with all four streams and timestamps.""" from central.gui.routes import streams_list mock_request = MagicMock() @@ -55,15 +56,21 @@ class TestStreamsListAuthenticated: mock_csrf.generate_csrf_tokens.return_value = ("token", "signed") mock_csrf.set_csrf_cookie = MagicMock() - # Mock JetStream + # Mock JetStream with proper state fields mock_js = AsyncMock() mock_stream_info = MagicMock() mock_stream_info.state.bytes = 1024000 mock_stream_info.state.messages = 100 - mock_stream_info.state.first_ts = None - mock_stream_info.state.last_ts = None + mock_stream_info.state.first_seq = 1 + mock_stream_info.state.last_seq = 100 mock_js.stream_info.return_value = mock_stream_info + # Mock get_msg for timestamps (RawStreamMsg has .time attribute) + test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc) + mock_msg = MagicMock() + mock_msg.time = test_ts + mock_js.get_msg.return_value = mock_msg + with patch("central.gui.routes._get_templates", return_value=mock_templates): with patch("central.gui.routes.get_pool", return_value=mock_pool): with patch("central.gui.nats.get_js", return_value=mock_js): @@ -78,6 +85,13 @@ class TestStreamsListAuthenticated: assert "CENTRAL_QUAKE" in stream_names assert "CENTRAL_WX" in stream_names + # Check that timestamps are populated + fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE") + assert fire_stream["live_bytes"] == 1024000 + assert fire_stream["live_messages"] == 100 + assert fire_stream["live_first_ts"] == test_ts + assert fire_stream["live_last_ts"] == test_ts + class TestStreamsListNatsUnavailable: """Test streams list when NATS is unavailable.""" @@ -122,8 +136,8 @@ class TestStreamsListPartialFailure: """Test streams list with one stream's info raising.""" @pytest.mark.asyncio - async def test_one_stream_unavailable_others_render(self): - """GET /streams with one stream error shows unavailable for that stream only.""" + async def test_stream_info_raises_shows_exception_type(self): + """GET /streams with stream_info error shows unavailable with exception type.""" from central.gui.routes import streams_list mock_request = MagicMock() @@ -147,21 +161,26 @@ class TestStreamsListPartialFailure: mock_csrf.generate_csrf_tokens.return_value = ("token", "signed") mock_csrf.set_csrf_cookie = MagicMock() - # Mock JetStream - CENTRAL_FIRE raises, CENTRAL_WX works + # Mock JetStream - CENTRAL_FIRE raises ValueError, CENTRAL_WX works mock_js = AsyncMock() + test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc) def stream_info_side_effect(name): if name == "CENTRAL_FIRE": - raise Exception("Stream not found") + raise ValueError("Stream not found") mock_info = MagicMock() mock_info.state.bytes = 2048 mock_info.state.messages = 50 - mock_info.state.first_ts = None - mock_info.state.last_ts = None + mock_info.state.first_seq = 1 + mock_info.state.last_seq = 50 return mock_info mock_js.stream_info.side_effect = stream_info_side_effect + mock_msg = MagicMock() + mock_msg.time = test_ts + mock_js.get_msg.return_value = mock_msg + with patch("central.gui.routes._get_templates", return_value=mock_templates): with patch("central.gui.routes.get_pool", return_value=mock_pool): with patch("central.gui.nats.get_js", return_value=mock_js): @@ -173,11 +192,197 @@ class TestStreamsListPartialFailure: fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE") wx_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_WX") - assert fire_stream["error"] == "unavailable" + # Exception type should be in error message + assert "unavailable: ValueError" in fire_stream["error"] assert wx_stream["error"] is None assert wx_stream["live_bytes"] == 2048 +class TestStreamsListEmptyStream: + """Test streams list with empty stream (zero messages).""" + + @pytest.mark.asyncio + async def test_empty_stream_no_get_msg_calls(self): + """Empty stream (first_seq == 0) renders correctly, no get_msg calls made.""" + from central.gui.routes import streams_list + + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="testop") + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = [ + {"name": "CENTRAL_QUAKE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None}, + ] + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + mock_csrf = MagicMock() + mock_csrf.generate_csrf_tokens.return_value = ("token", "signed") + mock_csrf.set_csrf_cookie = MagicMock() + + # Mock JetStream with empty stream (first_seq = 0) + mock_js = AsyncMock() + mock_stream_info = MagicMock() + mock_stream_info.state.bytes = 0 + mock_stream_info.state.messages = 0 + mock_stream_info.state.first_seq = 0 + mock_stream_info.state.last_seq = 0 + mock_js.stream_info.return_value = mock_stream_info + mock_js.get_msg = AsyncMock() + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + with patch("central.gui.nats.get_js", return_value=mock_js): + result = await streams_list(mock_request, mock_csrf) + + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + + quake_stream = context["streams"][0] + assert quake_stream["live_messages"] == 0 + assert quake_stream["live_first_ts"] is None + assert quake_stream["live_last_ts"] is None + assert quake_stream["error"] is None + + # get_msg should NOT have been called for empty stream + mock_js.get_msg.assert_not_called() + + +class TestStreamsListSingleMessage: + """Test streams list with single-message stream.""" + + @pytest.mark.asyncio + async def test_single_message_stream_reuses_timestamp(self): + """Single-message stream (first_seq == last_seq) calls get_msg once.""" + from central.gui.routes import streams_list + + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="testop") + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = [ + {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None}, + ] + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + mock_csrf = MagicMock() + mock_csrf.generate_csrf_tokens.return_value = ("token", "signed") + mock_csrf.set_csrf_cookie = MagicMock() + + # Mock JetStream with single message (first_seq == last_seq) + mock_js = AsyncMock() + mock_stream_info = MagicMock() + mock_stream_info.state.bytes = 512 + mock_stream_info.state.messages = 1 + mock_stream_info.state.first_seq = 1 + mock_stream_info.state.last_seq = 1 + mock_js.stream_info.return_value = mock_stream_info + + test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc) + mock_msg = MagicMock() + mock_msg.time = test_ts + mock_js.get_msg.return_value = mock_msg + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + with patch("central.gui.nats.get_js", return_value=mock_js): + result = await streams_list(mock_request, mock_csrf) + + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + + wx_stream = context["streams"][0] + assert wx_stream["live_messages"] == 1 + assert wx_stream["live_first_ts"] == test_ts + assert wx_stream["live_last_ts"] == test_ts + + # get_msg should have been called once (for first message only) + assert mock_js.get_msg.call_count == 1 + + +class TestStreamsListGetMsgFailure: + """Test streams list when get_msg fails for a message.""" + + @pytest.mark.asyncio + async def test_get_msg_first_fails_shows_lookup_error(self): + """get_msg raises for first message shows lookup failed, rest renders normally.""" + from central.gui.routes import streams_list + + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="testop") + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = [ + {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None}, + ] + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_response = MagicMock() + mock_templates.TemplateResponse.return_value = mock_response + + mock_csrf = MagicMock() + mock_csrf.generate_csrf_tokens.return_value = ("token", "signed") + mock_csrf.set_csrf_cookie = MagicMock() + + # Mock JetStream + mock_js = AsyncMock() + mock_stream_info = MagicMock() + mock_stream_info.state.bytes = 2048 + mock_stream_info.state.messages = 50 + mock_stream_info.state.first_seq = 1 + mock_stream_info.state.last_seq = 50 + mock_js.stream_info.return_value = mock_stream_info + + test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc) + + # First call fails, second succeeds + def get_msg_side_effect(name, seq): + if seq == 1: + raise KeyError("Message not found") + mock_msg = MagicMock() + mock_msg.time = test_ts + return mock_msg + + mock_js.get_msg.side_effect = get_msg_side_effect + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + with patch("central.gui.nats.get_js", return_value=mock_js): + result = await streams_list(mock_request, mock_csrf) + + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + + wx_stream = context["streams"][0] + # Main stream info still works + assert wx_stream["live_bytes"] == 2048 + assert wx_stream["live_messages"] == 50 + assert wx_stream["error"] is None + # First timestamp failed + assert wx_stream["live_first_ts"] is None + assert wx_stream["first_ts_error"] == "KeyError" + # Last timestamp succeeded + assert wx_stream["live_last_ts"] == test_ts + assert wx_stream["last_ts_error"] is None + + class TestStreamsUpdate: """Test stream update POST handler."""