mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
* feat(gui): implement first-run setup wizard (1b-8) Add a 5-step setup wizard that replaces the single-step /setup: 1. Create Operator - create initial operator account 2. System Settings - configure map tile URL and attribution 3. API Keys - optionally add API keys for adapters 4. Configure Adapters - enable/disable adapters with region picker 5. Finish Setup - review and complete setup Key changes: - Update middleware to handle wizard URL structure and step routing - Add wizard routes for each step with proper auth checks - Create new templates using base_wizard.html for consistent styling - Add audit events for system.update and setup.complete - Update tests for new middleware behavior Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(gui): handle CSRF errors on wizard paths Update csrf_exception_handler to re-render wizard forms with error message instead of redirecting to /login when CSRF validation fails. - /setup/operator: re-render with error - /setup/system: re-render with current system values + error - /setup/keys: re-render with current keys list + error - /setup/adapters: re-render with current adapter config + error - /setup/finish: re-render with summary data + error - /setup: redirect to /setup (middleware routes to appropriate step) Add error display to setup_keys.html and setup_finish.html templates. Add 7 new CSRF handler tests for wizard paths. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(gui): region picker render + click-to-draw Bug A: Maps render blank on /setup/adapters for FIRMS and USGS because Leaflet computed zero dimensions before container layout settled. Fix: add setTimeout invalidateSize() after map creation. Bug B: No click-to-draw functionality - only drag corners. Fix: add L.Control.Draw for rectangle drawing with CREATED event handler to replace existing rectangle. Both fixes applied to: - setup_adapters.html (wizard inline JS) - _region_picker.html (standalone edit page) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(gui): handle revisiting /setup/operator after operator created When an operator already exists, /setup/operator now shows a confirmation page instead of the create form. This prevents: - Unique constraint violations on duplicate username - Silent creation of duplicate operators GET /setup/operator: queries config.operators; if any exist, renders confirmation state with existing_operator context. POST /setup/operator: checks operator count before INSERT; if non-zero, renders confirmation state without inserting. Template updated with conditional to show "Operator Already Configured" message when existing_operator is set. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(csrf): replace fastapi-csrf-protect with session-bound CSRF Fixes CSRF race condition where every GET rotated the CSRF token, causing POST failures when users had multiple tabs or slow connections. Changes: - Remove fastapi-csrf-protect dependency - Add session-bound CSRF tokens stored in config.sessions table - Add pre-auth CSRF for unauthenticated routes (/login, /setup/operator) - Add csrf.py module for pre-auth token generation/validation - Update routes to use new CSRF token handling - Add migration 013 to add csrf_token column to sessions The session-bound approach ensures CSRF tokens remain stable for the duration of a session, eliminating the race condition. Note: Route tests (test_wizard.py, test_adapters.py, etc.) need refactoring to mock get_settings() instead of CsrfProtect dependency. Core auth/CSRF handler tests pass (74 tests). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test(csrf): update test suite for session-bound CSRF tokens - Add CSRF fixtures to conftest.py for pre-auth and session CSRF - Update test_wizard.py: use bypass_pre_auth_csrf and patch_route_settings - Update test_adapters.py: set request.state.csrf_token and form mock data - Update test_api_keys.py: add CSRF token to form data for POST routes - Update test_streams.py: change return_value to side_effect for CSRF support - Update test_region_picker.py: add CSRF token handling - Update test_config_store.py: set CENTRAL_CSRF_SECRET env var in fixture All 285 tests now pass with session-bound CSRF validation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Matt Johnson <mj@k7zvx.com>
391 lines
16 KiB
Python
391 lines
16 KiB
Python
"""Tests for API keys management routes."""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from central.gui.routes import (
|
|
api_keys_list,
|
|
api_keys_new,
|
|
api_keys_create,
|
|
api_keys_edit,
|
|
api_keys_rotate,
|
|
api_keys_delete,
|
|
)
|
|
|
|
|
|
class TestApiKeysListUnauthenticated:
|
|
"""Test API keys list without authentication."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_keys_list_unauthenticated_redirects(self):
|
|
"""GET /api-keys without auth redirects to /login."""
|
|
# This test verifies the session middleware behavior
|
|
# In practice, the middleware redirects before the route is called
|
|
# We verify the route requires operator in request.state
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = None
|
|
|
|
# The middleware would redirect, but if it didn't,
|
|
# the route would fail trying to access operator.id
|
|
# This is tested via the session middleware tests
|
|
|
|
|
|
class TestApiKeysListAuthenticated:
|
|
"""Test API keys list with authentication."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_keys_list_returns_all_keys_with_usage(self):
|
|
"""GET /api-keys authenticated returns 200 with all keys and usage info."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
# Mock database
|
|
mock_conn = AsyncMock()
|
|
|
|
# Keys query result
|
|
mock_conn.fetch.side_effect = [
|
|
# First call: keys
|
|
[
|
|
{
|
|
"alias": "firms",
|
|
"created_at": datetime(2026, 5, 16, 20, 0, tzinfo=timezone.utc),
|
|
"rotated_at": None,
|
|
"last_used_at": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
|
},
|
|
{
|
|
"alias": "test_key",
|
|
"created_at": datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc),
|
|
"rotated_at": None,
|
|
"last_used_at": None,
|
|
},
|
|
],
|
|
# Second call: adapters using firms
|
|
[{"name": "firms"}],
|
|
# Third call: adapters using test_key
|
|
[],
|
|
]
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
result = await api_keys_list(mock_request)
|
|
|
|
# Check template was called with correct context
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
|
|
assert len(context["keys"]) == 2
|
|
firms_key = next(k for k in context["keys"] if k["alias"] == "firms")
|
|
assert firms_key["used_by"] == ["firms"]
|
|
|
|
test_key = next(k for k in context["keys"] if k["alias"] == "test_key")
|
|
assert test_key["used_by"] == []
|
|
|
|
|
|
class TestApiKeysCreate:
|
|
"""Test API key creation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_valid_key_redirects(self):
|
|
"""POST /api-keys with valid data creates key and redirects."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "test1", "plaintext_key": "secret-api-key-123"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_conn = AsyncMock()
|
|
# Check existing - not found
|
|
mock_conn.fetchrow.side_effect = [
|
|
None, # No existing key
|
|
{"created_at": datetime(2026, 5, 18, 12, 0, tzinfo=timezone.utc)}, # Insert result
|
|
]
|
|
mock_conn.execute = AsyncMock()
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
with patch("central.crypto.encrypt", return_value=b"encrypted_data"):
|
|
with patch("central.gui.routes.write_audit", new_callable=AsyncMock):
|
|
result = await api_keys_create(mock_request)
|
|
|
|
assert result.status_code == 302
|
|
assert result.headers["location"] == "/api-keys"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_existing_alias_shows_error(self):
|
|
"""POST /api-keys with existing alias shows error."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "firms", "plaintext_key": "secret-key"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
mock_conn = AsyncMock()
|
|
# Key already exists
|
|
mock_conn.fetchrow.return_value = {"alias": "firms"}
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
with patch("central.crypto.encrypt", return_value=b"encrypted"):
|
|
result = await api_keys_create(mock_request)
|
|
|
|
# Should re-render form with error
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
assert "errors" in context
|
|
assert "alias" in context["errors"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_empty_alias_shows_error(self):
|
|
"""POST /api-keys with empty alias shows error."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "", "plaintext_key": "secret-key"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
result = await api_keys_create(mock_request)
|
|
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
assert context["errors"]["alias"] == "Alias is required"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_invalid_alias_chars_shows_error(self):
|
|
"""POST /api-keys with invalid alias chars shows error."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
# Test with space
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "test key", "plaintext_key": "secret-key"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
result = await api_keys_create(mock_request)
|
|
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
assert "letters, numbers, and underscores" in context["errors"]["alias"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_hyphen_in_alias_shows_error(self):
|
|
"""POST /api-keys with hyphen in alias shows error."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "test-key", "plaintext_key": "secret-key"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
result = await api_keys_create(mock_request)
|
|
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
assert "alias" in context["errors"]
|
|
|
|
|
|
class TestApiKeysRotate:
|
|
"""Test API key rotation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rotate_updates_key_and_timestamp(self):
|
|
"""POST /api-keys/{alias} rotates key and updates rotated_at."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "new_plaintext_key": "new-secret-key-456"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_conn = AsyncMock()
|
|
old_time = datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc)
|
|
new_time = datetime(2026, 5, 18, 12, 0, tzinfo=timezone.utc)
|
|
|
|
mock_conn.fetchrow.side_effect = [
|
|
# First: get existing key
|
|
{
|
|
"alias": "test1",
|
|
"created_at": datetime(2026, 5, 16, 10, 0, tzinfo=timezone.utc),
|
|
"rotated_at": old_time,
|
|
"last_used_at": None,
|
|
},
|
|
# Second: update result
|
|
{"rotated_at": new_time},
|
|
]
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
with patch("central.crypto.encrypt", return_value=b"new_encrypted"):
|
|
with patch("central.gui.routes.write_audit", new_callable=AsyncMock) as mock_audit:
|
|
result = await api_keys_rotate(mock_request, "test1")
|
|
|
|
assert result.status_code == 302
|
|
# Check audit was called with no plaintext
|
|
mock_audit.assert_called_once()
|
|
audit_call = mock_audit.call_args
|
|
assert "new-secret-key" not in str(audit_call)
|
|
|
|
|
|
class TestApiKeysDelete:
|
|
"""Test API key deletion."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_with_references_shows_error(self):
|
|
"""POST /api-keys/{alias}/delete with references shows error."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
mock_request.form = AsyncMock(return_value={"csrf_token": "test_csrf_token"})
|
|
|
|
mock_templates = MagicMock()
|
|
mock_templates.TemplateResponse.return_value = MagicMock()
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.fetchrow.return_value = {
|
|
"alias": "firms",
|
|
"created_at": datetime(2026, 5, 16, 10, 0, tzinfo=timezone.utc),
|
|
"rotated_at": None,
|
|
"last_used_at": None,
|
|
}
|
|
# Adapters using the key
|
|
mock_conn.fetch.return_value = [{"name": "firms"}]
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
result = await api_keys_delete(mock_request, "firms")
|
|
|
|
# Should re-render with error
|
|
call_args = mock_templates.TemplateResponse.call_args
|
|
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
|
assert "error" in context
|
|
assert "firms" in context["error"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_without_references_succeeds(self):
|
|
"""POST /api-keys/{alias}/delete without references deletes and redirects."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
mock_request.form = AsyncMock(return_value={"csrf_token": "test_csrf_token"})
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.fetchrow.return_value = {
|
|
"alias": "test1",
|
|
"created_at": datetime(2026, 5, 16, 10, 0, tzinfo=timezone.utc),
|
|
"rotated_at": None,
|
|
"last_used_at": None,
|
|
}
|
|
# No adapters using the key
|
|
mock_conn.fetch.return_value = []
|
|
mock_conn.execute = AsyncMock()
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
with patch("central.gui.routes.write_audit", new_callable=AsyncMock) as mock_audit:
|
|
result = await api_keys_delete(mock_request, "test1")
|
|
|
|
assert result.status_code == 302
|
|
assert result.headers["location"] == "/api-keys"
|
|
mock_conn.execute.assert_called_once()
|
|
|
|
|
|
class TestApiKeysAuditNoPlaintext:
|
|
"""Test that audit logs never contain plaintext keys."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_audit_has_no_plaintext(self):
|
|
"""Verify create audit log doesn't contain plaintext."""
|
|
mock_request = MagicMock()
|
|
mock_request.state.operator = MagicMock(id=1, username="admin")
|
|
|
|
mock_request.state.csrf_token = "test_csrf_token"
|
|
form_data = {"csrf_token": "test_csrf_token", "alias": "newkey", "plaintext_key": "super-secret-value"}
|
|
mock_request.form = AsyncMock(return_value=form_data)
|
|
|
|
mock_conn = AsyncMock()
|
|
mock_conn.fetchrow.side_effect = [
|
|
None,
|
|
{"created_at": datetime(2026, 5, 18, 12, 0, tzinfo=timezone.utc)},
|
|
]
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
|
|
mock_pool.acquire.return_value.__aexit__.return_value = None
|
|
|
|
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
|
with patch("central.crypto.encrypt", return_value=b"encrypted"):
|
|
with patch("central.gui.routes.write_audit", new_callable=AsyncMock) as mock_audit:
|
|
await api_keys_create(mock_request)
|
|
|
|
# Check audit call arguments
|
|
call_kwargs = mock_audit.call_args.kwargs
|
|
before = call_kwargs.get("before")
|
|
after = call_kwargs.get("after")
|
|
|
|
# Plaintext should not appear
|
|
assert before is None or "super-secret-value" not in str(before)
|
|
assert "super-secret-value" not in str(after)
|
|
# encrypted value should not appear either
|
|
assert "encrypted" not in str(after)
|