Compare commits

..

No commits in common. "87f46e8b35e4de61c9538d9d2300920c3ad28206" and "78b6fcf150cb49a2e9808d66e36dd3df1bd1063f" have entirely different histories.

19 changed files with 432 additions and 2076 deletions

View file

@ -1,63 +1,5 @@
# Changelog
## v0.3.0 — Phase 1b (2026-05-18)
Operator console. FastAPI + Jinja2 + Pico + HTMX. Self-hosted,
Tailscale-gated by default, no application-level auth beyond
the operator session.
### Added
- Operator console (`central-gui` systemd service on port 8000)
- Login + session auth (argon2id, 90-day DB-backed sessions)
- Dashboard: events 24h by adapter, stream sizes,
last-poll-time per adapter
- Adapters list and edit page (cadence + per-adapter settings),
with Leaflet region picker and click-to-draw rectangles
- Streams view with retention chips (1d / 7d / 14d / 30d /
365d / custom)
- API keys management (list / add / rotate / delete,
encrypted at rest via `crypto.encrypt`, plaintext never
logged or stored)
- First-run wizard (5 steps: operator, system, keys, adapters,
finish) with deferred-commit pattern — no DB writes until
Finish runs as a single transaction
- Events feed page (`/events`) — paginated, filterable by
adapter / category / time range / map viewport, with
color-coded geometry overlay, click-to-popup, and
expandable row details showing full event payload
- Paginated events JSON API (`/events.json`) — cursor-based
pagination, same filter surface as the HTML feed
### Changed
- CSRF tokens are now session-bound (synchronizer token
pattern), replacing the previous fastapi-csrf-protect
library. Eliminates a rotation race that broke first-load
submissions
- First-run wizard is a single atomic transaction at Finish,
not per-step DB writes. Back navigation works; abandoned
wizards leave no orphan rows
### Fixed
- Adapter editor's JSONB double-encoding bug (write path
called `json.dumps` before asyncpg's codec, corrupting
the settings column)
- Dashboard polls card was reading from the wrong NATS
subject and using a durable consumer instead of
`get_last_msg`, leaking zombie consumers
- Browser-noise paths (/favicon.ico, /apple-touch-icon.png,
/robots.txt) return 204 directly, preventing parallel
requests from racing the CSRF cookie on first page load
- SubResource Integrity hashes for leaflet-draw assets
corrected (previous values were fabricated and silently
blocked by browsers)
### Infrastructure
- New `config.sessions` column: `csrf_token` (per-session
synchronizer)
- Composite index on `public.events (time DESC, id DESC)`
for cursor pagination
- `central-gui` systemd service
## v0.2.0 — Phase 1a (2026-05-16)
Three live data sources, configurable infrastructure, hot-reload

View file

@ -28,32 +28,6 @@ The Windows workstation (matt-desktop) has no Central repository clones.
The directory `C:\Users\mtthw\central_work\` is scratch space only and
should not be used for commits.
## Network and Service Bindings
### Bind Address
`central-gui` binds to `0.0.0.0` by design. Network gating is the
operator's responsibility (firewall, Tailscale, etc.), not the app's.
Do not switch to `127.0.0.1` or to a specific interface — operators
choose their bind via whatever network they want to expose the service on.
### NATS Listener Ports
The default `nats-server.conf` listens on more than just :4222:
| Port | Protocol | Used by Central? |
|------|----------|------------------|
| 4222 | NATS client | Yes (all) |
| 8080 | WebSocket | No (Phase 0 leftover) |
| 8222 | HTTP monitoring | No (manual ops only) |
| 1883 | MQTT | No (Phase 0 leftover) |
None of the unused ports cause active harm — they listen but no consumer
connects. Operators can remove them from `nats-server.conf` if they want
a tighter footprint. Documenting so future contributors don't grep for
"MQTT integration" and come up confused.
## Repository
| Property | Value |

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "central"
version = "0.3.0"
version = "0.1.0"
requires-python = ">=3.12,<3.13"
description = "Data hub spine — adapters, bus, archive."
readme = "README.md"

View file

@ -4,8 +4,6 @@ from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from central.config_models import AdapterConfig
@ -18,24 +16,9 @@ class SourceAdapter(ABC):
Adapters yield Events. The supervisor handles scheduling,
CloudEvents wrapping, publish, and metadata heartbeats.
Class attributes that subclasses must define:
name: Short identifier, e.g. "nws"
display_name: Human-readable name for GUI
description: Short description of the adapter
settings_schema: Pydantic model class for adapter settings
requires_api_key: Key alias if API key required, else None
wizard_order: Order in setup wizard (None = not in wizard)
default_cadence_s: Default polling interval in seconds
"""
name: str
display_name: str
description: str
settings_schema: type[BaseModel]
requires_api_key: str | None = None
wizard_order: int | None = None
default_cadence_s: int
name: str # short identifier, e.g. "nws"
@abstractmethod
async def poll(self) -> AsyncIterator[Event]:
@ -57,16 +40,6 @@ class SourceAdapter(ABC):
"""
...
@abstractmethod
def subject_for(self, event: Event) -> str:
"""
Compute the NATS subject for an event.
Each adapter knows its own subject hierarchy. The supervisor
calls this to determine where to publish each event.
"""
...
async def startup(self) -> None:
"""Optional lifecycle hook called before first poll."""
pass

View file

@ -18,8 +18,6 @@ from tenacity import (
)
from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
@ -51,23 +49,10 @@ SEVERITY_MAP = {
}
class FIRMSSettings(BaseModel):
"""Settings schema for FIRMS adapter."""
api_key_alias: str = "firms"
satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"]
region: RegionConfig | None = None
class FIRMSAdapter(SourceAdapter):
"""NASA FIRMS fire hotspot adapter."""
name = "firms"
display_name = "NASA FIRMS Fire Hotspots"
description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS."
settings_schema = FIRMSSettings
requires_api_key = "firms"
wizard_order = 2
default_cadence_s = 300
def __init__(
self,
@ -131,15 +116,6 @@ class FIRMSAdapter(SourceAdapter):
},
)
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains this structure.
"""
return f"central.{event.category}"
async def startup(self) -> None:
"""Initialize HTTP session, dedup tracker, and fetch API key."""
# Fetch API key
@ -441,3 +417,14 @@ class FIRMSAdapter(SourceAdapter):
},
)
def subject_for_fire_hotspot(ev: Event) -> str:
"""Compute the NATS subject for a fire hotspot event.
Subject format: central.fire.hotspot.<satellite>.<confidence>
The category already contains the satellite and confidence info,
so we just prefix with 'central.'.
"""
# category is "fire.hotspot.<satellite>.<confidence>"
return f"central.{ev.category}"

View file

@ -19,8 +19,6 @@ from tenacity import (
from central import __version__
from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
@ -191,22 +189,10 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]:
return sorted(regions)
class NWSSettings(BaseModel):
"""Settings schema for NWS adapter."""
contact_email: str = ""
region: RegionConfig | None = None
class NWSAdapter(SourceAdapter):
"""National Weather Service alerts adapter."""
name = "nws"
display_name = "NWS Weather Alerts"
description = "National Weather Service active alerts via api.weather.gov."
settings_schema = NWSSettings
requires_api_key = None
wizard_order = 1
default_cadence_s = 60
def __init__(
self,
@ -248,35 +234,6 @@ class NWSAdapter(SourceAdapter):
},
)
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for a weather alert.
Subject format: central.wx.alert.us.<state>.<type>.<code>
where type is 'county' or 'zone' based on primary_region format.
"""
prefix = "central.wx"
if event.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = event.geo.primary_region
# Parse US-<STATE>-<CODE> format
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool:
"""Check if feature geometry intersects configured region bbox.

View file

@ -17,8 +17,6 @@ from tenacity import (
)
from central.adapter import SourceAdapter
from pydantic import BaseModel
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
@ -62,22 +60,10 @@ def magnitude_to_severity(mag: float) -> int:
return 5
class USGSQuakeSettings(BaseModel):
"""Settings schema for USGS quake adapter."""
feed: str = "all_hour"
region: RegionConfig | None = None
class USGSQuakeAdapter(SourceAdapter):
"""USGS Earthquake Hazards Program adapter."""
name = "usgs_quake"
display_name = "USGS Earthquakes"
description = "USGS earthquake feed (configurable window)."
settings_schema = USGSQuakeSettings
requires_api_key = None
wizard_order = 3
default_cadence_s = 60
def __init__(
self,
@ -412,9 +398,3 @@ class USGSQuakeAdapter(SourceAdapter):
new_count += 1
logger.info("USGS quake yielded events", extra={"count": new_count})
def subject_for(self, event: Event) -> str:
"""Return NATS subject for quake event."""
return f"central.{event.category}"

View file

@ -2197,90 +2197,95 @@ async def api_keys_delete(
return RedirectResponse(url="/api-keys", status_code=302)
# --- Events query helper ---
class EventsQueryResult:
"""Result from events query."""
def __init__(self, events: list, next_cursor: str | None, error: str | None = None):
self.events = events
self.next_cursor = next_cursor
self.error = error
def _parse_events_params(params) -> tuple[dict | None, str | None]:
@router.get("/events.json")
async def events_json(request: Request):
"""
Parse and validate events query parameters.
Paginated, filterable JSON endpoint for events.
Query parameters (all optional):
adapter: filter by adapter name
category: filter by event category
since: ISO 8601 datetime - events where time >= since
until: ISO 8601 datetime - events where time < until
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
limit: page size (default 50, max 200)
cursor: opaque pagination cursor
Returns:
(parsed_params, error_message)
If error_message is not None, parsed_params is None.
{"events": [...], "next_cursor": string or null}
"""
from fastapi.responses import JSONResponse
params = request.query_params
# Parse and validate limit
limit_str = params.get("limit", "50")
try:
limit = int(limit_str)
except ValueError:
return None, f"Invalid limit value: {limit_str}"
return JSONResponse(
{"error": f"Invalid limit value: {limit_str}"},
status_code=400,
)
if limit < 1 or limit > 200:
return None, "limit must be between 1 and 200"
return JSONResponse(
{"error": "limit must be between 1 and 200"},
status_code=400,
)
# Parse adapter filter
adapter = params.get("adapter")
if adapter == "":
adapter = None
# Parse category filter
# Parse category filter
category = params.get("category")
if category == "":
category = None
# Parse since/until filters
since = None
until = None
since_str = params.get("since")
if since_str:
try:
since = datetime.fromisoformat(since_str.replace("Z", "+00:00"))
except ValueError:
return None, f"Invalid ISO 8601 datetime for since: {since_str}"
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for since: {since_str}"},
status_code=400,
)
until_str = params.get("until")
if until_str:
try:
until = datetime.fromisoformat(until_str.replace("Z", "+00:00"))
except ValueError:
return None, f"Invalid ISO 8601 datetime for until: {until_str}"
return JSONResponse(
{"error": f"Invalid ISO 8601 datetime for until: {until_str}"},
status_code=400,
)
# Validate since <= until
if since and until and since > until:
return None, "since must be before or equal to until"
return JSONResponse(
{"error": "since must be before or equal to until"},
status_code=400,
)
# Parse region bbox
region_north = params.get("region_north")
region_south = params.get("region_south")
region_east = params.get("region_east")
region_west = params.get("region_west")
# Treat empty strings as None
if region_north == "":
region_north = None
if region_south == "":
region_south = None
if region_east == "":
region_east = None
if region_west == "":
region_west = None
region_params = [region_north, region_south, region_east, region_west]
region_supplied = [p for p in region_params if p is not None]
if len(region_supplied) > 0 and len(region_supplied) < 4:
return None, "Region filter requires all four parameters: region_north, region_south, region_east, region_west"
return JSONResponse(
{"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"},
status_code=400,
)
bbox = None
if len(region_supplied) == 4:
try:
@ -2291,13 +2296,16 @@ def _parse_events_params(params) -> tuple[dict | None, str | None]:
"west": float(region_west),
}
except ValueError:
return None, "Region parameters must be valid numbers"
return JSONResponse(
{"error": "Region parameters must be valid numbers"},
status_code=400,
)
# Parse cursor
cursor_time = None
cursor_id = None
cursor_str = params.get("cursor")
if cursor_str:
try:
decoded = base64.b64decode(cursor_str).decode("utf-8")
@ -2307,82 +2315,59 @@ def _parse_events_params(params) -> tuple[dict | None, str | None]:
cursor_time = datetime.fromisoformat(parts[0])
cursor_id = parts[1]
except Exception:
return None, "Invalid cursor"
return {
"limit": limit,
"adapter": adapter,
"category": category,
"since": since,
"until": until,
"bbox": bbox,
"cursor_time": cursor_time,
"cursor_id": cursor_id,
}, None
async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
"""
Fetch events from database using parsed parameters.
Returns EventsQueryResult with events list, next_cursor, and optional error.
"""
return JSONResponse(
{"error": "Invalid cursor"},
status_code=400,
)
# Get database pool after validation
pool = get_pool()
limit = parsed_params["limit"]
adapter = parsed_params["adapter"]
category = parsed_params["category"]
since = parsed_params["since"]
until = parsed_params["until"]
bbox = parsed_params["bbox"]
cursor_time = parsed_params["cursor_time"]
cursor_id = parsed_params["cursor_id"]
# Build query
conditions = []
query_params = []
param_idx = 1
if adapter:
conditions.append(f"adapter = ${param_idx}")
query_params.append(adapter)
param_idx += 1
if category:
conditions.append(f"category = ${param_idx}")
query_params.append(category)
param_idx += 1
if since:
conditions.append(f"time >= ${param_idx}")
query_params.append(since)
param_idx += 1
if until:
conditions.append(f"time < ${param_idx}")
query_params.append(until)
param_idx += 1
if bbox:
conditions.append(
f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))"
)
query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]])
param_idx += 4
if cursor_time and cursor_id:
conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})")
query_params.append(cursor_time)
query_params.append(cursor_id)
param_idx += 2
where_clause = ""
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
# Fetch limit+1 to check for next page
query = f"""
SELECT
SELECT
id,
time,
received,
@ -2398,26 +2383,29 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
LIMIT ${param_idx}
"""
query_params.append(limit + 1)
try:
async with pool.acquire() as conn:
rows = await conn.fetch(query, *query_params)
except Exception as e:
logger.error(f"Database error in _fetch_events: {e}")
return EventsQueryResult([], None, "Database error")
logger.error(f"Database error in events_json: {e}")
return JSONResponse(
{"error": "Database error"},
status_code=500,
)
# Check if there is a next page
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
# Build response
events = []
for row in rows:
geometry = None
if row["geometry"]:
geometry = json.loads(row["geometry"])
events.append({
"id": row["id"],
"time": row["time"].isoformat(),
@ -2429,196 +2417,15 @@ async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
"data": dict(row["data"]) if row["data"] else {},
"regions": list(row["regions"]) if row["regions"] else [],
})
# Build next_cursor if there are more results
next_cursor = None
if has_next and events:
last_event = rows[-1]
cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}"
next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8")
return EventsQueryResult(events, next_cursor)
def _geometry_summary(geometry: dict | None) -> str:
"""Generate a human-readable summary of a geometry."""
if not geometry:
return "None"
geom_type = geometry.get("type", "Unknown")
if geom_type == "Point":
return "Point"
elif geom_type == "LineString":
coords = geometry.get("coordinates", [])
return f"Line ({len(coords)} pts)"
elif geom_type == "Polygon":
coords = geometry.get("coordinates", [[]])
if coords:
return f"Polygon ({len(coords[0])} pts)"
return "Polygon"
elif geom_type == "MultiPolygon":
coords = geometry.get("coordinates", [])
return f"MultiPolygon ({len(coords)} parts)"
else:
return geom_type
@router.get("/events.json")
async def events_json(request: Request):
"""
Paginated, filterable JSON endpoint for events.
Query parameters (all optional):
adapter: filter by adapter name
category: filter by event category
since: ISO 8601 datetime - events where time >= since
until: ISO 8601 datetime - events where time < until
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
limit: page size (default 50, max 200)
cursor: opaque pagination cursor
Returns:
{"events": [...], "next_cursor": string or null}
"""
from fastapi.responses import JSONResponse
params = request.query_params
# Parse and validate parameters using shared helper
parsed, error = _parse_events_params(params)
if error:
return JSONResponse({"error": error}, status_code=400)
# Fetch events using shared helper
result = await _fetch_events(parsed)
if result.error:
return JSONResponse({"error": result.error}, status_code=500)
return JSONResponse({
"events": result.events,
"next_cursor": result.next_cursor,
"events": events,
"next_cursor": next_cursor,
})
# --- Events feed frontend routes ---
@router.get("/events", response_class=HTMLResponse)
async def events_list(request: Request) -> HTMLResponse:
"""Events feed page with filter form, table, and map."""
templates = _get_templates()
operator = getattr(request.state, "operator", None)
csrf_token = getattr(request.state, "csrf_token", "")
params = request.query_params
# Parse parameters
parsed, error = _parse_events_params(params)
# Get system settings for map tiles
pool = get_pool()
async with pool.acquire() as conn:
system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system")
tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap"
# Prepare filter values for template
filter_values = {
"adapter": params.get("adapter", ""),
"category": params.get("category", ""),
"since": params.get("since", ""),
"until": params.get("until", ""),
"region_north": params.get("region_north", ""),
"region_south": params.get("region_south", ""),
"region_east": params.get("region_east", ""),
"region_west": params.get("region_west", ""),
"limit": params.get("limit", "50"),
}
events = []
next_cursor = None
if error:
# Validation error - show error banner but don't fail the page
pass
else:
# Fetch events
result = await _fetch_events(parsed)
if result.error:
error = result.error
else:
events = result.events
next_cursor = result.next_cursor
# Add geometry summary to each event
for event in events:
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
return templates.TemplateResponse(
request=request,
name="events_list.html",
context={
"operator": operator,
"csrf_token": csrf_token,
"events": events,
"next_cursor": next_cursor,
"filter_values": filter_values,
"filter_error": error,
"tile_url": tile_url,
"tile_attribution": tile_attribution,
},
)
@router.get("/events/rows", response_class=HTMLResponse)
async def events_rows(request: Request) -> HTMLResponse:
"""HTMX fragment: events table rows only (no page chrome)."""
templates = _get_templates()
params = request.query_params
# Parse parameters
parsed, error = _parse_events_params(params)
# Prepare filter values for template
filter_values = {
"adapter": params.get("adapter", ""),
"category": params.get("category", ""),
"since": params.get("since", ""),
"until": params.get("until", ""),
"region_north": params.get("region_north", ""),
"region_south": params.get("region_south", ""),
"region_east": params.get("region_east", ""),
"region_west": params.get("region_west", ""),
"limit": params.get("limit", "50"),
}
events = []
next_cursor = None
if error:
pass
else:
result = await _fetch_events(parsed)
if result.error:
error = result.error
else:
events = result.events
next_cursor = result.next_cursor
# Add geometry summary to each event
for event in events:
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
return templates.TemplateResponse(
request=request,
name="_events_rows.html",
context={
"events": events,
"next_cursor": next_cursor,
"filter_values": filter_values,
"filter_error": error,
},
)

View file

@ -1,73 +0,0 @@
{% if filter_error %}
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
<strong>Filter Error:</strong> {{ filter_error }}
</article>
{% endif %}
{% if events %}
<table class="events-table">
<thead>
<tr>
<th style="width: 2rem;"></th>
<th>Time</th>
<th>Adapter</th>
<th>Category</th>
<th>Geometry</th>
<th>Subject</th>
</tr>
</thead>
<tbody>
{% for event in events %}
<tr class="event-row" data-row-idx="{{ loop.index0 }}"
data-event-id="{{ event.id }}"
data-adapter="{{ event.adapter }}"
data-category="{{ event.category }}"
data-time="{{ event.time }}"
data-subject="{{ event.subject or '' }}"
{% if event.geometry %}data-geometry='{{ event.geometry | tojson }}'{% endif %}>
<td><button type="button" class="expand-row" aria-label="Expand">&#9656;</button></td>
<td>{{ event.time }}</td>
<td>{{ event.adapter }}</td>
<td>{{ event.category }}</td>
<td>{{ event.geometry_summary }}</td>
<td>{{ event.subject or '—' }}</td>
</tr>
<tr class="event-detail" hidden>
<td colspan="6">
<dl class="event-detail-list">
<dt>Event ID</dt>
<dd><code>{{ event.id }}</code></dd>
<dt>Received</dt>
<dd>{{ event.received }}</dd>
{% if event.regions %}
<dt>Regions</dt>
<dd>{{ event.regions | join(", ") }}</dd>
{% endif %}
<dt>Data</dt>
<dd><pre class="event-data-pre">{{ event.data | tojson(indent=2) }}</pre></dd>
</dl>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<div class="pagination-info">
<span>Showing {{ events | length }} event{{ 's' if events | length != 1 else '' }}.</span>
{% if next_cursor %}
<a href="/events?cursor={{ next_cursor }}{% if filter_values.adapter %}&amp;adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&amp;category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&amp;since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&amp;until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&amp;region_north={{ filter_values.region_north }}&amp;region_south={{ filter_values.region_south }}&amp;region_east={{ filter_values.region_east }}&amp;region_west={{ filter_values.region_west }}{% endif %}&amp;limit={{ filter_values.limit }}"
role="button"
hx-get="/events/rows?cursor={{ next_cursor }}{% if filter_values.adapter %}&amp;adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&amp;category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&amp;since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&amp;until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&amp;region_north={{ filter_values.region_north }}&amp;region_south={{ filter_values.region_south }}&amp;region_east={{ filter_values.region_east }}&amp;region_west={{ filter_values.region_west }}{% endif %}&amp;limit={{ filter_values.limit }}"
hx-target="#events-rows"
hx-push-url="true">
Next &rarr;
</a>
{% else %}
<span><em>End of results</em></span>
{% endif %}
</div>
{% else %}
<article>
<p>No events match the filters.</p>
</article>
{% endif %}

View file

@ -17,7 +17,6 @@
{% if operator %}
<li><a href="/">Dashboard</a></li>
<li><a href="/adapters">Adapters</a></li>
<li><a href="/events">Events</a></li>
<li><a href="/streams">Streams</a></li>
<li><a href="/api-keys">API Keys</a></li>
<li>{{ operator.username }}</li>

View file

@ -1,437 +0,0 @@
{% extends "base.html" %}
{% block title %}Events - Central{% endblock %}
{% block head %}
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
#events-map {
height: 400px;
margin-bottom: 0.5rem;
border-radius: var(--pico-border-radius);
}
.map-controls {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
}
.map-legend {
display: flex;
gap: 1rem;
font-size: 0.85rem;
}
.map-legend-item {
display: flex;
align-items: center;
gap: 0.25rem;
}
.map-legend-swatch {
width: 16px;
height: 16px;
border-radius: 3px;
border: 1px solid rgba(0,0,0,0.2);
}
#fit-to-results {
padding: 0.25rem 0.75rem;
font-size: 0.85rem;
}
.events-table {
font-size: 0.9rem;
}
.events-table td {
vertical-align: middle;
}
.events-table tr.event-row:hover {
background-color: var(--pico-primary-focus);
cursor: pointer;
}
.events-table tr.event-row.highlighted {
background-color: var(--pico-primary-background);
}
.expand-row {
background: none;
border: none;
padding: 0.25rem;
cursor: pointer;
font-size: 0.9rem;
color: var(--pico-color);
}
.expand-row:hover {
color: var(--pico-primary);
}
.event-detail td {
background-color: var(--pico-card-background-color);
padding: 1rem;
}
.event-detail-list {
display: grid;
grid-template-columns: auto 1fr;
gap: 0.5rem 1rem;
margin: 0;
}
.event-detail-list dt {
font-weight: 600;
color: var(--pico-muted-color);
}
.event-detail-list dd {
margin: 0;
}
.event-data-pre {
max-height: 300px;
overflow: auto;
font-size: 0.8rem;
background: var(--pico-code-background-color);
padding: 0.5rem;
border-radius: var(--pico-border-radius);
margin: 0;
}
.filter-form .grid {
margin-bottom: 0.5rem;
}
.filter-form label {
margin-bottom: 0.25rem;
}
.filter-form input, .filter-form select {
margin-bottom: 0.5rem;
}
.pagination-info {
margin-top: 1rem;
display: flex;
justify-content: space-between;
align-items: center;
}
</style>
{% endblock %}
{% block content %}
<h1>Events</h1>
{% if filter_error %}
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
<strong>Filter Error:</strong> {{ filter_error }}
</article>
{% endif %}
<details open>
<summary>Filters</summary>
<form id="filter-form" class="filter-form" action="/events" method="get"
hx-get="/events/rows" hx-target="#events-rows" hx-push-url="true">
<div class="grid">
<div>
<label for="adapter">Adapter</label>
<select id="adapter" name="adapter">
<option value="">All</option>
<option value="nws" {% if filter_values.adapter == 'nws' %}selected{% endif %}>nws</option>
<option value="firms" {% if filter_values.adapter == 'firms' %}selected{% endif %}>firms</option>
<option value="usgs_quake" {% if filter_values.adapter == 'usgs_quake' %}selected{% endif %}>usgs_quake</option>
</select>
</div>
<div>
<label for="category">Category</label>
<input type="text" id="category" name="category" placeholder="Exact match"
value="{{ filter_values.category }}">
</div>
<div>
<label for="since">From</label>
<input type="datetime-local" id="since" name="since"
value="{{ filter_values.since }}">
</div>
<div>
<label for="until">To</label>
<input type="datetime-local" id="until" name="until"
value="{{ filter_values.until }}">
</div>
</div>
<!-- Hidden region inputs (managed by map viewport) -->
<input type="hidden" id="region_north" name="region_north" value="{{ filter_values.region_north }}">
<input type="hidden" id="region_south" name="region_south" value="{{ filter_values.region_south }}">
<input type="hidden" id="region_east" name="region_east" value="{{ filter_values.region_east }}">
<input type="hidden" id="region_west" name="region_west" value="{{ filter_values.region_west }}">
<input type="hidden" name="limit" value="{{ filter_values.limit }}">
<div style="display: flex; gap: 0.5rem; margin-top: 0.5rem;">
<button type="submit">Apply</button>
<a href="/events" role="button" class="outline">Clear Filters</a>
</div>
</form>
</details>
<div id="events-map"></div>
<div class="map-controls">
<div class="map-legend">
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #f59e0b;"></div>
<span>NWS (Weather)</span>
</div>
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #dc2626;"></div>
<span>FIRMS (Fire)</span>
</div>
<div class="map-legend-item">
<div class="map-legend-swatch" style="background-color: #7c3aed;"></div>
<span>USGS (Quake)</span>
</div>
</div>
<button type="button" id="fit-to-results" class="outline secondary">Fit map to results</button>
</div>
<div id="events-rows">
{% include "_events_rows.html" %}
</div>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
(function() {
var tileUrl = {{ tile_url | tojson }};
var tileAttr = {{ tile_attribution | tojson }};
// Adapter color mapping
var ADAPTER_COLORS = {
"nws": "#f59e0b",
"firms": "#dc2626",
"usgs_quake": "#7c3aed"
};
function getAdapterColor(adapter) {
return ADAPTER_COLORS[adapter] || "#3388ff";
}
// Initialize map
var map = L.map("events-map").setView([39, -98], 4);
L.tileLayer(tileUrl, {
attribution: tileAttr,
maxZoom: 18
}).addTo(map);
// Layer group for event geometries
var eventLayerGroup = L.layerGroup().addTo(map);
var highlightedRow = null;
var highlightedLayer = null;
var isInitialLoad = true;
var programmaticMove = false;
// Region input elements
var northInput = document.getElementById("region_north");
var southInput = document.getElementById("region_south");
var eastInput = document.getElementById("region_east");
var westInput = document.getElementById("region_west");
// Viewport-driven filter with debounce
var viewportDebounceTimer = null;
map.on("moveend", function() {
if (programmaticMove) {
programmaticMove = false;
return;
}
if (viewportDebounceTimer) clearTimeout(viewportDebounceTimer);
viewportDebounceTimer = setTimeout(applyViewportFilter, 400);
});
function applyViewportFilter() {
var bounds = map.getBounds();
northInput.value = bounds.getNorth().toFixed(4);
southInput.value = bounds.getSouth().toFixed(4);
eastInput.value = bounds.getEast().toFixed(4);
westInput.value = bounds.getWest().toFixed(4);
htmx.trigger(document.getElementById("filter-form"), "submit");
}
function buildPopup(row) {
var adapter = row.dataset.adapter || "";
var category = row.dataset.category || "";
var time = row.dataset.time ? new Date(row.dataset.time).toLocaleString() : "";
var subject = row.dataset.subject || "";
var eventId = row.dataset.eventId || "";
var html = "<strong>" + adapter + "</strong><br>" +
category + "<br>" +
"<small>" + time + "</small>";
if (subject) {
html += "<br><em>" + subject + "</em>";
}
html += '<br><a href="#" data-show-row="' + eventId + '">View details &#9662;</a>';
return html;
}
function rebindEventLayers() {
eventLayerGroup.clearLayers();
var rows = document.querySelectorAll("#events-rows tr.event-row[data-geometry]");
rows.forEach(function(row) {
var geomStr = row.dataset.geometry;
if (!geomStr || geomStr === "") return;
try {
var geom = JSON.parse(geomStr);
if (!geom) return;
var adapter = row.dataset.adapter || "";
var color = getAdapterColor(adapter);
var layer = L.geoJSON(geom, {
style: {
color: color,
weight: 2,
fillColor: color,
fillOpacity: 0.25
},
pointToLayer: function(feature, latlng) {
return L.circleMarker(latlng, {
radius: 8,
color: color,
weight: 2,
fillColor: color,
fillOpacity: 0.25
});
}
});
layer.bindPopup(buildPopup(row));
layer.on("click", function() {
highlightRow(row, layer, color);
});
layer.addTo(eventLayerGroup);
// Store reference for row highlighting
row._mapLayer = layer;
row._mapColor = color;
} catch (e) {
console.error("Error parsing geometry:", e);
}
});
}
function highlightRow(row, layer, originalColor) {
// Reset previous highlight
if (highlightedRow) {
highlightedRow.classList.remove("highlighted");
}
if (highlightedLayer && highlightedLayer._originalColor) {
highlightedLayer.setStyle({
color: highlightedLayer._originalColor,
weight: 2,
fillColor: highlightedLayer._originalColor,
fillOpacity: 0.25
});
}
// Set new highlight
row.classList.add("highlighted");
highlightedRow = row;
if (layer) {
layer._originalColor = originalColor;
layer.setStyle({
color: "#ff3333",
weight: 4,
fillColor: "#ff3333",
fillOpacity: 0.4
});
highlightedLayer = layer;
}
}
function fitToAllLayers() {
var layers = eventLayerGroup.getLayers();
if (layers.length === 0) return;
programmaticMove = true;
var group = L.featureGroup(layers);
map.fitBounds(group.getBounds(), { padding: [20, 20] });
}
// Row click handler (event delegation)
document.addEventListener("click", function(e) {
// Expand/collapse handler
if (e.target.classList.contains("expand-row")) {
var row = e.target.closest("tr");
var detail = row.nextElementSibling;
if (detail && detail.classList.contains("event-detail")) {
detail.hidden = !detail.hidden;
e.target.innerHTML = detail.hidden ? "&#9656;" : "&#9662;";
}
return;
}
// Popup "View details" link handler
if (e.target.matches("[data-show-row]")) {
e.preventDefault();
var eventId = e.target.dataset.showRow;
var targetRow = document.querySelector(
'tr[data-event-id="' + CSS.escape(eventId) + '"]'
);
if (!targetRow) return;
targetRow.scrollIntoView({ behavior: "smooth", block: "center" });
var detail = targetRow.nextElementSibling;
if (detail && detail.classList.contains("event-detail")) {
detail.hidden = false;
var button = targetRow.querySelector(".expand-row");
if (button) button.innerHTML = "&#9662;";
}
return;
}
// Row click to highlight (no auto-pan - user controls viewport)
var row = e.target.closest("tr.event-row");
if (row && row._mapLayer) {
highlightRow(row, row._mapLayer, row._mapColor);
// Map stays where user put it
}
});
// Row hover handlers (event delegation)
document.addEventListener("mouseenter", function(e) {
var row = e.target.closest && e.target.closest("tr.event-row");
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
row._mapLayer.setStyle({
color: row._mapColor,
weight: 3,
fillColor: row._mapColor,
fillOpacity: 0.25
});
}
}, true);
document.addEventListener("mouseleave", function(e) {
var row = e.target.closest && e.target.closest("tr.event-row");
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
row._mapLayer.setStyle({
color: row._mapColor,
weight: 2,
fillColor: row._mapColor,
fillOpacity: 0.25
});
}
}, true);
// Fit to results button
document.getElementById("fit-to-results").addEventListener("click", fitToAllLayers);
// Initial load - bind layers and fit bounds
rebindEventLayers(); // Initial load only
if (false) { // DISABLED: map never auto-fits
fitToAllLayers();
isInitialLoad = false;
}
// Re-bind layers after HTMX swap (but do NOT fit bounds)
document.body.addEventListener("htmx:afterSwap", function(evt) {
if (evt.detail.target.id === "events-rows") {
// rebindEventLayers(); // DISABLED: map shows all events, only table filters
// Do NOT call fitToAllLayers - preserve user viewport
}
});
// Fix map rendering after container shows
setTimeout(function() { map.invalidateSize(); }, 100);
})();
</script>
{% endblock %}

View file

@ -32,3 +32,48 @@ class Event(BaseModel):
data: dict[str, Any] # adapter-specific payload
def subject_for_event(ev: Event) -> str:
"""
Compute the NATS subject for an event based on its category.
Dispatch by category prefix:
- fire.*: returns central.<category> directly
- wx.*: uses weather alert subject logic
Weather alert subjects:
central.wx.alert.us.<state_lower>.county.<county_lower>
or
central.wx.alert.us.<state_lower>.zone.<zone_lower>
based on whether the primary_region encodes a county or a zone.
Fire hotspot subjects:
central.fire.hotspot.<satellite>.<confidence>
"""
# Fire events: subject is just central.<category>
if ev.category.startswith("fire."):
return f"central.{ev.category}"
# Weather events: use geo-based subject logic
prefix = "central.wx"
if ev.geo.primary_region is None:
return f"{prefix}.alert.us.unknown"
region = ev.geo.primary_region
# Parse US-<STATE>-<CODE> format
# County codes are like "Ada", "Canyon" (names)
# Zone codes start with "Z" like "Z033"
parts = region.split("-")
if len(parts) < 3 or parts[0] != "US":
return f"{prefix}.alert.us.unknown"
state = parts[1].lower()
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
# Zone code like Z033
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
else:
# County name
return f"{prefix}.alert.us.{state}.county.{code.lower()}"

View file

@ -13,40 +13,24 @@ from typing import Any
import nats
from nats.js import JetStreamContext
import importlib
import pkgutil
from central.adapter import SourceAdapter
from central.adapters.nws import NWSAdapter
from central.adapters.firms import FIRMSAdapter
from central.adapters.usgs_quake import USGSQuakeAdapter
from central.cloudevents_wire import wrap_event
from central.config_models import AdapterConfig
from central.config_source import ConfigSource, DbConfigSource
from central.config_store import ConfigStore
from central.bootstrap_config import get_settings
from central.models import subject_for_event
from central.stream_manager import StreamManager
import central.adapters
def discover_adapters() -> dict[str, type[SourceAdapter]]:
"""Auto-discover adapter classes from central.adapters package."""
registry: dict[str, type[SourceAdapter]] = {}
for module_info in pkgutil.iter_modules(central.adapters.__path__):
try:
module = importlib.import_module(f"central.adapters.{module_info.name}")
except Exception as e:
logger.error(
"Failed to import adapter module",
extra={"module": module_info.name, "error": str(e)},
)
continue
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, SourceAdapter)
and attr is not SourceAdapter
and hasattr(attr, "name")
):
registry[attr.name] = attr
return registry
# Adapter registry - add new adapters here
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = {
"nws": NWSAdapter,
"firms": FIRMSAdapter,
"usgs_quake": USGSQuakeAdapter,
}
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
@ -130,7 +114,6 @@ class Supervisor:
self._config_store = config_store
self._nats_url = nats_url
self._cloudevents_config = cloudevents_config
self._adapters = discover_adapters()
self._nc: nats.NATS | None = None
self._js: JetStreamContext | None = None
self._stream_manager: StreamManager | None = None
@ -178,7 +161,7 @@ class Supervisor:
def _create_adapter(self, config: AdapterConfig) -> SourceAdapter:
"""Create an adapter instance based on config name."""
cls = self._adapters.get(config.name)
cls = _ADAPTER_REGISTRY.get(config.name)
if cls is None:
raise ValueError(f"Unknown adapter type: {config.name}")
return cls(
@ -249,7 +232,7 @@ class Supervisor:
# Build CloudEvent (uses defaults if no config provided)
envelope, msg_id = wrap_event(event, self._cloudevents_config)
subject = state.adapter.subject_for(event)
subject = subject_for_event(event)
# Publish
await self._publish_event(subject, envelope, msg_id)

View file

@ -1,686 +0,0 @@
"""Tests for events feed frontend routes."""
import json
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.gui.routes import events_list, events_rows, events_json
class TestEventsFeedFrontendAuthenticated:
"""Test events feed frontend with authentication."""
@pytest.mark.asyncio
async def test_events_no_filters_returns_html(self):
"""GET /events authenticated, no filters returns HTML with events."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {}
mock_events = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Weather Alert",
"subject": f"Test Alert {i}",
"geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None,
"data": {},
"regions": [],
}
for i in range(5)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
call_args = mock_templates.TemplateResponse.call_args
context = call_args.kwargs.get("context", call_args[1].get("context"))
assert "events" in context
assert context["filter_error"] is None
@pytest.mark.asyncio
async def test_events_adapter_filter(self):
"""GET /events?adapter=nws returns only nws events."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"adapter": "nws"}
mock_events = [
{
"id": "nws_event_1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "NWS Alert",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_values"]["adapter"] == "nws"
@pytest.mark.asyncio
async def test_events_since_until_filter(self):
"""GET /events?since=...&until=... filters by time window."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {
"since": "2026-05-17T00:00:00",
"until": "2026-05-17T12:00:00",
}
mock_events = [
{
"id": "in_range",
"time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "In Range",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
# Verify filter was actually parsed and passed to template
mock_templates.TemplateResponse.assert_called_once()
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
context = call_kwargs.get("context", call_kwargs)
assert context["filter_values"]["since"] == "2026-05-17T00:00:00"
assert context["filter_values"]["until"] == "2026-05-17T12:00:00"
@pytest.mark.asyncio
async def test_events_region_filter(self):
"""GET /events with full region bbox filters by location."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {
"region_north": "49.5",
"region_south": "31",
"region_east": "-102",
"region_west": "-124.5",
}
mock_events = [
{
"id": "in_bbox",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "In BBox",
"geometry": '{"type": "Point", "coordinates": [-120, 40]}',
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
# Verify region filter was actually parsed and passed to template
mock_templates.TemplateResponse.assert_called_once()
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
context = call_kwargs.get("context", call_kwargs)
assert context["filter_values"]["region_north"] == "49.5"
assert context["filter_values"]["region_south"] == "31"
assert context["filter_values"]["region_east"] == "-102"
assert context["filter_values"]["region_west"] == "-124.5"
@pytest.mark.asyncio
async def test_events_partial_region_shows_error_banner(self):
"""GET /events with partial region shows error banner, not 400."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"region_north": "49"}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
# Should be 200, not 400
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_error"] is not None
assert "region" in context["filter_error"].lower()
# Events should be empty due to validation error
assert context["events"] == []
@pytest.mark.asyncio
async def test_events_with_limit_shows_next_button(self):
"""GET /events?limit=5 shows Next button when more events exist."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf_token"
mock_request.query_params = {"limit": "5"}
# Return 6 events (limit+1) to trigger pagination
mock_events = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Alert",
"subject": f"Event {i}",
"geometry": None,
"data": {},
"regions": [],
}
for i in range(6)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["next_cursor"] is not None
assert len(context["events"]) == 5 # Should be trimmed to limit
class TestEventsRowsFragment:
"""Test /events/rows HTMX fragment."""
@pytest.mark.asyncio
async def test_events_rows_returns_fragment(self):
"""GET /events/rows returns table fragment, not full page."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {"limit": "5"}
mock_events = [
{
"id": "event_1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "Event 1",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
assert result.status_code == 200
# Verify it uses the fragment template
call_args = mock_templates.TemplateResponse.call_args
assert call_args.kwargs.get("name") == "_events_rows.html"
class TestGeometrySummary:
"""Test geometry summary function."""
def test_geometry_summary_polygon(self):
"""Polygon geometry shows point count."""
from central.gui.routes import _geometry_summary
geom = {
"type": "Polygon",
"coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]
}
summary = _geometry_summary(geom)
assert "Polygon" in summary
assert "5 pts" in summary
def test_geometry_summary_point(self):
"""Point geometry shows 'Point'."""
from central.gui.routes import _geometry_summary
geom = {"type": "Point", "coordinates": [-122.4, 37.8]}
summary = _geometry_summary(geom)
assert summary == "Point"
def test_geometry_summary_linestring(self):
"""LineString geometry shows point count."""
from central.gui.routes import _geometry_summary
geom = {
"type": "LineString",
"coordinates": [[-122, 37], [-121, 38], [-120, 39]]
}
summary = _geometry_summary(geom)
assert "Line" in summary
assert "3 pts" in summary
def test_geometry_summary_none(self):
"""None geometry shows 'None'."""
from central.gui.routes import _geometry_summary
summary = _geometry_summary(None)
assert summary == "None"
class TestDataGeometryAttribute:
"""Test that rows have valid geometry data attributes."""
@pytest.mark.asyncio
async def test_event_with_geometry_has_valid_json(self):
"""Events with geometry have parseable JSON in data-geometry."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {}
mock_events = [
{
"id": "geom_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "With Geometry",
"geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}',
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
event = context["events"][0]
# Geometry should be parsed dict, not string
assert isinstance(event["geometry"], dict)
assert event["geometry"]["type"] == "Polygon"
@pytest.mark.asyncio
async def test_event_without_geometry_has_none(self):
"""Events without geometry have None for geometry field."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {}
mock_events = [
{
"id": "no_geom_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Alert",
"subject": "No Geometry",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_rows(mock_request)
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
event = context["events"][0]
assert event["geometry"] is None
class TestCrossEndpointParity:
"""Test that /events.json and /events return the same filtered results."""
@pytest.mark.asyncio
async def test_category_filter_both_endpoints(self):
"""Category filter works on both /events.json and /events."""
mock_events = [
{
"id": "weather_event",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "nws",
"category": "Weather Alert",
"subject": "Weather Event",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
query_params = {"category": "Weather Alert"}
# Test /events.json
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = query_params
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
assert len(json_data["events"]) == 1
assert json_data["events"][0]["category"] == "Weather Alert"
# Test /events
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = query_params
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = mock_events
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert len(html_context["events"]) == 1
assert html_context["events"][0]["category"] == "Weather Alert"
@pytest.mark.asyncio
async def test_cursor_pagination_both_endpoints(self):
"""Cursor pagination works identically on both endpoints."""
first_page = [
{
"id": f"event_{i}",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
"adapter": "nws",
"category": "Alert",
"subject": f"Event {i}",
"geometry": None,
"data": {},
"regions": [],
}
for i in range(3)
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = first_page
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
json_request = MagicMock()
json_request.state.operator = MagicMock(id=1, username="admin")
json_request.query_params = {"limit": "2"}
with patch("central.gui.routes.get_pool", return_value=mock_pool):
json_response = await events_json(json_request)
json_data = json.loads(json_response.body)
json_cursor = json_data["next_cursor"]
assert json_cursor is not None
html_request = MagicMock()
html_request.state.operator = MagicMock(id=1, username="admin")
html_request.state.csrf_token = "test_csrf"
html_request.query_params = {"limit": "2"}
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
mock_conn.fetch.return_value = first_page
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
await events_list(html_request)
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
html_cursor = html_context["next_cursor"]
assert json_cursor == html_cursor
class TestErrorSemantics:
"""Test error handling differences between JSON and HTML endpoints."""
@pytest.mark.asyncio
async def test_json_endpoint_returns_400_on_invalid_limit(self):
"""/events.json?limit=0 returns 400 JSON error."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.query_params = {"limit": "0"}
response = await events_json(mock_request)
assert response.status_code == 400
data = json.loads(response.body)
assert "error" in data
@pytest.mark.asyncio
async def test_html_endpoint_returns_200_with_error_banner(self):
"""/events?limit=0 returns 200 HTML with error banner."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf"
mock_request.query_params = {"limit": "0"}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
assert context["filter_error"] is not None
assert "limit" in context["filter_error"].lower()
assert context["events"] == []
class TestEventRowDataAttributes:
"""Test that _events_rows.html renders required data attributes."""
@pytest.mark.asyncio
async def test_row_renders_data_adapter_attribute(self):
"""Event rows include data-adapter attribute for color coding."""
mock_request = MagicMock()
mock_request.state.operator = MagicMock(id=1, username="admin")
mock_request.state.csrf_token = "test_csrf"
mock_request.query_params = {}
mock_events = [
{
"id": "test1",
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
"adapter": "usgs_quake",
"category": "quake.event",
"subject": "M4.2 Earthquake",
"geometry": None,
"data": {},
"regions": [],
},
]
mock_conn = AsyncMock()
mock_conn.fetch.return_value = mock_events
mock_conn.fetchrow.return_value = {
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"map_attribution": "OpenStreetMap",
}
mock_pool = MagicMock()
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
mock_templates = MagicMock()
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
with patch("central.gui.routes._get_templates", return_value=mock_templates):
with patch("central.gui.routes.get_pool", return_value=mock_pool):
result = await events_list(mock_request)
assert result.status_code == 200
mock_templates.TemplateResponse.assert_called_once()
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
# The template receives events with adapter field for data-adapter attribute
assert len(context["events"]) == 1
assert context["events"][0]["adapter"] == "usgs_quake"
assert context["events"][0]["category"] == "quake.event"
assert context["events"][0]["subject"] == "M4.2 Earthquake"

View file

@ -10,6 +10,7 @@ from central.adapters.firms import (
FIRMSAdapter,
CONFIDENCE_MAP,
SATELLITE_SHORT,
subject_for_fire_hotspot,
)
from central.config_models import AdapterConfig, RegionConfig
from central.models import Event, Geo
@ -284,14 +285,7 @@ class TestDeduplication:
class TestSubjectGeneration:
"""Test subject generation for fire hotspots."""
@pytest.mark.asyncio
async def test_subject_format(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
def test_subject_format(self):
event = Event(
id="test",
adapter="firms",
@ -302,17 +296,10 @@ class TestSubjectGeneration:
data={},
)
subject = adapter.subject_for(event)
subject = subject_for_fire_hotspot(event)
assert subject == "central.fire.hotspot.viirs_snpp.high"
@pytest.mark.asyncio
async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = FIRMSAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
def test_subject_nominal_confidence(self):
event = Event(
id="test",
adapter="firms",
@ -323,7 +310,7 @@ class TestSubjectGeneration:
data={},
)
subject = adapter.subject_for(event)
subject = subject_for_fire_hotspot(event)
assert subject == "central.fire.hotspot.viirs_noaa20.nominal"

View file

@ -4,7 +4,7 @@ from datetime import datetime, timezone
import pytest
from central.models import Event, Geo
from central.models import Event, Geo, subject_for_event
from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config
from central.cloudevents_wire import wrap_event
@ -57,6 +57,47 @@ def sample_config() -> Config:
)
class TestSubjectForEvent:
"""Tests for subject_for_event helper."""
def test_county_subject(self, sample_event: Event) -> None:
"""County codes produce county subject."""
subject = subject_for_event(sample_event)
assert subject == "central.wx.alert.us.id.county.ada"
def test_zone_subject(self, sample_geo: Geo) -> None:
"""Zone codes produce zone subject."""
geo = Geo(
centroid=sample_geo.centroid,
bbox=sample_geo.bbox,
regions=["US-ID-Z033"],
primary_region="US-ID-Z033",
)
event = Event(
id="test-zone",
adapter="nws",
category="wx.alert.winter_storm_warning",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.id.zone.z033"
def test_unknown_subject(self, sample_event: Event) -> None:
"""Missing primary_region produces unknown subject."""
geo = Geo(regions=[], primary_region=None)
event = Event(
id="test-unknown",
adapter="nws",
category="wx.alert.test",
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
geo=geo,
data={},
)
subject = subject_for_event(event)
assert subject == "central.wx.alert.us.unknown"
class TestCloudEventsWire:
"""Tests for CloudEvents wire format."""

View file

@ -17,6 +17,7 @@ from central.adapters.nws import (
SEVERITY_MAP,
)
from central.config_models import AdapterConfig
from central.models import subject_for_event
# Sample NWS GeoJSON features for testing
@ -271,7 +272,7 @@ class TestSubjectDerivation:
def test_county_subject(self, adapter: NWSAdapter) -> None:
event = adapter._normalize_feature(SAMPLE_FEATURE_ID)
assert event is not None
subject = adapter.subject_for(event)
subject = subject_for_event(event)
# Primary region should be alphabetically first
# Could be county or zone depending on sort order
assert subject.startswith("central.wx.alert.us.id.")
@ -293,7 +294,7 @@ class TestSubjectDerivation:
}
event = adapter._normalize_feature(feature)
assert event is not None
subject = adapter.subject_for(event)
subject = subject_for_event(event)
assert "zone" in subject

View file

@ -200,77 +200,76 @@ class TestEnableDisableEnableIntegration:
supervisor._js = mock_nats.jetstream()
# Patch NWSAdapter to use our mock
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start supervisor (starts adapter)
await supervisor._start_adapter(initial_config)
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
# Start supervisor (starts adapter)
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
# Simulate completed poll 5 minutes ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
saved_last_poll = state.last_completed_poll
# Simulate completed poll 5 minutes ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
saved_last_poll = state.last_completed_poll
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Re-enable adapter
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Re-enable adapter
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is in the past
# and poll immediately. Let's verify by checking the wait time logic.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
wait_time = max(0, next_poll_at - now.timestamp())
# The loop should detect that last_poll + cadence is in the past
# and poll immediately. Let's verify by checking the wait time logic.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was 5 minutes ago, cadence is 60s
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
# wait_time should be 0 (poll immediately)
assert wait_time == 0, (
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
f"last_poll was {saved_last_poll}, now is {now}"
)
# last_poll was 5 minutes ago, cadence is 60s
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
# wait_time should be 0 (poll immediately)
assert wait_time == 0, (
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
f"last_poll was {saved_last_poll}, now is {now}"
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio
async def test_enable_disable_enable_gap_shorter_than_cadence(
@ -309,76 +308,75 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(initial_config)
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
# Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
state = supervisor._adapter_states.get("nws")
assert state is not None
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
saved_last_poll = state.last_completed_poll
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
saved_last_poll = state.last_completed_poll
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
# On unfixed code, state will be NONE because pop() removes it
# On fixed code, state still exists with is_running=False
state = supervisor._adapter_states.get("nws")
assert state is not None, (
"State was removed on stop! This violates the rate-limit guarantee. "
"State should be preserved to maintain last_completed_poll."
)
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Re-enable adapter (simulate 20 seconds later, but we're just
# checking the rate limit logic)
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Re-enable adapter (simulate 20 seconds later, but we're just
# checking the rate limit logic)
reenabled_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(reenabled_config)
await supervisor._on_config_change("adapters", "nws")
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# Verify restarted with preserved last_completed_poll
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
assert state.last_completed_poll == saved_last_poll
# The loop should detect that last_poll + cadence is still in the future
# and wait until then.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60
wait_time = max(0, next_poll_at - now.timestamp())
# The loop should detect that last_poll + cadence is still in the future
# and wait until then.
now = datetime.now(timezone.utc)
next_poll_at = saved_last_poll.timestamp() + 60
wait_time = max(0, next_poll_at - now.timestamp())
# last_poll was ~10 seconds ago, cadence is 60s
# wait_time should be ~50s (60 - 10 = 50)
assert 45 < wait_time < 55, (
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
f"Rate limit violated: poll would happen before last_poll + cadence"
)
# last_poll was ~10 seconds ago, cadence is 60s
# wait_time should be ~50s (60 - 10 = 50)
assert 45 < wait_time < 55, (
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
f"Rate limit violated: poll would happen before last_poll + cadence"
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio
async def test_enable_disable_delete_readd_fresh_state(
@ -416,61 +414,60 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(initial_config)
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
# Start adapter
await supervisor._start_adapter(initial_config)
state = supervisor._adapter_states.get("nws")
assert state is not None
state = supervisor._adapter_states.get("nws")
assert state is not None
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
# Simulate completed poll 10 seconds ago
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# Disable adapter
disabled_config = AdapterConfig(
name="nws",
enabled=False,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(disabled_config)
await supervisor._on_config_change("adapters", "nws")
# DELETE adapter from DB (remove from config source)
config_source.set_adapter(None, name="nws")
await supervisor._on_config_change("adapters", "nws")
# DELETE adapter from DB (remove from config source)
config_source.set_adapter(None, name="nws")
await supervisor._on_config_change("adapters", "nws")
# Verify adapter fully removed
assert "nws" not in supervisor._adapter_states
# Verify adapter fully removed
assert "nws" not in supervisor._adapter_states
# Re-add adapter with same name
new_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(new_config)
await supervisor._on_config_change("adapters", "nws")
# Re-add adapter with same name
new_config = AdapterConfig(
name="nws",
enabled=True,
cadence_s=60,
settings={"states": ["ID"], "contact_email": "test@test.com"},
paused_at=None,
updated_at=datetime.now(timezone.utc),
)
config_source.set_adapter(new_config)
await supervisor._on_config_change("adapters", "nws")
# Verify new adapter started fresh
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
# last_completed_poll should be None (fresh adapter)
assert state.last_completed_poll is None, (
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
f"Preserved state not cleared on delete."
)
# Verify new adapter started fresh
state = supervisor._adapter_states.get("nws")
assert state is not None
assert adapter_is_running(state)
# last_completed_poll should be None (fresh adapter)
assert state.last_completed_poll is None, (
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
f"Preserved state not cleared on delete."
)
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio
async def test_stop_preserves_state_start_reuses_it(
@ -500,35 +497,34 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
# Start adapter
await supervisor._start_adapter(config)
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
# Start adapter
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
saved_poll = state.last_completed_poll
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
saved_poll = state.last_completed_poll
# Stop adapter
await supervisor._stop_adapter("nws")
# Stop adapter
await supervisor._stop_adapter("nws")
# State should still exist
assert "nws" in supervisor._adapter_states
state = supervisor._adapter_states["nws"]
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_poll
# State should still exist
assert "nws" in supervisor._adapter_states
state = supervisor._adapter_states["nws"]
assert not adapter_is_running(state)
assert state.last_completed_poll == saved_poll
# Restart adapter
await supervisor._start_adapter(config)
# Restart adapter
await supervisor._start_adapter(config)
# Should reuse existing state
state = supervisor._adapter_states.get("nws")
assert adapter_is_running(state)
assert state.last_completed_poll == saved_poll
# Should reuse existing state
state = supervisor._adapter_states.get("nws")
assert adapter_is_running(state)
assert state.last_completed_poll == saved_poll
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
# Cleanup
supervisor._shutdown_event.set()
await cleanup_adapter(supervisor, "nws")
@pytest.mark.asyncio
async def test_remove_adapter_clears_state(
@ -558,15 +554,14 @@ class TestEnableDisableEnableIntegration:
supervisor._nc = mock_nats
supervisor._js = mock_nats.jetstream()
# Inject mock adapter into supervisor's registry
supervisor._adapters["nws"] = MockNWSAdapter
await supervisor._start_adapter(config)
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
await supervisor._start_adapter(config)
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc)
state = supervisor._adapter_states.get("nws")
state.last_completed_poll = datetime.now(timezone.utc)
# Remove adapter
await cleanup_adapter(supervisor, "nws")
# Remove adapter
await cleanup_adapter(supervisor, "nws")
# State should be gone
assert "nws" not in supervisor._adapter_states
# State should be gone
assert "nws" not in supervisor._adapter_states

View file

@ -480,122 +480,3 @@ class TestApplyConfig:
assert adapter._feed == "all_day"
await adapter.shutdown()
class TestSubjectFor:
"""Test subject_for method for all magnitude tiers."""
@pytest.mark.asyncio
async def test_subject_minor(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-minor",
adapter="usgs_quake",
category="quake.event.minor",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.minor"
@pytest.mark.asyncio
async def test_subject_light(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-light",
adapter="usgs_quake",
category="quake.event.light",
time=datetime.now(timezone.utc),
severity=1,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.light"
@pytest.mark.asyncio
async def test_subject_moderate(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-moderate",
adapter="usgs_quake",
category="quake.event.moderate",
time=datetime.now(timezone.utc),
severity=2,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.moderate"
@pytest.mark.asyncio
async def test_subject_strong(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-strong",
adapter="usgs_quake",
category="quake.event.strong",
time=datetime.now(timezone.utc),
severity=3,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.strong"
@pytest.mark.asyncio
async def test_subject_major(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-major",
adapter="usgs_quake",
category="quake.event.major",
time=datetime.now(timezone.utc),
severity=4,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.major"
@pytest.mark.asyncio
async def test_subject_great(self, temp_db_path, mock_config_store):
config = make_adapter_config()
adapter = USGSQuakeAdapter(
config=config,
config_store=mock_config_store,
cursor_db_path=temp_db_path,
)
event = Event(
id="test-great",
adapter="usgs_quake",
category="quake.event.great",
time=datetime.now(timezone.utc),
severity=5,
geo=Geo(centroid=(-116.0, 45.0)),
data={},
)
assert adapter.subject_for(event) == "central.quake.event.great"