refactor(events): use shared helpers for /events.json, fix tests

- Refactor /events.json to use _parse_events_params and _fetch_events
  helpers, removing ~200 lines of duplicate query logic
- Delete smoke test (test_events_unauthenticated_redirects) that had
  no assertions
- Add TestCrossEndpointParity: verify /events.json and /events return
  identical results with same params, test category filter and cursor
  pagination on both endpoints
- Add TestErrorSemantics: verify /events.json returns 400 on bad params
  while /events returns 200 with error banner (intentional API vs HTML
  divergence)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-18 15:20:28 +00:00
commit c91cd10519
2 changed files with 189 additions and 227 deletions

View file

@ -2486,216 +2486,19 @@ async def events_json(request: Request):
params = request.query_params
# Parse and validate limit
limit_str = params.get("limit", "50")
try:
limit = int(limit_str)
except ValueError:
return JSONResponse(
{"error": f"Invalid limit value: {limit_str}"},
status_code=400,
)
# Parse and validate parameters using shared helper
parsed, error = _parse_events_params(params)
if error:
return JSONResponse({"error": error}, status_code=400)
if limit < 1 or limit > 200:
return JSONResponse(
{"error": "limit must be between 1 and 200"},
status_code=400,
)
# Parse adapter filter
adapter = params.get("adapter")
# Parse category filter
category = params.get("category")
# Parse since/until filters
since = None
until = None
since_str = params.get("since")
if since_str:
try:
since = datetime.fromisoformat(since_str.replace("Z", "+00:00"))
except ValueError:
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for since: {since_str}"},
status_code=400,
)
until_str = params.get("until")
if until_str:
try:
until = datetime.fromisoformat(until_str.replace("Z", "+00:00"))
except ValueError:
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for until: {until_str}"},
status_code=400,
)
# Validate since <= until
if since and until and since > until:
return JSONResponse(
{"error": "since must be before or equal to until"},
status_code=400,
)
# Parse region bbox
region_north = params.get("region_north")
region_south = params.get("region_south")
region_east = params.get("region_east")
region_west = params.get("region_west")
region_params = [region_north, region_south, region_east, region_west]
region_supplied = [p for p in region_params if p is not None]
if len(region_supplied) > 0 and len(region_supplied) < 4:
return JSONResponse(
{"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"},
status_code=400,
)
bbox = None
if len(region_supplied) == 4:
try:
bbox = {
"north": float(region_north),
"south": float(region_south),
"east": float(region_east),
"west": float(region_west),
}
except ValueError:
return JSONResponse(
{"error": "Region parameters must be valid numbers"},
status_code=400,
)
# Parse cursor
cursor_time = None
cursor_id = None
cursor_str = params.get("cursor")
if cursor_str:
try:
decoded = base64.b64decode(cursor_str).decode("utf-8")
parts = decoded.split("|", 1)
if len(parts) != 2:
raise ValueError("Invalid cursor format")
cursor_time = datetime.fromisoformat(parts[0])
cursor_id = parts[1]
except Exception:
return JSONResponse(
{"error": "Invalid cursor"},
status_code=400,
)
# Get database pool after validation
pool = get_pool()
# Build query
conditions = []
query_params = []
param_idx = 1
if adapter:
conditions.append(f"adapter = ${param_idx}")
query_params.append(adapter)
param_idx += 1
if category:
conditions.append(f"category = ${param_idx}")
query_params.append(category)
param_idx += 1
if since:
conditions.append(f"time >= ${param_idx}")
query_params.append(since)
param_idx += 1
if until:
conditions.append(f"time < ${param_idx}")
query_params.append(until)
param_idx += 1
if bbox:
conditions.append(
f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))"
)
query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]])
param_idx += 4
if cursor_time and cursor_id:
conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})")
query_params.append(cursor_time)
query_params.append(cursor_id)
param_idx += 2
where_clause = ""
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
# Fetch limit+1 to check for next page
query = f"""
SELECT
id,
time,
received,
adapter,
category,
payload->>'subject' as subject,
ST_AsGeoJSON(geom) as geometry,
payload as data,
regions
FROM public.events
{where_clause}
ORDER BY time DESC, id DESC
LIMIT ${param_idx}
"""
query_params.append(limit + 1)
try:
async with pool.acquire() as conn:
rows = await conn.fetch(query, *query_params)
except Exception as e:
logger.error(f"Database error in events_json: {e}")
return JSONResponse(
{"error": "Database error"},
status_code=500,
)
# Check if there is a next page
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
# Build response
events = []
for row in rows:
geometry = None
if row["geometry"]:
geometry = json.loads(row["geometry"])
events.append({
"id": row["id"],
"time": row["time"].isoformat(),
"received": row["received"].isoformat(),
"adapter": row["adapter"],
"category": row["category"],
"subject": row["subject"],
"geometry": geometry,
"data": dict(row["data"]) if row["data"] else {},
"regions": list(row["regions"]) if row["regions"] else [],
})
# Build next_cursor if there are more results
next_cursor = None
if has_next and events:
last_event = rows[-1]
cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}"
next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8")
# Fetch events using shared helper
result = await _fetch_events(parsed)
if result.error:
return JSONResponse({"error": result.error}, status_code=500)
return JSONResponse({
"events": events,
"next_cursor": next_cursor,
"events": result.events,
"next_cursor": result.next_cursor,
})

View file

@ -6,20 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.gui.routes import events_list, events_rows
class TestEventsFeedFrontendUnauthenticated:
"""Test events feed frontend without authentication."""
@pytest.mark.asyncio
async def test_events_unauthenticated_redirects(self):
"""GET /events without auth redirects to /login."""
# This test verifies the session middleware behavior
# In practice, the middleware redirects before the route is called
mock_request = MagicMock()
mock_request.state.operator = None
# The middleware would redirect, verified via integration tests
from central.gui.routes import events_list, events_rows, events_json
class TestEventsFeedFrontendAuthenticated:
@ -458,3 +445,175 @@ class TestDataGeometryAttribute:
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
event = context["events"][0]
assert event["geometry"] is None
class TestCrossEndpointParity:
"""Test that /events.json and /events return the same filtered results."""
@pytest.mark.asyncio
async def test_category_filter_both_endpoints(self):
"""Category filter works on both /events.json and /events."""
mock_events = [
{
"id": "weather_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Weather Alert",
"subject": "Weather Event",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
query_params = {"category": "Weather Alert"}
# Test /events.json
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = query_params
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
assert len(json_data["events"]) == 1
assert json_data["events"][0]["category"] == "Weather Alert"
# Test /events
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = query_params
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = mock_events
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert len(html_context["events"]) == 1
assert html_context["events"][0]["category"] == "Weather Alert"
@pytest.mark.asyncio
async def test_cursor_pagination_both_endpoints(self):
"""Cursor pagination works identically on both endpoints."""
first_page = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Alert",
"subject": f"Event {i}",
"geometry": None,
"data": {},
"regions": [],
}
for i in range(3)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = first_page
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = {"limit": "2"}
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
json_cursor = json_data["next_cursor"]
assert json_cursor is not None
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = {"limit": "2"}
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = first_page
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
html_cursor = html_context["next_cursor"]
assert json_cursor == html_cursor
class TestErrorSemantics:
"""Test error handling differences between JSON and HTML endpoints."""
@pytest.mark.asyncio
async def test_json_endpoint_returns_400_on_invalid_limit(self):
"""/events.json?limit=0 returns 400 JSON error."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {"limit": "0"}
response = await events_json(mock_request)
assert response.status_code == 400
data = json.loads(response.body)
assert "error" in data
@pytest.mark.asyncio
async def test_html_endpoint_returns_200_with_error_banner(self):
"""/events?limit=0 returns 200 HTML with error banner."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf"
mock_request.query_params = {"limit": "0"}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_error"] is not None
assert "limit" in context["filter_error"].lower()
assert context["events"] == []