fix(gui): handle revisiting /setup/operator after operator created

When an operator already exists, /setup/operator now shows a
confirmation page instead of the create form. This prevents:
- Unique constraint violations on duplicate username
- Silent creation of duplicate operators

GET /setup/operator: queries config.operators; if any exist,
renders confirmation state with existing_operator context.

POST /setup/operator: checks operator count before INSERT; if
non-zero, renders confirmation state without inserting.

Template updated with conditional to show "Operator Already
Configured" message when existing_operator is set.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zvx-echo6 2026-05-17 20:08:50 -06:00
commit 84044a4d45
3 changed files with 145 additions and 3 deletions

View file

@ -266,11 +266,27 @@ async def setup_operator_form(
) -> HTMLResponse:
"""Render the setup operator form (step 1)."""
templates = _get_templates()
pool = get_pool()
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
# Check if operator already exists
existing_operator = None
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT username FROM config.operators ORDER BY id LIMIT 1"
)
if row:
existing_operator = {"username": row["username"]}
response = templates.TemplateResponse(
request=request,
name="setup_operator.html",
context={"csrf_token": csrf_token, "error": None, "form_data": None},
context={
"csrf_token": csrf_token,
"error": None,
"form_data": None,
"existing_operator": existing_operator,
},
)
csrf_protect.set_csrf_cookie(signed_token, response)
return response
@ -291,6 +307,28 @@ async def setup_operator_submit(
# Validate CSRF
await csrf_protect.validate_csrf(request)
# Check if operator already exists (single-operator-per-install design)
async with pool.acquire() as conn:
count = await conn.fetchval("SELECT count(*) FROM config.operators")
if count > 0:
# Operator already exists — render confirmation page
existing = await conn.fetchrow(
"SELECT username FROM config.operators ORDER BY id LIMIT 1"
)
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
response = templates.TemplateResponse(
request=request,
name="setup_operator.html",
context={
"csrf_token": csrf_token,
"error": None,
"form_data": None,
"existing_operator": {"username": existing["username"]},
},
)
csrf_protect.set_csrf_cookie(signed_token, response)
return response
# Validate input
error = None
if password != confirm_password:
@ -310,6 +348,7 @@ async def setup_operator_submit(
"csrf_token": csrf_token,
"error": error,
"form_data": {"username": username},
"existing_operator": None,
},
status_code=200,
)

View file

@ -7,6 +7,17 @@
{% include "_wizard_header.html" %}
{% endwith %}
{% if existing_operator %}
<article>
<header>
<h1>Operator Already Configured</h1>
</header>
<p>The operator account <strong>{{ existing_operator.username }}</strong> has been created.</p>
<div style="display: flex; gap: 1rem; margin-top: 1rem;">
<a href="/setup/system" role="button">Next &rarr;</a>
</div>
</article>
{% else %}
<article>
<header>
<h1>Create Operator Account</h1>
@ -42,4 +53,5 @@
<button type="submit">Create Operator &rarr;</button>
</form>
</article>
{% endif %}
{% endblock %}

View file

@ -74,24 +74,64 @@ class TestSetupOperatorForm:
@pytest.mark.asyncio
async def test_get_returns_form(self):
"""GET /setup/operator returns the form."""
"""GET /setup/operator returns the form when no operator exists."""
mock_request = MagicMock()
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock()
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = None # No operator exists
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
mock_pool.acquire.return_value.__aexit__.return_value = None
mock_csrf = MagicMock()
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
mock_csrf.set_csrf_cookie = MagicMock()
with patch("central.gui.routes._get_templates", return_value=mock_templates):
result = await setup_operator_form(mock_request, mock_csrf)
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await setup_operator_form(mock_request, mock_csrf)
mock_templates.TemplateResponse.assert_called_once()
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["csrf_token"] == "token"
assert context["error"] is None
assert context["existing_operator"] is None
@pytest.mark.asyncio
async def test_get_returns_confirmation_when_operator_exists(self):
"""GET /setup/operator shows confirmation when operator already exists."""
mock_request = MagicMock()
mock_templates = MagicMock()
mock_response = MagicMock()
mock_response.body = b"Operator Already Configured"
mock_templates.TemplateResponse.return_value = mock_response
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {"username": "admin"} # Operator exists
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
mock_pool.acquire.return_value.__aexit__.return_value = None
mock_csrf = MagicMock()
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
mock_csrf.set_csrf_cookie = MagicMock()
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await setup_operator_form(mock_request, mock_csrf)
mock_templates.TemplateResponse.assert_called_once()
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["existing_operator"] == {"username": "admin"}
assert context["error"] is None
class TestSetupOperatorSubmit:
@ -104,7 +144,12 @@ class TestSetupOperatorSubmit:
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock()
mock_conn = AsyncMock()
mock_conn.fetchval.return_value = 0 # No existing operators
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
mock_pool.acquire.return_value.__aexit__.return_value = None
mock_csrf = MagicMock()
mock_csrf.validate_csrf = AsyncMock()
@ -131,6 +176,7 @@ class TestSetupOperatorSubmit:
mock_request = MagicMock()
mock_conn = AsyncMock()
mock_conn.fetchval.return_value = 0 # No existing operators
mock_conn.fetchrow.side_effect = [
{"id": 1}, # INSERT RETURNING id
{"session_lifetime_days": 90}, # system settings
@ -159,6 +205,51 @@ class TestSetupOperatorSubmit:
assert result.status_code == 302
assert result.headers["location"] == "/setup/system"
@pytest.mark.asyncio
async def test_post_when_operator_exists_shows_confirmation(self):
"""POST when operator exists returns 200 with confirmation, no insert."""
mock_request = MagicMock()
mock_templates = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_templates.TemplateResponse.return_value = mock_response
mock_conn = AsyncMock()
mock_conn.fetchval.return_value = 1 # Operator already exists
mock_conn.fetchrow.return_value = {"username": "existing_admin"}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__.return_value = mock_conn
mock_pool.acquire.return_value.__aexit__.return_value = None
mock_csrf = MagicMock()
mock_csrf.validate_csrf = AsyncMock()
mock_csrf.generate_csrf_tokens.return_value = ("token", "signed")
mock_csrf.set_csrf_cookie = MagicMock()
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
with patch("central.gui.routes.write_audit", new_callable=AsyncMock) as mock_audit:
result = await setup_operator_submit(
mock_request,
username="newadmin",
password="password123",
confirm_password="password123",
csrf_protect=mock_csrf,
)
# Should return 200, not 500 or redirect
assert result.status_code == 200
# Should render confirmation state
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert context["existing_operator"] == {"username": "existing_admin"}
# Should NOT call write_audit (no insert happened)
mock_audit.assert_not_called()
class TestSetupSystemForm:
"""Test system settings form (step 2)."""