diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6b237..9b5711a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,63 @@ # Changelog +## v0.3.0 — Phase 1b (2026-05-18) + +Operator console. FastAPI + Jinja2 + Pico + HTMX. Self-hosted, +Tailscale-gated by default, no application-level auth beyond +the operator session. + +### Added +- Operator console (`central-gui` systemd service on port 8000) +- Login + session auth (argon2id, 90-day DB-backed sessions) +- Dashboard: events 24h by adapter, stream sizes, + last-poll-time per adapter +- Adapters list and edit page (cadence + per-adapter settings), + with Leaflet region picker and click-to-draw rectangles +- Streams view with retention chips (1d / 7d / 14d / 30d / + 365d / custom) +- API keys management (list / add / rotate / delete, + encrypted at rest via `crypto.encrypt`, plaintext never + logged or stored) +- First-run wizard (5 steps: operator, system, keys, adapters, + finish) with deferred-commit pattern — no DB writes until + Finish runs as a single transaction +- Events feed page (`/events`) — paginated, filterable by + adapter / category / time range / map viewport, with + color-coded geometry overlay, click-to-popup, and + expandable row details showing full event payload +- Paginated events JSON API (`/events.json`) — cursor-based + pagination, same filter surface as the HTML feed + +### Changed +- CSRF tokens are now session-bound (synchronizer token + pattern), replacing the previous fastapi-csrf-protect + library. Eliminates a rotation race that broke first-load + submissions +- First-run wizard is a single atomic transaction at Finish, + not per-step DB writes. Back navigation works; abandoned + wizards leave no orphan rows + +### Fixed +- Adapter editor's JSONB double-encoding bug (write path + called `json.dumps` before asyncpg's codec, corrupting + the settings column) +- Dashboard polls card was reading from the wrong NATS + subject and using a durable consumer instead of + `get_last_msg`, leaking zombie consumers +- Browser-noise paths (/favicon.ico, /apple-touch-icon.png, + /robots.txt) return 204 directly, preventing parallel + requests from racing the CSRF cookie on first page load +- SubResource Integrity hashes for leaflet-draw assets + corrected (previous values were fabricated and silently + blocked by browsers) + +### Infrastructure +- New `config.sessions` column: `csrf_token` (per-session + synchronizer) +- Composite index on `public.events (time DESC, id DESC)` + for cursor pagination +- `central-gui` systemd service + ## v0.2.0 — Phase 1a (2026-05-16) Three live data sources, configurable infrastructure, hot-reload diff --git a/docs/environment.md b/docs/environment.md index 9659443..f2afce4 100644 --- a/docs/environment.md +++ b/docs/environment.md @@ -28,6 +28,32 @@ The Windows workstation (matt-desktop) has no Central repository clones. The directory `C:\Users\mtthw\central_work\` is scratch space only and should not be used for commits. + +## Network and Service Bindings + +### Bind Address + +`central-gui` binds to `0.0.0.0` by design. Network gating is the +operator's responsibility (firewall, Tailscale, etc.), not the app's. +Do not switch to `127.0.0.1` or to a specific interface — operators +choose their bind via whatever network they want to expose the service on. + +### NATS Listener Ports + +The default `nats-server.conf` listens on more than just :4222: + +| Port | Protocol | Used by Central? | +|------|----------|------------------| +| 4222 | NATS client | Yes (all) | +| 8080 | WebSocket | No (Phase 0 leftover) | +| 8222 | HTTP monitoring | No (manual ops only) | +| 1883 | MQTT | No (Phase 0 leftover) | + +None of the unused ports cause active harm — they listen but no consumer +connects. Operators can remove them from `nats-server.conf` if they want +a tighter footprint. Documenting so future contributors don't grep for +"MQTT integration" and come up confused. + ## Repository | Property | Value | diff --git a/pyproject.toml b/pyproject.toml index c101844..2922e9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "central" -version = "0.1.0" +version = "0.3.0" requires-python = ">=3.12,<3.13" description = "Data hub spine — adapters, bus, archive." readme = "README.md" diff --git a/src/central/adapter.py b/src/central/adapter.py index 2037eef..276a9cf 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -4,6 +4,8 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from typing import TYPE_CHECKING +from pydantic import BaseModel + if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -16,9 +18,24 @@ class SourceAdapter(ABC): Adapters yield Events. The supervisor handles scheduling, CloudEvents wrapping, publish, and metadata heartbeats. + + Class attributes that subclasses must define: + name: Short identifier, e.g. "nws" + display_name: Human-readable name for GUI + description: Short description of the adapter + settings_schema: Pydantic model class for adapter settings + requires_api_key: Key alias if API key required, else None + wizard_order: Order in setup wizard (None = not in wizard) + default_cadence_s: Default polling interval in seconds """ - name: str # short identifier, e.g. "nws" + name: str + display_name: str + description: str + settings_schema: type[BaseModel] + requires_api_key: str | None = None + wizard_order: int | None = None + default_cadence_s: int @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -40,6 +57,16 @@ class SourceAdapter(ABC): """ ... + @abstractmethod + def subject_for(self, event: Event) -> str: + """ + Compute the NATS subject for an event. + + Each adapter knows its own subject hierarchy. The supervisor + calls this to determine where to publish each event. + """ + ... + async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 3f746fa..7538d96 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -18,6 +18,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -49,10 +51,23 @@ SEVERITY_MAP = { } +class FIRMSSettings(BaseModel): + """Settings schema for FIRMS adapter.""" + api_key_alias: str = "firms" + satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + region: RegionConfig | None = None + + class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" + display_name = "NASA FIRMS Fire Hotspots" + description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." + settings_schema = FIRMSSettings + requires_api_key = "firms" + wizard_order = 2 + default_cadence_s = 300 def __init__( self, @@ -116,6 +131,15 @@ class FIRMSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a fire hotspot event. + + Subject format: central.fire.hotspot.. + The category already contains this structure. + """ + return f"central.{event.category}" + + async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" # Fetch API key @@ -417,14 +441,3 @@ class FIRMSAdapter(SourceAdapter): }, ) - -def subject_for_fire_hotspot(ev: Event) -> str: - """Compute the NATS subject for a fire hotspot event. - - Subject format: central.fire.hotspot.. - - The category already contains the satellite and confidence info, - so we just prefix with 'central.'. - """ - # category is "fire.hotspot.." - return f"central.{ev.category}" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 129584f..ce95d3a 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,6 +19,8 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -189,10 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: return sorted(regions) +class NWSSettings(BaseModel): + """Settings schema for NWS adapter.""" + contact_email: str = "" + region: RegionConfig | None = None + + class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" + display_name = "NWS Weather Alerts" + description = "National Weather Service active alerts via api.weather.gov." + settings_schema = NWSSettings + requires_api_key = None + wizard_order = 1 + default_cadence_s = 60 def __init__( self, @@ -234,6 +248,35 @@ class NWSAdapter(SourceAdapter): }, ) + def subject_for(self, event: Event) -> str: + """Compute NATS subject for a weather alert. + + Subject format: central.wx.alert.us... + where type is 'county' or 'zone' based on primary_region format. + """ + prefix = "central.wx" + + if event.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = event.geo.primary_region + + # Parse US-- format + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" + + def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 601c52b..e73148f 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -17,6 +17,8 @@ from tenacity import ( ) from central.adapter import SourceAdapter +from pydantic import BaseModel + from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -60,10 +62,22 @@ def magnitude_to_severity(mag: float) -> int: return 5 +class USGSQuakeSettings(BaseModel): + """Settings schema for USGS quake adapter.""" + feed: str = "all_hour" + region: RegionConfig | None = None + + class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" + display_name = "USGS Earthquakes" + description = "USGS earthquake feed (configurable window)." + settings_schema = USGSQuakeSettings + requires_api_key = None + wizard_order = 3 + default_cadence_s = 60 def __init__( self, @@ -398,3 +412,9 @@ class USGSQuakeAdapter(SourceAdapter): new_count += 1 logger.info("USGS quake yielded events", extra={"count": new_count}) + + def subject_for(self, event: Event) -> str: + """Return NATS subject for quake event.""" + return f"central.{event.category}" + + diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index f86b6e1..3a415c2 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2197,95 +2197,90 @@ async def api_keys_delete( return RedirectResponse(url="/api-keys", status_code=302) -@router.get("/events.json") -async def events_json(request: Request): + + +# --- Events query helper --- + +class EventsQueryResult: + """Result from events query.""" + def __init__(self, events: list, next_cursor: str | None, error: str | None = None): + self.events = events + self.next_cursor = next_cursor + self.error = error + + +def _parse_events_params(params) -> tuple[dict | None, str | None]: """ - Paginated, filterable JSON endpoint for events. - - Query parameters (all optional): - adapter: filter by adapter name - category: filter by event category - since: ISO 8601 datetime - events where time >= since - until: ISO 8601 datetime - events where time < until - region_north, region_south, region_east, region_west: bbox filter (all four required if any) - limit: page size (default 50, max 200) - cursor: opaque pagination cursor - + Parse and validate events query parameters. + Returns: - {"events": [...], "next_cursor": string or null} + (parsed_params, error_message) + If error_message is not None, parsed_params is None. """ - from fastapi.responses import JSONResponse - - params = request.query_params - # Parse and validate limit limit_str = params.get("limit", "50") try: limit = int(limit_str) except ValueError: - return JSONResponse( - {"error": f"Invalid limit value: {limit_str}"}, - status_code=400, - ) - + return None, f"Invalid limit value: {limit_str}" + if limit < 1 or limit > 200: - return JSONResponse( - {"error": "limit must be between 1 and 200"}, - status_code=400, - ) - + return None, "limit must be between 1 and 200" + # Parse adapter filter adapter = params.get("adapter") - - # Parse category filter + if adapter == "": + adapter = None + + # Parse category filter category = params.get("category") - + if category == "": + category = None + # Parse since/until filters since = None until = None - + since_str = params.get("since") if since_str: try: since = datetime.fromisoformat(since_str.replace("Z", "+00:00")) except ValueError: - return JSONResponse( - {"error": f"Invalid ISO 8601 datetime for since: {since_str}"}, - status_code=400, - ) - + return None, f"Invalid ISO 8601 datetime for since: {since_str}" + until_str = params.get("until") if until_str: try: until = datetime.fromisoformat(until_str.replace("Z", "+00:00")) except ValueError: - return JSONResponse( - {"error": f"Invalid ISO 8601 datetime for until: {until_str}"}, - status_code=400, - ) - + return None, f"Invalid ISO 8601 datetime for until: {until_str}" + # Validate since <= until if since and until and since > until: - return JSONResponse( - {"error": "since must be before or equal to until"}, - status_code=400, - ) - + return None, "since must be before or equal to until" + # Parse region bbox region_north = params.get("region_north") region_south = params.get("region_south") region_east = params.get("region_east") region_west = params.get("region_west") - + + # Treat empty strings as None + if region_north == "": + region_north = None + if region_south == "": + region_south = None + if region_east == "": + region_east = None + if region_west == "": + region_west = None + region_params = [region_north, region_south, region_east, region_west] region_supplied = [p for p in region_params if p is not None] - + if len(region_supplied) > 0 and len(region_supplied) < 4: - return JSONResponse( - {"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"}, - status_code=400, - ) - + return None, "Region filter requires all four parameters: region_north, region_south, region_east, region_west" + bbox = None if len(region_supplied) == 4: try: @@ -2296,16 +2291,13 @@ async def events_json(request: Request): "west": float(region_west), } except ValueError: - return JSONResponse( - {"error": "Region parameters must be valid numbers"}, - status_code=400, - ) - + return None, "Region parameters must be valid numbers" + # Parse cursor cursor_time = None cursor_id = None cursor_str = params.get("cursor") - + if cursor_str: try: decoded = base64.b64decode(cursor_str).decode("utf-8") @@ -2315,59 +2307,82 @@ async def events_json(request: Request): cursor_time = datetime.fromisoformat(parts[0]) cursor_id = parts[1] except Exception: - return JSONResponse( - {"error": "Invalid cursor"}, - status_code=400, - ) - - # Get database pool after validation + return None, "Invalid cursor" + + return { + "limit": limit, + "adapter": adapter, + "category": category, + "since": since, + "until": until, + "bbox": bbox, + "cursor_time": cursor_time, + "cursor_id": cursor_id, + }, None + + +async def _fetch_events(parsed_params: dict) -> EventsQueryResult: + """ + Fetch events from database using parsed parameters. + + Returns EventsQueryResult with events list, next_cursor, and optional error. + """ pool = get_pool() - + + limit = parsed_params["limit"] + adapter = parsed_params["adapter"] + category = parsed_params["category"] + since = parsed_params["since"] + until = parsed_params["until"] + bbox = parsed_params["bbox"] + cursor_time = parsed_params["cursor_time"] + cursor_id = parsed_params["cursor_id"] + # Build query conditions = [] query_params = [] param_idx = 1 - + if adapter: conditions.append(f"adapter = ${param_idx}") query_params.append(adapter) param_idx += 1 - + if category: conditions.append(f"category = ${param_idx}") query_params.append(category) param_idx += 1 - + if since: conditions.append(f"time >= ${param_idx}") query_params.append(since) param_idx += 1 - + if until: conditions.append(f"time < ${param_idx}") query_params.append(until) param_idx += 1 - + if bbox: conditions.append( f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))" ) query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]]) param_idx += 4 - + if cursor_time and cursor_id: conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})") query_params.append(cursor_time) query_params.append(cursor_id) param_idx += 2 - + where_clause = "" if conditions: where_clause = "WHERE " + " AND ".join(conditions) - + # Fetch limit+1 to check for next page query = f""" - SELECT + SELECT id, time, received, @@ -2383,29 +2398,26 @@ async def events_json(request: Request): LIMIT ${param_idx} """ query_params.append(limit + 1) - + try: async with pool.acquire() as conn: rows = await conn.fetch(query, *query_params) except Exception as e: - logger.error(f"Database error in events_json: {e}") - return JSONResponse( - {"error": "Database error"}, - status_code=500, - ) - + logger.error(f"Database error in _fetch_events: {e}") + return EventsQueryResult([], None, "Database error") + # Check if there is a next page has_next = len(rows) > limit if has_next: rows = rows[:limit] - + # Build response events = [] for row in rows: geometry = None if row["geometry"]: geometry = json.loads(row["geometry"]) - + events.append({ "id": row["id"], "time": row["time"].isoformat(), @@ -2417,15 +2429,196 @@ async def events_json(request: Request): "data": dict(row["data"]) if row["data"] else {}, "regions": list(row["regions"]) if row["regions"] else [], }) - + # Build next_cursor if there are more results next_cursor = None if has_next and events: last_event = rows[-1] cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}" next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8") - + + return EventsQueryResult(events, next_cursor) + + +def _geometry_summary(geometry: dict | None) -> str: + """Generate a human-readable summary of a geometry.""" + if not geometry: + return "None" + + geom_type = geometry.get("type", "Unknown") + + if geom_type == "Point": + return "Point" + elif geom_type == "LineString": + coords = geometry.get("coordinates", []) + return f"Line ({len(coords)} pts)" + elif geom_type == "Polygon": + coords = geometry.get("coordinates", [[]]) + if coords: + return f"Polygon ({len(coords[0])} pts)" + return "Polygon" + elif geom_type == "MultiPolygon": + coords = geometry.get("coordinates", []) + return f"MultiPolygon ({len(coords)} parts)" + else: + return geom_type + + + +@router.get("/events.json") +async def events_json(request: Request): + """ + Paginated, filterable JSON endpoint for events. + + Query parameters (all optional): + adapter: filter by adapter name + category: filter by event category + since: ISO 8601 datetime - events where time >= since + until: ISO 8601 datetime - events where time < until + region_north, region_south, region_east, region_west: bbox filter (all four required if any) + limit: page size (default 50, max 200) + cursor: opaque pagination cursor + + Returns: + {"events": [...], "next_cursor": string or null} + """ + from fastapi.responses import JSONResponse + + params = request.query_params + + # Parse and validate parameters using shared helper + parsed, error = _parse_events_params(params) + if error: + return JSONResponse({"error": error}, status_code=400) + + # Fetch events using shared helper + result = await _fetch_events(parsed) + if result.error: + return JSONResponse({"error": result.error}, status_code=500) + return JSONResponse({ - "events": events, - "next_cursor": next_cursor, + "events": result.events, + "next_cursor": result.next_cursor, }) + + +# --- Events feed frontend routes --- + +@router.get("/events", response_class=HTMLResponse) +async def events_list(request: Request) -> HTMLResponse: + """Events feed page with filter form, table, and map.""" + templates = _get_templates() + operator = getattr(request.state, "operator", None) + csrf_token = getattr(request.state, "csrf_token", "") + + params = request.query_params + + # Parse parameters + parsed, error = _parse_events_params(params) + + # Get system settings for map tiles + pool = get_pool() + async with pool.acquire() as conn: + system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system") + + tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png" + tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap" + + # Prepare filter values for template + filter_values = { + "adapter": params.get("adapter", ""), + "category": params.get("category", ""), + "since": params.get("since", ""), + "until": params.get("until", ""), + "region_north": params.get("region_north", ""), + "region_south": params.get("region_south", ""), + "region_east": params.get("region_east", ""), + "region_west": params.get("region_west", ""), + "limit": params.get("limit", "50"), + } + + events = [] + next_cursor = None + + if error: + # Validation error - show error banner but don't fail the page + pass + else: + # Fetch events + result = await _fetch_events(parsed) + if result.error: + error = result.error + else: + events = result.events + next_cursor = result.next_cursor + + # Add geometry summary to each event + for event in events: + event["geometry_summary"] = _geometry_summary(event.get("geometry")) + + return templates.TemplateResponse( + request=request, + name="events_list.html", + context={ + "operator": operator, + "csrf_token": csrf_token, + "events": events, + "next_cursor": next_cursor, + "filter_values": filter_values, + "filter_error": error, + "tile_url": tile_url, + "tile_attribution": tile_attribution, + }, + ) + + +@router.get("/events/rows", response_class=HTMLResponse) +async def events_rows(request: Request) -> HTMLResponse: + """HTMX fragment: events table rows only (no page chrome).""" + templates = _get_templates() + + params = request.query_params + + # Parse parameters + parsed, error = _parse_events_params(params) + + # Prepare filter values for template + filter_values = { + "adapter": params.get("adapter", ""), + "category": params.get("category", ""), + "since": params.get("since", ""), + "until": params.get("until", ""), + "region_north": params.get("region_north", ""), + "region_south": params.get("region_south", ""), + "region_east": params.get("region_east", ""), + "region_west": params.get("region_west", ""), + "limit": params.get("limit", "50"), + } + + events = [] + next_cursor = None + + if error: + pass + else: + result = await _fetch_events(parsed) + if result.error: + error = result.error + else: + events = result.events + next_cursor = result.next_cursor + + # Add geometry summary to each event + for event in events: + event["geometry_summary"] = _geometry_summary(event.get("geometry")) + + return templates.TemplateResponse( + request=request, + name="_events_rows.html", + context={ + "events": events, + "next_cursor": next_cursor, + "filter_values": filter_values, + "filter_error": error, + }, + ) diff --git a/src/central/gui/templates/_events_rows.html b/src/central/gui/templates/_events_rows.html new file mode 100644 index 0000000..c7f126c --- /dev/null +++ b/src/central/gui/templates/_events_rows.html @@ -0,0 +1,73 @@ +{% if filter_error %} +
+ Filter Error: {{ filter_error }} +
+{% endif %} + +{% if events %} + + + + + + + + + + + + + {% for event in events %} + + + + + + + + + + + + {% endfor %} + +
TimeAdapterCategoryGeometrySubject
{{ event.time }}{{ event.adapter }}{{ event.category }}{{ event.geometry_summary }}{{ event.subject or '—' }}
+ +
+ Showing {{ events | length }} event{{ 's' if events | length != 1 else '' }}. + {% if next_cursor %} + + Next → + + {% else %} + End of results + {% endif %} +
+{% else %} +
+

No events match the filters.

+
+{% endif %} diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index 0cd7baa..a7a667d 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -17,6 +17,7 @@ {% if operator %}
  • Dashboard
  • Adapters
  • +
  • Events
  • Streams
  • API Keys
  • {{ operator.username }}
  • diff --git a/src/central/gui/templates/events_list.html b/src/central/gui/templates/events_list.html new file mode 100644 index 0000000..5c00b94 --- /dev/null +++ b/src/central/gui/templates/events_list.html @@ -0,0 +1,437 @@ +{% extends "base.html" %} + +{% block title %}Events - Central{% endblock %} + +{% block head %} + + +{% endblock %} + +{% block content %} +

    Events

    + +{% if filter_error %} +
    + Filter Error: {{ filter_error }} +
    +{% endif %} + +
    + Filters +
    + +
    +
    + + +
    +
    + + +
    +
    + + +
    +
    + + +
    +
    + + + + + + + + + +
    + + Clear Filters +
    +
    +
    + +
    +
    +
    +
    +
    + NWS (Weather) +
    +
    +
    + FIRMS (Fire) +
    +
    +
    + USGS (Quake) +
    +
    + +
    + +
    + {% include "_events_rows.html" %} +
    + + + + +{% endblock %} diff --git a/src/central/models.py b/src/central/models.py index 17145ad..c7a2159 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -32,48 +32,3 @@ class Event(BaseModel): data: dict[str, Any] # adapter-specific payload -def subject_for_event(ev: Event) -> str: - """ - Compute the NATS subject for an event based on its category. - - Dispatch by category prefix: - - fire.*: returns central. directly - - wx.*: uses weather alert subject logic - - Weather alert subjects: - central.wx.alert.us..county. - or - central.wx.alert.us..zone. - based on whether the primary_region encodes a county or a zone. - - Fire hotspot subjects: - central.fire.hotspot.. - """ - # Fire events: subject is just central. - if ev.category.startswith("fire."): - return f"central.{ev.category}" - - # Weather events: use geo-based subject logic - prefix = "central.wx" - - if ev.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = ev.geo.primary_region - - # Parse US-- format - # County codes are like "Ada", "Canyon" (names) - # Zone codes start with "Z" like "Z033" - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index 652ce44..da00f0e 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -13,24 +13,40 @@ from typing import Any import nats from nats.js import JetStreamContext +import importlib +import pkgutil + from central.adapter import SourceAdapter -from central.adapters.nws import NWSAdapter -from central.adapters.firms import FIRMSAdapter -from central.adapters.usgs_quake import USGSQuakeAdapter from central.cloudevents_wire import wrap_event from central.config_models import AdapterConfig from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings -from central.models import subject_for_event from central.stream_manager import StreamManager +import central.adapters -# Adapter registry - add new adapters here -_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { - "nws": NWSAdapter, - "firms": FIRMSAdapter, - "usgs_quake": USGSQuakeAdapter, -} +def discover_adapters() -> dict[str, type[SourceAdapter]]: + """Auto-discover adapter classes from central.adapters package.""" + registry: dict[str, type[SourceAdapter]] = {} + for module_info in pkgutil.iter_modules(central.adapters.__path__): + try: + module = importlib.import_module(f"central.adapters.{module_info.name}") + except Exception as e: + logger.error( + "Failed to import adapter module", + extra={"module": module_info.name, "error": str(e)}, + ) + continue + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, SourceAdapter) + and attr is not SourceAdapter + and hasattr(attr, "name") + ): + registry[attr.name] = attr + return registry CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -114,6 +130,7 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config + self._adapters = discover_adapters() self._nc: nats.NATS | None = None self._js: JetStreamContext | None = None self._stream_manager: StreamManager | None = None @@ -161,7 +178,7 @@ class Supervisor: def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: """Create an adapter instance based on config name.""" - cls = _ADAPTER_REGISTRY.get(config.name) + cls = self._adapters.get(config.name) if cls is None: raise ValueError(f"Unknown adapter type: {config.name}") return cls( @@ -232,7 +249,7 @@ class Supervisor: # Build CloudEvent (uses defaults if no config provided) envelope, msg_id = wrap_event(event, self._cloudevents_config) - subject = subject_for_event(event) + subject = state.adapter.subject_for(event) # Publish await self._publish_event(subject, envelope, msg_id) diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py new file mode 100644 index 0000000..2ec1700 --- /dev/null +++ b/tests/test_events_feed_frontend.py @@ -0,0 +1,686 @@ +"""Tests for events feed frontend routes.""" + +import json +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.gui.routes import events_list, events_rows, events_json + + +class TestEventsFeedFrontendAuthenticated: + """Test events feed frontend with authentication.""" + + @pytest.mark.asyncio + async def test_events_no_filters_returns_html(self): + """GET /events authenticated, no filters returns HTML with events.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = {} + + mock_events = [ + { + "id": f"event_{i}", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "adapter": "nws", + "category": "Weather Alert", + "subject": f"Test Alert {i}", + "geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None, + "data": {}, + "regions": [], + } + for i in range(5) + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + call_args = mock_templates.TemplateResponse.call_args + context = call_args.kwargs.get("context", call_args[1].get("context")) + assert "events" in context + assert context["filter_error"] is None + + @pytest.mark.asyncio + async def test_events_adapter_filter(self): + """GET /events?adapter=nws returns only nws events.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = {"adapter": "nws"} + + mock_events = [ + { + "id": "nws_event_1", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "NWS Alert", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + assert context["filter_values"]["adapter"] == "nws" + + @pytest.mark.asyncio + async def test_events_since_until_filter(self): + """GET /events?since=...&until=... filters by time window.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = { + "since": "2026-05-17T00:00:00", + "until": "2026-05-17T12:00:00", + } + + mock_events = [ + { + "id": "in_range", + "time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "In Range", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + # Verify filter was actually parsed and passed to template + mock_templates.TemplateResponse.assert_called_once() + call_kwargs = mock_templates.TemplateResponse.call_args.kwargs + context = call_kwargs.get("context", call_kwargs) + assert context["filter_values"]["since"] == "2026-05-17T00:00:00" + assert context["filter_values"]["until"] == "2026-05-17T12:00:00" + + @pytest.mark.asyncio + async def test_events_region_filter(self): + """GET /events with full region bbox filters by location.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = { + "region_north": "49.5", + "region_south": "31", + "region_east": "-102", + "region_west": "-124.5", + } + + mock_events = [ + { + "id": "in_bbox", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "In BBox", + "geometry": '{"type": "Point", "coordinates": [-120, 40]}', + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + # Verify region filter was actually parsed and passed to template + mock_templates.TemplateResponse.assert_called_once() + call_kwargs = mock_templates.TemplateResponse.call_args.kwargs + context = call_kwargs.get("context", call_kwargs) + assert context["filter_values"]["region_north"] == "49.5" + assert context["filter_values"]["region_south"] == "31" + assert context["filter_values"]["region_east"] == "-102" + assert context["filter_values"]["region_west"] == "-124.5" + + @pytest.mark.asyncio + async def test_events_partial_region_shows_error_banner(self): + """GET /events with partial region shows error banner, not 400.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = {"region_north": "49"} + + mock_conn = AsyncMock() + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + # Should be 200, not 400 + assert result.status_code == 200 + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + assert context["filter_error"] is not None + assert "region" in context["filter_error"].lower() + # Events should be empty due to validation error + assert context["events"] == [] + + @pytest.mark.asyncio + async def test_events_with_limit_shows_next_button(self): + """GET /events?limit=5 shows Next button when more events exist.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf_token" + mock_request.query_params = {"limit": "5"} + + # Return 6 events (limit+1) to trigger pagination + mock_events = [ + { + "id": f"event_{i}", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "adapter": "nws", + "category": "Alert", + "subject": f"Event {i}", + "geometry": None, + "data": {}, + "regions": [], + } + for i in range(6) + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + assert context["next_cursor"] is not None + assert len(context["events"]) == 5 # Should be trimmed to limit + + +class TestEventsRowsFragment: + """Test /events/rows HTMX fragment.""" + + @pytest.mark.asyncio + async def test_events_rows_returns_fragment(self): + """GET /events/rows returns table fragment, not full page.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "5"} + + mock_events = [ + { + "id": "event_1", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "Event 1", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_rows(mock_request) + + assert result.status_code == 200 + # Verify it uses the fragment template + call_args = mock_templates.TemplateResponse.call_args + assert call_args.kwargs.get("name") == "_events_rows.html" + + +class TestGeometrySummary: + """Test geometry summary function.""" + + def test_geometry_summary_polygon(self): + """Polygon geometry shows point count.""" + from central.gui.routes import _geometry_summary + + geom = { + "type": "Polygon", + "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]] + } + summary = _geometry_summary(geom) + assert "Polygon" in summary + assert "5 pts" in summary + + def test_geometry_summary_point(self): + """Point geometry shows 'Point'.""" + from central.gui.routes import _geometry_summary + + geom = {"type": "Point", "coordinates": [-122.4, 37.8]} + summary = _geometry_summary(geom) + assert summary == "Point" + + def test_geometry_summary_linestring(self): + """LineString geometry shows point count.""" + from central.gui.routes import _geometry_summary + + geom = { + "type": "LineString", + "coordinates": [[-122, 37], [-121, 38], [-120, 39]] + } + summary = _geometry_summary(geom) + assert "Line" in summary + assert "3 pts" in summary + + def test_geometry_summary_none(self): + """None geometry shows 'None'.""" + from central.gui.routes import _geometry_summary + + summary = _geometry_summary(None) + assert summary == "None" + + +class TestDataGeometryAttribute: + """Test that rows have valid geometry data attributes.""" + + @pytest.mark.asyncio + async def test_event_with_geometry_has_valid_json(self): + """Events with geometry have parseable JSON in data-geometry.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {} + + mock_events = [ + { + "id": "geom_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "With Geometry", + "geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}', + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_rows(mock_request) + + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + event = context["events"][0] + # Geometry should be parsed dict, not string + assert isinstance(event["geometry"], dict) + assert event["geometry"]["type"] == "Polygon" + + @pytest.mark.asyncio + async def test_event_without_geometry_has_none(self): + """Events without geometry have None for geometry field.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {} + + mock_events = [ + { + "id": "no_geom_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Alert", + "subject": "No Geometry", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_rows(mock_request) + + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + event = context["events"][0] + assert event["geometry"] is None + + +class TestCrossEndpointParity: + """Test that /events.json and /events return the same filtered results.""" + + @pytest.mark.asyncio + async def test_category_filter_both_endpoints(self): + """Category filter works on both /events.json and /events.""" + mock_events = [ + { + "id": "weather_event", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "nws", + "category": "Weather Alert", + "subject": "Weather Event", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + query_params = {"category": "Weather Alert"} + + # Test /events.json + json_request = MagicMock() + json_request.state.operator = MagicMock(id=1, username="admin") + json_request.query_params = query_params + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + json_response = await events_json(json_request) + + json_data = json.loads(json_response.body) + assert len(json_data["events"]) == 1 + assert json_data["events"][0]["category"] == "Weather Alert" + + # Test /events + html_request = MagicMock() + html_request.state.operator = MagicMock(id=1, username="admin") + html_request.state.csrf_token = "test_csrf" + html_request.query_params = query_params + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + mock_conn.fetch.return_value = mock_events + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + await events_list(html_request) + + html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + assert len(html_context["events"]) == 1 + assert html_context["events"][0]["category"] == "Weather Alert" + + @pytest.mark.asyncio + async def test_cursor_pagination_both_endpoints(self): + """Cursor pagination works identically on both endpoints.""" + first_page = [ + { + "id": f"event_{i}", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), + "adapter": "nws", + "category": "Alert", + "subject": f"Event {i}", + "geometry": None, + "data": {}, + "regions": [], + } + for i in range(3) + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = first_page + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + json_request = MagicMock() + json_request.state.operator = MagicMock(id=1, username="admin") + json_request.query_params = {"limit": "2"} + + with patch("central.gui.routes.get_pool", return_value=mock_pool): + json_response = await events_json(json_request) + + json_data = json.loads(json_response.body) + json_cursor = json_data["next_cursor"] + assert json_cursor is not None + + html_request = MagicMock() + html_request.state.operator = MagicMock(id=1, username="admin") + html_request.state.csrf_token = "test_csrf" + html_request.query_params = {"limit": "2"} + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + mock_conn.fetch.return_value = first_page + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + await events_list(html_request) + + html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + html_cursor = html_context["next_cursor"] + + assert json_cursor == html_cursor + + +class TestErrorSemantics: + """Test error handling differences between JSON and HTML endpoints.""" + + @pytest.mark.asyncio + async def test_json_endpoint_returns_400_on_invalid_limit(self): + """/events.json?limit=0 returns 400 JSON error.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.query_params = {"limit": "0"} + + response = await events_json(mock_request) + + assert response.status_code == 400 + data = json.loads(response.body) + assert "error" in data + + @pytest.mark.asyncio + async def test_html_endpoint_returns_200_with_error_banner(self): + """/events?limit=0 returns 200 HTML with error banner.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf" + mock_request.query_params = {"limit": "0"} + + mock_conn = AsyncMock() + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + assert context["filter_error"] is not None + assert "limit" in context["filter_error"].lower() + assert context["events"] == [] + + +class TestEventRowDataAttributes: + """Test that _events_rows.html renders required data attributes.""" + + @pytest.mark.asyncio + async def test_row_renders_data_adapter_attribute(self): + """Event rows include data-adapter attribute for color coding.""" + mock_request = MagicMock() + mock_request.state.operator = MagicMock(id=1, username="admin") + mock_request.state.csrf_token = "test_csrf" + mock_request.query_params = {} + + mock_events = [ + { + "id": "test1", + "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), + "adapter": "usgs_quake", + "category": "quake.event", + "subject": "M4.2 Earthquake", + "geometry": None, + "data": {}, + "regions": [], + }, + ] + + mock_conn = AsyncMock() + mock_conn.fetch.return_value = mock_events + mock_conn.fetchrow.return_value = { + "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", + "map_attribution": "OpenStreetMap", + } + + mock_pool = MagicMock() + mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) + + mock_templates = MagicMock() + mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) + + with patch("central.gui.routes._get_templates", return_value=mock_templates): + with patch("central.gui.routes.get_pool", return_value=mock_pool): + result = await events_list(mock_request) + + assert result.status_code == 200 + mock_templates.TemplateResponse.assert_called_once() + context = mock_templates.TemplateResponse.call_args.kwargs.get("context") + # The template receives events with adapter field for data-adapter attribute + assert len(context["events"]) == 1 + assert context["events"][0]["adapter"] == "usgs_quake" + assert context["events"][0]["category"] == "quake.event" + assert context["events"][0]["subject"] == "M4.2 Earthquake" diff --git a/tests/test_firms.py b/tests/test_firms.py index 9e51331..bfe629a 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -10,7 +10,6 @@ from central.adapters.firms import ( FIRMSAdapter, CONFIDENCE_MAP, SATELLITE_SHORT, - subject_for_fire_hotspot, ) from central.config_models import AdapterConfig, RegionConfig from central.models import Event, Geo @@ -285,7 +284,14 @@ class TestDeduplication: class TestSubjectGeneration: """Test subject generation for fire hotspots.""" - def test_subject_format(self): + @pytest.mark.asyncio + async def test_subject_format(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -296,10 +302,17 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_snpp.high" - def test_subject_nominal_confidence(self): + @pytest.mark.asyncio + async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = FIRMSAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) event = Event( id="test", adapter="firms", @@ -310,7 +323,7 @@ class TestSubjectGeneration: data={}, ) - subject = subject_for_fire_hotspot(event) + subject = adapter.subject_for(event) assert subject == "central.fire.hotspot.viirs_noaa20.nominal" diff --git a/tests/test_models.py b/tests/test_models.py index c010f39..7ed5c54 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import pytest -from central.models import Event, Geo, subject_for_event +from central.models import Event, Geo from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config from central.cloudevents_wire import wrap_event @@ -57,47 +57,6 @@ def sample_config() -> Config: ) -class TestSubjectForEvent: - """Tests for subject_for_event helper.""" - - def test_county_subject(self, sample_event: Event) -> None: - """County codes produce county subject.""" - subject = subject_for_event(sample_event) - assert subject == "central.wx.alert.us.id.county.ada" - - def test_zone_subject(self, sample_geo: Geo) -> None: - """Zone codes produce zone subject.""" - geo = Geo( - centroid=sample_geo.centroid, - bbox=sample_geo.bbox, - regions=["US-ID-Z033"], - primary_region="US-ID-Z033", - ) - event = Event( - id="test-zone", - adapter="nws", - category="wx.alert.winter_storm_warning", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.id.zone.z033" - - def test_unknown_subject(self, sample_event: Event) -> None: - """Missing primary_region produces unknown subject.""" - geo = Geo(regions=[], primary_region=None) - event = Event( - id="test-unknown", - adapter="nws", - category="wx.alert.test", - time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), - geo=geo, - data={}, - ) - subject = subject_for_event(event) - assert subject == "central.wx.alert.us.unknown" - class TestCloudEventsWire: """Tests for CloudEvents wire format.""" diff --git a/tests/test_nws_normalization.py b/tests/test_nws_normalization.py index 53a7e49..706ce1d 100644 --- a/tests/test_nws_normalization.py +++ b/tests/test_nws_normalization.py @@ -17,7 +17,6 @@ from central.adapters.nws import ( SEVERITY_MAP, ) from central.config_models import AdapterConfig -from central.models import subject_for_event # Sample NWS GeoJSON features for testing @@ -272,7 +271,7 @@ class TestSubjectDerivation: def test_county_subject(self, adapter: NWSAdapter) -> None: event = adapter._normalize_feature(SAMPLE_FEATURE_ID) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) # Primary region should be alphabetically first # Could be county or zone depending on sort order assert subject.startswith("central.wx.alert.us.id.") @@ -294,7 +293,7 @@ class TestSubjectDerivation: } event = adapter._normalize_feature(feature) assert event is not None - subject = subject_for_event(event) + subject = adapter.subject_for(event) assert "zone" in subject diff --git a/tests/test_supervisor_integration.py b/tests/test_supervisor_integration.py index cf1c65b..e318752 100644 --- a/tests/test_supervisor_integration.py +++ b/tests/test_supervisor_integration.py @@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration: supervisor._js = mock_nats.jetstream() # Patch NWSAdapter to use our mock - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start supervisor (starts adapter) - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start supervisor (starts adapter) + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) - # Simulate completed poll 5 minutes ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 5 minutes ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is in the past - # and poll immediately. Let's verify by checking the wait time logic. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is in the past + # and poll immediately. Let's verify by checking the wait time logic. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was 5 minutes ago, cadence is 60s - # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago - # wait_time should be 0 (poll immediately) - assert wait_time == 0, ( - f"Expected immediate poll (wait=0), got wait={wait_time}s. " - f"last_poll was {saved_last_poll}, now is {now}" - ) + # last_poll was 5 minutes ago, cadence is 60s + # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago + # wait_time should be 0 (poll immediately) + assert wait_time == 0, ( + f"Expected immediate poll (wait=0), got wait={wait_time}s. " + f"last_poll was {saved_last_poll}, now is {now}" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_enable_gap_shorter_than_cadence( @@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter (simulate 20 seconds later, but we're just - # checking the rate limit logic) - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter (simulate 20 seconds later, but we're just + # checking the rate limit logic) + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is still in the future - # and wait until then. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is still in the future + # and wait until then. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was ~10 seconds ago, cadence is 60s - # wait_time should be ~50s (60 - 10 = 50) - assert 45 < wait_time < 55, ( - f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " - f"Rate limit violated: poll would happen before last_poll + cadence" - ) + # last_poll was ~10 seconds ago, cadence is 60s + # wait_time should be ~50s (60 - 10 = 50) + assert 45 < wait_time < 55, ( + f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " + f"Rate limit violated: poll would happen before last_poll + cadence" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_delete_readd_fresh_state( @@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(initial_config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # DELETE adapter from DB (remove from config source) - config_source.set_adapter(None, name="nws") - await supervisor._on_config_change("adapters", "nws") + # DELETE adapter from DB (remove from config source) + config_source.set_adapter(None, name="nws") + await supervisor._on_config_change("adapters", "nws") - # Verify adapter fully removed - assert "nws" not in supervisor._adapter_states + # Verify adapter fully removed + assert "nws" not in supervisor._adapter_states - # Re-add adapter with same name - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - await supervisor._on_config_change("adapters", "nws") + # Re-add adapter with same name + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + await supervisor._on_config_change("adapters", "nws") - # Verify new adapter started fresh - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - # last_completed_poll should be None (fresh adapter) - assert state.last_completed_poll is None, ( - f"Expected None (fresh adapter), got {state.last_completed_poll}. " - f"Preserved state not cleared on delete." - ) + # Verify new adapter started fresh + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + # last_completed_poll should be None (fresh adapter) + assert state.last_completed_poll is None, ( + f"Expected None (fresh adapter), got {state.last_completed_poll}. " + f"Preserved state not cleared on delete." + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_stop_preserves_state_start_reuses_it( @@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - # Start adapter - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + # Start adapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) - saved_poll = state.last_completed_poll + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) + saved_poll = state.last_completed_poll - # Stop adapter - await supervisor._stop_adapter("nws") + # Stop adapter + await supervisor._stop_adapter("nws") - # State should still exist - assert "nws" in supervisor._adapter_states - state = supervisor._adapter_states["nws"] - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # State should still exist + assert "nws" in supervisor._adapter_states + state = supervisor._adapter_states["nws"] + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Restart adapter - await supervisor._start_adapter(config) + # Restart adapter + await supervisor._start_adapter(config) - # Should reuse existing state - state = supervisor._adapter_states.get("nws") - assert adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # Should reuse existing state + state = supervisor._adapter_states.get("nws") + assert adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_remove_adapter_clears_state( @@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - with patch("central.supervisor.NWSAdapter", MockNWSAdapter): - await supervisor._start_adapter(config) + # Inject mock adapter into supervisor's registry + supervisor._adapters["nws"] = MockNWSAdapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - # Remove adapter - await cleanup_adapter(supervisor, "nws") + # Remove adapter + await cleanup_adapter(supervisor, "nws") - # State should be gone - assert "nws" not in supervisor._adapter_states + # State should be gone + assert "nws" not in supervisor._adapter_states diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py index 24c6f73..690f7da 100644 --- a/tests/test_usgs_quake.py +++ b/tests/test_usgs_quake.py @@ -480,3 +480,122 @@ class TestApplyConfig: assert adapter._feed == "all_day" await adapter.shutdown() + + + +class TestSubjectFor: + """Test subject_for method for all magnitude tiers.""" + + @pytest.mark.asyncio + async def test_subject_minor(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-minor", + adapter="usgs_quake", + category="quake.event.minor", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.minor" + + @pytest.mark.asyncio + async def test_subject_light(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-light", + adapter="usgs_quake", + category="quake.event.light", + time=datetime.now(timezone.utc), + severity=1, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.light" + + @pytest.mark.asyncio + async def test_subject_moderate(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-moderate", + adapter="usgs_quake", + category="quake.event.moderate", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.moderate" + + @pytest.mark.asyncio + async def test_subject_strong(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-strong", + adapter="usgs_quake", + category="quake.event.strong", + time=datetime.now(timezone.utc), + severity=3, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.strong" + + @pytest.mark.asyncio + async def test_subject_major(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-major", + adapter="usgs_quake", + category="quake.event.major", + time=datetime.now(timezone.utc), + severity=4, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.major" + + @pytest.mark.asyncio + async def test_subject_great(self, temp_db_path, mock_config_store): + config = make_adapter_config() + adapter = USGSQuakeAdapter( + config=config, + config_store=mock_config_store, + cursor_db_path=temp_db_path, + ) + event = Event( + id="test-great", + adapter="usgs_quake", + category="quake.event.great", + time=datetime.now(timezone.utc), + severity=5, + geo=Geo(centroid=(-116.0, 45.0)), + data={}, + ) + assert adapter.subject_for(event) == "central.quake.event.great"