feat(api): add paginated events feed JSON endpoint (#25)

GET /events.json with cursor-based pagination and filtering:
- Filter by adapter, category, since/until, region bbox
- Cursor pagination via (time DESC, id DESC) ordering
- Returns events with GeoJSON geometry parsed as objects
- Validation returns 400 with clear error messages

Migration 014 adds composite index for efficient pagination.

Tests: 17 new tests covering filters, pagination, validation.

Co-authored-by: Matt Johnson <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
malice 2026-05-17 22:31:00 -06:00 committed by GitHub
commit 246cd75051
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 771 additions and 0 deletions

View file

@ -1,8 +1,10 @@
"""Route handlers for Central GUI."""
import base64
import json
import logging
import re
from datetime import datetime
from typing import Any
logger = logging.getLogger("central.gui.routes")
@ -2232,3 +2234,237 @@ async def api_keys_delete(
)
return RedirectResponse(url="/api-keys", status_code=302)
@router.get("/events.json")
async def events_json(request: Request):
"""
Paginated, filterable JSON endpoint for events.
Query parameters (all optional):
adapter: filter by adapter name
category: filter by event category
since: ISO 8601 datetime - events where time >= since
until: ISO 8601 datetime - events where time < until
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
limit: page size (default 50, max 200)
cursor: opaque pagination cursor
Returns:
{"events": [...], "next_cursor": string or null}
"""
from fastapi.responses import JSONResponse
params = request.query_params
# Parse and validate limit
limit_str = params.get("limit", "50")
try:
limit = int(limit_str)
except ValueError:
return JSONResponse(
{"error": f"Invalid limit value: {limit_str}"},
status_code=400,
)
if limit < 1 or limit > 200:
return JSONResponse(
{"error": "limit must be between 1 and 200"},
status_code=400,
)
# Parse adapter filter
adapter = params.get("adapter")
# Parse category filter
category = params.get("category")
# Parse since/until filters
since = None
until = None
since_str = params.get("since")
if since_str:
try:
since = datetime.fromisoformat(since_str.replace("Z", "+00:00"))
except ValueError:
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for since: {since_str}"},
status_code=400,
)
until_str = params.get("until")
if until_str:
try:
until = datetime.fromisoformat(until_str.replace("Z", "+00:00"))
except ValueError:
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for until: {until_str}"},
status_code=400,
)
# Validate since <= until
if since and until and since > until:
return JSONResponse(
{"error": "since must be before or equal to until"},
status_code=400,
)
# Parse region bbox
region_north = params.get("region_north")
region_south = params.get("region_south")
region_east = params.get("region_east")
region_west = params.get("region_west")
region_params = [region_north, region_south, region_east, region_west]
region_supplied = [p for p in region_params if p is not None]
if len(region_supplied) > 0 and len(region_supplied) < 4:
return JSONResponse(
{"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"},
status_code=400,
)
bbox = None
if len(region_supplied) == 4:
try:
bbox = {
"north": float(region_north),
"south": float(region_south),
"east": float(region_east),
"west": float(region_west),
}
except ValueError:
return JSONResponse(
{"error": "Region parameters must be valid numbers"},
status_code=400,
)
# Parse cursor
cursor_time = None
cursor_id = None
cursor_str = params.get("cursor")
if cursor_str:
try:
decoded = base64.b64decode(cursor_str).decode("utf-8")
parts = decoded.split("|", 1)
if len(parts) != 2:
raise ValueError("Invalid cursor format")
cursor_time = datetime.fromisoformat(parts[0])
cursor_id = parts[1]
except Exception:
return JSONResponse(
{"error": "Invalid cursor"},
status_code=400,
)
# Get database pool after validation
pool = get_pool()
# Build query
conditions = []
query_params = []
param_idx = 1
if adapter:
conditions.append(f"adapter = ${param_idx}")
query_params.append(adapter)
param_idx += 1
if category:
conditions.append(f"category = ${param_idx}")
query_params.append(category)
param_idx += 1
if since:
conditions.append(f"time >= ${param_idx}")
query_params.append(since)
param_idx += 1
if until:
conditions.append(f"time < ${param_idx}")
query_params.append(until)
param_idx += 1
if bbox:
conditions.append(
f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))"
)
query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]])
param_idx += 4
if cursor_time and cursor_id:
conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})")
query_params.append(cursor_time)
query_params.append(cursor_id)
param_idx += 2
where_clause = ""
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
# Fetch limit+1 to check for next page
query = f"""
SELECT
id,
time,
received,
adapter,
category,
payload->>'subject' as subject,
ST_AsGeoJSON(geom) as geometry,
payload as data,
regions
FROM public.events
{where_clause}
ORDER BY time DESC, id DESC
LIMIT ${param_idx}
"""
query_params.append(limit + 1)
try:
async with pool.acquire() as conn:
rows = await conn.fetch(query, *query_params)
except Exception as e:
logger.error(f"Database error in events_json: {e}")
return JSONResponse(
{"error": "Database error"},
status_code=500,
)
# Check if there is a next page
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
# Build response
events = []
for row in rows:
geometry = None
if row["geometry"]:
geometry = json.loads(row["geometry"])
events.append({
"id": row["id"],
"time": row["time"].isoformat(),
"received": row["received"].isoformat(),
"adapter": row["adapter"],
"category": row["category"],
"subject": row["subject"],
"geometry": geometry,
"data": dict(row["data"]) if row["data"] else {},
"regions": list(row["regions"]) if row["regions"] else [],
})
# Build next_cursor if there are more results
next_cursor = None
if has_next and events:
last_event = rows[-1]
cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}"
next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8")
return JSONResponse({
"events": events,
"next_cursor": next_cursor,
})