diff --git a/sql/migrations/014_events_time_id_index.sql b/sql/migrations/014_events_time_id_index.sql new file mode 100644 index 0000000..c006181 --- /dev/null +++ b/sql/migrations/014_events_time_id_index.sql @@ -0,0 +1,5 @@ +-- Migration 014: Add composite index for cursor-based pagination +-- Supports ORDER BY (time DESC, id DESC) with efficient range queries + +CREATE INDEX IF NOT EXISTS events_time_id_desc_idx +ON public.events (time DESC, id DESC); diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index afb12a0..0d08e77 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -1,8 +1,10 @@ """Route handlers for Central GUI.""" +import base64 import json import logging import re +from datetime import datetime from typing import Any logger = logging.getLogger("central.gui.routes") @@ -2232,3 +2234,237 @@ async def api_keys_delete( ) return RedirectResponse(url="/api-keys", status_code=302) + + +@router.get("/events.json") +async def events_json(request: Request): + """ + Paginated, filterable JSON endpoint for events. + + Query parameters (all optional): + adapter: filter by adapter name + category: filter by event category + since: ISO 8601 datetime - events where time >= since + until: ISO 8601 datetime - events where time < until + region_north, region_south, region_east, region_west: bbox filter (all four required if any) + limit: page size (default 50, max 200) + cursor: opaque pagination cursor + + Returns: + {"events": [...], "next_cursor": string or null} + """ + from fastapi.responses import JSONResponse + + params = request.query_params + + # Parse and validate limit + limit_str = params.get("limit", "50") + try: + limit = int(limit_str) + except ValueError: + return JSONResponse( + {"error": f"Invalid limit value: {limit_str}"}, + status_code=400, + ) + + if limit < 1 or limit > 200: + return JSONResponse( + {"error": "limit must be between 1 and 200"}, + status_code=400, + ) + + # Parse adapter filter + adapter = params.get("adapter") + + # Parse category filter + category = params.get("category") + + # Parse since/until filters + since = None + until = None + + since_str = params.get("since") + if since_str: + try: + since = datetime.fromisoformat(since_str.replace("Z", "+00:00")) + except ValueError: + return JSONResponse( + {"error": f"Invalid ISO 8601 datetime for since: {since_str}"}, + status_code=400, + ) + + until_str = params.get("until") + if until_str: + try: + until = datetime.fromisoformat(until_str.replace("Z", "+00:00")) + except ValueError: + return JSONResponse( + {"error": f"Invalid ISO 8601 datetime for until: {until_str}"}, + status_code=400, + ) + + # Validate since <= until + if since and until and since > until: + return JSONResponse( + {"error": "since must be before or equal to until"}, + status_code=400, + ) + + # Parse region bbox + region_north = params.get("region_north") + region_south = params.get("region_south") + region_east = params.get("region_east") + region_west = params.get("region_west") + + region_params = [region_north, region_south, region_east, region_west] + region_supplied = [p for p in region_params if p is not None] + + if len(region_supplied) > 0 and len(region_supplied) < 4: + return JSONResponse( + {"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"}, + status_code=400, + ) + + bbox = None + if len(region_supplied) == 4: + try: + bbox = { + "north": float(region_north), + "south": float(region_south), + "east": float(region_east), + "west": float(region_west), + } + except ValueError: + return JSONResponse( + {"error": "Region parameters must be valid numbers"}, + status_code=400, + ) + + # Parse cursor + cursor_time = None + cursor_id = None + cursor_str = params.get("cursor") + + if cursor_str: + try: + decoded = base64.b64decode(cursor_str).decode("utf-8") + parts = decoded.split("|", 1) + if len(parts) != 2: + raise ValueError("Invalid cursor format") + cursor_time = datetime.fromisoformat(parts[0]) + cursor_id = parts[1] + except Exception: + return JSONResponse( + {"error": "Invalid cursor"}, + status_code=400, + ) + + # Get database pool after validation + pool = get_pool() + + # Build query + conditions = [] + query_params = [] + param_idx = 1 + + if adapter: + conditions.append(f"adapter = ${param_idx}") + query_params.append(adapter) + param_idx += 1 + + if category: + conditions.append(f"category = ${param_idx}") + query_params.append(category) + param_idx += 1 + + if since: + conditions.append(f"time >= ${param_idx}") + query_params.append(since) + param_idx += 1 + + if until: + conditions.append(f"time < ${param_idx}") + query_params.append(until) + param_idx += 1 + + if bbox: + conditions.append( + f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))" + ) + query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]]) + param_idx += 4 + + if cursor_time and cursor_id: + conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})") + query_params.append(cursor_time) + query_params.append(cursor_id) + param_idx += 2 + + where_clause = "" + if conditions: + where_clause = "WHERE " + " AND ".join(conditions) + + # Fetch limit+1 to check for next page + query = f""" + SELECT + id, + time, + received, + adapter, + category, + payload->>'subject' as subject, + ST_AsGeoJSON(geom) as geometry, + payload as data, + regions + FROM public.events + {where_clause} + ORDER BY time DESC, id DESC + LIMIT ${param_idx} + """ + query_params.append(limit + 1) + + try: + async with pool.acquire() as conn: + rows = await conn.fetch(query, *query_params) + except Exception as e: + logger.error(f"Database error in events_json: {e}") + return JSONResponse( + {"error": "Database error"}, + status_code=500, + ) + + # Check if there is a next page + has_next = len(rows) > limit + if has_next: + rows = rows[:limit] + + # Build response + events = [] + for row in rows: + geometry = None + if row["geometry"]: + geometry = json.loads(row["geometry"]) + + events.append({ + "id": row["id"], + "time": row["time"].isoformat(), + "received": row["received"].isoformat(), + "adapter": row["adapter"], + "category": row["category"], + "subject": row["subject"], + "geometry": geometry, + "data": dict(row["data"]) if row["data"] else {}, + "regions": list(row["regions"]) if row["regions"] else [], + }) + + # Build next_cursor if there are more results + next_cursor = None + if has_next and events: + last_event = rows[-1] + cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}" + next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8") + + return JSONResponse({ + "events": events, + "next_cursor": next_cursor, + }) diff --git a/tests/test_events_feed.py b/tests/test_events_feed.py new file mode 100644 index 0000000..03262e2 --- /dev/null +++ b/tests/test_events_feed.py @@ -0,0 +1,530 @@ +"""Tests for events feed JSON endpoint.""" + +import base64 +import json +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.gui.routes import events_json + + +class TestEventsFeedUnauthenticated: + """Test events feed without authentication.""" + + @pytest.mark.asyncio + async def test_events_json_unauthenticated_redirects(self): + """GET /events.json without auth redirects to /login.""" + # This test verifies the session middleware behavior + # In practice, the middleware redirects before the route is called + # We verify the route requires operator in request.state + mock_request = MagicMock() + mock_request.state.operator = None + # The middleware would redirect, verified via integration tests + + +class TestEventsFeedAuthenticated: + """Test events feed with authentication.""" + + @pytest.mark.asyncio + async def test_events_json_no_filters_returns_events(self): + """GET /events.json authenticated, no filters returns recent events.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {} + + # Create mock events + mock_events = [ + { + "id": f"event_{i}", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "adapter": "nws", + "category": "Weather Alert", + "subject": f"Test Alert {i}", + "geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None, + "data": {"key": f"value_{i}"}, + "regions": ["CA", "OR"] if i % 2 == 0 else [], + } + for i in range(5) + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert "events" in body + assert len(body["events"]) == 5 + assert body["next_cursor"] is None # No next page (only 5 events) + + @pytest.mark.asyncio + async def test_events_json_with_limit(self): + """GET /events.json?limit=10 returns up to 10 events.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "3"} + + # Create 4 mock events (limit+1 to trigger pagination) + mock_events = [ + { + "id": f"event_{i}", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "adapter": "nws", + "category": "Weather Alert", + "subject": f"Test Alert {i}", + "geometry": None, + "data": {}, + "regions": [], + } + for i in range(4) # 4 events for limit=3 to trigger next_cursor + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert len(body["events"]) == 3 + assert body["next_cursor"] is not None + + @pytest.mark.asyncio + async def test_pagination_roundtrip(self): + """Fetch page 1, use next_cursor for page 2, verify no overlap.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "2"} + + # Page 1: 3 events (2 returned + next_cursor) + page1_events = [ + { + "id": "event_0", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Event 0", + "geometry": None, + "data": {}, + "regions": [], + }, + { + "id": "event_1", + "time": datetime(2026, 5, 17, 11, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 11, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Event 1", + "geometry": None, + "data": {}, + "regions": [], + }, + { + "id": "event_2", + "time": datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Event 2", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = page1_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result1 = await events_json(mock_request) + + body1 = json.loads(result1.body) + assert len(body1["events"]) == 2 + assert body1["next_cursor"] is not None + page1_ids = {e["id"] for e in body1["events"]} + + # Page 2 with cursor + mock_request.query_params = {"limit": "2", "cursor": body1["next_cursor"]} + + page2_events = [ + { + "id": "event_2", + "time": datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 10, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Event 2", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + mock_conn.fetch.return_value = page2_events + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result2 = await events_json(mock_request) + + body2 = json.loads(result2.body) + page2_ids = {e["id"] for e in body2["events"]} + + # No overlap + assert page1_ids.isdisjoint(page2_ids) + + @pytest.mark.asyncio + async def test_filter_by_adapter(self): + """GET /events.json?adapter=nws returns only nws events.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"adapter": "nws"} + + mock_events = [ + { + "id": "nws_event_1", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Weather Alert", + "subject": "NWS Alert", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert all(e["adapter"] == "nws" for e in body["events"]) + + @pytest.mark.asyncio + async def test_filter_by_adapter_and_category(self): + """GET /events.json?adapter=nws&category=Test Alert filters by both.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"adapter": "nws", "category": "Test Alert"} + + mock_events = [ + { + "id": "filtered_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Test Alert", + "subject": "Filtered Alert", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + for event in body["events"]: + assert event["adapter"] == "nws" + assert event["category"] == "Test Alert" + + @pytest.mark.asyncio + async def test_filter_by_since_until(self): + """GET /events.json?since=&until= filters by time window.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = { + "since": "2026-05-17T00:00:00Z", + "until": "2026-05-17T12:00:00Z", + } + + mock_events = [ + { + "id": "in_range_event", + "time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "In Range", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + + @pytest.mark.asyncio + async def test_filter_by_region_bbox_includes_geometry_inside(self): + """Region bbox filter includes events with geometry inside bbox.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = { + "region_north": "49.5", + "region_south": "31", + "region_east": "-102", + "region_west": "-124.5", + } + + mock_events = [ + { + "id": "inside_bbox", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Inside BBox", + "geometry": '{"type": "Point", "coordinates": [-120, 40]}', + "data": {}, + "regions": ["CA"], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert len(body["events"]) == 1 + assert body["events"][0]["geometry"] is not None + + @pytest.mark.asyncio + async def test_empty_result_returns_200(self): + """GET /events.json?adapter=nonexistent returns 200 with empty list.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"adapter": "nonexistent"} + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = [] + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert body["events"] == [] + assert body["next_cursor"] is None + + +class TestEventsFeedValidation: + """Test events feed validation errors.""" + + @pytest.mark.asyncio + async def test_limit_zero_returns_400(self): + """GET /events.json?limit=0 returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "0"} + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "limit" in body["error"].lower() + + @pytest.mark.asyncio + async def test_limit_too_large_returns_400(self): + """GET /events.json?limit=500 returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "500"} + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "limit" in body["error"].lower() + + @pytest.mark.asyncio + async def test_partial_region_returns_400(self): + """GET /events.json?region_north=49 alone returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"region_north": "49"} + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "region" in body["error"].lower() + + @pytest.mark.asyncio + async def test_invalid_since_returns_400(self): + """GET /events.json?since=garbage returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"since": "garbage"} + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "since" in body["error"].lower() + + @pytest.mark.asyncio + async def test_invalid_cursor_returns_400(self): + """GET /events.json?cursor=garbage returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"cursor": "garbage"} + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "cursor" in body["error"].lower() + + @pytest.mark.asyncio + async def test_since_after_until_returns_400(self): + """GET /events.json?since=2026-01-01&until=2025-01-01 returns 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = { + "since": "2026-01-01T00:00:00Z", + "until": "2025-01-01T00:00:00Z", + } + + result = await events_json(mock_request) + + assert result.status_code == 400 + body = json.loads(result.body) + assert "error" in body + assert "since" in body["error"].lower() or "until" in body["error"].lower() + + +class TestEventsFeedGeometry: + """Test geometry handling in events feed.""" + + @pytest.mark.asyncio + async def test_geometry_parsed_as_object(self): + """Geometry is returned as GeoJSON object, not string.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {} + + mock_events = [ + { + "id": "geom_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "With Geometry", + "geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}', + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert len(body["events"]) == 1 + geom = body["events"][0]["geometry"] + assert isinstance(geom, dict) + assert geom["type"] == "Polygon" + + @pytest.mark.asyncio + async def test_null_geometry_returned_as_null(self): + """Events with null geometry return null, not string.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {} + + mock_events = [ + { + "id": "no_geom_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "No Geometry", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_json(mock_request) + + assert result.status_code == 200 + body = json.loads(result.body) + assert body["events"][0]["geometry"] is None