Compare commits

...

7 commits

Author SHA1 Message Date
Matt Johnson
87f46e8b35 Merge refactor/a1-self-describing-adapters: self-describing adapter pattern
- Add display_name, description, settings_schema (Pydantic), requires_api_key, wizard_order, default_cadence_s to SourceAdapter ABC
- Implement in NWSAdapter, FIRMSAdapter, USGSQuakeAdapter
- Auto-discovery via pkgutil.iter_modules
- Fix quake stream bug (events now route to CENTRAL_QUAKE)
- 308 tests pass, live verified on CT104

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-18 22:49:42 +00:00
Matt Johnson
4ee3d8bd14 fix(adapters): complete self-describing adapter attributes
- Replace settings_schema classmethod with Pydantic model class attribute
- Add display_name, description, requires_api_key, wizard_order, default_cadence_s
- Remove stream_name from adapters (JetStream routes by subject filter)
- Define NWSSettings, FIRMSSettings, USGSQuakeSettings Pydantic models
- Make discover_adapters() public with error handling
- Move adapter registry to Supervisor instance (self._adapters)
- Add subject_for tests for all 6 quake magnitude tiers
- Fix test_supervisor_integration to use injected mock adapters

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-18 22:33:19 +00:00
Matt Johnson
4573bf6ee2 refactor(adapters): self-describing adapter pattern with auto-discovery
- Add stream_name, subject_for(), and settings_schema() to SourceAdapter ABC
- Implement all three methods in NWSAdapter, FIRMSAdapter, USGSQuakeAdapter
- Replace manual _ADAPTER_REGISTRY with pkgutil.iter_modules auto-discovery
- Remove subject_for_event from models.py (each adapter owns its subject logic)
- Update supervisor to use adapter.subject_for(event) instead of helper
- Fix quake events going to wrong stream (was publishing to CENTRAL_WX)
- Update test files to use adapter methods

This fixes the quake stream bug where events were published to
central.wx.alert.us.unknown instead of central.quake.event.<tier>.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-18 22:14:12 +00:00
38b23f2a25
release: bump version to 0.3.0 (#30)
Pyproject was stuck at 0.1.0 since Phase 0; bumping to match
the v0.3.0 release tag landing alongside this.

Co-authored-by: Matt Johnson <mj@k7zvx.com>
2026-05-18 14:29:28 -06:00
dbe627dee4
docs: add v0.3.0 changelog entry and network bindings reference (#29)
CHANGELOG.md:
- v0.3.0 Phase 1b entry covering operator console, events feed,
  wizard, session auth, and infrastructure changes

docs/environment.md:
- New "Network and Service Bindings" section documenting:
  - central-gui binds 0.0.0.0 by design (network gating is ops)
  - NATS listener ports table (4222/8080/8222/1883)

Co-authored-by: Matt Johnson <mj@k7zvx.com>
2026-05-18 14:26:09 -06:00
3de81f392a
1b-9c: Events feed UX iteration — colors, popups, viewport filter, expandable rows (#28)
* feat: events feed UX iteration - colors, popups, viewport filter

A. Color-code polygons by adapter (NWS amber, FIRMS red, USGS violet)
B. Click popup on polygons showing time + adapter + category + subject
C. Map viewport drives spatial filter - pan/zoom updates table via HTMX
D. Add legend showing adapter color mapping
E. Remove draw-bbox control, region inputs now hidden (auto-managed)

Template changes:
- _events_rows.html: add data-adapter, data-category, data-time, data-subject
- events_list.html: ADAPTER_COLORS mapping, bindPopup, moveend handler

Test: verify template renders adapter/category/subject for JS consumption

* fix: remove isoformat() call on already-formatted time string

* feat: full events feed UX iteration

A. Color-code polygons by adapter with legend
B. Click popup on polygons with "View details" link
C. Viewport-driven spatial filter - pan/zoom updates table via HTMX
   Map never auto-fits after initial load (user controls viewport)
D. Expandable row details showing full event data payload

Changes:
- _events_rows.html: add data-event-id, expand button, detail row
- events_list.html: eventLayerGroup pattern, buildPopup, rebindEventLayers
  Fit to results button, expand/collapse handlers, CSS.escape for IDs

* fix: add programmaticMove flag to prevent viewport refresh loop

Suppress moveend handler during fitBounds/setView calls to prevent
feedback loop: fitBounds -> moveend -> applyViewportFilter -> HTMX
swap -> repeat.

* fix: map never auto-fits - user controls viewport

- Disable initial fitToAllLayers on page load
- Remove fitBounds/setView from row click handler
- Map only moves when user pans/zooms
- Table filters based on visible viewport

* fix: map shows all events always, only table filters

Map polygons are drawn once on load and never cleared/redrawn.
HTMX swap only updates the table, not the map layers.
User viewport is fully preserved.

* fix: use htmx.trigger instead of dispatchEvent for HTMX swap

dispatchEvent(submit) was triggering native form submission (full page
reload). htmx.trigger() properly triggers HTMX swap.

Also re-enable initial rebindEventLayers so polygons load on first render.

---------

Co-authored-by: Matt Johnson <mj@k7zvx.com>
2026-05-18 14:19:27 -06:00
55e68d038f
feat(gui): add events feed frontend with map and filters (1b-9b) (#26)
* feat(gui): add events feed frontend with map and filters

GET /events: Full page with filter form, table, and Leaflet map
GET /events/rows: HTMX fragment for table updates

Features:
- Filterable by adapter, category, time range, region bbox
- Cursor-based pagination with Next button
- Leaflet map showing event geometries
- Click/hover row highlights geometry on map
- Draw rectangle on map to filter by region
- Validation errors shown as banner, not 400
- Events link added to nav between Adapters and Streams

Refactored events query into shared helper for JSON and HTML routes.

Tests: 14 new tests covering filters, fragments, geometry handling.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(events): use shared helpers for /events.json, fix tests

- Refactor /events.json to use _parse_events_params and _fetch_events
  helpers, removing ~200 lines of duplicate query logic
- Delete smoke test (test_events_unauthenticated_redirects) that had
  no assertions
- Add TestCrossEndpointParity: verify /events.json and /events return
  identical results with same params, test category filter and cursor
  pagination on both endpoints
- Add TestErrorSemantics: verify /events.json returns 400 on bad params
  while /events returns 200 with error banner (intentional API vs HTML
  divergence)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test: add real assertions to since/until and region filter tests

Replace trivial status_code==200 assertions with checks that verify
the filter values were actually parsed and passed to the template.
These tests now fail if the handler ignores the filter parameters.

* fix: remove double-escaping from data-geometry attribute

tojson already produces HTML-attribute-safe JSON. The extra |e filter
was double-escaping, causing JSON.parse to fail in the browser JS.
Switch to single-quoted attribute to avoid conflicts with JSON double
quotes.

---------

Co-authored-by: Matt Johnson <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-18 11:23:38 -06:00
19 changed files with 2076 additions and 432 deletions

View file

@ -1,5 +1,63 @@
# Changelog # Changelog
## v0.3.0 — Phase 1b (2026-05-18)
Operator console. FastAPI + Jinja2 + Pico + HTMX. Self-hosted,
Tailscale-gated by default, no application-level auth beyond
the operator session.
### Added
- Operator console (`central-gui` systemd service on port 8000)
- Login + session auth (argon2id, 90-day DB-backed sessions)
- Dashboard: events 24h by adapter, stream sizes,
last-poll-time per adapter
- Adapters list and edit page (cadence + per-adapter settings),
with Leaflet region picker and click-to-draw rectangles
- Streams view with retention chips (1d / 7d / 14d / 30d /
365d / custom)
- API keys management (list / add / rotate / delete,
encrypted at rest via `crypto.encrypt`, plaintext never
logged or stored)
- First-run wizard (5 steps: operator, system, keys, adapters,
finish) with deferred-commit pattern — no DB writes until
Finish runs as a single transaction
- Events feed page (`/events`) — paginated, filterable by
adapter / category / time range / map viewport, with
color-coded geometry overlay, click-to-popup, and
expandable row details showing full event payload
- Paginated events JSON API (`/events.json`) — cursor-based
pagination, same filter surface as the HTML feed
### Changed
- CSRF tokens are now session-bound (synchronizer token
pattern), replacing the previous fastapi-csrf-protect
library. Eliminates a rotation race that broke first-load
submissions
- First-run wizard is a single atomic transaction at Finish,
not per-step DB writes. Back navigation works; abandoned
wizards leave no orphan rows
### Fixed
- Adapter editor's JSONB double-encoding bug (write path
called `json.dumps` before asyncpg's codec, corrupting
the settings column)
- Dashboard polls card was reading from the wrong NATS
subject and using a durable consumer instead of
`get_last_msg`, leaking zombie consumers
- Browser-noise paths (/favicon.ico, /apple-touch-icon.png,
/robots.txt) return 204 directly, preventing parallel
requests from racing the CSRF cookie on first page load
- SubResource Integrity hashes for leaflet-draw assets
corrected (previous values were fabricated and silently
blocked by browsers)
### Infrastructure
- New `config.sessions` column: `csrf_token` (per-session
synchronizer)
- Composite index on `public.events (time DESC, id DESC)`
for cursor pagination
- `central-gui` systemd service
## v0.2.0 — Phase 1a (2026-05-16) ## v0.2.0 — Phase 1a (2026-05-16)
Three live data sources, configurable infrastructure, hot-reload Three live data sources, configurable infrastructure, hot-reload

View file

@ -28,6 +28,32 @@ The Windows workstation (matt-desktop) has no Central repository clones.
The directory `C:\Users\mtthw\central_work\` is scratch space only and The directory `C:\Users\mtthw\central_work\` is scratch space only and
should not be used for commits. should not be used for commits.
## Network and Service Bindings
### Bind Address
`central-gui` binds to `0.0.0.0` by design. Network gating is the
operator's responsibility (firewall, Tailscale, etc.), not the app's.
Do not switch to `127.0.0.1` or to a specific interface — operators
choose their bind via whatever network they want to expose the service on.
### NATS Listener Ports
The default `nats-server.conf` listens on more than just :4222:
| Port | Protocol | Used by Central? |
|------|----------|------------------|
| 4222 | NATS client | Yes (all) |
| 8080 | WebSocket | No (Phase 0 leftover) |
| 8222 | HTTP monitoring | No (manual ops only) |
| 1883 | MQTT | No (Phase 0 leftover) |
None of the unused ports cause active harm — they listen but no consumer
connects. Operators can remove them from `nats-server.conf` if they want
a tighter footprint. Documenting so future contributors don't grep for
"MQTT integration" and come up confused.
## Repository ## Repository
| Property | Value | | Property | Value |

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "central" name = "central"
version = "0.1.0" version = "0.3.0"
requires-python = ">=3.12,<3.13" requires-python = ">=3.12,<3.13"
description = "Data hub spine — adapters, bus, archive." description = "Data hub spine — adapters, bus, archive."
readme = "README.md" readme = "README.md"

View file

@ -4,6 +4,8 @@ from abc import ABC, abstractmethod
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING: if TYPE_CHECKING:
from central.config_models import AdapterConfig from central.config_models import AdapterConfig
@ -16,9 +18,24 @@ class SourceAdapter(ABC):
Adapters yield Events. The supervisor handles scheduling, Adapters yield Events. The supervisor handles scheduling,
CloudEvents wrapping, publish, and metadata heartbeats. CloudEvents wrapping, publish, and metadata heartbeats.
Class attributes that subclasses must define:
name: Short identifier, e.g. "nws"
display_name: Human-readable name for GUI
description: Short description of the adapter
settings_schema: Pydantic model class for adapter settings
requires_api_key: Key alias if API key required, else None
wizard_order: Order in setup wizard (None = not in wizard)
default_cadence_s: Default polling interval in seconds
""" """
name: str # short identifier, e.g. "nws" name: str
display_name: str
description: str
settings_schema: type[BaseModel]
requires_api_key: str | None = None
wizard_order: int | None = None
default_cadence_s: int
@abstractmethod @abstractmethod
async def poll(self) -> AsyncIterator[Event]: async def poll(self) -> AsyncIterator[Event]:
@ -40,6 +57,16 @@ class SourceAdapter(ABC):
""" """
... ...
@abstractmethod
def subject_for(self, event: Event) -> str:
"""
Compute the NATS subject for an event.
Each adapter knows its own subject hierarchy. The supervisor
calls this to determine where to publish each event.
"""
...
async def startup(self) -> None: async def startup(self) -> None:
"""Optional lifecycle hook called before first poll.""" """Optional lifecycle hook called before first poll."""
pass pass

View file

@ -18,6 +18,8 @@ from tenacity import (
) )
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.models import Event, Geo from central.models import Event, Geo
@ -49,10 +51,23 @@ SEVERITY_MAP = {
} }
class FIRMSSettings(BaseModel):
"""Settings schema for FIRMS adapter."""
api_key_alias: str = "firms"
satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"]
region: RegionConfig | None = None
class FIRMSAdapter(SourceAdapter): class FIRMSAdapter(SourceAdapter):
"""NASA FIRMS fire hotspot adapter.""" """NASA FIRMS fire hotspot adapter."""
name = "firms" name = "firms"
display_name = "NASA FIRMS Fire Hotspots"
description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS."
settings_schema = FIRMSSettings
requires_api_key = "firms"
wizard_order = 2
default_cadence_s = 300
def __init__( def __init__(
self, self,
@ -116,6 +131,15 @@ class FIRMSAdapter(SourceAdapter):
}, },
) )
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains this structure.
"""
return f"central.{event.category}"
async def startup(self) -> None: async def startup(self) -> None:
"""Initialize HTTP session, dedup tracker, and fetch API key.""" """Initialize HTTP session, dedup tracker, and fetch API key."""
# Fetch API key # Fetch API key
@ -417,14 +441,3 @@ class FIRMSAdapter(SourceAdapter):
}, },
) )
def subject_for_fire_hotspot(ev: Event) -> str:
"""Compute the NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains the satellite and confidence info,
so we just prefix with 'central.'.
"""
# category is "fire.hotspot.<satellite>.<confidence>"
return f"central.{ev.category}"

View file

@ -19,6 +19,8 @@ from tenacity import (
from central import __version__ from central import __version__
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.models import Event, Geo from central.models import Event, Geo
@ -189,10 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]:
return sorted(regions) return sorted(regions)
class NWSSettings(BaseModel):
"""Settings schema for NWS adapter."""
contact_email: str = ""
region: RegionConfig | None = None
class NWSAdapter(SourceAdapter): class NWSAdapter(SourceAdapter):
"""National Weather Service alerts adapter.""" """National Weather Service alerts adapter."""
name = "nws" name = "nws"
display_name = "NWS Weather Alerts"
description = "National Weather Service active alerts via api.weather.gov."
settings_schema = NWSSettings
requires_api_key = None
wizard_order = 1
default_cadence_s = 60
def __init__( def __init__(
self, self,
@ -234,6 +248,35 @@ class NWSAdapter(SourceAdapter):
}, },
) )
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a weather alert.
Subject format: central.wx.alert.us.<state>.<type>.<code>
where type is 'county' or 'zone' based on primary_region format.
"""
prefix = "central.wx"
if event.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = event.geo.primary_region
# Parse US-<STATE>-<CODE> format
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool: def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool:
"""Check if feature geometry intersects configured region bbox. """Check if feature geometry intersects configured region bbox.

View file

@ -17,6 +17,8 @@ from tenacity import (
) )
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.models import Event, Geo from central.models import Event, Geo
@ -60,10 +62,22 @@ def magnitude_to_severity(mag: float) -> int:
return 5 return 5
class USGSQuakeSettings(BaseModel):
"""Settings schema for USGS quake adapter."""
feed: str = "all_hour"
region: RegionConfig | None = None
class USGSQuakeAdapter(SourceAdapter): class USGSQuakeAdapter(SourceAdapter):
"""USGS Earthquake Hazards Program adapter.""" """USGS Earthquake Hazards Program adapter."""
name = "usgs_quake" name = "usgs_quake"
display_name = "USGS Earthquakes"
description = "USGS earthquake feed (configurable window)."
settings_schema = USGSQuakeSettings
requires_api_key = None
wizard_order = 3
default_cadence_s = 60
def __init__( def __init__(
self, self,
@ -398,3 +412,9 @@ class USGSQuakeAdapter(SourceAdapter):
new_count += 1 new_count += 1
logger.info("USGS quake yielded events", extra={"count": new_count}) logger.info("USGS quake yielded events", extra={"count": new_count})
def subject_for(self, event: Event) -> str:
"""Return NATS subject for quake event."""
return f"central.{event.category}"

View file

@ -2197,95 +2197,90 @@ async def api_keys_delete(
return RedirectResponse(url="/api-keys", status_code=302) return RedirectResponse(url="/api-keys", status_code=302)
@router.get("/events.json")
async def events_json(request: Request):
# --- Events query helper ---
class EventsQueryResult:
"""Result from events query."""
def __init__(self, events: list, next_cursor: str | None, error: str | None = None):
self.events = events
self.next_cursor = next_cursor
self.error = error
def _parse_events_params(params) -> tuple[dict | None, str | None]:
""" """
Paginated, filterable JSON endpoint for events. Parse and validate events query parameters.
Query parameters (all optional):
adapter: filter by adapter name
category: filter by event category
since: ISO 8601 datetime - events where time >= since
until: ISO 8601 datetime - events where time < until
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
limit: page size (default 50, max 200)
cursor: opaque pagination cursor
Returns: Returns:
{"events": [...], "next_cursor": string or null} (parsed_params, error_message)
If error_message is not None, parsed_params is None.
""" """
from fastapi.responses import JSONResponse
params = request.query_params
# Parse and validate limit # Parse and validate limit
limit_str = params.get("limit", "50") limit_str = params.get("limit", "50")
try: try:
limit = int(limit_str) limit = int(limit_str)
except ValueError: except ValueError:
return JSONResponse( return None, f"Invalid limit value: {limit_str}"
{"error": f"Invalid limit value: {limit_str}"},
status_code=400,
)
if limit < 1 or limit > 200: if limit < 1 or limit > 200:
return JSONResponse( return None, "limit must be between 1 and 200"
{"error": "limit must be between 1 and 200"},
status_code=400,
)
# Parse adapter filter # Parse adapter filter
adapter = params.get("adapter") adapter = params.get("adapter")
if adapter == "":
# Parse category filter adapter = None
# Parse category filter
category = params.get("category") category = params.get("category")
if category == "":
category = None
# Parse since/until filters # Parse since/until filters
since = None since = None
until = None until = None
since_str = params.get("since") since_str = params.get("since")
if since_str: if since_str:
try: try:
since = datetime.fromisoformat(since_str.replace("Z", "+00:00")) since = datetime.fromisoformat(since_str.replace("Z", "+00:00"))
except ValueError: except ValueError:
return JSONResponse( return None, f"Invalid ISO 8601 datetime for since: {since_str}"
{"error": f"Invalid ISO 8601 datetime for since: {since_str}"},
status_code=400,
)
until_str = params.get("until") until_str = params.get("until")
if until_str: if until_str:
try: try:
until = datetime.fromisoformat(until_str.replace("Z", "+00:00")) until = datetime.fromisoformat(until_str.replace("Z", "+00:00"))
except ValueError: except ValueError:
return JSONResponse( return None, f"Invalid ISO 8601 datetime for until: {until_str}"
{"error": f"Invalid ISO 8601 datetime for until: {until_str}"},
status_code=400,
)
# Validate since <= until # Validate since <= until
if since and until and since > until: if since and until and since > until:
return JSONResponse( return None, "since must be before or equal to until"
{"error": "since must be before or equal to until"},
status_code=400,
)
# Parse region bbox # Parse region bbox
region_north = params.get("region_north") region_north = params.get("region_north")
region_south = params.get("region_south") region_south = params.get("region_south")
region_east = params.get("region_east") region_east = params.get("region_east")
region_west = params.get("region_west") region_west = params.get("region_west")
# Treat empty strings as None
if region_north == "":
region_north = None
if region_south == "":
region_south = None
if region_east == "":
region_east = None
if region_west == "":
region_west = None
region_params = [region_north, region_south, region_east, region_west] region_params = [region_north, region_south, region_east, region_west]
region_supplied = [p for p in region_params if p is not None] region_supplied = [p for p in region_params if p is not None]
if len(region_supplied) > 0 and len(region_supplied) < 4: if len(region_supplied) > 0 and len(region_supplied) < 4:
return JSONResponse( return None, "Region filter requires all four parameters: region_north, region_south, region_east, region_west"
{"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"},
status_code=400,
)
bbox = None bbox = None
if len(region_supplied) == 4: if len(region_supplied) == 4:
try: try:
@ -2296,16 +2291,13 @@ async def events_json(request: Request):
"west": float(region_west), "west": float(region_west),
} }
except ValueError: except ValueError:
return JSONResponse( return None, "Region parameters must be valid numbers"
{"error": "Region parameters must be valid numbers"},
status_code=400,
)
# Parse cursor # Parse cursor
cursor_time = None cursor_time = None
cursor_id = None cursor_id = None
cursor_str = params.get("cursor") cursor_str = params.get("cursor")
if cursor_str: if cursor_str:
try: try:
decoded = base64.b64decode(cursor_str).decode("utf-8") decoded = base64.b64decode(cursor_str).decode("utf-8")
@ -2315,59 +2307,82 @@ async def events_json(request: Request):
cursor_time = datetime.fromisoformat(parts[0]) cursor_time = datetime.fromisoformat(parts[0])
cursor_id = parts[1] cursor_id = parts[1]
except Exception: except Exception:
return JSONResponse( return None, "Invalid cursor"
{"error": "Invalid cursor"},
status_code=400, return {
) "limit": limit,
"adapter": adapter,
# Get database pool after validation "category": category,
"since": since,
"until": until,
"bbox": bbox,
"cursor_time": cursor_time,
"cursor_id": cursor_id,
}, None
async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
"""
Fetch events from database using parsed parameters.
Returns EventsQueryResult with events list, next_cursor, and optional error.
"""
pool = get_pool() pool = get_pool()
limit = parsed_params["limit"]
adapter = parsed_params["adapter"]
category = parsed_params["category"]
since = parsed_params["since"]
until = parsed_params["until"]
bbox = parsed_params["bbox"]
cursor_time = parsed_params["cursor_time"]
cursor_id = parsed_params["cursor_id"]
# Build query # Build query
conditions = [] conditions = []
query_params = [] query_params = []
param_idx = 1 param_idx = 1
if adapter: if adapter:
conditions.append(f"adapter = ${param_idx}") conditions.append(f"adapter = ${param_idx}")
query_params.append(adapter) query_params.append(adapter)
param_idx += 1 param_idx += 1
if category: if category:
conditions.append(f"category = ${param_idx}") conditions.append(f"category = ${param_idx}")
query_params.append(category) query_params.append(category)
param_idx += 1 param_idx += 1
if since: if since:
conditions.append(f"time >= ${param_idx}") conditions.append(f"time >= ${param_idx}")
query_params.append(since) query_params.append(since)
param_idx += 1 param_idx += 1
if until: if until:
conditions.append(f"time < ${param_idx}") conditions.append(f"time < ${param_idx}")
query_params.append(until) query_params.append(until)
param_idx += 1 param_idx += 1
if bbox: if bbox:
conditions.append( conditions.append(
f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))" f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))"
) )
query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]]) query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]])
param_idx += 4 param_idx += 4
if cursor_time and cursor_id: if cursor_time and cursor_id:
conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})") conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})")
query_params.append(cursor_time) query_params.append(cursor_time)
query_params.append(cursor_id) query_params.append(cursor_id)
param_idx += 2 param_idx += 2
where_clause = "" where_clause = ""
if conditions: if conditions:
where_clause = "WHERE " + " AND ".join(conditions) where_clause = "WHERE " + " AND ".join(conditions)
# Fetch limit+1 to check for next page # Fetch limit+1 to check for next page
query = f""" query = f"""
SELECT SELECT
id, id,
time, time,
received, received,
@ -2383,29 +2398,26 @@ async def events_json(request: Request):
LIMIT ${param_idx} LIMIT ${param_idx}
""" """
query_params.append(limit + 1) query_params.append(limit + 1)
try: try:
async with pool.acquire() as conn: async with pool.acquire() as conn:
rows = await conn.fetch(query, *query_params) rows = await conn.fetch(query, *query_params)
except Exception as e: except Exception as e:
logger.error(f"Database error in events_json: {e}") logger.error(f"Database error in _fetch_events: {e}")
return JSONResponse( return EventsQueryResult([], None, "Database error")
{"error": "Database error"},
status_code=500,
)
# Check if there is a next page # Check if there is a next page
has_next = len(rows) > limit has_next = len(rows) > limit
if has_next: if has_next:
rows = rows[:limit] rows = rows[:limit]
# Build response # Build response
events = [] events = []
for row in rows: for row in rows:
geometry = None geometry = None
if row["geometry"]: if row["geometry"]:
geometry = json.loads(row["geometry"]) geometry = json.loads(row["geometry"])
events.append({ events.append({
"id": row["id"], "id": row["id"],
"time": row["time"].isoformat(), "time": row["time"].isoformat(),
@ -2417,15 +2429,196 @@ async def events_json(request: Request):
"data": dict(row["data"]) if row["data"] else {}, "data": dict(row["data"]) if row["data"] else {},
"regions": list(row["regions"]) if row["regions"] else [], "regions": list(row["regions"]) if row["regions"] else [],
}) })
# Build next_cursor if there are more results # Build next_cursor if there are more results
next_cursor = None next_cursor = None
if has_next and events: if has_next and events:
last_event = rows[-1] last_event = rows[-1]
cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}" cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}"
next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8") next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8")
return EventsQueryResult(events, next_cursor)
def _geometry_summary(geometry: dict | None) -> str:
"""Generate a human-readable summary of a geometry."""
if not geometry:
return "None"
geom_type = geometry.get("type", "Unknown")
if geom_type == "Point":
return "Point"
elif geom_type == "LineString":
coords = geometry.get("coordinates", [])
return f"Line ({len(coords)} pts)"
elif geom_type == "Polygon":
coords = geometry.get("coordinates", [[]])
if coords:
return f"Polygon ({len(coords[0])} pts)"
return "Polygon"
elif geom_type == "MultiPolygon":
coords = geometry.get("coordinates", [])
return f"MultiPolygon ({len(coords)} parts)"
else:
return geom_type
@router.get("/events.json")
async def events_json(request: Request):
"""
Paginated, filterable JSON endpoint for events.
Query parameters (all optional):
adapter: filter by adapter name
category: filter by event category
since: ISO 8601 datetime - events where time >= since
until: ISO 8601 datetime - events where time < until
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
limit: page size (default 50, max 200)
cursor: opaque pagination cursor
Returns:
{"events": [...], "next_cursor": string or null}
"""
from fastapi.responses import JSONResponse
params = request.query_params
# Parse and validate parameters using shared helper
parsed, error = _parse_events_params(params)
if error:
return JSONResponse({"error": error}, status_code=400)
# Fetch events using shared helper
result = await _fetch_events(parsed)
if result.error:
return JSONResponse({"error": result.error}, status_code=500)
return JSONResponse({ return JSONResponse({
"events": events, "events": result.events,
"next_cursor": next_cursor, "next_cursor": result.next_cursor,
}) })
# --- Events feed frontend routes ---
@router.get("/events", response_class=HTMLResponse)
async def events_list(request: Request) -> HTMLResponse:
"""Events feed page with filter form, table, and map."""
templates = _get_templates()
operator = getattr(request.state, "operator", None)
csrf_token = getattr(request.state, "csrf_token", "")
params = request.query_params
# Parse parameters
parsed, error = _parse_events_params(params)
# Get system settings for map tiles
pool = get_pool()
async with pool.acquire() as conn:
system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system")
tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap"
# Prepare filter values for template
filter_values = {
"adapter": params.get("adapter", ""),
"category": params.get("category", ""),
"since": params.get("since", ""),
"until": params.get("until", ""),
"region_north": params.get("region_north", ""),
"region_south": params.get("region_south", ""),
"region_east": params.get("region_east", ""),
"region_west": params.get("region_west", ""),
"limit": params.get("limit", "50"),
}
events = []
next_cursor = None
if error:
# Validation error - show error banner but don't fail the page
pass
else:
# Fetch events
result = await _fetch_events(parsed)
if result.error:
error = result.error
else:
events = result.events
next_cursor = result.next_cursor
# Add geometry summary to each event
for event in events:
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
return templates.TemplateResponse(
request=request,
name="events_list.html",
context={
"operator": operator,
"csrf_token": csrf_token,
"events": events,
"next_cursor": next_cursor,
"filter_values": filter_values,
"filter_error": error,
"tile_url": tile_url,
"tile_attribution": tile_attribution,
},
)
@router.get("/events/rows", response_class=HTMLResponse)
async def events_rows(request: Request) -> HTMLResponse:
"""HTMX fragment: events table rows only (no page chrome)."""
templates = _get_templates()
params = request.query_params
# Parse parameters
parsed, error = _parse_events_params(params)
# Prepare filter values for template
filter_values = {
"adapter": params.get("adapter", ""),
"category": params.get("category", ""),
"since": params.get("since", ""),
"until": params.get("until", ""),
"region_north": params.get("region_north", ""),
"region_south": params.get("region_south", ""),
"region_east": params.get("region_east", ""),
"region_west": params.get("region_west", ""),
"limit": params.get("limit", "50"),
}
events = []
next_cursor = None
if error:
pass
else:
result = await _fetch_events(parsed)
if result.error:
error = result.error
else:
events = result.events
next_cursor = result.next_cursor
# Add geometry summary to each event
for event in events:
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
return templates.TemplateResponse(
request=request,
name="_events_rows.html",
context={
"events": events,
"next_cursor": next_cursor,
"filter_values": filter_values,
"filter_error": error,
},
)

View file

@ -0,0 +1,73 @@
{% if filter_error %}
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
<strong>Filter Error:</strong> {{ filter_error }}
</article>
{% endif %}
{% if events %}
<table class="events-table">
<thead>
<tr>
<th style="width: 2rem;"></th>
<th>Time</th>
<th>Adapter</th>
<th>Category</th>
<th>Geometry</th>
<th>Subject</th>
</tr>
</thead>
<tbody>
{% for event in events %}
<tr class="event-row" data-row-idx="{{ loop.index0 }}"
data-event-id="{{ event.id }}"
data-adapter="{{ event.adapter }}"
data-category="{{ event.category }}"
data-time="{{ event.time }}"
data-subject="{{ event.subject or '' }}"
{% if event.geometry %}data-geometry='{{ event.geometry | tojson }}'{% endif %}>
<td><button type="button" class="expand-row" aria-label="Expand">&#9656;</button></td>
<td>{{ event.time }}</td>
<td>{{ event.adapter }}</td>
<td>{{ event.category }}</td>
<td>{{ event.geometry_summary }}</td>
<td>{{ event.subject or '—' }}</td>
</tr>
<tr class="event-detail" hidden>
<td colspan="6">
<dl class="event-detail-list">
<dt>Event ID</dt>
<dd><code>{{ event.id }}</code></dd>
<dt>Received</dt>
<dd>{{ event.received }}</dd>
{% if event.regions %}
<dt>Regions</dt>
<dd>{{ event.regions | join(", ") }}</dd>
{% endif %}
<dt>Data</dt>
<dd><pre class="event-data-pre">{{ event.data | tojson(indent=2) }}</pre></dd>
</dl>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<div class="pagination-info">
<span>Showing {{ events | length }} event{{ 's' if events | length != 1 else '' }}.</span>
{% if next_cursor %}
<a href="/events?cursor={{ next_cursor }}{% if filter_values.adapter %}&amp;adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&amp;category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&amp;since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&amp;until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&amp;region_north={{ filter_values.region_north }}&amp;region_south={{ filter_values.region_south }}&amp;region_east={{ filter_values.region_east }}&amp;region_west={{ filter_values.region_west }}{% endif %}&amp;limit={{ filter_values.limit }}"
role="button"
hx-get="/events/rows?cursor={{ next_cursor }}{% if filter_values.adapter %}&amp;adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&amp;category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&amp;since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&amp;until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&amp;region_north={{ filter_values.region_north }}&amp;region_south={{ filter_values.region_south }}&amp;region_east={{ filter_values.region_east }}&amp;region_west={{ filter_values.region_west }}{% endif %}&amp;limit={{ filter_values.limit }}"
hx-target="#events-rows"
hx-push-url="true">
Next &rarr;
</a>
{% else %}
<span><em>End of results</em></span>
{% endif %}
</div>
{% else %}
<article>
<p>No events match the filters.</p>
</article>
{% endif %}

View file

@ -17,6 +17,7 @@
{% if operator %} {% if operator %}
<li><a href="/">Dashboard</a></li> <li><a href="/">Dashboard</a></li>
<li><a href="/adapters">Adapters</a></li> <li><a href="/adapters">Adapters</a></li>
<li><a href="/events">Events</a></li>
<li><a href="/streams">Streams</a></li> <li><a href="/streams">Streams</a></li>
<li><a href="/api-keys">API Keys</a></li> <li><a href="/api-keys">API Keys</a></li>
<li>{{ operator.username }}</li> <li>{{ operator.username }}</li>

View file

@ -0,0 +1,437 @@
{% extends "base.html" %}
{% block title %}Events - Central{% endblock %}
{% block head %}
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
#events-map {
height: 400px;
margin-bottom: 0.5rem;
border-radius: var(--pico-border-radius);
}
.map-controls {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
}
.map-legend {
display: flex;
gap: 1rem;
font-size: 0.85rem;
}
.map-legend-item {
display: flex;
align-items: center;
gap: 0.25rem;
}
.map-legend-swatch {
width: 16px;
height: 16px;
border-radius: 3px;
border: 1px solid rgba(0,0,0,0.2);
}
#fit-to-results {
padding: 0.25rem 0.75rem;
font-size: 0.85rem;
}
.events-table {
font-size: 0.9rem;
}
.events-table td {
vertical-align: middle;
}
.events-table tr.event-row:hover {
background-color: var(--pico-primary-focus);
cursor: pointer;
}
.events-table tr.event-row.highlighted {
background-color: var(--pico-primary-background);
}
.expand-row {
background: none;
border: none;
padding: 0.25rem;
cursor: pointer;
font-size: 0.9rem;
color: var(--pico-color);
}
.expand-row:hover {
color: var(--pico-primary);
}
.event-detail td {
background-color: var(--pico-card-background-color);
padding: 1rem;
}
.event-detail-list {
display: grid;
grid-template-columns: auto 1fr;
gap: 0.5rem 1rem;
margin: 0;
}
.event-detail-list dt {
font-weight: 600;
color: var(--pico-muted-color);
}
.event-detail-list dd {
margin: 0;
}
.event-data-pre {
max-height: 300px;
overflow: auto;
font-size: 0.8rem;
background: var(--pico-code-background-color);
padding: 0.5rem;
border-radius: var(--pico-border-radius);
margin: 0;
}
.filter-form .grid {
margin-bottom: 0.5rem;
}
.filter-form label {
margin-bottom: 0.25rem;
}
.filter-form input, .filter-form select {
margin-bottom: 0.5rem;
}
.pagination-info {
margin-top: 1rem;
display: flex;
justify-content: space-between;
align-items: center;
}
</style>
{% endblock %}
{% block content %}
<h1>Events</h1>
{% if filter_error %}
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
<strong>Filter Error:</strong> {{ filter_error }}
</article>
{% endif %}
<details open>
<summary>Filters</summary>
<form id="filter-form" class="filter-form" action="/events" method="get"
hx-get="/events/rows" hx-target="#events-rows" hx-push-url="true">
<div class="grid">
<div>
<label for="adapter">Adapter</label>
<select id="adapter" name="adapter">
<option value="">All</option>
<option value="nws" {% if filter_values.adapter == 'nws' %}selected{% endif %}>nws</option>
<option value="firms" {% if filter_values.adapter == 'firms' %}selected{% endif %}>firms</option>
<option value="usgs_quake" {% if filter_values.adapter == 'usgs_quake' %}selected{% endif %}>usgs_quake</option>
</select>
</div>
<div>
<label for="category">Category</label>
<input type="text" id="category" name="category" placeholder="Exact match"
value="{{ filter_values.category }}">
</div>
<div>
<label for="since">From</label>
<input type="datetime-local" id="since" name="since"
value="{{ filter_values.since }}">
</div>
<div>
<label for="until">To</label>
<input type="datetime-local" id="until" name="until"
value="{{ filter_values.until }}">
</div>
</div>
<!-- Hidden region inputs (managed by map viewport) -->
<input type="hidden" id="region_north" name="region_north" value="{{ filter_values.region_north }}">
<input type="hidden" id="region_south" name="region_south" value="{{ filter_values.region_south }}">
<input type="hidden" id="region_east" name="region_east" value="{{ filter_values.region_east }}">
<input type="hidden" id="region_west" name="region_west" value="{{ filter_values.region_west }}">
<input type="hidden" name="limit" value="{{ filter_values.limit }}">
<div style="display: flex; gap: 0.5rem; margin-top: 0.5rem;">
<button type="submit">Apply</button>
<a href="/events" role="button" class="outline">Clear Filters</a>
</div>
</form>
</details>
<div id="events-map"></div>
<div class="map-controls">
<div class="map-legend">
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #f59e0b;"></div>
<span>NWS (Weather)</span>
</div>
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #dc2626;"></div>
<span>FIRMS (Fire)</span>
</div>
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #7c3aed;"></div>
<span>USGS (Quake)</span>
</div>
</div>
<button type="button" id="fit-to-results" class="outline secondary">Fit map to results</button>
</div>
<div id="events-rows">
{% include "_events_rows.html" %}
</div>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
(function() {
var tileUrl = {{ tile_url | tojson }};
var tileAttr = {{ tile_attribution | tojson }};
// Adapter color mapping
var ADAPTER_COLORS = {
"nws": "#f59e0b",
"firms": "#dc2626",
"usgs_quake": "#7c3aed"
};
function getAdapterColor(adapter) {
return ADAPTER_COLORS[adapter] || "#3388ff";
}
// Initialize map
var map = L.map("events-map").setView([39, -98], 4);
L.tileLayer(tileUrl, {
attribution: tileAttr,
maxZoom: 18
}).addTo(map);
// Layer group for event geometries
var eventLayerGroup = L.layerGroup().addTo(map);
var highlightedRow = null;
var highlightedLayer = null;
var isInitialLoad = true;
var programmaticMove = false;
// Region input elements
var northInput = document.getElementById("region_north");
var southInput = document.getElementById("region_south");
var eastInput = document.getElementById("region_east");
var westInput = document.getElementById("region_west");
// Viewport-driven filter with debounce
var viewportDebounceTimer = null;
map.on("moveend", function() {
if (programmaticMove) {
programmaticMove = false;
return;
}
if (viewportDebounceTimer) clearTimeout(viewportDebounceTimer);
viewportDebounceTimer = setTimeout(applyViewportFilter, 400);
});
function applyViewportFilter() {
var bounds = map.getBounds();
northInput.value = bounds.getNorth().toFixed(4);
southInput.value = bounds.getSouth().toFixed(4);
eastInput.value = bounds.getEast().toFixed(4);
westInput.value = bounds.getWest().toFixed(4);
htmx.trigger(document.getElementById("filter-form"), "submit");
}
function buildPopup(row) {
var adapter = row.dataset.adapter || "";
var category = row.dataset.category || "";
var time = row.dataset.time ? new Date(row.dataset.time).toLocaleString() : "";
var subject = row.dataset.subject || "";
var eventId = row.dataset.eventId || "";
var html = "<strong>" + adapter + "</strong><br>" +
category + "<br>" +
"<small>" + time + "</small>";
if (subject) {
html += "<br><em>" + subject + "</em>";
}
html += '<br><a href="#" data-show-row="' + eventId + '">View details &#9662;</a>';
return html;
}
function rebindEventLayers() {
eventLayerGroup.clearLayers();
var rows = document.querySelectorAll("#events-rows tr.event-row[data-geometry]");
rows.forEach(function(row) {
var geomStr = row.dataset.geometry;
if (!geomStr || geomStr === "") return;
try {
var geom = JSON.parse(geomStr);
if (!geom) return;
var adapter = row.dataset.adapter || "";
var color = getAdapterColor(adapter);
var layer = L.geoJSON(geom, {
style: {
color: color,
weight: 2,
fillColor: color,
fillOpacity: 0.25
},
pointToLayer: function(feature, latlng) {
return L.circleMarker(latlng, {
radius: 8,
color: color,
weight: 2,
fillColor: color,
fillOpacity: 0.25
});
}
});
layer.bindPopup(buildPopup(row));
layer.on("click", function() {
highlightRow(row, layer, color);
});
layer.addTo(eventLayerGroup);
// Store reference for row highlighting
row._mapLayer = layer;
row._mapColor = color;
} catch (e) {
console.error("Error parsing geometry:", e);
}
});
}
function highlightRow(row, layer, originalColor) {
// Reset previous highlight
if (highlightedRow) {
highlightedRow.classList.remove("highlighted");
}
if (highlightedLayer && highlightedLayer._originalColor) {
highlightedLayer.setStyle({
color: highlightedLayer._originalColor,
weight: 2,
fillColor: highlightedLayer._originalColor,
fillOpacity: 0.25
});
}
// Set new highlight
row.classList.add("highlighted");
highlightedRow = row;
if (layer) {
layer._originalColor = originalColor;
layer.setStyle({
color: "#ff3333",
weight: 4,
fillColor: "#ff3333",
fillOpacity: 0.4
});
highlightedLayer = layer;
}
}
function fitToAllLayers() {
var layers = eventLayerGroup.getLayers();
if (layers.length === 0) return;
programmaticMove = true;
var group = L.featureGroup(layers);
map.fitBounds(group.getBounds(), { padding: [20, 20] });
}
// Row click handler (event delegation)
document.addEventListener("click", function(e) {
// Expand/collapse handler
if (e.target.classList.contains("expand-row")) {
var row = e.target.closest("tr");
var detail = row.nextElementSibling;
if (detail && detail.classList.contains("event-detail")) {
detail.hidden = !detail.hidden;
e.target.innerHTML = detail.hidden ? "&#9656;" : "&#9662;";
}
return;
}
// Popup "View details" link handler
if (e.target.matches("[data-show-row]")) {
e.preventDefault();
var eventId = e.target.dataset.showRow;
var targetRow = document.querySelector(
'tr[data-event-id="' + CSS.escape(eventId) + '"]'
);
if (!targetRow) return;
targetRow.scrollIntoView({ behavior: "smooth", block: "center" });
var detail = targetRow.nextElementSibling;
if (detail && detail.classList.contains("event-detail")) {
detail.hidden = false;
var button = targetRow.querySelector(".expand-row");
if (button) button.innerHTML = "&#9662;";
}
return;
}
// Row click to highlight (no auto-pan - user controls viewport)
var row = e.target.closest("tr.event-row");
if (row && row._mapLayer) {
highlightRow(row, row._mapLayer, row._mapColor);
// Map stays where user put it
}
});
// Row hover handlers (event delegation)
document.addEventListener("mouseenter", function(e) {
var row = e.target.closest && e.target.closest("tr.event-row");
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
row._mapLayer.setStyle({
color: row._mapColor,
weight: 3,
fillColor: row._mapColor,
fillOpacity: 0.25
});
}
}, true);
document.addEventListener("mouseleave", function(e) {
var row = e.target.closest && e.target.closest("tr.event-row");
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
row._mapLayer.setStyle({
color: row._mapColor,
weight: 2,
fillColor: row._mapColor,
fillOpacity: 0.25
});
}
}, true);
// Fit to results button
document.getElementById("fit-to-results").addEventListener("click", fitToAllLayers);
// Initial load - bind layers and fit bounds
rebindEventLayers(); // Initial load only
if (false) { // DISABLED: map never auto-fits
fitToAllLayers();
isInitialLoad = false;
}
// Re-bind layers after HTMX swap (but do NOT fit bounds)
document.body.addEventListener("htmx:afterSwap", function(evt) {
if (evt.detail.target.id === "events-rows") {
// rebindEventLayers(); // DISABLED: map shows all events, only table filters
// Do NOT call fitToAllLayers - preserve user viewport
}
});
// Fix map rendering after container shows
setTimeout(function() { map.invalidateSize(); }, 100);
})();
</script>
{% endblock %}

View file

@ -32,48 +32,3 @@ class Event(BaseModel):
data: dict[str, Any] # adapter-specific payload data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event) -> str:
"""
Compute the NATS subject for an event based on its category.
Dispatch by category prefix:
- fire.*: returns central.<category> directly
- wx.*: uses weather alert subject logic
Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower>
or
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence>
"""
# Fire events: subject is just central.<category>
if ev.category.startswith("fire."):
return f"central.{ev.category}"
# Weather events: use geo-based subject logic
prefix = "central.wx"
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
# Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
# Zone codes start with "Z" like "Z033"
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"

View file

@ -13,24 +13,40 @@ from typing import Any
import nats import nats
from nats.js import JetStreamContext from nats.js import JetStreamContext
import importlib
import pkgutil
from central.adapter import SourceAdapter from central.adapter import SourceAdapter
from central.adapters.nws import NWSAdapter
from central.adapters.firms import FIRMSAdapter
from central.adapters.usgs_quake import USGSQuakeAdapter
from central.cloudevents_wire import wrap_event from central.cloudevents_wire import wrap_event
from central.config_models import AdapterConfig from central.config_models import AdapterConfig
from central.config_source import ConfigSource, DbConfigSource from central.config_source import ConfigSource, DbConfigSource
from central.config_store import ConfigStore from central.config_store import ConfigStore
from central.bootstrap_config import get_settings from central.bootstrap_config import get_settings
from central.models import subject_for_event
from central.stream_manager import StreamManager from central.stream_manager import StreamManager
import central.adapters
# Adapter registry - add new adapters here def discover_adapters() -> dict[str, type[SourceAdapter]]:
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = { """Auto-discover adapter classes from central.adapters package."""
"nws": NWSAdapter, registry: dict[str, type[SourceAdapter]] = {}
"firms": FIRMSAdapter, for module_info in pkgutil.iter_modules(central.adapters.__path__):
"usgs_quake": USGSQuakeAdapter, try:
} module = importlib.import_module(f"central.adapters.{module_info.name}")
except Exception as e:
logger.error(
"Failed to import adapter module",
extra={"module": module_info.name, "error": str(e)},
)
continue
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, SourceAdapter)
and attr is not SourceAdapter
and hasattr(attr, "name")
):
registry[attr.name] = attr
return registry
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db") CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
@ -114,6 +130,7 @@ class Supervisor:
self._config_store = config_store self._config_store = config_store
self._nats_url = nats_url self._nats_url = nats_url
self._cloudevents_config = cloudevents_config self._cloudevents_config = cloudevents_config
self._adapters = discover_adapters()
self._nc: nats.NATS | None = None self._nc: nats.NATS | None = None
self._js: JetStreamContext | None = None self._js: JetStreamContext | None = None
self._stream_manager: StreamManager | None = None self._stream_manager: StreamManager | None = None
@ -161,7 +178,7 @@ class Supervisor:
def _create_adapter(self, config: AdapterConfig) -> SourceAdapter: def _create_adapter(self, config: AdapterConfig) -> SourceAdapter:
"""Create an adapter instance based on config name.""" """Create an adapter instance based on config name."""
cls = _ADAPTER_REGISTRY.get(config.name) cls = self._adapters.get(config.name)
if cls is None: if cls is None:
raise ValueError(f"Unknown adapter type: {config.name}") raise ValueError(f"Unknown adapter type: {config.name}")
return cls( return cls(
@ -232,7 +249,7 @@ class Supervisor:
# Build CloudEvent (uses defaults if no config provided) # Build CloudEvent (uses defaults if no config provided)
envelope, msg_id = wrap_event(event, self._cloudevents_config) envelope, msg_id = wrap_event(event, self._cloudevents_config)
subject = subject_for_event(event) subject = state.adapter.subject_for(event)
# Publish # Publish
await self._publish_event(subject, envelope, msg_id) await self._publish_event(subject, envelope, msg_id)

View file

@ -0,0 +1,686 @@
"""Tests for events feed frontend routes."""
import json
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.gui.routes import events_list, events_rows, events_json
class TestEventsFeedFrontendAuthenticated:
"""Test events feed frontend with authentication."""
@pytest.mark.asyncio
async def test_events_no_filters_returns_html(self):
"""GET /events authenticated, no filters returns HTML with events."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {}
mock_events = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Weather Alert",
"subject": f"Test Alert {i}",
"geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None,
"data": {},
"regions": [],
}
for i in range(5)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert "events" in context
assert context["filter_error"] is None
@pytest.mark.asyncio
async def test_events_adapter_filter(self):
"""GET /events?adapter=nws returns only nws events."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"adapter": "nws"}
mock_events = [
{
"id": "nws_event_1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "NWS Alert",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_values"]["adapter"] == "nws"
@pytest.mark.asyncio
async def test_events_since_until_filter(self):
"""GET /events?since=...&until=... filters by time window."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {
"since": "2026-05-17T00:00:00",
"until": "2026-05-17T12:00:00",
}
mock_events = [
{
"id": "in_range",
"time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "In Range",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
# Verify filter was actually parsed and passed to template
mock_templates.TemplateResponse.assert_called_once()
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
context = call_kwargs.get("context", call_kwargs)
assert context["filter_values"]["since"] == "2026-05-17T00:00:00"
assert context["filter_values"]["until"] == "2026-05-17T12:00:00"
@pytest.mark.asyncio
async def test_events_region_filter(self):
"""GET /events with full region bbox filters by location."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {
"region_north": "49.5",
"region_south": "31",
"region_east": "-102",
"region_west": "-124.5",
}
mock_events = [
{
"id": "in_bbox",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "In BBox",
"geometry": '{"type": "Point", "coordinates": [-120, 40]}',
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
# Verify region filter was actually parsed and passed to template
mock_templates.TemplateResponse.assert_called_once()
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
context = call_kwargs.get("context", call_kwargs)
assert context["filter_values"]["region_north"] == "49.5"
assert context["filter_values"]["region_south"] == "31"
assert context["filter_values"]["region_east"] == "-102"
assert context["filter_values"]["region_west"] == "-124.5"
@pytest.mark.asyncio
async def test_events_partial_region_shows_error_banner(self):
"""GET /events with partial region shows error banner, not 400."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"region_north": "49"}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
# Should be 200, not 400
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_error"] is not None
assert "region" in context["filter_error"].lower()
# Events should be empty due to validation error
assert context["events"] == []
@pytest.mark.asyncio
async def test_events_with_limit_shows_next_button(self):
"""GET /events?limit=5 shows Next button when more events exist."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"limit": "5"}
# Return 6 events (limit+1) to trigger pagination
mock_events = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Alert",
"subject": f"Event {i}",
"geometry": None,
"data": {},
"regions": [],
}
for i in range(6)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["next_cursor"] is not None
assert len(context["events"]) == 5 # Should be trimmed to limit
class TestEventsRowsFragment:
"""Test /events/rows HTMX fragment."""
@pytest.mark.asyncio
async def test_events_rows_returns_fragment(self):
"""GET /events/rows returns table fragment, not full page."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {"limit": "5"}
mock_events = [
{
"id": "event_1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "Event 1",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
assert result.status_code == 200
# Verify it uses the fragment template
call_args = mock_templates.TemplateResponse.call_args
assert call_args.kwargs.get("name") == "_events_rows.html"
class TestGeometrySummary:
"""Test geometry summary function."""
def test_geometry_summary_polygon(self):
"""Polygon geometry shows point count."""
from central.gui.routes import _geometry_summary
geom = {
"type": "Polygon",
"coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]
}
summary = _geometry_summary(geom)
assert "Polygon" in summary
assert "5 pts" in summary
def test_geometry_summary_point(self):
"""Point geometry shows 'Point'."""
from central.gui.routes import _geometry_summary
geom = {"type": "Point", "coordinates": [-122.4, 37.8]}
summary = _geometry_summary(geom)
assert summary == "Point"
def test_geometry_summary_linestring(self):
"""LineString geometry shows point count."""
from central.gui.routes import _geometry_summary
geom = {
"type": "LineString",
"coordinates": [[-122, 37], [-121, 38], [-120, 39]]
}
summary = _geometry_summary(geom)
assert "Line" in summary
assert "3 pts" in summary
def test_geometry_summary_none(self):
"""None geometry shows 'None'."""
from central.gui.routes import _geometry_summary
summary = _geometry_summary(None)
assert summary == "None"
class TestDataGeometryAttribute:
"""Test that rows have valid geometry data attributes."""
@pytest.mark.asyncio
async def test_event_with_geometry_has_valid_json(self):
"""Events with geometry have parseable JSON in data-geometry."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {}
mock_events = [
{
"id": "geom_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "With Geometry",
"geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}',
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
event = context["events"][0]
# Geometry should be parsed dict, not string
assert isinstance(event["geometry"], dict)
assert event["geometry"]["type"] == "Polygon"
@pytest.mark.asyncio
async def test_event_without_geometry_has_none(self):
"""Events without geometry have None for geometry field."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {}
mock_events = [
{
"id": "no_geom_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "No Geometry",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
event = context["events"][0]
assert event["geometry"] is None
class TestCrossEndpointParity:
"""Test that /events.json and /events return the same filtered results."""
@pytest.mark.asyncio
async def test_category_filter_both_endpoints(self):
"""Category filter works on both /events.json and /events."""
mock_events = [
{
"id": "weather_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Weather Alert",
"subject": "Weather Event",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
query_params = {"category": "Weather Alert"}
# Test /events.json
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = query_params
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
assert len(json_data["events"]) == 1
assert json_data["events"][0]["category"] == "Weather Alert"
# Test /events
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = query_params
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = mock_events
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert len(html_context["events"]) == 1
assert html_context["events"][0]["category"] == "Weather Alert"
@pytest.mark.asyncio
async def test_cursor_pagination_both_endpoints(self):
"""Cursor pagination works identically on both endpoints."""
first_page = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Alert",
"subject": f"Event {i}",
"geometry": None,
"data": {},
"regions": [],
}
for i in range(3)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = first_page
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = {"limit": "2"}
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
json_cursor = json_data["next_cursor"]
assert json_cursor is not None
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = {"limit": "2"}
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = first_page
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
html_cursor = html_context["next_cursor"]
assert json_cursor == html_cursor
class TestErrorSemantics:
"""Test error handling differences between JSON and HTML endpoints."""
@pytest.mark.asyncio
async def test_json_endpoint_returns_400_on_invalid_limit(self):
"""/events.json?limit=0 returns 400 JSON error."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {"limit": "0"}
response = await events_json(mock_request)
assert response.status_code == 400
data = json.loads(response.body)
assert "error" in data
@pytest.mark.asyncio
async def test_html_endpoint_returns_200_with_error_banner(self):
"""/events?limit=0 returns 200 HTML with error banner."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf"
mock_request.query_params = {"limit": "0"}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_error"] is not None
assert "limit" in context["filter_error"].lower()
assert context["events"] == []
class TestEventRowDataAttributes:
"""Test that _events_rows.html renders required data attributes."""
@pytest.mark.asyncio
async def test_row_renders_data_adapter_attribute(self):
"""Event rows include data-adapter attribute for color coding."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf"
mock_request.query_params = {}
mock_events = [
{
"id": "test1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "usgs_quake",
"category": "quake.event",
"subject": "M4.2 Earthquake",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
mock_templates.TemplateResponse.assert_called_once()
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
# The template receives events with adapter field for data-adapter attribute
assert len(context["events"]) == 1
assert context["events"][0]["adapter"] == "usgs_quake"
assert context["events"][0]["category"] == "quake.event"
assert context["events"][0]["subject"] == "M4.2 Earthquake"

View file

@ -10,7 +10,6 @@ from central.adapters.firms import (
FIRMSAdapter, FIRMSAdapter,
CONFIDENCE_MAP, CONFIDENCE_MAP,
SATELLITE_SHORT, SATELLITE_SHORT,
subject_for_fire_hotspot,
) )
from central.config_models import AdapterConfig, RegionConfig from central.config_models import AdapterConfig, RegionConfig
from central.models import Event, Geo from central.models import Event, Geo
@ -285,7 +284,14 @@ class TestDeduplication:
class TestSubjectGeneration: class TestSubjectGeneration:
"""Test subject generation for fire hotspots.""" """Test subject generation for fire hotspots."""
def test_subject_format(self): @pytest.mark.asyncio
async def test_subject_format(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event( event = Event(
id="test", id="test",
adapter="firms", adapter="firms",
@ -296,10 +302,17 @@ class TestSubjectGeneration:
data={}, data={},
) )
subject = subject_for_fire_hotspot(event) subject = adapter.subject_for(event)
assert subject == "central.fire.hotspot.viirs_snpp.high" assert subject == "central.fire.hotspot.viirs_snpp.high"
def test_subject_nominal_confidence(self): @pytest.mark.asyncio
async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event( event = Event(
id="test", id="test",
adapter="firms", adapter="firms",
@ -310,7 +323,7 @@ class TestSubjectGeneration:
data={}, data={},
) )
subject = subject_for_fire_hotspot(event) subject = adapter.subject_for(event)
assert subject == "central.fire.hotspot.viirs_noaa20.nominal" assert subject == "central.fire.hotspot.viirs_noaa20.nominal"

View file

@ -4,7 +4,7 @@ from datetime import datetime, timezone
import pytest import pytest
from central.models import Event, Geo, subject_for_event from central.models import Event, Geo
from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config
from central.cloudevents_wire import wrap_event from central.cloudevents_wire import wrap_event
@ -57,47 +57,6 @@ def sample_config() -> Config:
) )
class TestSubjectForEvent:
"""Tests for subject_for_event helper."""
def test_county_subject(self, sample_event: Event) -> None:
"""County codes produce county subject."""
subject = subject_for_event(sample_event)
assert subject == "central.wx.alert.us.id.county.ada"
def test_zone_subject(self, sample_geo: Geo) -> None:
"""Zone codes produce zone subject."""
geo = Geo(
centroid=sample_geo.centroid,
bbox=sample_geo.bbox,
regions=["US-ID-Z033"],
primary_region="US-ID-Z033",
)
event = Event(
id="test-zone",
adapter="nws",
category="wx.alert.winter_storm_warning",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.id.zone.z033"
def test_unknown_subject(self, sample_event: Event) -> None:
"""Missing primary_region produces unknown subject."""
geo = Geo(regions=[], primary_region=None)
event = Event(
id="test-unknown",
adapter="nws",
category="wx.alert.test",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.unknown"
class TestCloudEventsWire: class TestCloudEventsWire:
"""Tests for CloudEvents wire format.""" """Tests for CloudEvents wire format."""

View file

@ -17,7 +17,6 @@ from central.adapters.nws import (
SEVERITY_MAP, SEVERITY_MAP,
) )
from central.config_models import AdapterConfig from central.config_models import AdapterConfig
from central.models import subject_for_event
# Sample NWS GeoJSON features for testing # Sample NWS GeoJSON features for testing
@ -272,7 +271,7 @@ class TestSubjectDerivation:
def test_county_subject(self, adapter: NWSAdapter) -> None: def test_county_subject(self, adapter: NWSAdapter) -> None:
event = adapter._normalize_feature(SAMPLE_FEATURE_ID) event = adapter._normalize_feature(SAMPLE_FEATURE_ID)
assert event is not None assert event is not None
subject = subject_for_event(event) subject = adapter.subject_for(event)
# Primary region should be alphabetically first # Primary region should be alphabetically first
# Could be county or zone depending on sort order # Could be county or zone depending on sort order
assert subject.startswith("central.wx.alert.us.id.") assert subject.startswith("central.wx.alert.us.id.")
@ -294,7 +293,7 @@ class TestSubjectDerivation:
} }
event = adapter._normalize_feature(feature) event = adapter._normalize_feature(feature)
assert event is not None assert event is not None
subject = subject_for_event(event) subject = adapter.subject_for(event)
assert "zone" in subject assert "zone" in subject

View file

@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration:
supervisor._js = mock_nats.jetstream() supervisor._js = mock_nats.jetstream()
# Patch NWSAdapter to use our mock # Patch NWSAdapter to use our mock
with patch("central.supervisor.NWSAdapter", MockNWSAdapter): # Inject mock adapter into supervisor's registry
# Start supervisor (starts adapter) supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(initial_config) # Start supervisor (starts adapter)
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
assert adapter_is_running(state) assert adapter_is_running(state)
# Simulate completed poll 5 minutes ago # Simulate completed poll 5 minutes ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5) state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
saved_last_poll = state.last_completed_poll saved_last_poll = state.last_completed_poll
# Disable adapter # Disable adapter
disabled_config = AdapterConfig( disabled_config = AdapterConfig(
name="nws", name="nws",
enabled=False, enabled=False,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(disabled_config) config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK) # Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it # On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False # On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None, ( assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. " "State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll." "State should be preserved to maintain last_completed_poll."
) )
assert not adapter_is_running(state) assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll assert state.last_completed_poll == saved_last_poll
# Re-enable adapter # Re-enable adapter
reenabled_config = AdapterConfig( reenabled_config = AdapterConfig(
name="nws", name="nws",
enabled=True, enabled=True,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(reenabled_config) config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll # Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
assert adapter_is_running(state) assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is in the past # The loop should detect that last_poll + cadence is in the past
# and poll immediately. Let's verify by checking the wait time logic. # and poll immediately. Let's verify by checking the wait time logic.
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
wait_time = max(0, next_poll_at - now.timestamp()) wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was 5 minutes ago, cadence is 60s # last_poll was 5 minutes ago, cadence is 60s
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago # next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
# wait_time should be 0 (poll immediately) # wait_time should be 0 (poll immediately)
assert wait_time == 0, ( assert wait_time == 0, (
f"Expected immediate poll (wait=0), got wait={wait_time}s. " f"Expected immediate poll (wait=0), got wait={wait_time}s. "
f"last_poll was {saved_last_poll}, now is {now}" f"last_poll was {saved_last_poll}, now is {now}"
) )
# Cleanup # Cleanup
supervisor._shutdown_event.set() supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws") await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_enable_disable_enable_gap_shorter_than_cadence( async def test_enable_disable_enable_gap_shorter_than_cadence(
@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream() supervisor._js = mock_nats.jetstream()
with patch("central.supervisor.NWSAdapter", MockNWSAdapter): # Inject mock adapter into supervisor's registry
# Start adapter supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(initial_config) # Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
# Simulate completed poll 10 seconds ago # Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
saved_last_poll = state.last_completed_poll saved_last_poll = state.last_completed_poll
# Disable adapter # Disable adapter
disabled_config = AdapterConfig( disabled_config = AdapterConfig(
name="nws", name="nws",
enabled=False, enabled=False,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(disabled_config) config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK) # Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it # On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False # On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None, ( assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. " "State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll." "State should be preserved to maintain last_completed_poll."
) )
assert not adapter_is_running(state) assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll assert state.last_completed_poll == saved_last_poll
# Re-enable adapter (simulate 20 seconds later, but we're just # Re-enable adapter (simulate 20 seconds later, but we're just
# checking the rate limit logic) # checking the rate limit logic)
reenabled_config = AdapterConfig( reenabled_config = AdapterConfig(
name="nws", name="nws",
enabled=True, enabled=True,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(reenabled_config) config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll # Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
assert adapter_is_running(state) assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is still in the future # The loop should detect that last_poll + cadence is still in the future
# and wait until then. # and wait until then.
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60 next_poll_at = saved_last_poll.timestamp() + 60
wait_time = max(0, next_poll_at - now.timestamp()) wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was ~10 seconds ago, cadence is 60s # last_poll was ~10 seconds ago, cadence is 60s
# wait_time should be ~50s (60 - 10 = 50) # wait_time should be ~50s (60 - 10 = 50)
assert 45 < wait_time < 55, ( assert 45 < wait_time < 55, (
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. " f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
f"Rate limit violated: poll would happen before last_poll + cadence" f"Rate limit violated: poll would happen before last_poll + cadence"
) )
# Cleanup # Cleanup
supervisor._shutdown_event.set() supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws") await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_enable_disable_delete_readd_fresh_state( async def test_enable_disable_delete_readd_fresh_state(
@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream() supervisor._js = mock_nats.jetstream()
with patch("central.supervisor.NWSAdapter", MockNWSAdapter): # Inject mock adapter into supervisor's registry
# Start adapter supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(initial_config) # Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
# Simulate completed poll 10 seconds ago # Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10) state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
# Disable adapter # Disable adapter
disabled_config = AdapterConfig( disabled_config = AdapterConfig(
name="nws", name="nws",
enabled=False, enabled=False,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(disabled_config) config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# DELETE adapter from DB (remove from config source) # DELETE adapter from DB (remove from config source)
config_source.set_adapter(None, name="nws") config_source.set_adapter(None, name="nws")
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify adapter fully removed # Verify adapter fully removed
assert "nws" not in supervisor._adapter_states assert "nws" not in supervisor._adapter_states
# Re-add adapter with same name # Re-add adapter with same name
new_config = AdapterConfig( new_config = AdapterConfig(
name="nws", name="nws",
enabled=True, enabled=True,
cadence_s=60, cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"}, settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None, paused_at=None,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
) )
config_source.set_adapter(new_config) config_source.set_adapter(new_config)
await supervisor._on_config_change("adapters", "nws") await supervisor._on_config_change("adapters", "nws")
# Verify new adapter started fresh # Verify new adapter started fresh
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert state is not None assert state is not None
assert adapter_is_running(state) assert adapter_is_running(state)
# last_completed_poll should be None (fresh adapter) # last_completed_poll should be None (fresh adapter)
assert state.last_completed_poll is None, ( assert state.last_completed_poll is None, (
f"Expected None (fresh adapter), got {state.last_completed_poll}. " f"Expected None (fresh adapter), got {state.last_completed_poll}. "
f"Preserved state not cleared on delete." f"Preserved state not cleared on delete."
) )
# Cleanup # Cleanup
supervisor._shutdown_event.set() supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws") await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stop_preserves_state_start_reuses_it( async def test_stop_preserves_state_start_reuses_it(
@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream() supervisor._js = mock_nats.jetstream()
with patch("central.supervisor.NWSAdapter", MockNWSAdapter): # Inject mock adapter into supervisor's registry
# Start adapter supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(config) # Start adapter
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30) state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
saved_poll = state.last_completed_poll saved_poll = state.last_completed_poll
# Stop adapter # Stop adapter
await supervisor._stop_adapter("nws") await supervisor._stop_adapter("nws")
# State should still exist # State should still exist
assert "nws" in supervisor._adapter_states assert "nws" in supervisor._adapter_states
state = supervisor._adapter_states["nws"] state = supervisor._adapter_states["nws"]
assert not adapter_is_running(state) assert not adapter_is_running(state)
assert state.last_completed_poll == saved_poll assert state.last_completed_poll == saved_poll
# Restart adapter # Restart adapter
await supervisor._start_adapter(config) await supervisor._start_adapter(config)
# Should reuse existing state # Should reuse existing state
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
assert adapter_is_running(state) assert adapter_is_running(state)
assert state.last_completed_poll == saved_poll assert state.last_completed_poll == saved_poll
# Cleanup # Cleanup
supervisor._shutdown_event.set() supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws") await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_remove_adapter_clears_state( async def test_remove_adapter_clears_state(
@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream() supervisor._js = mock_nats.jetstream()
with patch("central.supervisor.NWSAdapter", MockNWSAdapter): # Inject mock adapter into supervisor's registry
await supervisor._start_adapter(config) supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws") state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc) state.last_completed_poll = datetime.now(timezone.utc)
# Remove adapter # Remove adapter
await cleanup_adapter(supervisor, "nws") await cleanup_adapter(supervisor, "nws")
# State should be gone # State should be gone
assert "nws" not in supervisor._adapter_states assert "nws" not in supervisor._adapter_states

View file

@ -480,3 +480,122 @@ class TestApplyConfig:
assert adapter._feed == "all_day" assert adapter._feed == "all_day"
await adapter.shutdown() await adapter.shutdown()
class TestSubjectFor:
"""Test subject_for method for all magnitude tiers."""
@pytest.mark.asyncio
async def test_subject_minor(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-minor",
adapter="usgs_quake",
category="quake.event.minor",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.minor"
@pytest.mark.asyncio
async def test_subject_light(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-light",
adapter="usgs_quake",
category="quake.event.light",
time=datetime.now(timezone.utc),
severity=1,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.light"
@pytest.mark.asyncio
async def test_subject_moderate(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-moderate",
adapter="usgs_quake",
category="quake.event.moderate",
time=datetime.now(timezone.utc),
severity=2,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.moderate"
@pytest.mark.asyncio
async def test_subject_strong(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-strong",
adapter="usgs_quake",
category="quake.event.strong",
time=datetime.now(timezone.utc),
severity=3,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.strong"
@pytest.mark.asyncio
async def test_subject_major(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-major",
adapter="usgs_quake",
category="quake.event.major",
time=datetime.now(timezone.utc),
severity=4,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.major"
@pytest.mark.asyncio
async def test_subject_great(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-great",
adapter="usgs_quake",
category="quake.event.great",
time=datetime.now(timezone.utc),
severity=5,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.great"