mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
fix(gui): dashboard polls card + CSRF exception handler
Fix A - /dashboard/polls:
- Use get_last_msg instead of pull_subscribe (no durable consumers)
- Fix subject filter: central.meta.adapter.{name}.status
- Parse correct fields: ts and ok from status message
- Handle NotFoundError gracefully when no status exists
Fix B - CSRF exception handler:
- Add global CsrfProtectError handler in __init__.py
- Return friendly "session expired" message instead of 500
- Re-render forms with error or redirect to /login
- Update templates to display error messages
Tests:
- Add get_last_msg mocking tests for polls
- Add regression test verifying no pull_subscribe
- Add CSRF handler tests
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
4368c83613
commit
9396e5dbe8
7 changed files with 283 additions and 74 deletions
|
|
@ -1,47 +1,36 @@
|
|||
"""Tests for dashboard routes."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Set required env vars before importing central modules
|
||||
os.environ.setdefault("CENTRAL_DB_DSN", "postgresql://test:test@localhost/test")
|
||||
os.environ.setdefault("CENTRAL_CSRF_SECRET", "testsecret12345678901234567890ab")
|
||||
os.environ.setdefault("CENTRAL_NATS_URL", "nats://localhost:4222")
|
||||
|
||||
|
||||
class TestFormatBytes:
|
||||
"""Test _format_bytes helper."""
|
||||
|
||||
def test_format_bytes_bytes(self):
|
||||
"""Bytes are shown as B."""
|
||||
from central.gui.routes import _format_bytes
|
||||
assert _format_bytes(100) == "100 B"
|
||||
|
||||
def test_format_bytes_kilobytes(self):
|
||||
"""KB formatting."""
|
||||
from central.gui.routes import _format_bytes
|
||||
assert _format_bytes(1024) == "1.0 KB"
|
||||
|
||||
def test_format_bytes_megabytes(self):
|
||||
"""MB formatting."""
|
||||
from central.gui.routes import _format_bytes
|
||||
assert _format_bytes(1048576) == "1.0 MB"
|
||||
|
||||
def test_format_bytes_gigabytes(self):
|
||||
"""GB formatting."""
|
||||
from central.gui.routes import _format_bytes
|
||||
assert _format_bytes(1073741824) == "1.0 GB"
|
||||
|
||||
|
||||
class TestDashboardEventsSQL:
|
||||
"""Test events query construction."""
|
||||
|
||||
def test_events_query_has_24h_filter(self):
|
||||
"""Events query filters by received > NOW() - 24h."""
|
||||
# We can't easily test the full route without mocking,
|
||||
# but we can verify the query logic by inspecting the source
|
||||
import inspect
|
||||
from central.gui.routes import dashboard_events
|
||||
source = inspect.getsource(dashboard_events)
|
||||
|
|
@ -50,25 +39,17 @@ class TestDashboardEventsSQL:
|
|||
|
||||
|
||||
class TestDashboardStreamsGracefulDegradation:
|
||||
"""Test streams endpoint graceful degradation."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nats_unavailable_returns_error_message(self):
|
||||
"""When NATS is unavailable, streams returns error message not 500."""
|
||||
from central.gui.routes import dashboard_streams
|
||||
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.nats.get_js", return_value=None):
|
||||
result = await dashboard_streams(mock_request)
|
||||
|
||||
# Should call template with error context
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
assert context["error"] == "NATS unavailable"
|
||||
|
|
@ -76,32 +57,23 @@ class TestDashboardStreamsGracefulDegradation:
|
|||
|
||||
|
||||
class TestDashboardPollsGracefulDegradation:
|
||||
"""Test polls endpoint graceful degradation."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nats_unavailable_shows_all_adapters_with_error(self):
|
||||
"""When NATS is unavailable, polls shows adapters with error message."""
|
||||
from central.gui.routes import dashboard_polls
|
||||
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = [{"name": "nws"}, {"name": "firms"}]
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
with patch("central.gui.nats.get_js", return_value=None):
|
||||
result = await dashboard_polls(mock_request)
|
||||
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
assert context["error"] == "NATS unavailable"
|
||||
|
|
@ -109,14 +81,106 @@ class TestDashboardPollsGracefulDegradation:
|
|||
assert context["adapters"][0]["error"] == "NATS unavailable"
|
||||
|
||||
|
||||
class TestDashboardStreamsIsolation:
|
||||
"""Test stream failure isolation."""
|
||||
class TestDashboardPollsGetLastMsg:
|
||||
@pytest.mark.asyncio
|
||||
async def test_polls_returns_timestamp_from_status_message(self):
|
||||
from central.gui.routes import dashboard_polls
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = [{"name": "nws"}]
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_msg = MagicMock()
|
||||
mock_msg.data = json.dumps({"ok": True, "ts": "2026-05-17T12:34:56Z"}).encode()
|
||||
mock_js = AsyncMock()
|
||||
mock_js.get_last_msg = AsyncMock(return_value=mock_msg)
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
with patch("central.gui.nats.get_js", return_value=mock_js):
|
||||
result = await dashboard_polls(mock_request)
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
assert len(context["adapters"]) == 1
|
||||
adapter = context["adapters"][0]
|
||||
assert adapter["name"] == "nws"
|
||||
assert adapter["last_poll"] == "2026-05-17T12:34:56Z"
|
||||
assert adapter["status"] == "\u2713"
|
||||
assert adapter["error"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_single_stream_failure_doesnt_crash_card(self):
|
||||
"""A single stream failure shows error for that stream only."""
|
||||
from central.gui.routes import dashboard_streams
|
||||
async def test_polls_handles_not_found_error_gracefully(self):
|
||||
from central.gui.routes import dashboard_polls
|
||||
from nats.js.errors import NotFoundError
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = [{"name": "nws"}]
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_js = AsyncMock()
|
||||
mock_js.get_last_msg = AsyncMock(side_effect=NotFoundError())
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
with patch("central.gui.nats.get_js", return_value=mock_js):
|
||||
result = await dashboard_polls(mock_request)
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
adapter = context["adapters"][0]
|
||||
assert adapter["last_poll"] is None
|
||||
assert adapter["status"] is None
|
||||
assert adapter["error"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_polls_shows_failure_status_when_ok_is_false(self):
|
||||
from central.gui.routes import dashboard_polls
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = [{"name": "nws"}]
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_msg = MagicMock()
|
||||
mock_msg.data = json.dumps({"ok": False, "ts": "2026-05-17T12:34:56Z", "error": "Connection timeout"}).encode()
|
||||
mock_js = AsyncMock()
|
||||
mock_js.get_last_msg = AsyncMock(return_value=mock_msg)
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
with patch("central.gui.nats.get_js", return_value=mock_js):
|
||||
result = await dashboard_polls(mock_request)
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
adapter = context["adapters"][0]
|
||||
assert adapter["status"] == "\u2717"
|
||||
assert adapter["error"] == "Connection timeout"
|
||||
|
||||
|
||||
class TestDashboardPollsNoSubscribe:
|
||||
def test_polls_does_not_use_pull_subscribe(self):
|
||||
import inspect
|
||||
from central.gui.routes import dashboard_polls
|
||||
source = inspect.getsource(dashboard_polls)
|
||||
assert "pull_subscribe" not in source
|
||||
assert "get_last_msg" in source
|
||||
assert "central.meta.adapter." in source
|
||||
|
||||
|
||||
class TestDashboardStreamsIsolation:
|
||||
@pytest.mark.asyncio
|
||||
async def test_single_stream_failure_doesnt_crash_card(self):
|
||||
from central.gui.routes import dashboard_streams
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock()
|
||||
|
||||
|
|
@ -132,27 +196,18 @@ class TestDashboardStreamsIsolation:
|
|||
|
||||
mock_js = AsyncMock()
|
||||
mock_js.stream_info.side_effect = mock_stream_info
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = mock_response
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.nats.get_js", return_value=mock_js):
|
||||
result = await dashboard_streams(mock_request)
|
||||
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
|
||||
streams = context["streams"]
|
||||
# Should have 4 streams
|
||||
assert len(streams) == 4
|
||||
|
||||
# CENTRAL_FIRE should have error
|
||||
fire_stream = next(s for s in streams if s["name"] == "CENTRAL_FIRE")
|
||||
assert fire_stream.get("error") == "unavailable"
|
||||
|
||||
# CENTRAL_WX should be fine
|
||||
wx_stream = next(s for s in streams if s["name"] == "CENTRAL_WX")
|
||||
assert wx_stream.get("error") is None
|
||||
assert wx_stream["messages"] == 100
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue