fix(csrf): replace fastapi-csrf-protect with session-bound CSRF

Fixes CSRF race condition where every GET rotated the CSRF token,
causing POST failures when users had multiple tabs or slow connections.

Changes:
- Remove fastapi-csrf-protect dependency
- Add session-bound CSRF tokens stored in config.sessions table
- Add pre-auth CSRF for unauthenticated routes (/login, /setup/operator)
- Add csrf.py module for pre-auth token generation/validation
- Update routes to use new CSRF token handling
- Add migration 013 to add csrf_token column to sessions

The session-bound approach ensures CSRF tokens remain stable for the
duration of a session, eliminating the race condition.

Note: Route tests (test_wizard.py, test_adapters.py, etc.) need
refactoring to mock get_settings() instead of CsrfProtect dependency.
Core auth/CSRF handler tests pass (74 tests).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-18 03:16:37 +00:00
commit c317c9ab01
11 changed files with 410 additions and 208 deletions

View file

@ -14,23 +14,18 @@ class TestCsrfExceptionHandlerRegistered:
"""Verify CSRF exception handler is properly registered."""
def test_csrf_exception_handler_is_registered(self):
"""The app has a CsrfProtectError exception handler registered."""
"""The app has a CsrfValidationError exception handler registered."""
from central.gui import app
from fastapi_csrf_protect.exceptions import CsrfProtectError
from central.gui.auth import CsrfValidationError
assert CsrfProtectError in app.exception_handlers, \
"CsrfProtectError handler should be registered"
assert CsrfValidationError in app.exception_handlers, \
"CsrfValidationError handler should be registered"
def test_csrf_subclasses_are_caught(self):
"""MissingTokenError and TokenValidationError inherit from CsrfProtectError."""
from fastapi_csrf_protect.exceptions import (
CsrfProtectError,
MissingTokenError,
TokenValidationError,
)
def test_csrf_validation_error_is_exception(self):
"""CsrfValidationError is a proper Exception subclass."""
from central.gui.auth import CsrfValidationError
assert issubclass(MissingTokenError, CsrfProtectError)
assert issubclass(TokenValidationError, CsrfProtectError)
assert issubclass(CsrfValidationError, Exception)
class TestCsrfExceptionHandlerBehavior:
@ -40,10 +35,10 @@ class TestCsrfExceptionHandlerBehavior:
"""CSRF handler checks request path for /login."""
import inspect
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import CsrfProtectError
from central.gui.auth import CsrfValidationError
app = _create_app()
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
# Verify handler source contains /login path check
source = inspect.getsource(handler)
@ -54,17 +49,16 @@ class TestCsrfExceptionHandlerBehavior:
async def test_logout_csrf_error_redirects_to_login(self):
"""CSRF error on /logout should redirect to /login."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
from fastapi.responses import RedirectResponse
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/logout"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
result = await handler(mock_request, exc)
@ -75,17 +69,16 @@ class TestCsrfExceptionHandlerBehavior:
async def test_adapters_csrf_error_redirects_to_adapters(self):
"""CSRF error on /adapters/{name} should redirect to /adapters."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
from fastapi.responses import RedirectResponse
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/adapters/nws"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
result = await handler(mock_request, exc)
@ -94,16 +87,16 @@ class TestCsrfExceptionHandlerBehavior:
class TestCsrfHandlerNoTraceback:
"""Verify exception handler doesn't expose Python internals."""
"""Verify exception handler does not expose Python internals."""
def test_handler_exists_and_is_async(self):
"""The CSRF handler should be an async function."""
import inspect
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import CsrfProtectError
from central.gui.auth import CsrfValidationError
app = _create_app()
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
assert handler is not None
assert inspect.iscoroutinefunction(handler)
@ -116,17 +109,15 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_operator_csrf_error_renders_form_with_error(self):
"""CSRF error on /setup/operator re-renders form with error message."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from fastapi.responses import HTMLResponse
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup/operator"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
result = await handler(mock_request, exc)
@ -140,16 +131,15 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_system_csrf_error_renders_form_with_error(self):
"""CSRF error on /setup/system re-renders form with error message."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup/system"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
with patch("central.gui.db.get_pool", return_value=None):
result = await handler(mock_request, exc)
@ -163,16 +153,15 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_keys_csrf_error_renders_form_with_error(self):
"""CSRF error on /setup/keys re-renders form with error message."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup/keys"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
with patch("central.gui.db.get_pool", return_value=None):
result = await handler(mock_request, exc)
@ -186,16 +175,15 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_adapters_csrf_error_renders_form_with_error(self):
"""CSRF error on /setup/adapters re-renders form with error message."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup/adapters"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
with patch("central.gui.db.get_pool", return_value=None):
result = await handler(mock_request, exc)
@ -209,16 +197,15 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_finish_csrf_error_renders_form_with_error(self):
"""CSRF error on /setup/finish re-renders form with error message."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup/finish"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
with patch("central.gui.db.get_pool", return_value=None):
result = await handler(mock_request, exc)
@ -232,17 +219,16 @@ class TestCsrfHandlerWizardPaths:
async def test_setup_base_csrf_error_redirects_to_setup(self):
"""CSRF error on /setup redirects to /setup (middleware routes to step)."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
from fastapi.responses import RedirectResponse
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/setup"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
result = await handler(mock_request, exc)
@ -253,16 +239,15 @@ class TestCsrfHandlerWizardPaths:
async def test_login_csrf_error_still_works(self):
"""CSRF error on /login still renders login form with error (regression test)."""
from central.gui import _create_app
from fastapi_csrf_protect.exceptions import TokenValidationError
from central.gui.auth import CsrfValidationError
app = _create_app()
from fastapi_csrf_protect.exceptions import CsrfProtectError
handler = app.exception_handlers.get(CsrfProtectError)
handler = app.exception_handlers.get(CsrfValidationError)
mock_request = MagicMock()
mock_request.url.path = "/login"
exc = TokenValidationError("Invalid token")
exc = CsrfValidationError("Invalid token")
result = await handler(mock_request, exc)