diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py
index 5a055e5..10c6b2e 100644
--- a/src/central/gui/routes.py
+++ b/src/central/gui/routes.py
@@ -1,9 +1,12 @@
"""Route handlers for Central GUI."""
import json
+import logging
import re
from typing import Any
+logger = logging.getLogger("central.gui.routes")
+
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi_csrf_protect import CsrfProtect
@@ -911,8 +914,12 @@ async def streams_list(
"managed_max_bytes": row["managed_max_bytes"],
"live_bytes": None,
"live_messages": None,
+ "live_first_seq": None,
+ "live_last_seq": None,
"live_first_ts": None,
"live_last_ts": None,
+ "first_ts_error": None,
+ "last_ts_error": None,
"error": None,
}
@@ -922,10 +929,41 @@ async def streams_list(
info = await js.stream_info(row["name"])
stream_data["live_bytes"] = info.state.bytes
stream_data["live_messages"] = info.state.messages
- stream_data["live_first_ts"] = info.state.first_ts
- stream_data["live_last_ts"] = info.state.last_ts
- except Exception:
- stream_data["error"] = "unavailable"
+ stream_data["live_first_seq"] = info.state.first_seq
+ stream_data["live_last_seq"] = info.state.last_seq
+
+ # Fetch first / last message timestamps via get_msg
+ # RawStreamMsg has .time attribute (not .metadata.timestamp)
+ if info.state.first_seq > 0:
+ try:
+ first_msg = await js.get_msg(row["name"], seq=info.state.first_seq)
+ stream_data["live_first_ts"] = first_msg.time
+ except Exception as e:
+ logger.warning(
+ "get_msg first failed",
+ extra={"stream": row["name"], "err": type(e).__name__},
+ )
+ stream_data["live_first_ts"] = None
+ stream_data["first_ts_error"] = type(e).__name__
+
+ if info.state.last_seq > 0 and info.state.last_seq != info.state.first_seq:
+ try:
+ last_msg = await js.get_msg(row["name"], seq=info.state.last_seq)
+ stream_data["live_last_ts"] = last_msg.time
+ except Exception as e:
+ logger.warning(
+ "get_msg last failed",
+ extra={"stream": row["name"], "err": type(e).__name__},
+ )
+ stream_data["live_last_ts"] = None
+ stream_data["last_ts_error"] = type(e).__name__
+ elif info.state.last_seq == info.state.first_seq and info.state.first_seq > 0:
+ # Single message in stream
+ stream_data["live_last_ts"] = stream_data.get("live_first_ts")
+
+ except Exception as e:
+ logger.exception("Stream info failed", extra={"stream": row["name"]})
+ stream_data["error"] = f"unavailable: {type(e).__name__}"
else:
stream_data["error"] = "NATS unavailable"
diff --git a/src/central/gui/templates/streams_list.html b/src/central/gui/templates/streams_list.html
index ab5735a..8ae99ed 100644
--- a/src/central/gui/templates/streams_list.html
+++ b/src/central/gui/templates/streams_list.html
@@ -8,10 +8,10 @@
{% for stream in streams %}
-
+
{{ stream.name }}
{% if stream.managed_max_bytes %}
- Managed by supervisor
+ Managed by supervisor
{% endif %}
@@ -21,10 +21,31 @@
({{ stream.error }})
{% else %}
- - Size: {{ stream.live_bytes|default(0)|filesizeformat if stream.live_bytes is not none else '(unavailable)' }}
- - Messages: {{ stream.live_messages if stream.live_messages is not none else '(unavailable)' }}
- - First message: {{ stream.live_first_ts.isoformat() if stream.live_first_ts else '(none)' }}
- - Last message: {{ stream.live_last_ts.isoformat() if stream.live_last_ts else '(none)' }}
+ - Messages: {{ stream.live_messages }}
+ - Size: {{ stream.live_bytes|filesizeformat }}
+ {% if stream.live_messages == 0 %}
+ - First message: (empty)
+ - Last message: (empty)
+ {% else %}
+ - First message:
+ {% if stream.live_first_ts %}
+ {{ stream.live_first_ts.isoformat() }}
+ {% elif stream.first_ts_error %}
+ (lookup failed: {{ stream.first_ts_error }})
+ {% else %}
+ (unknown)
+ {% endif %}
+
+ - Last message:
+ {% if stream.live_last_ts %}
+ {{ stream.live_last_ts.isoformat() }}
+ {% elif stream.last_ts_error %}
+ (lookup failed: {{ stream.last_ts_error }})
+ {% else %}
+ (unknown)
+ {% endif %}
+
+ {% endif %}
{% endif %}
@@ -33,7 +54,7 @@
Configuration:
- Max age: {{ (stream.max_age_s / 86400)|round(1) }} days ({{ stream.max_age_s }}s)
- - Max bytes (config): {{ stream.max_bytes_cfg|filesizeformat }}
+ - Max bytes (current): {{ stream.max_bytes_cfg|filesizeformat }}
diff --git a/tests/test_streams.py b/tests/test_streams.py
index b12f4ee..c2346fa 100644
--- a/tests/test_streams.py
+++ b/tests/test_streams.py
@@ -1,6 +1,7 @@
"""Tests for streams list and edit routes."""
import os
+from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -28,8 +29,8 @@ class TestStreamsListAuthenticated:
"""Test streams list with authentication."""
@pytest.mark.asyncio
- async def test_streams_list_returns_all_streams(self):
- """GET /streams authenticated returns 200 with all four streams."""
+ async def test_streams_list_returns_all_streams_with_timestamps(self):
+ """GET /streams authenticated returns 200 with all four streams and timestamps."""
from central.gui.routes import streams_list
mock_request = MagicMock()
@@ -55,15 +56,21 @@ class TestStreamsListAuthenticated:
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
mock_csrf.set_csrf_cookie = MagicMock()
- # Mock JetStream
+ # Mock JetStream with proper state fields
mock_js = AsyncMock()
mock_stream_info = MagicMock()
mock_stream_info.state.bytes = 1024000
mock_stream_info.state.messages = 100
- mock_stream_info.state.first_ts = None
- mock_stream_info.state.last_ts = None
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 100
mock_js.stream_info.return_value = mock_stream_info
+ # Mock get_msg for timestamps (RawStreamMsg has .time attribute)
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
with patch("central.gui.nats.get_js", return_value=mock_js):
@@ -78,6 +85,13 @@ class TestStreamsListAuthenticated:
assert "CENTRAL_QUAKE" in stream_names
assert "CENTRAL_WX" in stream_names
+ # Check that timestamps are populated
+ fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE")
+ assert fire_stream["live_bytes"] == 1024000
+ assert fire_stream["live_messages"] == 100
+ assert fire_stream["live_first_ts"] == test_ts
+ assert fire_stream["live_last_ts"] == test_ts
+
class TestStreamsListNatsUnavailable:
"""Test streams list when NATS is unavailable."""
@@ -122,8 +136,8 @@ class TestStreamsListPartialFailure:
"""Test streams list with one stream's info raising."""
@pytest.mark.asyncio
- async def test_one_stream_unavailable_others_render(self):
- """GET /streams with one stream error shows unavailable for that stream only."""
+ async def test_stream_info_raises_shows_exception_type(self):
+ """GET /streams with stream_info error shows unavailable with exception type."""
from central.gui.routes import streams_list
mock_request = MagicMock()
@@ -147,21 +161,26 @@ class TestStreamsListPartialFailure:
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
mock_csrf.set_csrf_cookie = MagicMock()
- # Mock JetStream - CENTRAL_FIRE raises, CENTRAL_WX works
+ # Mock JetStream - CENTRAL_FIRE raises ValueError, CENTRAL_WX works
mock_js = AsyncMock()
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
def stream_info_side_effect(name):
if name == "CENTRAL_FIRE":
- raise Exception("Stream not found")
+ raise ValueError("Stream not found")
mock_info = MagicMock()
mock_info.state.bytes = 2048
mock_info.state.messages = 50
- mock_info.state.first_ts = None
- mock_info.state.last_ts = None
+ mock_info.state.first_seq = 1
+ mock_info.state.last_seq = 50
return mock_info
mock_js.stream_info.side_effect = stream_info_side_effect
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
with patch("central.gui.nats.get_js", return_value=mock_js):
@@ -173,11 +192,197 @@ class TestStreamsListPartialFailure:
fire_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_FIRE")
wx_stream = next(s for s in context["streams"] if s["name"] == "CENTRAL_WX")
- assert fire_stream["error"] == "unavailable"
+ # Exception type should be in error message
+ assert "unavailable: ValueError" in fire_stream["error"]
assert wx_stream["error"] is None
assert wx_stream["live_bytes"] == 2048
+class TestStreamsListEmptyStream:
+ """Test streams list with empty stream (zero messages)."""
+
+ @pytest.mark.asyncio
+ async def test_empty_stream_no_get_msg_calls(self):
+ """Empty stream (first_seq == 0) renders correctly, no get_msg calls made."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_QUAKE", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream with empty stream (first_seq = 0)
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 0
+ mock_stream_info.state.messages = 0
+ mock_stream_info.state.first_seq = 0
+ mock_stream_info.state.last_seq = 0
+ mock_js.stream_info.return_value = mock_stream_info
+ mock_js.get_msg = AsyncMock()
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ quake_stream = context["streams"][0]
+ assert quake_stream["live_messages"] == 0
+ assert quake_stream["live_first_ts"] is None
+ assert quake_stream["live_last_ts"] is None
+ assert quake_stream["error"] is None
+
+ # get_msg should NOT have been called for empty stream
+ mock_js.get_msg.assert_not_called()
+
+
+class TestStreamsListSingleMessage:
+ """Test streams list with single-message stream."""
+
+ @pytest.mark.asyncio
+ async def test_single_message_stream_reuses_timestamp(self):
+ """Single-message stream (first_seq == last_seq) calls get_msg once."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream with single message (first_seq == last_seq)
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 512
+ mock_stream_info.state.messages = 1
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 1
+ mock_js.stream_info.return_value = mock_stream_info
+
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ mock_js.get_msg.return_value = mock_msg
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ wx_stream = context["streams"][0]
+ assert wx_stream["live_messages"] == 1
+ assert wx_stream["live_first_ts"] == test_ts
+ assert wx_stream["live_last_ts"] == test_ts
+
+ # get_msg should have been called once (for first message only)
+ assert mock_js.get_msg.call_count == 1
+
+
+class TestStreamsListGetMsgFailure:
+ """Test streams list when get_msg fails for a message."""
+
+ @pytest.mark.asyncio
+ async def test_get_msg_first_fails_shows_lookup_error(self):
+ """get_msg raises for first message shows lookup failed, rest renders normally."""
+ from central.gui.routes import streams_list
+
+ mock_request = MagicMock()
+ mock_request.state.operator = MagicMock(id=1, username="testop")
+
+ mock_conn = AsyncMock()
+ mock_conn.fetch.return_value = [
+ {"name": "CENTRAL_WX", "max_age_s": 604800, "max_bytes": 1073741824, "managed_max_bytes": True, "updated_at": None},
+ ]
+
+ mock_pool = MagicMock()
+ mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
+
+ mock_templates = MagicMock()
+ mock_response = MagicMock()
+ mock_templates.TemplateResponse.return_value = mock_response
+
+ mock_csrf = MagicMock()
+ mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
+ mock_csrf.set_csrf_cookie = MagicMock()
+
+ # Mock JetStream
+ mock_js = AsyncMock()
+ mock_stream_info = MagicMock()
+ mock_stream_info.state.bytes = 2048
+ mock_stream_info.state.messages = 50
+ mock_stream_info.state.first_seq = 1
+ mock_stream_info.state.last_seq = 50
+ mock_js.stream_info.return_value = mock_stream_info
+
+ test_ts = datetime(2026, 5, 17, 12, 0, 0, tzinfo=timezone.utc)
+
+ # First call fails, second succeeds
+ def get_msg_side_effect(name, seq):
+ if seq == 1:
+ raise KeyError("Message not found")
+ mock_msg = MagicMock()
+ mock_msg.time = test_ts
+ return mock_msg
+
+ mock_js.get_msg.side_effect = get_msg_side_effect
+
+ with patch("central.gui.routes._get_templates", return_value=mock_templates):
+ with patch("central.gui.routes.get_pool", return_value=mock_pool):
+ with patch("central.gui.nats.get_js", return_value=mock_js):
+ result = await streams_list(mock_request, mock_csrf)
+
+ call_args = mock_templates.TemplateResponse.call_args
+ context = call_args.kwargs.get("context", call_args[1].get("context"))
+
+ wx_stream = context["streams"][0]
+ # Main stream info still works
+ assert wx_stream["live_bytes"] == 2048
+ assert wx_stream["live_messages"] == 50
+ assert wx_stream["error"] is None
+ # First timestamp failed
+ assert wx_stream["live_first_ts"] is None
+ assert wx_stream["first_ts_error"] == "KeyError"
+ # Last timestamp succeeded
+ assert wx_stream["live_last_ts"] == test_ts
+ assert wx_stream["last_ts_error"] is None
+
+
class TestStreamsUpdate:
"""Test stream update POST handler."""