diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b5711a..9d6b237 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,63 +1,5 @@ # Changelog -## v0.3.0 — Phase 1b (2026-05-18) - -Operator console. FastAPI + Jinja2 + Pico + HTMX. Self-hosted, -Tailscale-gated by default, no application-level auth beyond -the operator session. - -### Added -- Operator console (`central-gui` systemd service on port 8000) -- Login + session auth (argon2id, 90-day DB-backed sessions) -- Dashboard: events 24h by adapter, stream sizes, - last-poll-time per adapter -- Adapters list and edit page (cadence + per-adapter settings), - with Leaflet region picker and click-to-draw rectangles -- Streams view with retention chips (1d / 7d / 14d / 30d / - 365d / custom) -- API keys management (list / add / rotate / delete, - encrypted at rest via `crypto.encrypt`, plaintext never - logged or stored) -- First-run wizard (5 steps: operator, system, keys, adapters, - finish) with deferred-commit pattern — no DB writes until - Finish runs as a single transaction -- Events feed page (`/events`) — paginated, filterable by - adapter / category / time range / map viewport, with - color-coded geometry overlay, click-to-popup, and - expandable row details showing full event payload -- Paginated events JSON API (`/events.json`) — cursor-based - pagination, same filter surface as the HTML feed - -### Changed -- CSRF tokens are now session-bound (synchronizer token - pattern), replacing the previous fastapi-csrf-protect - library. Eliminates a rotation race that broke first-load - submissions -- First-run wizard is a single atomic transaction at Finish, - not per-step DB writes. Back navigation works; abandoned - wizards leave no orphan rows - -### Fixed -- Adapter editor's JSONB double-encoding bug (write path - called `json.dumps` before asyncpg's codec, corrupting - the settings column) -- Dashboard polls card was reading from the wrong NATS - subject and using a durable consumer instead of - `get_last_msg`, leaking zombie consumers -- Browser-noise paths (/favicon.ico, /apple-touch-icon.png, - /robots.txt) return 204 directly, preventing parallel - requests from racing the CSRF cookie on first page load -- SubResource Integrity hashes for leaflet-draw assets - corrected (previous values were fabricated and silently - blocked by browsers) - -### Infrastructure -- New `config.sessions` column: `csrf_token` (per-session - synchronizer) -- Composite index on `public.events (time DESC, id DESC)` - for cursor pagination -- `central-gui` systemd service - ## v0.2.0 — Phase 1a (2026-05-16) Three live data sources, configurable infrastructure, hot-reload diff --git a/docs/environment.md b/docs/environment.md index f2afce4..9659443 100644 --- a/docs/environment.md +++ b/docs/environment.md @@ -28,32 +28,6 @@ The Windows workstation (matt-desktop) has no Central repository clones. The directory `C:\Users\mtthw\central_work\` is scratch space only and should not be used for commits. - -## Network and Service Bindings - -### Bind Address - -`central-gui` binds to `0.0.0.0` by design. Network gating is the -operator's responsibility (firewall, Tailscale, etc.), not the app's. -Do not switch to `127.0.0.1` or to a specific interface — operators -choose their bind via whatever network they want to expose the service on. - -### NATS Listener Ports - -The default `nats-server.conf` listens on more than just :4222: - -| Port | Protocol | Used by Central? | -|------|----------|------------------| -| 4222 | NATS client | Yes (all) | -| 8080 | WebSocket | No (Phase 0 leftover) | -| 8222 | HTTP monitoring | No (manual ops only) | -| 1883 | MQTT | No (Phase 0 leftover) | - -None of the unused ports cause active harm — they listen but no consumer -connects. Operators can remove them from `nats-server.conf` if they want -a tighter footprint. Documenting so future contributors don't grep for -"MQTT integration" and come up confused. - ## Repository | Property | Value | diff --git a/pyproject.toml b/pyproject.toml index 2922e9f..c101844 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "central" -version = "0.3.0" +version = "0.1.0" requires-python = ">=3.12,<3.13" description = "Data hub spine — adapters, bus, archive." readme = "README.md" diff --git a/src/central/adapter.py b/src/central/adapter.py index 276a9cf..2037eef 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -4,8 +4,6 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from typing import TYPE_CHECKING -from pydantic import BaseModel - if TYPE_CHECKING: from central.config_models import AdapterConfig @@ -18,24 +16,9 @@ class SourceAdapter(ABC): Adapters yield Events. The supervisor handles scheduling, CloudEvents wrapping, publish, and metadata heartbeats. - - Class attributes that subclasses must define: - name: Short identifier, e.g. "nws" - display_name: Human-readable name for GUI - description: Short description of the adapter - settings_schema: Pydantic model class for adapter settings - requires_api_key: Key alias if API key required, else None - wizard_order: Order in setup wizard (None = not in wizard) - default_cadence_s: Default polling interval in seconds """ - name: str - display_name: str - description: str - settings_schema: type[BaseModel] - requires_api_key: str | None = None - wizard_order: int | None = None - default_cadence_s: int + name: str # short identifier, e.g. "nws" @abstractmethod async def poll(self) -> AsyncIterator[Event]: @@ -57,16 +40,6 @@ class SourceAdapter(ABC): """ ... - @abstractmethod - def subject_for(self, event: Event) -> str: - """ - Compute the NATS subject for an event. - - Each adapter knows its own subject hierarchy. The supervisor - calls this to determine where to publish each event. - """ - ... - async def startup(self) -> None: """Optional lifecycle hook called before first poll.""" pass diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 7538d96..3f746fa 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -18,8 +18,6 @@ from tenacity import ( ) from central.adapter import SourceAdapter -from pydantic import BaseModel - from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -51,23 +49,10 @@ SEVERITY_MAP = { } -class FIRMSSettings(BaseModel): - """Settings schema for FIRMS adapter.""" - api_key_alias: str = "firms" - satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] - region: RegionConfig | None = None - - class FIRMSAdapter(SourceAdapter): """NASA FIRMS fire hotspot adapter.""" name = "firms" - display_name = "NASA FIRMS Fire Hotspots" - description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." - settings_schema = FIRMSSettings - requires_api_key = "firms" - wizard_order = 2 - default_cadence_s = 300 def __init__( self, @@ -131,15 +116,6 @@ class FIRMSAdapter(SourceAdapter): }, ) - def subject_for(self, event: Event) -> str: - """Compute NATS subject for a fire hotspot event. - - Subject format: central.fire.hotspot.. - The category already contains this structure. - """ - return f"central.{event.category}" - - async def startup(self) -> None: """Initialize HTTP session, dedup tracker, and fetch API key.""" # Fetch API key @@ -441,3 +417,14 @@ class FIRMSAdapter(SourceAdapter): }, ) + +def subject_for_fire_hotspot(ev: Event) -> str: + """Compute the NATS subject for a fire hotspot event. + + Subject format: central.fire.hotspot.. + + The category already contains the satellite and confidence info, + so we just prefix with 'central.'. + """ + # category is "fire.hotspot.." + return f"central.{ev.category}" diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index ce95d3a..129584f 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,8 +19,6 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter -from pydantic import BaseModel - from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -191,22 +189,10 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: return sorted(regions) -class NWSSettings(BaseModel): - """Settings schema for NWS adapter.""" - contact_email: str = "" - region: RegionConfig | None = None - - class NWSAdapter(SourceAdapter): """National Weather Service alerts adapter.""" name = "nws" - display_name = "NWS Weather Alerts" - description = "National Weather Service active alerts via api.weather.gov." - settings_schema = NWSSettings - requires_api_key = None - wizard_order = 1 - default_cadence_s = 60 def __init__( self, @@ -248,35 +234,6 @@ class NWSAdapter(SourceAdapter): }, ) - def subject_for(self, event: Event) -> str: - """Compute NATS subject for a weather alert. - - Subject format: central.wx.alert.us... - where type is 'county' or 'zone' based on primary_region format. - """ - prefix = "central.wx" - - if event.geo.primary_region is None: - return f"{prefix}.alert.us.unknown" - - region = event.geo.primary_region - - # Parse US-- format - parts = region.split("-") - if len(parts) < 3 or parts[0] != "US": - return f"{prefix}.alert.us.unknown" - - state = parts[1].lower() - code = "-".join(parts[2:]) # Handle multi-part names - - if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): - # Zone code like Z033 - return f"{prefix}.alert.us.{state}.zone.{code.lower()}" - else: - # County name - return f"{prefix}.alert.us.{state}.county.{code.lower()}" - - def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: """Check if feature geometry intersects configured region bbox. diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index e73148f..601c52b 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -17,8 +17,6 @@ from tenacity import ( ) from central.adapter import SourceAdapter -from pydantic import BaseModel - from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore from central.models import Event, Geo @@ -62,22 +60,10 @@ def magnitude_to_severity(mag: float) -> int: return 5 -class USGSQuakeSettings(BaseModel): - """Settings schema for USGS quake adapter.""" - feed: str = "all_hour" - region: RegionConfig | None = None - - class USGSQuakeAdapter(SourceAdapter): """USGS Earthquake Hazards Program adapter.""" name = "usgs_quake" - display_name = "USGS Earthquakes" - description = "USGS earthquake feed (configurable window)." - settings_schema = USGSQuakeSettings - requires_api_key = None - wizard_order = 3 - default_cadence_s = 60 def __init__( self, @@ -412,9 +398,3 @@ class USGSQuakeAdapter(SourceAdapter): new_count += 1 logger.info("USGS quake yielded events", extra={"count": new_count}) - - def subject_for(self, event: Event) -> str: - """Return NATS subject for quake event.""" - return f"central.{event.category}" - - diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 3a415c2..f86b6e1 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2197,90 +2197,95 @@ async def api_keys_delete( return RedirectResponse(url="/api-keys", status_code=302) - - -# --- Events query helper --- - -class EventsQueryResult: - """Result from events query.""" - def __init__(self, events: list, next_cursor: str | None, error: str | None = None): - self.events = events - self.next_cursor = next_cursor - self.error = error - - -def _parse_events_params(params) -> tuple[dict | None, str | None]: +@router.get("/events.json") +async def events_json(request: Request): """ - Parse and validate events query parameters. - + Paginated, filterable JSON endpoint for events. + + Query parameters (all optional): + adapter: filter by adapter name + category: filter by event category + since: ISO 8601 datetime - events where time >= since + until: ISO 8601 datetime - events where time < until + region_north, region_south, region_east, region_west: bbox filter (all four required if any) + limit: page size (default 50, max 200) + cursor: opaque pagination cursor + Returns: - (parsed_params, error_message) - If error_message is not None, parsed_params is None. + {"events": [...], "next_cursor": string or null} """ + from fastapi.responses import JSONResponse + + params = request.query_params + # Parse and validate limit limit_str = params.get("limit", "50") try: limit = int(limit_str) except ValueError: - return None, f"Invalid limit value: {limit_str}" - + return JSONResponse( + {"error": f"Invalid limit value: {limit_str}"}, + status_code=400, + ) + if limit < 1 or limit > 200: - return None, "limit must be between 1 and 200" - + return JSONResponse( + {"error": "limit must be between 1 and 200"}, + status_code=400, + ) + # Parse adapter filter adapter = params.get("adapter") - if adapter == "": - adapter = None - - # Parse category filter + + # Parse category filter category = params.get("category") - if category == "": - category = None - + # Parse since/until filters since = None until = None - + since_str = params.get("since") if since_str: try: since = datetime.fromisoformat(since_str.replace("Z", "+00:00")) except ValueError: - return None, f"Invalid ISO 8601 datetime for since: {since_str}" - + return JSONResponse( + {"error": f"Invalid ISO 8601 datetime for since: {since_str}"}, + status_code=400, + ) + until_str = params.get("until") if until_str: try: until = datetime.fromisoformat(until_str.replace("Z", "+00:00")) except ValueError: - return None, f"Invalid ISO 8601 datetime for until: {until_str}" - + return JSONResponse( + {"error": f"Invalid ISO 8601 datetime for until: {until_str}"}, + status_code=400, + ) + # Validate since <= until if since and until and since > until: - return None, "since must be before or equal to until" - + return JSONResponse( + {"error": "since must be before or equal to until"}, + status_code=400, + ) + # Parse region bbox region_north = params.get("region_north") region_south = params.get("region_south") region_east = params.get("region_east") region_west = params.get("region_west") - - # Treat empty strings as None - if region_north == "": - region_north = None - if region_south == "": - region_south = None - if region_east == "": - region_east = None - if region_west == "": - region_west = None - + region_params = [region_north, region_south, region_east, region_west] region_supplied = [p for p in region_params if p is not None] - + if len(region_supplied) > 0 and len(region_supplied) < 4: - return None, "Region filter requires all four parameters: region_north, region_south, region_east, region_west" - + return JSONResponse( + {"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"}, + status_code=400, + ) + bbox = None if len(region_supplied) == 4: try: @@ -2291,13 +2296,16 @@ def _parse_events_params(params) -> tuple[dict | None, str | None]: "west": float(region_west), } except ValueError: - return None, "Region parameters must be valid numbers" - + return JSONResponse( + {"error": "Region parameters must be valid numbers"}, + status_code=400, + ) + # Parse cursor cursor_time = None cursor_id = None cursor_str = params.get("cursor") - + if cursor_str: try: decoded = base64.b64decode(cursor_str).decode("utf-8") @@ -2307,82 +2315,59 @@ def _parse_events_params(params) -> tuple[dict | None, str | None]: cursor_time = datetime.fromisoformat(parts[0]) cursor_id = parts[1] except Exception: - return None, "Invalid cursor" - - return { - "limit": limit, - "adapter": adapter, - "category": category, - "since": since, - "until": until, - "bbox": bbox, - "cursor_time": cursor_time, - "cursor_id": cursor_id, - }, None - - -async def _fetch_events(parsed_params: dict) -> EventsQueryResult: - """ - Fetch events from database using parsed parameters. - - Returns EventsQueryResult with events list, next_cursor, and optional error. - """ + return JSONResponse( + {"error": "Invalid cursor"}, + status_code=400, + ) + + # Get database pool after validation pool = get_pool() - - limit = parsed_params["limit"] - adapter = parsed_params["adapter"] - category = parsed_params["category"] - since = parsed_params["since"] - until = parsed_params["until"] - bbox = parsed_params["bbox"] - cursor_time = parsed_params["cursor_time"] - cursor_id = parsed_params["cursor_id"] - + # Build query conditions = [] query_params = [] param_idx = 1 - + if adapter: conditions.append(f"adapter = ${param_idx}") query_params.append(adapter) param_idx += 1 - + if category: conditions.append(f"category = ${param_idx}") query_params.append(category) param_idx += 1 - + if since: conditions.append(f"time >= ${param_idx}") query_params.append(since) param_idx += 1 - + if until: conditions.append(f"time < ${param_idx}") query_params.append(until) param_idx += 1 - + if bbox: conditions.append( f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))" ) query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]]) param_idx += 4 - + if cursor_time and cursor_id: conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})") query_params.append(cursor_time) query_params.append(cursor_id) param_idx += 2 - + where_clause = "" if conditions: where_clause = "WHERE " + " AND ".join(conditions) - + # Fetch limit+1 to check for next page query = f""" - SELECT + SELECT id, time, received, @@ -2398,26 +2383,29 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult: LIMIT ${param_idx} """ query_params.append(limit + 1) - + try: async with pool.acquire() as conn: rows = await conn.fetch(query, *query_params) except Exception as e: - logger.error(f"Database error in _fetch_events: {e}") - return EventsQueryResult([], None, "Database error") - + logger.error(f"Database error in events_json: {e}") + return JSONResponse( + {"error": "Database error"}, + status_code=500, + ) + # Check if there is a next page has_next = len(rows) > limit if has_next: rows = rows[:limit] - + # Build response events = [] for row in rows: geometry = None if row["geometry"]: geometry = json.loads(row["geometry"]) - + events.append({ "id": row["id"], "time": row["time"].isoformat(), @@ -2429,196 +2417,15 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult: "data": dict(row["data"]) if row["data"] else {}, "regions": list(row["regions"]) if row["regions"] else [], }) - + # Build next_cursor if there are more results next_cursor = None if has_next and events: last_event = rows[-1] cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}" next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8") - - return EventsQueryResult(events, next_cursor) - - -def _geometry_summary(geometry: dict | None) -> str: - """Generate a human-readable summary of a geometry.""" - if not geometry: - return "None" - - geom_type = geometry.get("type", "Unknown") - - if geom_type == "Point": - return "Point" - elif geom_type == "LineString": - coords = geometry.get("coordinates", []) - return f"Line ({len(coords)} pts)" - elif geom_type == "Polygon": - coords = geometry.get("coordinates", [[]]) - if coords: - return f"Polygon ({len(coords[0])} pts)" - return "Polygon" - elif geom_type == "MultiPolygon": - coords = geometry.get("coordinates", []) - return f"MultiPolygon ({len(coords)} parts)" - else: - return geom_type - - - -@router.get("/events.json") -async def events_json(request: Request): - """ - Paginated, filterable JSON endpoint for events. - - Query parameters (all optional): - adapter: filter by adapter name - category: filter by event category - since: ISO 8601 datetime - events where time >= since - until: ISO 8601 datetime - events where time < until - region_north, region_south, region_east, region_west: bbox filter (all four required if any) - limit: page size (default 50, max 200) - cursor: opaque pagination cursor - - Returns: - {"events": [...], "next_cursor": string or null} - """ - from fastapi.responses import JSONResponse - - params = request.query_params - - # Parse and validate parameters using shared helper - parsed, error = _parse_events_params(params) - if error: - return JSONResponse({"error": error}, status_code=400) - - # Fetch events using shared helper - result = await _fetch_events(parsed) - if result.error: - return JSONResponse({"error": result.error}, status_code=500) - + return JSONResponse({ - "events": result.events, - "next_cursor": result.next_cursor, + "events": events, + "next_cursor": next_cursor, }) - - -# --- Events feed frontend routes --- - -@router.get("/events", response_class=HTMLResponse) -async def events_list(request: Request) -> HTMLResponse: - """Events feed page with filter form, table, and map.""" - templates = _get_templates() - operator = getattr(request.state, "operator", None) - csrf_token = getattr(request.state, "csrf_token", "") - - params = request.query_params - - # Parse parameters - parsed, error = _parse_events_params(params) - - # Get system settings for map tiles - pool = get_pool() - async with pool.acquire() as conn: - system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system") - - tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png" - tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap" - - # Prepare filter values for template - filter_values = { - "adapter": params.get("adapter", ""), - "category": params.get("category", ""), - "since": params.get("since", ""), - "until": params.get("until", ""), - "region_north": params.get("region_north", ""), - "region_south": params.get("region_south", ""), - "region_east": params.get("region_east", ""), - "region_west": params.get("region_west", ""), - "limit": params.get("limit", "50"), - } - - events = [] - next_cursor = None - - if error: - # Validation error - show error banner but don't fail the page - pass - else: - # Fetch events - result = await _fetch_events(parsed) - if result.error: - error = result.error - else: - events = result.events - next_cursor = result.next_cursor - - # Add geometry summary to each event - for event in events: - event["geometry_summary"] = _geometry_summary(event.get("geometry")) - - return templates.TemplateResponse( - request=request, - name="events_list.html", - context={ - "operator": operator, - "csrf_token": csrf_token, - "events": events, - "next_cursor": next_cursor, - "filter_values": filter_values, - "filter_error": error, - "tile_url": tile_url, - "tile_attribution": tile_attribution, - }, - ) - - -@router.get("/events/rows", response_class=HTMLResponse) -async def events_rows(request: Request) -> HTMLResponse: - """HTMX fragment: events table rows only (no page chrome).""" - templates = _get_templates() - - params = request.query_params - - # Parse parameters - parsed, error = _parse_events_params(params) - - # Prepare filter values for template - filter_values = { - "adapter": params.get("adapter", ""), - "category": params.get("category", ""), - "since": params.get("since", ""), - "until": params.get("until", ""), - "region_north": params.get("region_north", ""), - "region_south": params.get("region_south", ""), - "region_east": params.get("region_east", ""), - "region_west": params.get("region_west", ""), - "limit": params.get("limit", "50"), - } - - events = [] - next_cursor = None - - if error: - pass - else: - result = await _fetch_events(parsed) - if result.error: - error = result.error - else: - events = result.events - next_cursor = result.next_cursor - - # Add geometry summary to each event - for event in events: - event["geometry_summary"] = _geometry_summary(event.get("geometry")) - - return templates.TemplateResponse( - request=request, - name="_events_rows.html", - context={ - "events": events, - "next_cursor": next_cursor, - "filter_values": filter_values, - "filter_error": error, - }, - ) diff --git a/src/central/gui/templates/_events_rows.html b/src/central/gui/templates/_events_rows.html deleted file mode 100644 index c7f126c..0000000 --- a/src/central/gui/templates/_events_rows.html +++ /dev/null @@ -1,73 +0,0 @@ -{% if filter_error %} -
- Filter Error: {{ filter_error }} -
-{% endif %} - -{% if events %} - - - - - - - - - - - - - {% for event in events %} - - - - - - - - - - - - {% endfor %} - -
TimeAdapterCategoryGeometrySubject
{{ event.time }}{{ event.adapter }}{{ event.category }}{{ event.geometry_summary }}{{ event.subject or '—' }}
- -
- Showing {{ events | length }} event{{ 's' if events | length != 1 else '' }}. - {% if next_cursor %} - - Next → - - {% else %} - End of results - {% endif %} -
-{% else %} -
-

No events match the filters.

-
-{% endif %} diff --git a/src/central/gui/templates/base.html b/src/central/gui/templates/base.html index a7a667d..0cd7baa 100644 --- a/src/central/gui/templates/base.html +++ b/src/central/gui/templates/base.html @@ -17,7 +17,6 @@ {% if operator %}
  • Dashboard
  • Adapters
  • -
  • Events
  • Streams
  • API Keys
  • {{ operator.username }}
  • diff --git a/src/central/gui/templates/events_list.html b/src/central/gui/templates/events_list.html deleted file mode 100644 index 5c00b94..0000000 --- a/src/central/gui/templates/events_list.html +++ /dev/null @@ -1,437 +0,0 @@ -{% extends "base.html" %} - -{% block title %}Events - Central{% endblock %} - -{% block head %} - - -{% endblock %} - -{% block content %} -

    Events

    - -{% if filter_error %} -
    - Filter Error: {{ filter_error }} -
    -{% endif %} - -
    - Filters -
    - -
    -
    - - -
    -
    - - -
    -
    - - -
    -
    - - -
    -
    - - - - - - - - - -
    - - Clear Filters -
    -
    -
    - -
    -
    -
    -
    -
    - NWS (Weather) -
    -
    -
    - FIRMS (Fire) -
    -
    -
    - USGS (Quake) -
    -
    - -
    - -
    - {% include "_events_rows.html" %} -
    - - - - -{% endblock %} diff --git a/src/central/models.py b/src/central/models.py index c7a2159..17145ad 100644 --- a/src/central/models.py +++ b/src/central/models.py @@ -32,3 +32,48 @@ class Event(BaseModel): data: dict[str, Any] # adapter-specific payload +def subject_for_event(ev: Event) -> str: + """ + Compute the NATS subject for an event based on its category. + + Dispatch by category prefix: + - fire.*: returns central. directly + - wx.*: uses weather alert subject logic + + Weather alert subjects: + central.wx.alert.us..county. + or + central.wx.alert.us..zone. + based on whether the primary_region encodes a county or a zone. + + Fire hotspot subjects: + central.fire.hotspot.. + """ + # Fire events: subject is just central. + if ev.category.startswith("fire."): + return f"central.{ev.category}" + + # Weather events: use geo-based subject logic + prefix = "central.wx" + + if ev.geo.primary_region is None: + return f"{prefix}.alert.us.unknown" + + region = ev.geo.primary_region + + # Parse US-- format + # County codes are like "Ada", "Canyon" (names) + # Zone codes start with "Z" like "Z033" + parts = region.split("-") + if len(parts) < 3 or parts[0] != "US": + return f"{prefix}.alert.us.unknown" + + state = parts[1].lower() + code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington" + + if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit(): + # Zone code like Z033 + return f"{prefix}.alert.us.{state}.zone.{code.lower()}" + else: + # County name + return f"{prefix}.alert.us.{state}.county.{code.lower()}" diff --git a/src/central/supervisor.py b/src/central/supervisor.py index da00f0e..652ce44 100644 --- a/src/central/supervisor.py +++ b/src/central/supervisor.py @@ -13,40 +13,24 @@ from typing import Any import nats from nats.js import JetStreamContext -import importlib -import pkgutil - from central.adapter import SourceAdapter +from central.adapters.nws import NWSAdapter +from central.adapters.firms import FIRMSAdapter +from central.adapters.usgs_quake import USGSQuakeAdapter from central.cloudevents_wire import wrap_event from central.config_models import AdapterConfig from central.config_source import ConfigSource, DbConfigSource from central.config_store import ConfigStore from central.bootstrap_config import get_settings +from central.models import subject_for_event from central.stream_manager import StreamManager -import central.adapters -def discover_adapters() -> dict[str, type[SourceAdapter]]: - """Auto-discover adapter classes from central.adapters package.""" - registry: dict[str, type[SourceAdapter]] = {} - for module_info in pkgutil.iter_modules(central.adapters.__path__): - try: - module = importlib.import_module(f"central.adapters.{module_info.name}") - except Exception as e: - logger.error( - "Failed to import adapter module", - extra={"module": module_info.name, "error": str(e)}, - ) - continue - for attr_name in dir(module): - attr = getattr(module, attr_name) - if ( - isinstance(attr, type) - and issubclass(attr, SourceAdapter) - and attr is not SourceAdapter - and hasattr(attr, "name") - ): - registry[attr.name] = attr - return registry +# Adapter registry - add new adapters here +_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { + "nws": NWSAdapter, + "firms": FIRMSAdapter, + "usgs_quake": USGSQuakeAdapter, +} CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") @@ -130,7 +114,6 @@ class Supervisor: self._config_store = config_store self._nats_url = nats_url self._cloudevents_config = cloudevents_config - self._adapters = discover_adapters() self._nc: nats.NATS | None = None self._js: JetStreamContext | None = None self._stream_manager: StreamManager | None = None @@ -178,7 +161,7 @@ class Supervisor: def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: """Create an adapter instance based on config name.""" - cls = self._adapters.get(config.name) + cls = _ADAPTER_REGISTRY.get(config.name) if cls is None: raise ValueError(f"Unknown adapter type: {config.name}") return cls( @@ -249,7 +232,7 @@ class Supervisor: # Build CloudEvent (uses defaults if no config provided) envelope, msg_id = wrap_event(event, self._cloudevents_config) - subject = state.adapter.subject_for(event) + subject = subject_for_event(event) # Publish await self._publish_event(subject, envelope, msg_id) diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py deleted file mode 100644 index 2ec1700..0000000 --- a/tests/test_events_feed_frontend.py +++ /dev/null @@ -1,686 +0,0 @@ -"""Tests for events feed frontend routes.""" - -import json -from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -from central.gui.routes import events_list, events_rows, events_json - - -class TestEventsFeedFrontendAuthenticated: - """Test events feed frontend with authentication.""" - - @pytest.mark.asyncio - async def test_events_no_filters_returns_html(self): - """GET /events authenticated, no filters returns HTML with events.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = {} - - mock_events = [ - { - "id": f"event_{i}", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "adapter": "nws", - "category": "Weather Alert", - "subject": f"Test Alert {i}", - "geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None, - "data": {}, - "regions": [], - } - for i in range(5) - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - call_args = mock_templates.TemplateResponse.call_args - context = call_args.kwargs.get("context", call_args[1].get("context")) - assert "events" in context - assert context["filter_error"] is None - - @pytest.mark.asyncio - async def test_events_adapter_filter(self): - """GET /events?adapter=nws returns only nws events.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = {"adapter": "nws"} - - mock_events = [ - { - "id": "nws_event_1", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "NWS Alert", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - assert context["filter_values"]["adapter"] == "nws" - - @pytest.mark.asyncio - async def test_events_since_until_filter(self): - """GET /events?since=...&until=... filters by time window.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = { - "since": "2026-05-17T00:00:00", - "until": "2026-05-17T12:00:00", - } - - mock_events = [ - { - "id": "in_range", - "time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "In Range", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - # Verify filter was actually parsed and passed to template - mock_templates.TemplateResponse.assert_called_once() - call_kwargs = mock_templates.TemplateResponse.call_args.kwargs - context = call_kwargs.get("context", call_kwargs) - assert context["filter_values"]["since"] == "2026-05-17T00:00:00" - assert context["filter_values"]["until"] == "2026-05-17T12:00:00" - - @pytest.mark.asyncio - async def test_events_region_filter(self): - """GET /events with full region bbox filters by location.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = { - "region_north": "49.5", - "region_south": "31", - "region_east": "-102", - "region_west": "-124.5", - } - - mock_events = [ - { - "id": "in_bbox", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "In BBox", - "geometry": '{"type": "Point", "coordinates": [-120, 40]}', - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - # Verify region filter was actually parsed and passed to template - mock_templates.TemplateResponse.assert_called_once() - call_kwargs = mock_templates.TemplateResponse.call_args.kwargs - context = call_kwargs.get("context", call_kwargs) - assert context["filter_values"]["region_north"] == "49.5" - assert context["filter_values"]["region_south"] == "31" - assert context["filter_values"]["region_east"] == "-102" - assert context["filter_values"]["region_west"] == "-124.5" - - @pytest.mark.asyncio - async def test_events_partial_region_shows_error_banner(self): - """GET /events with partial region shows error banner, not 400.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = {"region_north": "49"} - - mock_conn = AsyncMock() - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - # Should be 200, not 400 - assert result.status_code == 200 - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - assert context["filter_error"] is not None - assert "region" in context["filter_error"].lower() - # Events should be empty due to validation error - assert context["events"] == [] - - @pytest.mark.asyncio - async def test_events_with_limit_shows_next_button(self): - """GET /events?limit=5 shows Next button when more events exist.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf_token" - mock_request.query_params = {"limit": "5"} - - # Return 6 events (limit+1) to trigger pagination - mock_events = [ - { - "id": f"event_{i}", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "adapter": "nws", - "category": "Alert", - "subject": f"Event {i}", - "geometry": None, - "data": {}, - "regions": [], - } - for i in range(6) - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - assert context["next_cursor"] is not None - assert len(context["events"]) == 5 # Should be trimmed to limit - - -class TestEventsRowsFragment: - """Test /events/rows HTMX fragment.""" - - @pytest.mark.asyncio - async def test_events_rows_returns_fragment(self): - """GET /events/rows returns table fragment, not full page.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.query_params = {"limit": "5"} - - mock_events = [ - { - "id": "event_1", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "Event 1", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_rows(mock_request) - - assert result.status_code == 200 - # Verify it uses the fragment template - call_args = mock_templates.TemplateResponse.call_args - assert call_args.kwargs.get("name") == "_events_rows.html" - - -class TestGeometrySummary: - """Test geometry summary function.""" - - def test_geometry_summary_polygon(self): - """Polygon geometry shows point count.""" - from central.gui.routes import _geometry_summary - - geom = { - "type": "Polygon", - "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]] - } - summary = _geometry_summary(geom) - assert "Polygon" in summary - assert "5 pts" in summary - - def test_geometry_summary_point(self): - """Point geometry shows 'Point'.""" - from central.gui.routes import _geometry_summary - - geom = {"type": "Point", "coordinates": [-122.4, 37.8]} - summary = _geometry_summary(geom) - assert summary == "Point" - - def test_geometry_summary_linestring(self): - """LineString geometry shows point count.""" - from central.gui.routes import _geometry_summary - - geom = { - "type": "LineString", - "coordinates": [[-122, 37], [-121, 38], [-120, 39]] - } - summary = _geometry_summary(geom) - assert "Line" in summary - assert "3 pts" in summary - - def test_geometry_summary_none(self): - """None geometry shows 'None'.""" - from central.gui.routes import _geometry_summary - - summary = _geometry_summary(None) - assert summary == "None" - - -class TestDataGeometryAttribute: - """Test that rows have valid geometry data attributes.""" - - @pytest.mark.asyncio - async def test_event_with_geometry_has_valid_json(self): - """Events with geometry have parseable JSON in data-geometry.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.query_params = {} - - mock_events = [ - { - "id": "geom_event", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "With Geometry", - "geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}', - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_rows(mock_request) - - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - event = context["events"][0] - # Geometry should be parsed dict, not string - assert isinstance(event["geometry"], dict) - assert event["geometry"]["type"] == "Polygon" - - @pytest.mark.asyncio - async def test_event_without_geometry_has_none(self): - """Events without geometry have None for geometry field.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.query_params = {} - - mock_events = [ - { - "id": "no_geom_event", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Alert", - "subject": "No Geometry", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_rows(mock_request) - - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - event = context["events"][0] - assert event["geometry"] is None - - -class TestCrossEndpointParity: - """Test that /events.json and /events return the same filtered results.""" - - @pytest.mark.asyncio - async def test_category_filter_both_endpoints(self): - """Category filter works on both /events.json and /events.""" - mock_events = [ - { - "id": "weather_event", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "nws", - "category": "Weather Alert", - "subject": "Weather Event", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - query_params = {"category": "Weather Alert"} - - # Test /events.json - json_request = MagicMock() - json_request.state.operator = MagicMock(id=1, username="admin") - json_request.query_params = query_params - - with patch("central.gui.routes.get_pool", return_value=mock_pool): - json_response = await events_json(json_request) - - json_data = json.loads(json_response.body) - assert len(json_data["events"]) == 1 - assert json_data["events"][0]["category"] == "Weather Alert" - - # Test /events - html_request = MagicMock() - html_request.state.operator = MagicMock(id=1, username="admin") - html_request.state.csrf_token = "test_csrf" - html_request.query_params = query_params - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - mock_conn.fetch.return_value = mock_events - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - await events_list(html_request) - - html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - assert len(html_context["events"]) == 1 - assert html_context["events"][0]["category"] == "Weather Alert" - - @pytest.mark.asyncio - async def test_cursor_pagination_both_endpoints(self): - """Cursor pagination works identically on both endpoints.""" - first_page = [ - { - "id": f"event_{i}", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i), - "adapter": "nws", - "category": "Alert", - "subject": f"Event {i}", - "geometry": None, - "data": {}, - "regions": [], - } - for i in range(3) - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = first_page - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - json_request = MagicMock() - json_request.state.operator = MagicMock(id=1, username="admin") - json_request.query_params = {"limit": "2"} - - with patch("central.gui.routes.get_pool", return_value=mock_pool): - json_response = await events_json(json_request) - - json_data = json.loads(json_response.body) - json_cursor = json_data["next_cursor"] - assert json_cursor is not None - - html_request = MagicMock() - html_request.state.operator = MagicMock(id=1, username="admin") - html_request.state.csrf_token = "test_csrf" - html_request.query_params = {"limit": "2"} - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - mock_conn.fetch.return_value = first_page - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - await events_list(html_request) - - html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - html_cursor = html_context["next_cursor"] - - assert json_cursor == html_cursor - - -class TestErrorSemantics: - """Test error handling differences between JSON and HTML endpoints.""" - - @pytest.mark.asyncio - async def test_json_endpoint_returns_400_on_invalid_limit(self): - """/events.json?limit=0 returns 400 JSON error.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.query_params = {"limit": "0"} - - response = await events_json(mock_request) - - assert response.status_code == 400 - data = json.loads(response.body) - assert "error" in data - - @pytest.mark.asyncio - async def test_html_endpoint_returns_200_with_error_banner(self): - """/events?limit=0 returns 200 HTML with error banner.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf" - mock_request.query_params = {"limit": "0"} - - mock_conn = AsyncMock() - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - assert context["filter_error"] is not None - assert "limit" in context["filter_error"].lower() - assert context["events"] == [] - - -class TestEventRowDataAttributes: - """Test that _events_rows.html renders required data attributes.""" - - @pytest.mark.asyncio - async def test_row_renders_data_adapter_attribute(self): - """Event rows include data-adapter attribute for color coding.""" - mock_request = MagicMock() - mock_request.state.operator = MagicMock(id=1, username="admin") - mock_request.state.csrf_token = "test_csrf" - mock_request.query_params = {} - - mock_events = [ - { - "id": "test1", - "time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc), - "adapter": "usgs_quake", - "category": "quake.event", - "subject": "M4.2 Earthquake", - "geometry": None, - "data": {}, - "regions": [], - }, - ] - - mock_conn = AsyncMock() - mock_conn.fetch.return_value = mock_events - mock_conn.fetchrow.return_value = { - "map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", - "map_attribution": "OpenStreetMap", - } - - mock_pool = MagicMock() - mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) - mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None) - - mock_templates = MagicMock() - mock_templates.TemplateResponse.return_value = MagicMock(status_code=200) - - with patch("central.gui.routes._get_templates", return_value=mock_templates): - with patch("central.gui.routes.get_pool", return_value=mock_pool): - result = await events_list(mock_request) - - assert result.status_code == 200 - mock_templates.TemplateResponse.assert_called_once() - context = mock_templates.TemplateResponse.call_args.kwargs.get("context") - # The template receives events with adapter field for data-adapter attribute - assert len(context["events"]) == 1 - assert context["events"][0]["adapter"] == "usgs_quake" - assert context["events"][0]["category"] == "quake.event" - assert context["events"][0]["subject"] == "M4.2 Earthquake" diff --git a/tests/test_firms.py b/tests/test_firms.py index bfe629a..9e51331 100644 --- a/tests/test_firms.py +++ b/tests/test_firms.py @@ -10,6 +10,7 @@ from central.adapters.firms import ( FIRMSAdapter, CONFIDENCE_MAP, SATELLITE_SHORT, + subject_for_fire_hotspot, ) from central.config_models import AdapterConfig, RegionConfig from central.models import Event, Geo @@ -284,14 +285,7 @@ class TestDeduplication: class TestSubjectGeneration: """Test subject generation for fire hotspots.""" - @pytest.mark.asyncio - async def test_subject_format(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = FIRMSAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) + def test_subject_format(self): event = Event( id="test", adapter="firms", @@ -302,17 +296,10 @@ class TestSubjectGeneration: data={}, ) - subject = adapter.subject_for(event) + subject = subject_for_fire_hotspot(event) assert subject == "central.fire.hotspot.viirs_snpp.high" - @pytest.mark.asyncio - async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = FIRMSAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) + def test_subject_nominal_confidence(self): event = Event( id="test", adapter="firms", @@ -323,7 +310,7 @@ class TestSubjectGeneration: data={}, ) - subject = adapter.subject_for(event) + subject = subject_for_fire_hotspot(event) assert subject == "central.fire.hotspot.viirs_noaa20.nominal" diff --git a/tests/test_models.py b/tests/test_models.py index 7ed5c54..c010f39 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone import pytest -from central.models import Event, Geo +from central.models import Event, Geo, subject_for_event from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config from central.cloudevents_wire import wrap_event @@ -57,6 +57,47 @@ def sample_config() -> Config: ) +class TestSubjectForEvent: + """Tests for subject_for_event helper.""" + + def test_county_subject(self, sample_event: Event) -> None: + """County codes produce county subject.""" + subject = subject_for_event(sample_event) + assert subject == "central.wx.alert.us.id.county.ada" + + def test_zone_subject(self, sample_geo: Geo) -> None: + """Zone codes produce zone subject.""" + geo = Geo( + centroid=sample_geo.centroid, + bbox=sample_geo.bbox, + regions=["US-ID-Z033"], + primary_region="US-ID-Z033", + ) + event = Event( + id="test-zone", + adapter="nws", + category="wx.alert.winter_storm_warning", + time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), + geo=geo, + data={}, + ) + subject = subject_for_event(event) + assert subject == "central.wx.alert.us.id.zone.z033" + + def test_unknown_subject(self, sample_event: Event) -> None: + """Missing primary_region produces unknown subject.""" + geo = Geo(regions=[], primary_region=None) + event = Event( + id="test-unknown", + adapter="nws", + category="wx.alert.test", + time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), + geo=geo, + data={}, + ) + subject = subject_for_event(event) + assert subject == "central.wx.alert.us.unknown" + class TestCloudEventsWire: """Tests for CloudEvents wire format.""" diff --git a/tests/test_nws_normalization.py b/tests/test_nws_normalization.py index 706ce1d..53a7e49 100644 --- a/tests/test_nws_normalization.py +++ b/tests/test_nws_normalization.py @@ -17,6 +17,7 @@ from central.adapters.nws import ( SEVERITY_MAP, ) from central.config_models import AdapterConfig +from central.models import subject_for_event # Sample NWS GeoJSON features for testing @@ -271,7 +272,7 @@ class TestSubjectDerivation: def test_county_subject(self, adapter: NWSAdapter) -> None: event = adapter._normalize_feature(SAMPLE_FEATURE_ID) assert event is not None - subject = adapter.subject_for(event) + subject = subject_for_event(event) # Primary region should be alphabetically first # Could be county or zone depending on sort order assert subject.startswith("central.wx.alert.us.id.") @@ -293,7 +294,7 @@ class TestSubjectDerivation: } event = adapter._normalize_feature(feature) assert event is not None - subject = adapter.subject_for(event) + subject = subject_for_event(event) assert "zone" in subject diff --git a/tests/test_supervisor_integration.py b/tests/test_supervisor_integration.py index e318752..cf1c65b 100644 --- a/tests/test_supervisor_integration.py +++ b/tests/test_supervisor_integration.py @@ -200,77 +200,76 @@ class TestEnableDisableEnableIntegration: supervisor._js = mock_nats.jetstream() # Patch NWSAdapter to use our mock - # Inject mock adapter into supervisor's registry - supervisor._adapters["nws"] = MockNWSAdapter - # Start supervisor (starts adapter) - await supervisor._start_adapter(initial_config) + with patch("central.supervisor.NWSAdapter", MockNWSAdapter): + # Start supervisor (starts adapter) + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) - # Simulate completed poll 5 minutes ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 5 minutes ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is in the past - # and poll immediately. Let's verify by checking the wait time logic. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is in the past + # and poll immediately. Let's verify by checking the wait time logic. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was 5 minutes ago, cadence is 60s - # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago - # wait_time should be 0 (poll immediately) - assert wait_time == 0, ( - f"Expected immediate poll (wait=0), got wait={wait_time}s. " - f"last_poll was {saved_last_poll}, now is {now}" - ) + # last_poll was 5 minutes ago, cadence is 60s + # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago + # wait_time should be 0 (poll immediately) + assert wait_time == 0, ( + f"Expected immediate poll (wait=0), got wait={wait_time}s. " + f"last_poll was {saved_last_poll}, now is {now}" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_enable_gap_shorter_than_cadence( @@ -309,76 +308,75 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - # Inject mock adapter into supervisor's registry - supervisor._adapters["nws"] = MockNWSAdapter - # Start adapter - await supervisor._start_adapter(initial_config) + with patch("central.supervisor.NWSAdapter", MockNWSAdapter): + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - saved_last_poll = state.last_completed_poll + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + saved_last_poll = state.last_completed_poll - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify stopped but state preserved (THIS IS THE KEY CHECK) - # On unfixed code, state will be NONE because pop() removes it - # On fixed code, state still exists with is_running=False - state = supervisor._adapter_states.get("nws") - assert state is not None, ( - "State was removed on stop! This violates the rate-limit guarantee. " - "State should be preserved to maintain last_completed_poll." - ) - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify stopped but state preserved (THIS IS THE KEY CHECK) + # On unfixed code, state will be NONE because pop() removes it + # On fixed code, state still exists with is_running=False + state = supervisor._adapter_states.get("nws") + assert state is not None, ( + "State was removed on stop! This violates the rate-limit guarantee. " + "State should be preserved to maintain last_completed_poll." + ) + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # Re-enable adapter (simulate 20 seconds later, but we're just - # checking the rate limit logic) - reenabled_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(reenabled_config) - await supervisor._on_config_change("adapters", "nws") + # Re-enable adapter (simulate 20 seconds later, but we're just + # checking the rate limit logic) + reenabled_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(reenabled_config) + await supervisor._on_config_change("adapters", "nws") - # Verify restarted with preserved last_completed_poll - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - assert state.last_completed_poll == saved_last_poll + # Verify restarted with preserved last_completed_poll + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + assert state.last_completed_poll == saved_last_poll - # The loop should detect that last_poll + cadence is still in the future - # and wait until then. - now = datetime.now(timezone.utc) - next_poll_at = saved_last_poll.timestamp() + 60 - wait_time = max(0, next_poll_at - now.timestamp()) + # The loop should detect that last_poll + cadence is still in the future + # and wait until then. + now = datetime.now(timezone.utc) + next_poll_at = saved_last_poll.timestamp() + 60 + wait_time = max(0, next_poll_at - now.timestamp()) - # last_poll was ~10 seconds ago, cadence is 60s - # wait_time should be ~50s (60 - 10 = 50) - assert 45 < wait_time < 55, ( - f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " - f"Rate limit violated: poll would happen before last_poll + cadence" - ) + # last_poll was ~10 seconds ago, cadence is 60s + # wait_time should be ~50s (60 - 10 = 50) + assert 45 < wait_time < 55, ( + f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " + f"Rate limit violated: poll would happen before last_poll + cadence" + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_enable_disable_delete_readd_fresh_state( @@ -416,61 +414,60 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - # Inject mock adapter into supervisor's registry - supervisor._adapters["nws"] = MockNWSAdapter - # Start adapter - await supervisor._start_adapter(initial_config) + with patch("central.supervisor.NWSAdapter", MockNWSAdapter): + # Start adapter + await supervisor._start_adapter(initial_config) - state = supervisor._adapter_states.get("nws") - assert state is not None + state = supervisor._adapter_states.get("nws") + assert state is not None - # Simulate completed poll 10 seconds ago - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) + # Simulate completed poll 10 seconds ago + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) - # Disable adapter - disabled_config = AdapterConfig( - name="nws", - enabled=False, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(disabled_config) - await supervisor._on_config_change("adapters", "nws") + # Disable adapter + disabled_config = AdapterConfig( + name="nws", + enabled=False, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(disabled_config) + await supervisor._on_config_change("adapters", "nws") - # DELETE adapter from DB (remove from config source) - config_source.set_adapter(None, name="nws") - await supervisor._on_config_change("adapters", "nws") + # DELETE adapter from DB (remove from config source) + config_source.set_adapter(None, name="nws") + await supervisor._on_config_change("adapters", "nws") - # Verify adapter fully removed - assert "nws" not in supervisor._adapter_states + # Verify adapter fully removed + assert "nws" not in supervisor._adapter_states - # Re-add adapter with same name - new_config = AdapterConfig( - name="nws", - enabled=True, - cadence_s=60, - settings={"states": ["ID"], "contact_email": "test@test.com"}, - paused_at=None, - updated_at=datetime.now(timezone.utc), - ) - config_source.set_adapter(new_config) - await supervisor._on_config_change("adapters", "nws") + # Re-add adapter with same name + new_config = AdapterConfig( + name="nws", + enabled=True, + cadence_s=60, + settings={"states": ["ID"], "contact_email": "test@test.com"}, + paused_at=None, + updated_at=datetime.now(timezone.utc), + ) + config_source.set_adapter(new_config) + await supervisor._on_config_change("adapters", "nws") - # Verify new adapter started fresh - state = supervisor._adapter_states.get("nws") - assert state is not None - assert adapter_is_running(state) - # last_completed_poll should be None (fresh adapter) - assert state.last_completed_poll is None, ( - f"Expected None (fresh adapter), got {state.last_completed_poll}. " - f"Preserved state not cleared on delete." - ) + # Verify new adapter started fresh + state = supervisor._adapter_states.get("nws") + assert state is not None + assert adapter_is_running(state) + # last_completed_poll should be None (fresh adapter) + assert state.last_completed_poll is None, ( + f"Expected None (fresh adapter), got {state.last_completed_poll}. " + f"Preserved state not cleared on delete." + ) - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_stop_preserves_state_start_reuses_it( @@ -500,35 +497,34 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - # Inject mock adapter into supervisor's registry - supervisor._adapters["nws"] = MockNWSAdapter - # Start adapter - await supervisor._start_adapter(config) + with patch("central.supervisor.NWSAdapter", MockNWSAdapter): + # Start adapter + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) - saved_poll = state.last_completed_poll + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) + saved_poll = state.last_completed_poll - # Stop adapter - await supervisor._stop_adapter("nws") + # Stop adapter + await supervisor._stop_adapter("nws") - # State should still exist - assert "nws" in supervisor._adapter_states - state = supervisor._adapter_states["nws"] - assert not adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # State should still exist + assert "nws" in supervisor._adapter_states + state = supervisor._adapter_states["nws"] + assert not adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Restart adapter - await supervisor._start_adapter(config) + # Restart adapter + await supervisor._start_adapter(config) - # Should reuse existing state - state = supervisor._adapter_states.get("nws") - assert adapter_is_running(state) - assert state.last_completed_poll == saved_poll + # Should reuse existing state + state = supervisor._adapter_states.get("nws") + assert adapter_is_running(state) + assert state.last_completed_poll == saved_poll - # Cleanup - supervisor._shutdown_event.set() - await cleanup_adapter(supervisor, "nws") + # Cleanup + supervisor._shutdown_event.set() + await cleanup_adapter(supervisor, "nws") @pytest.mark.asyncio async def test_remove_adapter_clears_state( @@ -558,15 +554,14 @@ class TestEnableDisableEnableIntegration: supervisor._nc = mock_nats supervisor._js = mock_nats.jetstream() - # Inject mock adapter into supervisor's registry - supervisor._adapters["nws"] = MockNWSAdapter - await supervisor._start_adapter(config) + with patch("central.supervisor.NWSAdapter", MockNWSAdapter): + await supervisor._start_adapter(config) - state = supervisor._adapter_states.get("nws") - state.last_completed_poll = datetime.now(timezone.utc) + state = supervisor._adapter_states.get("nws") + state.last_completed_poll = datetime.now(timezone.utc) - # Remove adapter - await cleanup_adapter(supervisor, "nws") + # Remove adapter + await cleanup_adapter(supervisor, "nws") - # State should be gone - assert "nws" not in supervisor._adapter_states + # State should be gone + assert "nws" not in supervisor._adapter_states diff --git a/tests/test_usgs_quake.py b/tests/test_usgs_quake.py index 690f7da..24c6f73 100644 --- a/tests/test_usgs_quake.py +++ b/tests/test_usgs_quake.py @@ -480,122 +480,3 @@ class TestApplyConfig: assert adapter._feed == "all_day" await adapter.shutdown() - - - -class TestSubjectFor: - """Test subject_for method for all magnitude tiers.""" - - @pytest.mark.asyncio - async def test_subject_minor(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-minor", - adapter="usgs_quake", - category="quake.event.minor", - time=datetime.now(timezone.utc), - severity=0, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.minor" - - @pytest.mark.asyncio - async def test_subject_light(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-light", - adapter="usgs_quake", - category="quake.event.light", - time=datetime.now(timezone.utc), - severity=1, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.light" - - @pytest.mark.asyncio - async def test_subject_moderate(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-moderate", - adapter="usgs_quake", - category="quake.event.moderate", - time=datetime.now(timezone.utc), - severity=2, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.moderate" - - @pytest.mark.asyncio - async def test_subject_strong(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-strong", - adapter="usgs_quake", - category="quake.event.strong", - time=datetime.now(timezone.utc), - severity=3, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.strong" - - @pytest.mark.asyncio - async def test_subject_major(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-major", - adapter="usgs_quake", - category="quake.event.major", - time=datetime.now(timezone.utc), - severity=4, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.major" - - @pytest.mark.asyncio - async def test_subject_great(self, temp_db_path, mock_config_store): - config = make_adapter_config() - adapter = USGSQuakeAdapter( - config=config, - config_store=mock_config_store, - cursor_db_path=temp_db_path, - ) - event = Event( - id="test-great", - adapter="usgs_quake", - category="quake.event.great", - time=datetime.now(timezone.utc), - severity=5, - geo=Geo(centroid=(-116.0, 45.0)), - data={}, - ) - assert adapter.subject_for(event) == "central.quake.event.great"