mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
Compare commits
7 commits
78b6fcf150
...
87f46e8b35
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87f46e8b35 | ||
|
|
4ee3d8bd14 | ||
|
|
4573bf6ee2 | ||
|
38b23f2a25 |
|||
|
dbe627dee4 |
|||
|
3de81f392a |
|||
|
55e68d038f |
19 changed files with 2076 additions and 432 deletions
58
CHANGELOG.md
58
CHANGELOG.md
|
|
@ -1,5 +1,63 @@
|
|||
# Changelog
|
||||
|
||||
## v0.3.0 — Phase 1b (2026-05-18)
|
||||
|
||||
Operator console. FastAPI + Jinja2 + Pico + HTMX. Self-hosted,
|
||||
Tailscale-gated by default, no application-level auth beyond
|
||||
the operator session.
|
||||
|
||||
### Added
|
||||
- Operator console (`central-gui` systemd service on port 8000)
|
||||
- Login + session auth (argon2id, 90-day DB-backed sessions)
|
||||
- Dashboard: events 24h by adapter, stream sizes,
|
||||
last-poll-time per adapter
|
||||
- Adapters list and edit page (cadence + per-adapter settings),
|
||||
with Leaflet region picker and click-to-draw rectangles
|
||||
- Streams view with retention chips (1d / 7d / 14d / 30d /
|
||||
365d / custom)
|
||||
- API keys management (list / add / rotate / delete,
|
||||
encrypted at rest via `crypto.encrypt`, plaintext never
|
||||
logged or stored)
|
||||
- First-run wizard (5 steps: operator, system, keys, adapters,
|
||||
finish) with deferred-commit pattern — no DB writes until
|
||||
Finish runs as a single transaction
|
||||
- Events feed page (`/events`) — paginated, filterable by
|
||||
adapter / category / time range / map viewport, with
|
||||
color-coded geometry overlay, click-to-popup, and
|
||||
expandable row details showing full event payload
|
||||
- Paginated events JSON API (`/events.json`) — cursor-based
|
||||
pagination, same filter surface as the HTML feed
|
||||
|
||||
### Changed
|
||||
- CSRF tokens are now session-bound (synchronizer token
|
||||
pattern), replacing the previous fastapi-csrf-protect
|
||||
library. Eliminates a rotation race that broke first-load
|
||||
submissions
|
||||
- First-run wizard is a single atomic transaction at Finish,
|
||||
not per-step DB writes. Back navigation works; abandoned
|
||||
wizards leave no orphan rows
|
||||
|
||||
### Fixed
|
||||
- Adapter editor's JSONB double-encoding bug (write path
|
||||
called `json.dumps` before asyncpg's codec, corrupting
|
||||
the settings column)
|
||||
- Dashboard polls card was reading from the wrong NATS
|
||||
subject and using a durable consumer instead of
|
||||
`get_last_msg`, leaking zombie consumers
|
||||
- Browser-noise paths (/favicon.ico, /apple-touch-icon.png,
|
||||
/robots.txt) return 204 directly, preventing parallel
|
||||
requests from racing the CSRF cookie on first page load
|
||||
- SubResource Integrity hashes for leaflet-draw assets
|
||||
corrected (previous values were fabricated and silently
|
||||
blocked by browsers)
|
||||
|
||||
### Infrastructure
|
||||
- New `config.sessions` column: `csrf_token` (per-session
|
||||
synchronizer)
|
||||
- Composite index on `public.events (time DESC, id DESC)`
|
||||
for cursor pagination
|
||||
- `central-gui` systemd service
|
||||
|
||||
## v0.2.0 — Phase 1a (2026-05-16)
|
||||
|
||||
Three live data sources, configurable infrastructure, hot-reload
|
||||
|
|
|
|||
|
|
@ -28,6 +28,32 @@ The Windows workstation (matt-desktop) has no Central repository clones.
|
|||
The directory `C:\Users\mtthw\central_work\` is scratch space only and
|
||||
should not be used for commits.
|
||||
|
||||
|
||||
## Network and Service Bindings
|
||||
|
||||
### Bind Address
|
||||
|
||||
`central-gui` binds to `0.0.0.0` by design. Network gating is the
|
||||
operator's responsibility (firewall, Tailscale, etc.), not the app's.
|
||||
Do not switch to `127.0.0.1` or to a specific interface — operators
|
||||
choose their bind via whatever network they want to expose the service on.
|
||||
|
||||
### NATS Listener Ports
|
||||
|
||||
The default `nats-server.conf` listens on more than just :4222:
|
||||
|
||||
| Port | Protocol | Used by Central? |
|
||||
|------|----------|------------------|
|
||||
| 4222 | NATS client | Yes (all) |
|
||||
| 8080 | WebSocket | No (Phase 0 leftover) |
|
||||
| 8222 | HTTP monitoring | No (manual ops only) |
|
||||
| 1883 | MQTT | No (Phase 0 leftover) |
|
||||
|
||||
None of the unused ports cause active harm — they listen but no consumer
|
||||
connects. Operators can remove them from `nats-server.conf` if they want
|
||||
a tighter footprint. Documenting so future contributors don't grep for
|
||||
"MQTT integration" and come up confused.
|
||||
|
||||
## Repository
|
||||
|
||||
| Property | Value |
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
|||
|
||||
[project]
|
||||
name = "central"
|
||||
version = "0.1.0"
|
||||
version = "0.3.0"
|
||||
requires-python = ">=3.12,<3.13"
|
||||
description = "Data hub spine — adapters, bus, archive."
|
||||
readme = "README.md"
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ from abc import ABC, abstractmethod
|
|||
from collections.abc import AsyncIterator
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from central.config_models import AdapterConfig
|
||||
|
||||
|
|
@ -16,9 +18,24 @@ class SourceAdapter(ABC):
|
|||
|
||||
Adapters yield Events. The supervisor handles scheduling,
|
||||
CloudEvents wrapping, publish, and metadata heartbeats.
|
||||
|
||||
Class attributes that subclasses must define:
|
||||
name: Short identifier, e.g. "nws"
|
||||
display_name: Human-readable name for GUI
|
||||
description: Short description of the adapter
|
||||
settings_schema: Pydantic model class for adapter settings
|
||||
requires_api_key: Key alias if API key required, else None
|
||||
wizard_order: Order in setup wizard (None = not in wizard)
|
||||
default_cadence_s: Default polling interval in seconds
|
||||
"""
|
||||
|
||||
name: str # short identifier, e.g. "nws"
|
||||
name: str
|
||||
display_name: str
|
||||
description: str
|
||||
settings_schema: type[BaseModel]
|
||||
requires_api_key: str | None = None
|
||||
wizard_order: int | None = None
|
||||
default_cadence_s: int
|
||||
|
||||
@abstractmethod
|
||||
async def poll(self) -> AsyncIterator[Event]:
|
||||
|
|
@ -40,6 +57,16 @@ class SourceAdapter(ABC):
|
|||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def subject_for(self, event: Event) -> str:
|
||||
"""
|
||||
Compute the NATS subject for an event.
|
||||
|
||||
Each adapter knows its own subject hierarchy. The supervisor
|
||||
calls this to determine where to publish each event.
|
||||
"""
|
||||
...
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Optional lifecycle hook called before first poll."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ from tenacity import (
|
|||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -49,10 +51,23 @@ SEVERITY_MAP = {
|
|||
}
|
||||
|
||||
|
||||
class FIRMSSettings(BaseModel):
|
||||
"""Settings schema for FIRMS adapter."""
|
||||
api_key_alias: str = "firms"
|
||||
satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"]
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class FIRMSAdapter(SourceAdapter):
|
||||
"""NASA FIRMS fire hotspot adapter."""
|
||||
|
||||
name = "firms"
|
||||
display_name = "NASA FIRMS Fire Hotspots"
|
||||
description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS."
|
||||
settings_schema = FIRMSSettings
|
||||
requires_api_key = "firms"
|
||||
wizard_order = 2
|
||||
default_cadence_s = 300
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -116,6 +131,15 @@ class FIRMSAdapter(SourceAdapter):
|
|||
},
|
||||
)
|
||||
|
||||
def subject_for(self, event: Event) -> str:
|
||||
"""Compute NATS subject for a fire hotspot event.
|
||||
|
||||
Subject format: central.fire.hotspot.<satellite>.<confidence>
|
||||
The category already contains this structure.
|
||||
"""
|
||||
return f"central.{event.category}"
|
||||
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Initialize HTTP session, dedup tracker, and fetch API key."""
|
||||
# Fetch API key
|
||||
|
|
@ -417,14 +441,3 @@ class FIRMSAdapter(SourceAdapter):
|
|||
},
|
||||
)
|
||||
|
||||
|
||||
def subject_for_fire_hotspot(ev: Event) -> str:
|
||||
"""Compute the NATS subject for a fire hotspot event.
|
||||
|
||||
Subject format: central.fire.hotspot.<satellite>.<confidence>
|
||||
|
||||
The category already contains the satellite and confidence info,
|
||||
so we just prefix with 'central.'.
|
||||
"""
|
||||
# category is "fire.hotspot.<satellite>.<confidence>"
|
||||
return f"central.{ev.category}"
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ from tenacity import (
|
|||
|
||||
from central import __version__
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -189,10 +191,22 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]:
|
|||
return sorted(regions)
|
||||
|
||||
|
||||
class NWSSettings(BaseModel):
|
||||
"""Settings schema for NWS adapter."""
|
||||
contact_email: str = ""
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class NWSAdapter(SourceAdapter):
|
||||
"""National Weather Service alerts adapter."""
|
||||
|
||||
name = "nws"
|
||||
display_name = "NWS Weather Alerts"
|
||||
description = "National Weather Service active alerts via api.weather.gov."
|
||||
settings_schema = NWSSettings
|
||||
requires_api_key = None
|
||||
wizard_order = 1
|
||||
default_cadence_s = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -234,6 +248,35 @@ class NWSAdapter(SourceAdapter):
|
|||
},
|
||||
)
|
||||
|
||||
def subject_for(self, event: Event) -> str:
|
||||
"""Compute NATS subject for a weather alert.
|
||||
|
||||
Subject format: central.wx.alert.us.<state>.<type>.<code>
|
||||
where type is 'county' or 'zone' based on primary_region format.
|
||||
"""
|
||||
prefix = "central.wx"
|
||||
|
||||
if event.geo.primary_region is None:
|
||||
return f"{prefix}.alert.us.unknown"
|
||||
|
||||
region = event.geo.primary_region
|
||||
|
||||
# Parse US-<STATE>-<CODE> format
|
||||
parts = region.split("-")
|
||||
if len(parts) < 3 or parts[0] != "US":
|
||||
return f"{prefix}.alert.us.unknown"
|
||||
|
||||
state = parts[1].lower()
|
||||
code = "-".join(parts[2:]) # Handle multi-part names
|
||||
|
||||
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
|
||||
# Zone code like Z033
|
||||
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
|
||||
else:
|
||||
# County name
|
||||
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
|
||||
|
||||
|
||||
def _geometry_intersects_region(self, geometry: dict[str, Any] | None) -> bool:
|
||||
"""Check if feature geometry intersects configured region bbox.
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ from tenacity import (
|
|||
)
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from pydantic import BaseModel
|
||||
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.config_store import ConfigStore
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -60,10 +62,22 @@ def magnitude_to_severity(mag: float) -> int:
|
|||
return 5
|
||||
|
||||
|
||||
class USGSQuakeSettings(BaseModel):
|
||||
"""Settings schema for USGS quake adapter."""
|
||||
feed: str = "all_hour"
|
||||
region: RegionConfig | None = None
|
||||
|
||||
|
||||
class USGSQuakeAdapter(SourceAdapter):
|
||||
"""USGS Earthquake Hazards Program adapter."""
|
||||
|
||||
name = "usgs_quake"
|
||||
display_name = "USGS Earthquakes"
|
||||
description = "USGS earthquake feed (configurable window)."
|
||||
settings_schema = USGSQuakeSettings
|
||||
requires_api_key = None
|
||||
wizard_order = 3
|
||||
default_cadence_s = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
|
@ -398,3 +412,9 @@ class USGSQuakeAdapter(SourceAdapter):
|
|||
new_count += 1
|
||||
|
||||
logger.info("USGS quake yielded events", extra={"count": new_count})
|
||||
|
||||
def subject_for(self, event: Event) -> str:
|
||||
"""Return NATS subject for quake event."""
|
||||
return f"central.{event.category}"
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -2197,95 +2197,90 @@ async def api_keys_delete(
|
|||
return RedirectResponse(url="/api-keys", status_code=302)
|
||||
|
||||
|
||||
@router.get("/events.json")
|
||||
async def events_json(request: Request):
|
||||
|
||||
|
||||
# --- Events query helper ---
|
||||
|
||||
class EventsQueryResult:
|
||||
"""Result from events query."""
|
||||
def __init__(self, events: list, next_cursor: str | None, error: str | None = None):
|
||||
self.events = events
|
||||
self.next_cursor = next_cursor
|
||||
self.error = error
|
||||
|
||||
|
||||
def _parse_events_params(params) -> tuple[dict | None, str | None]:
|
||||
"""
|
||||
Paginated, filterable JSON endpoint for events.
|
||||
|
||||
Query parameters (all optional):
|
||||
adapter: filter by adapter name
|
||||
category: filter by event category
|
||||
since: ISO 8601 datetime - events where time >= since
|
||||
until: ISO 8601 datetime - events where time < until
|
||||
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
|
||||
limit: page size (default 50, max 200)
|
||||
cursor: opaque pagination cursor
|
||||
|
||||
Parse and validate events query parameters.
|
||||
|
||||
Returns:
|
||||
{"events": [...], "next_cursor": string or null}
|
||||
(parsed_params, error_message)
|
||||
If error_message is not None, parsed_params is None.
|
||||
"""
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
params = request.query_params
|
||||
|
||||
# Parse and validate limit
|
||||
limit_str = params.get("limit", "50")
|
||||
try:
|
||||
limit = int(limit_str)
|
||||
except ValueError:
|
||||
return JSONResponse(
|
||||
{"error": f"Invalid limit value: {limit_str}"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, f"Invalid limit value: {limit_str}"
|
||||
|
||||
if limit < 1 or limit > 200:
|
||||
return JSONResponse(
|
||||
{"error": "limit must be between 1 and 200"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, "limit must be between 1 and 200"
|
||||
|
||||
# Parse adapter filter
|
||||
adapter = params.get("adapter")
|
||||
|
||||
# Parse category filter
|
||||
if adapter == "":
|
||||
adapter = None
|
||||
|
||||
# Parse category filter
|
||||
category = params.get("category")
|
||||
|
||||
if category == "":
|
||||
category = None
|
||||
|
||||
# Parse since/until filters
|
||||
since = None
|
||||
until = None
|
||||
|
||||
|
||||
since_str = params.get("since")
|
||||
if since_str:
|
||||
try:
|
||||
since = datetime.fromisoformat(since_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return JSONResponse(
|
||||
{"error": f"Invalid ISO 8601 datetime for since: {since_str}"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, f"Invalid ISO 8601 datetime for since: {since_str}"
|
||||
|
||||
until_str = params.get("until")
|
||||
if until_str:
|
||||
try:
|
||||
until = datetime.fromisoformat(until_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return JSONResponse(
|
||||
{"error": f"Invalid ISO 8601 datetime for until: {until_str}"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, f"Invalid ISO 8601 datetime for until: {until_str}"
|
||||
|
||||
# Validate since <= until
|
||||
if since and until and since > until:
|
||||
return JSONResponse(
|
||||
{"error": "since must be before or equal to until"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, "since must be before or equal to until"
|
||||
|
||||
# Parse region bbox
|
||||
region_north = params.get("region_north")
|
||||
region_south = params.get("region_south")
|
||||
region_east = params.get("region_east")
|
||||
region_west = params.get("region_west")
|
||||
|
||||
|
||||
# Treat empty strings as None
|
||||
if region_north == "":
|
||||
region_north = None
|
||||
if region_south == "":
|
||||
region_south = None
|
||||
if region_east == "":
|
||||
region_east = None
|
||||
if region_west == "":
|
||||
region_west = None
|
||||
|
||||
region_params = [region_north, region_south, region_east, region_west]
|
||||
region_supplied = [p for p in region_params if p is not None]
|
||||
|
||||
|
||||
if len(region_supplied) > 0 and len(region_supplied) < 4:
|
||||
return JSONResponse(
|
||||
{"error": "Region filter requires all four parameters: region_north, region_south, region_east, region_west"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, "Region filter requires all four parameters: region_north, region_south, region_east, region_west"
|
||||
|
||||
bbox = None
|
||||
if len(region_supplied) == 4:
|
||||
try:
|
||||
|
|
@ -2296,16 +2291,13 @@ async def events_json(request: Request):
|
|||
"west": float(region_west),
|
||||
}
|
||||
except ValueError:
|
||||
return JSONResponse(
|
||||
{"error": "Region parameters must be valid numbers"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
return None, "Region parameters must be valid numbers"
|
||||
|
||||
# Parse cursor
|
||||
cursor_time = None
|
||||
cursor_id = None
|
||||
cursor_str = params.get("cursor")
|
||||
|
||||
|
||||
if cursor_str:
|
||||
try:
|
||||
decoded = base64.b64decode(cursor_str).decode("utf-8")
|
||||
|
|
@ -2315,59 +2307,82 @@ async def events_json(request: Request):
|
|||
cursor_time = datetime.fromisoformat(parts[0])
|
||||
cursor_id = parts[1]
|
||||
except Exception:
|
||||
return JSONResponse(
|
||||
{"error": "Invalid cursor"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
# Get database pool after validation
|
||||
return None, "Invalid cursor"
|
||||
|
||||
return {
|
||||
"limit": limit,
|
||||
"adapter": adapter,
|
||||
"category": category,
|
||||
"since": since,
|
||||
"until": until,
|
||||
"bbox": bbox,
|
||||
"cursor_time": cursor_time,
|
||||
"cursor_id": cursor_id,
|
||||
}, None
|
||||
|
||||
|
||||
async def _fetch_events(parsed_params: dict) -> EventsQueryResult:
|
||||
"""
|
||||
Fetch events from database using parsed parameters.
|
||||
|
||||
Returns EventsQueryResult with events list, next_cursor, and optional error.
|
||||
"""
|
||||
pool = get_pool()
|
||||
|
||||
|
||||
limit = parsed_params["limit"]
|
||||
adapter = parsed_params["adapter"]
|
||||
category = parsed_params["category"]
|
||||
since = parsed_params["since"]
|
||||
until = parsed_params["until"]
|
||||
bbox = parsed_params["bbox"]
|
||||
cursor_time = parsed_params["cursor_time"]
|
||||
cursor_id = parsed_params["cursor_id"]
|
||||
|
||||
# Build query
|
||||
conditions = []
|
||||
query_params = []
|
||||
param_idx = 1
|
||||
|
||||
|
||||
if adapter:
|
||||
conditions.append(f"adapter = ${param_idx}")
|
||||
query_params.append(adapter)
|
||||
param_idx += 1
|
||||
|
||||
|
||||
if category:
|
||||
conditions.append(f"category = ${param_idx}")
|
||||
query_params.append(category)
|
||||
param_idx += 1
|
||||
|
||||
|
||||
if since:
|
||||
conditions.append(f"time >= ${param_idx}")
|
||||
query_params.append(since)
|
||||
param_idx += 1
|
||||
|
||||
|
||||
if until:
|
||||
conditions.append(f"time < ${param_idx}")
|
||||
query_params.append(until)
|
||||
param_idx += 1
|
||||
|
||||
|
||||
if bbox:
|
||||
conditions.append(
|
||||
f"ST_Intersects(geom, ST_MakeEnvelope(${param_idx}, ${param_idx+1}, ${param_idx+2}, ${param_idx+3}, 4326))"
|
||||
)
|
||||
query_params.extend([bbox["west"], bbox["south"], bbox["east"], bbox["north"]])
|
||||
param_idx += 4
|
||||
|
||||
|
||||
if cursor_time and cursor_id:
|
||||
conditions.append(f"(time, id) < (${param_idx}, ${param_idx+1})")
|
||||
query_params.append(cursor_time)
|
||||
query_params.append(cursor_id)
|
||||
param_idx += 2
|
||||
|
||||
|
||||
where_clause = ""
|
||||
if conditions:
|
||||
where_clause = "WHERE " + " AND ".join(conditions)
|
||||
|
||||
|
||||
# Fetch limit+1 to check for next page
|
||||
query = f"""
|
||||
SELECT
|
||||
SELECT
|
||||
id,
|
||||
time,
|
||||
received,
|
||||
|
|
@ -2383,29 +2398,26 @@ async def events_json(request: Request):
|
|||
LIMIT ${param_idx}
|
||||
"""
|
||||
query_params.append(limit + 1)
|
||||
|
||||
|
||||
try:
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(query, *query_params)
|
||||
except Exception as e:
|
||||
logger.error(f"Database error in events_json: {e}")
|
||||
return JSONResponse(
|
||||
{"error": "Database error"},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
logger.error(f"Database error in _fetch_events: {e}")
|
||||
return EventsQueryResult([], None, "Database error")
|
||||
|
||||
# Check if there is a next page
|
||||
has_next = len(rows) > limit
|
||||
if has_next:
|
||||
rows = rows[:limit]
|
||||
|
||||
|
||||
# Build response
|
||||
events = []
|
||||
for row in rows:
|
||||
geometry = None
|
||||
if row["geometry"]:
|
||||
geometry = json.loads(row["geometry"])
|
||||
|
||||
|
||||
events.append({
|
||||
"id": row["id"],
|
||||
"time": row["time"].isoformat(),
|
||||
|
|
@ -2417,15 +2429,196 @@ async def events_json(request: Request):
|
|||
"data": dict(row["data"]) if row["data"] else {},
|
||||
"regions": list(row["regions"]) if row["regions"] else [],
|
||||
})
|
||||
|
||||
|
||||
# Build next_cursor if there are more results
|
||||
next_cursor = None
|
||||
if has_next and events:
|
||||
last_event = rows[-1]
|
||||
cursor_data = f"{last_event['time'].isoformat()}|{last_event['id']}"
|
||||
next_cursor = base64.b64encode(cursor_data.encode("utf-8")).decode("utf-8")
|
||||
|
||||
|
||||
return EventsQueryResult(events, next_cursor)
|
||||
|
||||
|
||||
def _geometry_summary(geometry: dict | None) -> str:
|
||||
"""Generate a human-readable summary of a geometry."""
|
||||
if not geometry:
|
||||
return "None"
|
||||
|
||||
geom_type = geometry.get("type", "Unknown")
|
||||
|
||||
if geom_type == "Point":
|
||||
return "Point"
|
||||
elif geom_type == "LineString":
|
||||
coords = geometry.get("coordinates", [])
|
||||
return f"Line ({len(coords)} pts)"
|
||||
elif geom_type == "Polygon":
|
||||
coords = geometry.get("coordinates", [[]])
|
||||
if coords:
|
||||
return f"Polygon ({len(coords[0])} pts)"
|
||||
return "Polygon"
|
||||
elif geom_type == "MultiPolygon":
|
||||
coords = geometry.get("coordinates", [])
|
||||
return f"MultiPolygon ({len(coords)} parts)"
|
||||
else:
|
||||
return geom_type
|
||||
|
||||
|
||||
|
||||
@router.get("/events.json")
|
||||
async def events_json(request: Request):
|
||||
"""
|
||||
Paginated, filterable JSON endpoint for events.
|
||||
|
||||
Query parameters (all optional):
|
||||
adapter: filter by adapter name
|
||||
category: filter by event category
|
||||
since: ISO 8601 datetime - events where time >= since
|
||||
until: ISO 8601 datetime - events where time < until
|
||||
region_north, region_south, region_east, region_west: bbox filter (all four required if any)
|
||||
limit: page size (default 50, max 200)
|
||||
cursor: opaque pagination cursor
|
||||
|
||||
Returns:
|
||||
{"events": [...], "next_cursor": string or null}
|
||||
"""
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
params = request.query_params
|
||||
|
||||
# Parse and validate parameters using shared helper
|
||||
parsed, error = _parse_events_params(params)
|
||||
if error:
|
||||
return JSONResponse({"error": error}, status_code=400)
|
||||
|
||||
# Fetch events using shared helper
|
||||
result = await _fetch_events(parsed)
|
||||
if result.error:
|
||||
return JSONResponse({"error": result.error}, status_code=500)
|
||||
|
||||
return JSONResponse({
|
||||
"events": events,
|
||||
"next_cursor": next_cursor,
|
||||
"events": result.events,
|
||||
"next_cursor": result.next_cursor,
|
||||
})
|
||||
|
||||
|
||||
# --- Events feed frontend routes ---
|
||||
|
||||
@router.get("/events", response_class=HTMLResponse)
|
||||
async def events_list(request: Request) -> HTMLResponse:
|
||||
"""Events feed page with filter form, table, and map."""
|
||||
templates = _get_templates()
|
||||
operator = getattr(request.state, "operator", None)
|
||||
csrf_token = getattr(request.state, "csrf_token", "")
|
||||
|
||||
params = request.query_params
|
||||
|
||||
# Parse parameters
|
||||
parsed, error = _parse_events_params(params)
|
||||
|
||||
# Get system settings for map tiles
|
||||
pool = get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
system_row = await conn.fetchrow("SELECT map_tile_url, map_attribution FROM config.system")
|
||||
|
||||
tile_url = system_row["map_tile_url"] if system_row else "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
|
||||
tile_attribution = system_row["map_attribution"] if system_row else "OpenStreetMap"
|
||||
|
||||
# Prepare filter values for template
|
||||
filter_values = {
|
||||
"adapter": params.get("adapter", ""),
|
||||
"category": params.get("category", ""),
|
||||
"since": params.get("since", ""),
|
||||
"until": params.get("until", ""),
|
||||
"region_north": params.get("region_north", ""),
|
||||
"region_south": params.get("region_south", ""),
|
||||
"region_east": params.get("region_east", ""),
|
||||
"region_west": params.get("region_west", ""),
|
||||
"limit": params.get("limit", "50"),
|
||||
}
|
||||
|
||||
events = []
|
||||
next_cursor = None
|
||||
|
||||
if error:
|
||||
# Validation error - show error banner but don't fail the page
|
||||
pass
|
||||
else:
|
||||
# Fetch events
|
||||
result = await _fetch_events(parsed)
|
||||
if result.error:
|
||||
error = result.error
|
||||
else:
|
||||
events = result.events
|
||||
next_cursor = result.next_cursor
|
||||
|
||||
# Add geometry summary to each event
|
||||
for event in events:
|
||||
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="events_list.html",
|
||||
context={
|
||||
"operator": operator,
|
||||
"csrf_token": csrf_token,
|
||||
"events": events,
|
||||
"next_cursor": next_cursor,
|
||||
"filter_values": filter_values,
|
||||
"filter_error": error,
|
||||
"tile_url": tile_url,
|
||||
"tile_attribution": tile_attribution,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/events/rows", response_class=HTMLResponse)
|
||||
async def events_rows(request: Request) -> HTMLResponse:
|
||||
"""HTMX fragment: events table rows only (no page chrome)."""
|
||||
templates = _get_templates()
|
||||
|
||||
params = request.query_params
|
||||
|
||||
# Parse parameters
|
||||
parsed, error = _parse_events_params(params)
|
||||
|
||||
# Prepare filter values for template
|
||||
filter_values = {
|
||||
"adapter": params.get("adapter", ""),
|
||||
"category": params.get("category", ""),
|
||||
"since": params.get("since", ""),
|
||||
"until": params.get("until", ""),
|
||||
"region_north": params.get("region_north", ""),
|
||||
"region_south": params.get("region_south", ""),
|
||||
"region_east": params.get("region_east", ""),
|
||||
"region_west": params.get("region_west", ""),
|
||||
"limit": params.get("limit", "50"),
|
||||
}
|
||||
|
||||
events = []
|
||||
next_cursor = None
|
||||
|
||||
if error:
|
||||
pass
|
||||
else:
|
||||
result = await _fetch_events(parsed)
|
||||
if result.error:
|
||||
error = result.error
|
||||
else:
|
||||
events = result.events
|
||||
next_cursor = result.next_cursor
|
||||
|
||||
# Add geometry summary to each event
|
||||
for event in events:
|
||||
event["geometry_summary"] = _geometry_summary(event.get("geometry"))
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request=request,
|
||||
name="_events_rows.html",
|
||||
context={
|
||||
"events": events,
|
||||
"next_cursor": next_cursor,
|
||||
"filter_values": filter_values,
|
||||
"filter_error": error,
|
||||
},
|
||||
)
|
||||
|
|
|
|||
73
src/central/gui/templates/_events_rows.html
Normal file
73
src/central/gui/templates/_events_rows.html
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
{% if filter_error %}
|
||||
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
|
||||
<strong>Filter Error:</strong> {{ filter_error }}
|
||||
</article>
|
||||
{% endif %}
|
||||
|
||||
{% if events %}
|
||||
<table class="events-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th style="width: 2rem;"></th>
|
||||
<th>Time</th>
|
||||
<th>Adapter</th>
|
||||
<th>Category</th>
|
||||
<th>Geometry</th>
|
||||
<th>Subject</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for event in events %}
|
||||
<tr class="event-row" data-row-idx="{{ loop.index0 }}"
|
||||
data-event-id="{{ event.id }}"
|
||||
data-adapter="{{ event.adapter }}"
|
||||
data-category="{{ event.category }}"
|
||||
data-time="{{ event.time }}"
|
||||
data-subject="{{ event.subject or '' }}"
|
||||
{% if event.geometry %}data-geometry='{{ event.geometry | tojson }}'{% endif %}>
|
||||
<td><button type="button" class="expand-row" aria-label="Expand">▸</button></td>
|
||||
<td>{{ event.time }}</td>
|
||||
<td>{{ event.adapter }}</td>
|
||||
<td>{{ event.category }}</td>
|
||||
<td>{{ event.geometry_summary }}</td>
|
||||
<td>{{ event.subject or '—' }}</td>
|
||||
</tr>
|
||||
<tr class="event-detail" hidden>
|
||||
<td colspan="6">
|
||||
<dl class="event-detail-list">
|
||||
<dt>Event ID</dt>
|
||||
<dd><code>{{ event.id }}</code></dd>
|
||||
<dt>Received</dt>
|
||||
<dd>{{ event.received }}</dd>
|
||||
{% if event.regions %}
|
||||
<dt>Regions</dt>
|
||||
<dd>{{ event.regions | join(", ") }}</dd>
|
||||
{% endif %}
|
||||
<dt>Data</dt>
|
||||
<dd><pre class="event-data-pre">{{ event.data | tojson(indent=2) }}</pre></dd>
|
||||
</dl>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
<div class="pagination-info">
|
||||
<span>Showing {{ events | length }} event{{ 's' if events | length != 1 else '' }}.</span>
|
||||
{% if next_cursor %}
|
||||
<a href="/events?cursor={{ next_cursor }}{% if filter_values.adapter %}&adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&region_north={{ filter_values.region_north }}&region_south={{ filter_values.region_south }}&region_east={{ filter_values.region_east }}&region_west={{ filter_values.region_west }}{% endif %}&limit={{ filter_values.limit }}"
|
||||
role="button"
|
||||
hx-get="/events/rows?cursor={{ next_cursor }}{% if filter_values.adapter %}&adapter={{ filter_values.adapter }}{% endif %}{% if filter_values.category %}&category={{ filter_values.category | urlencode }}{% endif %}{% if filter_values.since %}&since={{ filter_values.since }}{% endif %}{% if filter_values.until %}&until={{ filter_values.until }}{% endif %}{% if filter_values.region_north %}&region_north={{ filter_values.region_north }}&region_south={{ filter_values.region_south }}&region_east={{ filter_values.region_east }}&region_west={{ filter_values.region_west }}{% endif %}&limit={{ filter_values.limit }}"
|
||||
hx-target="#events-rows"
|
||||
hx-push-url="true">
|
||||
Next →
|
||||
</a>
|
||||
{% else %}
|
||||
<span><em>End of results</em></span>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% else %}
|
||||
<article>
|
||||
<p>No events match the filters.</p>
|
||||
</article>
|
||||
{% endif %}
|
||||
|
|
@ -17,6 +17,7 @@
|
|||
{% if operator %}
|
||||
<li><a href="/">Dashboard</a></li>
|
||||
<li><a href="/adapters">Adapters</a></li>
|
||||
<li><a href="/events">Events</a></li>
|
||||
<li><a href="/streams">Streams</a></li>
|
||||
<li><a href="/api-keys">API Keys</a></li>
|
||||
<li>{{ operator.username }}</li>
|
||||
|
|
|
|||
437
src/central/gui/templates/events_list.html
Normal file
437
src/central/gui/templates/events_list.html
Normal file
|
|
@ -0,0 +1,437 @@
|
|||
{% extends "base.html" %}
|
||||
|
||||
{% block title %}Events - Central{% endblock %}
|
||||
|
||||
{% block head %}
|
||||
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
|
||||
<style>
|
||||
#events-map {
|
||||
height: 400px;
|
||||
margin-bottom: 0.5rem;
|
||||
border-radius: var(--pico-border-radius);
|
||||
}
|
||||
.map-controls {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
margin-bottom: 1rem;
|
||||
}
|
||||
.map-legend {
|
||||
display: flex;
|
||||
gap: 1rem;
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
.map-legend-item {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 0.25rem;
|
||||
}
|
||||
.map-legend-swatch {
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
border-radius: 3px;
|
||||
border: 1px solid rgba(0,0,0,0.2);
|
||||
}
|
||||
#fit-to-results {
|
||||
padding: 0.25rem 0.75rem;
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
.events-table {
|
||||
font-size: 0.9rem;
|
||||
}
|
||||
.events-table td {
|
||||
vertical-align: middle;
|
||||
}
|
||||
.events-table tr.event-row:hover {
|
||||
background-color: var(--pico-primary-focus);
|
||||
cursor: pointer;
|
||||
}
|
||||
.events-table tr.event-row.highlighted {
|
||||
background-color: var(--pico-primary-background);
|
||||
}
|
||||
.expand-row {
|
||||
background: none;
|
||||
border: none;
|
||||
padding: 0.25rem;
|
||||
cursor: pointer;
|
||||
font-size: 0.9rem;
|
||||
color: var(--pico-color);
|
||||
}
|
||||
.expand-row:hover {
|
||||
color: var(--pico-primary);
|
||||
}
|
||||
.event-detail td {
|
||||
background-color: var(--pico-card-background-color);
|
||||
padding: 1rem;
|
||||
}
|
||||
.event-detail-list {
|
||||
display: grid;
|
||||
grid-template-columns: auto 1fr;
|
||||
gap: 0.5rem 1rem;
|
||||
margin: 0;
|
||||
}
|
||||
.event-detail-list dt {
|
||||
font-weight: 600;
|
||||
color: var(--pico-muted-color);
|
||||
}
|
||||
.event-detail-list dd {
|
||||
margin: 0;
|
||||
}
|
||||
.event-data-pre {
|
||||
max-height: 300px;
|
||||
overflow: auto;
|
||||
font-size: 0.8rem;
|
||||
background: var(--pico-code-background-color);
|
||||
padding: 0.5rem;
|
||||
border-radius: var(--pico-border-radius);
|
||||
margin: 0;
|
||||
}
|
||||
.filter-form .grid {
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
.filter-form label {
|
||||
margin-bottom: 0.25rem;
|
||||
}
|
||||
.filter-form input, .filter-form select {
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
.pagination-info {
|
||||
margin-top: 1rem;
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
}
|
||||
</style>
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Events</h1>
|
||||
|
||||
{% if filter_error %}
|
||||
<article aria-label="Filter Error" style="background-color: var(--pico-del-color); padding: 1rem; margin-bottom: 1rem;">
|
||||
<strong>Filter Error:</strong> {{ filter_error }}
|
||||
</article>
|
||||
{% endif %}
|
||||
|
||||
<details open>
|
||||
<summary>Filters</summary>
|
||||
<form id="filter-form" class="filter-form" action="/events" method="get"
|
||||
hx-get="/events/rows" hx-target="#events-rows" hx-push-url="true">
|
||||
|
||||
<div class="grid">
|
||||
<div>
|
||||
<label for="adapter">Adapter</label>
|
||||
<select id="adapter" name="adapter">
|
||||
<option value="">All</option>
|
||||
<option value="nws" {% if filter_values.adapter == 'nws' %}selected{% endif %}>nws</option>
|
||||
<option value="firms" {% if filter_values.adapter == 'firms' %}selected{% endif %}>firms</option>
|
||||
<option value="usgs_quake" {% if filter_values.adapter == 'usgs_quake' %}selected{% endif %}>usgs_quake</option>
|
||||
</select>
|
||||
</div>
|
||||
<div>
|
||||
<label for="category">Category</label>
|
||||
<input type="text" id="category" name="category" placeholder="Exact match"
|
||||
value="{{ filter_values.category }}">
|
||||
</div>
|
||||
<div>
|
||||
<label for="since">From</label>
|
||||
<input type="datetime-local" id="since" name="since"
|
||||
value="{{ filter_values.since }}">
|
||||
</div>
|
||||
<div>
|
||||
<label for="until">To</label>
|
||||
<input type="datetime-local" id="until" name="until"
|
||||
value="{{ filter_values.until }}">
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Hidden region inputs (managed by map viewport) -->
|
||||
<input type="hidden" id="region_north" name="region_north" value="{{ filter_values.region_north }}">
|
||||
<input type="hidden" id="region_south" name="region_south" value="{{ filter_values.region_south }}">
|
||||
<input type="hidden" id="region_east" name="region_east" value="{{ filter_values.region_east }}">
|
||||
<input type="hidden" id="region_west" name="region_west" value="{{ filter_values.region_west }}">
|
||||
|
||||
<input type="hidden" name="limit" value="{{ filter_values.limit }}">
|
||||
|
||||
<div style="display: flex; gap: 0.5rem; margin-top: 0.5rem;">
|
||||
<button type="submit">Apply</button>
|
||||
<a href="/events" role="button" class="outline">Clear Filters</a>
|
||||
</div>
|
||||
</form>
|
||||
</details>
|
||||
|
||||
<div id="events-map"></div>
|
||||
<div class="map-controls">
|
||||
<div class="map-legend">
|
||||
<div class="map-legend-item">
|
||||
<div class="map-legend-swatch" style="background-color: #f59e0b;"></div>
|
||||
<span>NWS (Weather)</span>
|
||||
</div>
|
||||
<div class="map-legend-item">
|
||||
<div class="map-legend-swatch" style="background-color: #dc2626;"></div>
|
||||
<span>FIRMS (Fire)</span>
|
||||
</div>
|
||||
<div class="map-legend-item">
|
||||
<div class="map-legend-swatch" style="background-color: #7c3aed;"></div>
|
||||
<span>USGS (Quake)</span>
|
||||
</div>
|
||||
</div>
|
||||
<button type="button" id="fit-to-results" class="outline secondary">Fit map to results</button>
|
||||
</div>
|
||||
|
||||
<div id="events-rows">
|
||||
{% include "_events_rows.html" %}
|
||||
</div>
|
||||
|
||||
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
|
||||
<script>
|
||||
(function() {
|
||||
var tileUrl = {{ tile_url | tojson }};
|
||||
var tileAttr = {{ tile_attribution | tojson }};
|
||||
|
||||
// Adapter color mapping
|
||||
var ADAPTER_COLORS = {
|
||||
"nws": "#f59e0b",
|
||||
"firms": "#dc2626",
|
||||
"usgs_quake": "#7c3aed"
|
||||
};
|
||||
|
||||
function getAdapterColor(adapter) {
|
||||
return ADAPTER_COLORS[adapter] || "#3388ff";
|
||||
}
|
||||
|
||||
// Initialize map
|
||||
var map = L.map("events-map").setView([39, -98], 4);
|
||||
|
||||
L.tileLayer(tileUrl, {
|
||||
attribution: tileAttr,
|
||||
maxZoom: 18
|
||||
}).addTo(map);
|
||||
|
||||
// Layer group for event geometries
|
||||
var eventLayerGroup = L.layerGroup().addTo(map);
|
||||
var highlightedRow = null;
|
||||
var highlightedLayer = null;
|
||||
var isInitialLoad = true;
|
||||
var programmaticMove = false;
|
||||
|
||||
// Region input elements
|
||||
var northInput = document.getElementById("region_north");
|
||||
var southInput = document.getElementById("region_south");
|
||||
var eastInput = document.getElementById("region_east");
|
||||
var westInput = document.getElementById("region_west");
|
||||
|
||||
// Viewport-driven filter with debounce
|
||||
var viewportDebounceTimer = null;
|
||||
|
||||
map.on("moveend", function() {
|
||||
if (programmaticMove) {
|
||||
programmaticMove = false;
|
||||
return;
|
||||
}
|
||||
if (viewportDebounceTimer) clearTimeout(viewportDebounceTimer);
|
||||
viewportDebounceTimer = setTimeout(applyViewportFilter, 400);
|
||||
});
|
||||
|
||||
function applyViewportFilter() {
|
||||
var bounds = map.getBounds();
|
||||
northInput.value = bounds.getNorth().toFixed(4);
|
||||
southInput.value = bounds.getSouth().toFixed(4);
|
||||
eastInput.value = bounds.getEast().toFixed(4);
|
||||
westInput.value = bounds.getWest().toFixed(4);
|
||||
htmx.trigger(document.getElementById("filter-form"), "submit");
|
||||
}
|
||||
|
||||
function buildPopup(row) {
|
||||
var adapter = row.dataset.adapter || "";
|
||||
var category = row.dataset.category || "";
|
||||
var time = row.dataset.time ? new Date(row.dataset.time).toLocaleString() : "";
|
||||
var subject = row.dataset.subject || "";
|
||||
var eventId = row.dataset.eventId || "";
|
||||
|
||||
var html = "<strong>" + adapter + "</strong><br>" +
|
||||
category + "<br>" +
|
||||
"<small>" + time + "</small>";
|
||||
if (subject) {
|
||||
html += "<br><em>" + subject + "</em>";
|
||||
}
|
||||
html += '<br><a href="#" data-show-row="' + eventId + '">View details ▾</a>';
|
||||
return html;
|
||||
}
|
||||
|
||||
function rebindEventLayers() {
|
||||
eventLayerGroup.clearLayers();
|
||||
|
||||
var rows = document.querySelectorAll("#events-rows tr.event-row[data-geometry]");
|
||||
|
||||
rows.forEach(function(row) {
|
||||
var geomStr = row.dataset.geometry;
|
||||
if (!geomStr || geomStr === "") return;
|
||||
|
||||
try {
|
||||
var geom = JSON.parse(geomStr);
|
||||
if (!geom) return;
|
||||
|
||||
var adapter = row.dataset.adapter || "";
|
||||
var color = getAdapterColor(adapter);
|
||||
|
||||
var layer = L.geoJSON(geom, {
|
||||
style: {
|
||||
color: color,
|
||||
weight: 2,
|
||||
fillColor: color,
|
||||
fillOpacity: 0.25
|
||||
},
|
||||
pointToLayer: function(feature, latlng) {
|
||||
return L.circleMarker(latlng, {
|
||||
radius: 8,
|
||||
color: color,
|
||||
weight: 2,
|
||||
fillColor: color,
|
||||
fillOpacity: 0.25
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
layer.bindPopup(buildPopup(row));
|
||||
layer.on("click", function() {
|
||||
highlightRow(row, layer, color);
|
||||
});
|
||||
|
||||
layer.addTo(eventLayerGroup);
|
||||
|
||||
// Store reference for row highlighting
|
||||
row._mapLayer = layer;
|
||||
row._mapColor = color;
|
||||
} catch (e) {
|
||||
console.error("Error parsing geometry:", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function highlightRow(row, layer, originalColor) {
|
||||
// Reset previous highlight
|
||||
if (highlightedRow) {
|
||||
highlightedRow.classList.remove("highlighted");
|
||||
}
|
||||
if (highlightedLayer && highlightedLayer._originalColor) {
|
||||
highlightedLayer.setStyle({
|
||||
color: highlightedLayer._originalColor,
|
||||
weight: 2,
|
||||
fillColor: highlightedLayer._originalColor,
|
||||
fillOpacity: 0.25
|
||||
});
|
||||
}
|
||||
|
||||
// Set new highlight
|
||||
row.classList.add("highlighted");
|
||||
highlightedRow = row;
|
||||
|
||||
if (layer) {
|
||||
layer._originalColor = originalColor;
|
||||
layer.setStyle({
|
||||
color: "#ff3333",
|
||||
weight: 4,
|
||||
fillColor: "#ff3333",
|
||||
fillOpacity: 0.4
|
||||
});
|
||||
highlightedLayer = layer;
|
||||
}
|
||||
}
|
||||
|
||||
function fitToAllLayers() {
|
||||
var layers = eventLayerGroup.getLayers();
|
||||
if (layers.length === 0) return;
|
||||
programmaticMove = true;
|
||||
var group = L.featureGroup(layers);
|
||||
map.fitBounds(group.getBounds(), { padding: [20, 20] });
|
||||
}
|
||||
|
||||
// Row click handler (event delegation)
|
||||
document.addEventListener("click", function(e) {
|
||||
// Expand/collapse handler
|
||||
if (e.target.classList.contains("expand-row")) {
|
||||
var row = e.target.closest("tr");
|
||||
var detail = row.nextElementSibling;
|
||||
if (detail && detail.classList.contains("event-detail")) {
|
||||
detail.hidden = !detail.hidden;
|
||||
e.target.innerHTML = detail.hidden ? "▸" : "▾";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Popup "View details" link handler
|
||||
if (e.target.matches("[data-show-row]")) {
|
||||
e.preventDefault();
|
||||
var eventId = e.target.dataset.showRow;
|
||||
var targetRow = document.querySelector(
|
||||
'tr[data-event-id="' + CSS.escape(eventId) + '"]'
|
||||
);
|
||||
if (!targetRow) return;
|
||||
targetRow.scrollIntoView({ behavior: "smooth", block: "center" });
|
||||
var detail = targetRow.nextElementSibling;
|
||||
if (detail && detail.classList.contains("event-detail")) {
|
||||
detail.hidden = false;
|
||||
var button = targetRow.querySelector(".expand-row");
|
||||
if (button) button.innerHTML = "▾";
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Row click to highlight (no auto-pan - user controls viewport)
|
||||
var row = e.target.closest("tr.event-row");
|
||||
if (row && row._mapLayer) {
|
||||
highlightRow(row, row._mapLayer, row._mapColor);
|
||||
// Map stays where user put it
|
||||
}
|
||||
});
|
||||
|
||||
// Row hover handlers (event delegation)
|
||||
document.addEventListener("mouseenter", function(e) {
|
||||
var row = e.target.closest && e.target.closest("tr.event-row");
|
||||
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
|
||||
row._mapLayer.setStyle({
|
||||
color: row._mapColor,
|
||||
weight: 3,
|
||||
fillColor: row._mapColor,
|
||||
fillOpacity: 0.25
|
||||
});
|
||||
}
|
||||
}, true);
|
||||
|
||||
document.addEventListener("mouseleave", function(e) {
|
||||
var row = e.target.closest && e.target.closest("tr.event-row");
|
||||
if (row && row._mapLayer && row._mapLayer !== highlightedLayer) {
|
||||
row._mapLayer.setStyle({
|
||||
color: row._mapColor,
|
||||
weight: 2,
|
||||
fillColor: row._mapColor,
|
||||
fillOpacity: 0.25
|
||||
});
|
||||
}
|
||||
}, true);
|
||||
|
||||
// Fit to results button
|
||||
document.getElementById("fit-to-results").addEventListener("click", fitToAllLayers);
|
||||
|
||||
// Initial load - bind layers and fit bounds
|
||||
rebindEventLayers(); // Initial load only
|
||||
if (false) { // DISABLED: map never auto-fits
|
||||
fitToAllLayers();
|
||||
isInitialLoad = false;
|
||||
}
|
||||
|
||||
// Re-bind layers after HTMX swap (but do NOT fit bounds)
|
||||
document.body.addEventListener("htmx:afterSwap", function(evt) {
|
||||
if (evt.detail.target.id === "events-rows") {
|
||||
// rebindEventLayers(); // DISABLED: map shows all events, only table filters
|
||||
// Do NOT call fitToAllLayers - preserve user viewport
|
||||
}
|
||||
});
|
||||
|
||||
// Fix map rendering after container shows
|
||||
setTimeout(function() { map.invalidateSize(); }, 100);
|
||||
})();
|
||||
</script>
|
||||
|
||||
{% endblock %}
|
||||
|
|
@ -32,48 +32,3 @@ class Event(BaseModel):
|
|||
data: dict[str, Any] # adapter-specific payload
|
||||
|
||||
|
||||
def subject_for_event(ev: Event) -> str:
|
||||
"""
|
||||
Compute the NATS subject for an event based on its category.
|
||||
|
||||
Dispatch by category prefix:
|
||||
- fire.*: returns central.<category> directly
|
||||
- wx.*: uses weather alert subject logic
|
||||
|
||||
Weather alert subjects:
|
||||
central.wx.alert.us.<state_lower>.county.<county_lower>
|
||||
or
|
||||
central.wx.alert.us.<state_lower>.zone.<zone_lower>
|
||||
based on whether the primary_region encodes a county or a zone.
|
||||
|
||||
Fire hotspot subjects:
|
||||
central.fire.hotspot.<satellite>.<confidence>
|
||||
"""
|
||||
# Fire events: subject is just central.<category>
|
||||
if ev.category.startswith("fire."):
|
||||
return f"central.{ev.category}"
|
||||
|
||||
# Weather events: use geo-based subject logic
|
||||
prefix = "central.wx"
|
||||
|
||||
if ev.geo.primary_region is None:
|
||||
return f"{prefix}.alert.us.unknown"
|
||||
|
||||
region = ev.geo.primary_region
|
||||
|
||||
# Parse US-<STATE>-<CODE> format
|
||||
# County codes are like "Ada", "Canyon" (names)
|
||||
# Zone codes start with "Z" like "Z033"
|
||||
parts = region.split("-")
|
||||
if len(parts) < 3 or parts[0] != "US":
|
||||
return f"{prefix}.alert.us.unknown"
|
||||
|
||||
state = parts[1].lower()
|
||||
code = "-".join(parts[2:]) # Handle multi-part names like "Payette-Washington"
|
||||
|
||||
if code.startswith("Z") and len(code) >= 2 and code[1:].isdigit():
|
||||
# Zone code like Z033
|
||||
return f"{prefix}.alert.us.{state}.zone.{code.lower()}"
|
||||
else:
|
||||
# County name
|
||||
return f"{prefix}.alert.us.{state}.county.{code.lower()}"
|
||||
|
|
|
|||
|
|
@ -13,24 +13,40 @@ from typing import Any
|
|||
import nats
|
||||
from nats.js import JetStreamContext
|
||||
|
||||
import importlib
|
||||
import pkgutil
|
||||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapters.nws import NWSAdapter
|
||||
from central.adapters.firms import FIRMSAdapter
|
||||
from central.adapters.usgs_quake import USGSQuakeAdapter
|
||||
from central.cloudevents_wire import wrap_event
|
||||
from central.config_models import AdapterConfig
|
||||
from central.config_source import ConfigSource, DbConfigSource
|
||||
from central.config_store import ConfigStore
|
||||
from central.bootstrap_config import get_settings
|
||||
from central.models import subject_for_event
|
||||
from central.stream_manager import StreamManager
|
||||
import central.adapters
|
||||
|
||||
# Adapter registry - add new adapters here
|
||||
_ADAPTER_REGISTRY: dict[str, type[SourceAdapter]] = {
|
||||
"nws": NWSAdapter,
|
||||
"firms": FIRMSAdapter,
|
||||
"usgs_quake": USGSQuakeAdapter,
|
||||
}
|
||||
def discover_adapters() -> dict[str, type[SourceAdapter]]:
|
||||
"""Auto-discover adapter classes from central.adapters package."""
|
||||
registry: dict[str, type[SourceAdapter]] = {}
|
||||
for module_info in pkgutil.iter_modules(central.adapters.__path__):
|
||||
try:
|
||||
module = importlib.import_module(f"central.adapters.{module_info.name}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to import adapter module",
|
||||
extra={"module": module_info.name, "error": str(e)},
|
||||
)
|
||||
continue
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
if (
|
||||
isinstance(attr, type)
|
||||
and issubclass(attr, SourceAdapter)
|
||||
and attr is not SourceAdapter
|
||||
and hasattr(attr, "name")
|
||||
):
|
||||
registry[attr.name] = attr
|
||||
return registry
|
||||
|
||||
CURSOR_DB_PATH = Path("/var/lib/central/cursors.db")
|
||||
|
||||
|
|
@ -114,6 +130,7 @@ class Supervisor:
|
|||
self._config_store = config_store
|
||||
self._nats_url = nats_url
|
||||
self._cloudevents_config = cloudevents_config
|
||||
self._adapters = discover_adapters()
|
||||
self._nc: nats.NATS | None = None
|
||||
self._js: JetStreamContext | None = None
|
||||
self._stream_manager: StreamManager | None = None
|
||||
|
|
@ -161,7 +178,7 @@ class Supervisor:
|
|||
|
||||
def _create_adapter(self, config: AdapterConfig) -> SourceAdapter:
|
||||
"""Create an adapter instance based on config name."""
|
||||
cls = _ADAPTER_REGISTRY.get(config.name)
|
||||
cls = self._adapters.get(config.name)
|
||||
if cls is None:
|
||||
raise ValueError(f"Unknown adapter type: {config.name}")
|
||||
return cls(
|
||||
|
|
@ -232,7 +249,7 @@ class Supervisor:
|
|||
# Build CloudEvent (uses defaults if no config provided)
|
||||
envelope, msg_id = wrap_event(event, self._cloudevents_config)
|
||||
|
||||
subject = subject_for_event(event)
|
||||
subject = state.adapter.subject_for(event)
|
||||
|
||||
# Publish
|
||||
await self._publish_event(subject, envelope, msg_id)
|
||||
|
|
|
|||
686
tests/test_events_feed_frontend.py
Normal file
686
tests/test_events_feed_frontend.py
Normal file
|
|
@ -0,0 +1,686 @@
|
|||
"""Tests for events feed frontend routes."""
|
||||
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from central.gui.routes import events_list, events_rows, events_json
|
||||
|
||||
|
||||
class TestEventsFeedFrontendAuthenticated:
|
||||
"""Test events feed frontend with authentication."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_no_filters_returns_html(self):
|
||||
"""GET /events authenticated, no filters returns HTML with events."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": f"event_{i}",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"adapter": "nws",
|
||||
"category": "Weather Alert",
|
||||
"subject": f"Test Alert {i}",
|
||||
"geometry": '{"type": "Point", "coordinates": [-122.4, 37.8]}' if i % 2 == 0 else None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
}
|
||||
for i in range(5)
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
context = call_args.kwargs.get("context", call_args[1].get("context"))
|
||||
assert "events" in context
|
||||
assert context["filter_error"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_adapter_filter(self):
|
||||
"""GET /events?adapter=nws returns only nws events."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {"adapter": "nws"}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "nws_event_1",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "NWS Alert",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
assert context["filter_values"]["adapter"] == "nws"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_since_until_filter(self):
|
||||
"""GET /events?since=...&until=... filters by time window."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {
|
||||
"since": "2026-05-17T00:00:00",
|
||||
"until": "2026-05-17T12:00:00",
|
||||
}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "in_range",
|
||||
"time": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 6, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "In Range",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
# Verify filter was actually parsed and passed to template
|
||||
mock_templates.TemplateResponse.assert_called_once()
|
||||
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
|
||||
context = call_kwargs.get("context", call_kwargs)
|
||||
assert context["filter_values"]["since"] == "2026-05-17T00:00:00"
|
||||
assert context["filter_values"]["until"] == "2026-05-17T12:00:00"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_region_filter(self):
|
||||
"""GET /events with full region bbox filters by location."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {
|
||||
"region_north": "49.5",
|
||||
"region_south": "31",
|
||||
"region_east": "-102",
|
||||
"region_west": "-124.5",
|
||||
}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "in_bbox",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "In BBox",
|
||||
"geometry": '{"type": "Point", "coordinates": [-120, 40]}',
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
# Verify region filter was actually parsed and passed to template
|
||||
mock_templates.TemplateResponse.assert_called_once()
|
||||
call_kwargs = mock_templates.TemplateResponse.call_args.kwargs
|
||||
context = call_kwargs.get("context", call_kwargs)
|
||||
assert context["filter_values"]["region_north"] == "49.5"
|
||||
assert context["filter_values"]["region_south"] == "31"
|
||||
assert context["filter_values"]["region_east"] == "-102"
|
||||
assert context["filter_values"]["region_west"] == "-124.5"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_partial_region_shows_error_banner(self):
|
||||
"""GET /events with partial region shows error banner, not 400."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {"region_north": "49"}
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
# Should be 200, not 400
|
||||
assert result.status_code == 200
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
assert context["filter_error"] is not None
|
||||
assert "region" in context["filter_error"].lower()
|
||||
# Events should be empty due to validation error
|
||||
assert context["events"] == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_with_limit_shows_next_button(self):
|
||||
"""GET /events?limit=5 shows Next button when more events exist."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf_token"
|
||||
mock_request.query_params = {"limit": "5"}
|
||||
|
||||
# Return 6 events (limit+1) to trigger pagination
|
||||
mock_events = [
|
||||
{
|
||||
"id": f"event_{i}",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": f"Event {i}",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
}
|
||||
for i in range(6)
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
assert context["next_cursor"] is not None
|
||||
assert len(context["events"]) == 5 # Should be trimmed to limit
|
||||
|
||||
|
||||
class TestEventsRowsFragment:
|
||||
"""Test /events/rows HTMX fragment."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_events_rows_returns_fragment(self):
|
||||
"""GET /events/rows returns table fragment, not full page."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.query_params = {"limit": "5"}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "event_1",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "Event 1",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_rows(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
# Verify it uses the fragment template
|
||||
call_args = mock_templates.TemplateResponse.call_args
|
||||
assert call_args.kwargs.get("name") == "_events_rows.html"
|
||||
|
||||
|
||||
class TestGeometrySummary:
|
||||
"""Test geometry summary function."""
|
||||
|
||||
def test_geometry_summary_polygon(self):
|
||||
"""Polygon geometry shows point count."""
|
||||
from central.gui.routes import _geometry_summary
|
||||
|
||||
geom = {
|
||||
"type": "Polygon",
|
||||
"coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]
|
||||
}
|
||||
summary = _geometry_summary(geom)
|
||||
assert "Polygon" in summary
|
||||
assert "5 pts" in summary
|
||||
|
||||
def test_geometry_summary_point(self):
|
||||
"""Point geometry shows 'Point'."""
|
||||
from central.gui.routes import _geometry_summary
|
||||
|
||||
geom = {"type": "Point", "coordinates": [-122.4, 37.8]}
|
||||
summary = _geometry_summary(geom)
|
||||
assert summary == "Point"
|
||||
|
||||
def test_geometry_summary_linestring(self):
|
||||
"""LineString geometry shows point count."""
|
||||
from central.gui.routes import _geometry_summary
|
||||
|
||||
geom = {
|
||||
"type": "LineString",
|
||||
"coordinates": [[-122, 37], [-121, 38], [-120, 39]]
|
||||
}
|
||||
summary = _geometry_summary(geom)
|
||||
assert "Line" in summary
|
||||
assert "3 pts" in summary
|
||||
|
||||
def test_geometry_summary_none(self):
|
||||
"""None geometry shows 'None'."""
|
||||
from central.gui.routes import _geometry_summary
|
||||
|
||||
summary = _geometry_summary(None)
|
||||
assert summary == "None"
|
||||
|
||||
|
||||
class TestDataGeometryAttribute:
|
||||
"""Test that rows have valid geometry data attributes."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_with_geometry_has_valid_json(self):
|
||||
"""Events with geometry have parseable JSON in data-geometry."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.query_params = {}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "geom_event",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "With Geometry",
|
||||
"geometry": '{"type": "Polygon", "coordinates": [[[-122, 37], [-122, 38], [-121, 38], [-121, 37], [-122, 37]]]}',
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_rows(mock_request)
|
||||
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
event = context["events"][0]
|
||||
# Geometry should be parsed dict, not string
|
||||
assert isinstance(event["geometry"], dict)
|
||||
assert event["geometry"]["type"] == "Polygon"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_without_geometry_has_none(self):
|
||||
"""Events without geometry have None for geometry field."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.query_params = {}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "no_geom_event",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": "No Geometry",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_rows(mock_request)
|
||||
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
event = context["events"][0]
|
||||
assert event["geometry"] is None
|
||||
|
||||
|
||||
class TestCrossEndpointParity:
|
||||
"""Test that /events.json and /events return the same filtered results."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_category_filter_both_endpoints(self):
|
||||
"""Category filter works on both /events.json and /events."""
|
||||
mock_events = [
|
||||
{
|
||||
"id": "weather_event",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "nws",
|
||||
"category": "Weather Alert",
|
||||
"subject": "Weather Event",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
query_params = {"category": "Weather Alert"}
|
||||
|
||||
# Test /events.json
|
||||
json_request = MagicMock()
|
||||
json_request.state.operator = MagicMock(id=1, username="admin")
|
||||
json_request.query_params = query_params
|
||||
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
json_response = await events_json(json_request)
|
||||
|
||||
json_data = json.loads(json_response.body)
|
||||
assert len(json_data["events"]) == 1
|
||||
assert json_data["events"][0]["category"] == "Weather Alert"
|
||||
|
||||
# Test /events
|
||||
html_request = MagicMock()
|
||||
html_request.state.operator = MagicMock(id=1, username="admin")
|
||||
html_request.state.csrf_token = "test_csrf"
|
||||
html_request.query_params = query_params
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
await events_list(html_request)
|
||||
|
||||
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
assert len(html_context["events"]) == 1
|
||||
assert html_context["events"][0]["category"] == "Weather Alert"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cursor_pagination_both_endpoints(self):
|
||||
"""Cursor pagination works identically on both endpoints."""
|
||||
first_page = [
|
||||
{
|
||||
"id": f"event_{i}",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc) - timedelta(hours=i),
|
||||
"adapter": "nws",
|
||||
"category": "Alert",
|
||||
"subject": f"Event {i}",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
}
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = first_page
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
json_request = MagicMock()
|
||||
json_request.state.operator = MagicMock(id=1, username="admin")
|
||||
json_request.query_params = {"limit": "2"}
|
||||
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
json_response = await events_json(json_request)
|
||||
|
||||
json_data = json.loads(json_response.body)
|
||||
json_cursor = json_data["next_cursor"]
|
||||
assert json_cursor is not None
|
||||
|
||||
html_request = MagicMock()
|
||||
html_request.state.operator = MagicMock(id=1, username="admin")
|
||||
html_request.state.csrf_token = "test_csrf"
|
||||
html_request.query_params = {"limit": "2"}
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
mock_conn.fetch.return_value = first_page
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
await events_list(html_request)
|
||||
|
||||
html_context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
html_cursor = html_context["next_cursor"]
|
||||
|
||||
assert json_cursor == html_cursor
|
||||
|
||||
|
||||
class TestErrorSemantics:
|
||||
"""Test error handling differences between JSON and HTML endpoints."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_json_endpoint_returns_400_on_invalid_limit(self):
|
||||
"""/events.json?limit=0 returns 400 JSON error."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.query_params = {"limit": "0"}
|
||||
|
||||
response = await events_json(mock_request)
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.body)
|
||||
assert "error" in data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_html_endpoint_returns_200_with_error_banner(self):
|
||||
"""/events?limit=0 returns 200 HTML with error banner."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf"
|
||||
mock_request.query_params = {"limit": "0"}
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
assert context["filter_error"] is not None
|
||||
assert "limit" in context["filter_error"].lower()
|
||||
assert context["events"] == []
|
||||
|
||||
|
||||
class TestEventRowDataAttributes:
|
||||
"""Test that _events_rows.html renders required data attributes."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_row_renders_data_adapter_attribute(self):
|
||||
"""Event rows include data-adapter attribute for color coding."""
|
||||
mock_request = MagicMock()
|
||||
mock_request.state.operator = MagicMock(id=1, username="admin")
|
||||
mock_request.state.csrf_token = "test_csrf"
|
||||
mock_request.query_params = {}
|
||||
|
||||
mock_events = [
|
||||
{
|
||||
"id": "test1",
|
||||
"time": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"received": datetime(2026, 5, 17, 12, 0, tzinfo=timezone.utc),
|
||||
"adapter": "usgs_quake",
|
||||
"category": "quake.event",
|
||||
"subject": "M4.2 Earthquake",
|
||||
"geometry": None,
|
||||
"data": {},
|
||||
"regions": [],
|
||||
},
|
||||
]
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.fetch.return_value = mock_events
|
||||
mock_conn.fetchrow.return_value = {
|
||||
"map_tile_url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
|
||||
"map_attribution": "OpenStreetMap",
|
||||
}
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_pool.acquire.return_value.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_templates = MagicMock()
|
||||
mock_templates.TemplateResponse.return_value = MagicMock(status_code=200)
|
||||
|
||||
with patch("central.gui.routes._get_templates", return_value=mock_templates):
|
||||
with patch("central.gui.routes.get_pool", return_value=mock_pool):
|
||||
result = await events_list(mock_request)
|
||||
|
||||
assert result.status_code == 200
|
||||
mock_templates.TemplateResponse.assert_called_once()
|
||||
context = mock_templates.TemplateResponse.call_args.kwargs.get("context")
|
||||
# The template receives events with adapter field for data-adapter attribute
|
||||
assert len(context["events"]) == 1
|
||||
assert context["events"][0]["adapter"] == "usgs_quake"
|
||||
assert context["events"][0]["category"] == "quake.event"
|
||||
assert context["events"][0]["subject"] == "M4.2 Earthquake"
|
||||
|
|
@ -10,7 +10,6 @@ from central.adapters.firms import (
|
|||
FIRMSAdapter,
|
||||
CONFIDENCE_MAP,
|
||||
SATELLITE_SHORT,
|
||||
subject_for_fire_hotspot,
|
||||
)
|
||||
from central.config_models import AdapterConfig, RegionConfig
|
||||
from central.models import Event, Geo
|
||||
|
|
@ -285,7 +284,14 @@ class TestDeduplication:
|
|||
class TestSubjectGeneration:
|
||||
"""Test subject generation for fire hotspots."""
|
||||
|
||||
def test_subject_format(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_format(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = FIRMSAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test",
|
||||
adapter="firms",
|
||||
|
|
@ -296,10 +302,17 @@ class TestSubjectGeneration:
|
|||
data={},
|
||||
)
|
||||
|
||||
subject = subject_for_fire_hotspot(event)
|
||||
subject = adapter.subject_for(event)
|
||||
assert subject == "central.fire.hotspot.viirs_snpp.high"
|
||||
|
||||
def test_subject_nominal_confidence(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_nominal_confidence(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = FIRMSAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test",
|
||||
adapter="firms",
|
||||
|
|
@ -310,7 +323,7 @@ class TestSubjectGeneration:
|
|||
data={},
|
||||
)
|
||||
|
||||
subject = subject_for_fire_hotspot(event)
|
||||
subject = adapter.subject_for(event)
|
||||
assert subject == "central.fire.hotspot.viirs_noaa20.nominal"
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from datetime import datetime, timezone
|
|||
|
||||
import pytest
|
||||
|
||||
from central.models import Event, Geo, subject_for_event
|
||||
from central.models import Event, Geo
|
||||
from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config
|
||||
from central.cloudevents_wire import wrap_event
|
||||
|
||||
|
|
@ -57,47 +57,6 @@ def sample_config() -> Config:
|
|||
)
|
||||
|
||||
|
||||
class TestSubjectForEvent:
|
||||
"""Tests for subject_for_event helper."""
|
||||
|
||||
def test_county_subject(self, sample_event: Event) -> None:
|
||||
"""County codes produce county subject."""
|
||||
subject = subject_for_event(sample_event)
|
||||
assert subject == "central.wx.alert.us.id.county.ada"
|
||||
|
||||
def test_zone_subject(self, sample_geo: Geo) -> None:
|
||||
"""Zone codes produce zone subject."""
|
||||
geo = Geo(
|
||||
centroid=sample_geo.centroid,
|
||||
bbox=sample_geo.bbox,
|
||||
regions=["US-ID-Z033"],
|
||||
primary_region="US-ID-Z033",
|
||||
)
|
||||
event = Event(
|
||||
id="test-zone",
|
||||
adapter="nws",
|
||||
category="wx.alert.winter_storm_warning",
|
||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||
geo=geo,
|
||||
data={},
|
||||
)
|
||||
subject = subject_for_event(event)
|
||||
assert subject == "central.wx.alert.us.id.zone.z033"
|
||||
|
||||
def test_unknown_subject(self, sample_event: Event) -> None:
|
||||
"""Missing primary_region produces unknown subject."""
|
||||
geo = Geo(regions=[], primary_region=None)
|
||||
event = Event(
|
||||
id="test-unknown",
|
||||
adapter="nws",
|
||||
category="wx.alert.test",
|
||||
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
||||
geo=geo,
|
||||
data={},
|
||||
)
|
||||
subject = subject_for_event(event)
|
||||
assert subject == "central.wx.alert.us.unknown"
|
||||
|
||||
|
||||
class TestCloudEventsWire:
|
||||
"""Tests for CloudEvents wire format."""
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ from central.adapters.nws import (
|
|||
SEVERITY_MAP,
|
||||
)
|
||||
from central.config_models import AdapterConfig
|
||||
from central.models import subject_for_event
|
||||
|
||||
|
||||
# Sample NWS GeoJSON features for testing
|
||||
|
|
@ -272,7 +271,7 @@ class TestSubjectDerivation:
|
|||
def test_county_subject(self, adapter: NWSAdapter) -> None:
|
||||
event = adapter._normalize_feature(SAMPLE_FEATURE_ID)
|
||||
assert event is not None
|
||||
subject = subject_for_event(event)
|
||||
subject = adapter.subject_for(event)
|
||||
# Primary region should be alphabetically first
|
||||
# Could be county or zone depending on sort order
|
||||
assert subject.startswith("central.wx.alert.us.id.")
|
||||
|
|
@ -294,7 +293,7 @@ class TestSubjectDerivation:
|
|||
}
|
||||
event = adapter._normalize_feature(feature)
|
||||
assert event is not None
|
||||
subject = subject_for_event(event)
|
||||
subject = adapter.subject_for(event)
|
||||
assert "zone" in subject
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -200,76 +200,77 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
# Patch NWSAdapter to use our mock
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start supervisor (starts adapter)
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start supervisor (starts adapter)
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
|
||||
# Simulate completed poll 5 minutes ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
# Simulate completed poll 5 minutes ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# Re-enable adapter
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-enable adapter
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# The loop should detect that last_poll + cadence is in the past
|
||||
# and poll immediately. Let's verify by checking the wait time logic.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
# The loop should detect that last_poll + cadence is in the past
|
||||
# and poll immediately. Let's verify by checking the wait time logic.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60 # cadence = 60s
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
|
||||
# last_poll was 5 minutes ago, cadence is 60s
|
||||
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
|
||||
# wait_time should be 0 (poll immediately)
|
||||
assert wait_time == 0, (
|
||||
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
|
||||
f"last_poll was {saved_last_poll}, now is {now}"
|
||||
)
|
||||
# last_poll was 5 minutes ago, cadence is 60s
|
||||
# next_poll_at = 5_minutes_ago + 60s = 4_minutes_ago
|
||||
# wait_time should be 0 (poll immediately)
|
||||
assert wait_time == 0, (
|
||||
f"Expected immediate poll (wait=0), got wait={wait_time}s. "
|
||||
f"last_poll was {saved_last_poll}, now is {now}"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_enable_gap_shorter_than_cadence(
|
||||
|
|
@ -308,75 +309,76 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
saved_last_poll = state.last_completed_poll
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify stopped but state preserved (THIS IS THE KEY CHECK)
|
||||
# On unfixed code, state will be NONE because pop() removes it
|
||||
# On fixed code, state still exists with is_running=False
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None, (
|
||||
"State was removed on stop! This violates the rate-limit guarantee. "
|
||||
"State should be preserved to maintain last_completed_poll."
|
||||
)
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# Re-enable adapter (simulate 20 seconds later, but we're just
|
||||
# checking the rate limit logic)
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-enable adapter (simulate 20 seconds later, but we're just
|
||||
# checking the rate limit logic)
|
||||
reenabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(reenabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
# Verify restarted with preserved last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_last_poll
|
||||
|
||||
# The loop should detect that last_poll + cadence is still in the future
|
||||
# and wait until then.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
# The loop should detect that last_poll + cadence is still in the future
|
||||
# and wait until then.
|
||||
now = datetime.now(timezone.utc)
|
||||
next_poll_at = saved_last_poll.timestamp() + 60
|
||||
wait_time = max(0, next_poll_at - now.timestamp())
|
||||
|
||||
# last_poll was ~10 seconds ago, cadence is 60s
|
||||
# wait_time should be ~50s (60 - 10 = 50)
|
||||
assert 45 < wait_time < 55, (
|
||||
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
|
||||
f"Rate limit violated: poll would happen before last_poll + cadence"
|
||||
)
|
||||
# last_poll was ~10 seconds ago, cadence is 60s
|
||||
# wait_time should be ~50s (60 - 10 = 50)
|
||||
assert 45 < wait_time < 55, (
|
||||
f"Expected ~50s wait (respecting rate limit), got wait={wait_time}s. "
|
||||
f"Rate limit violated: poll would happen before last_poll + cadence"
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_delete_readd_fresh_state(
|
||||
|
|
@ -414,60 +416,61 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(initial_config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
# Simulate completed poll 10 seconds ago
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=10)
|
||||
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Disable adapter
|
||||
disabled_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=False,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(disabled_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# DELETE adapter from DB (remove from config source)
|
||||
config_source.set_adapter(None, name="nws")
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# DELETE adapter from DB (remove from config source)
|
||||
config_source.set_adapter(None, name="nws")
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify adapter fully removed
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
# Verify adapter fully removed
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
|
||||
# Re-add adapter with same name
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
# Re-add adapter with same name
|
||||
new_config = AdapterConfig(
|
||||
name="nws",
|
||||
enabled=True,
|
||||
cadence_s=60,
|
||||
settings={"states": ["ID"], "contact_email": "test@test.com"},
|
||||
paused_at=None,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
config_source.set_adapter(new_config)
|
||||
await supervisor._on_config_change("adapters", "nws")
|
||||
|
||||
# Verify new adapter started fresh
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
# last_completed_poll should be None (fresh adapter)
|
||||
assert state.last_completed_poll is None, (
|
||||
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
|
||||
f"Preserved state not cleared on delete."
|
||||
)
|
||||
# Verify new adapter started fresh
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert state is not None
|
||||
assert adapter_is_running(state)
|
||||
# last_completed_poll should be None (fresh adapter)
|
||||
assert state.last_completed_poll is None, (
|
||||
f"Expected None (fresh adapter), got {state.last_completed_poll}. "
|
||||
f"Preserved state not cleared on delete."
|
||||
)
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_preserves_state_start_reuses_it(
|
||||
|
|
@ -497,34 +500,35 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
# Start adapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
|
||||
saved_poll = state.last_completed_poll
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc) - timedelta(seconds=30)
|
||||
saved_poll = state.last_completed_poll
|
||||
|
||||
# Stop adapter
|
||||
await supervisor._stop_adapter("nws")
|
||||
# Stop adapter
|
||||
await supervisor._stop_adapter("nws")
|
||||
|
||||
# State should still exist
|
||||
assert "nws" in supervisor._adapter_states
|
||||
state = supervisor._adapter_states["nws"]
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
# State should still exist
|
||||
assert "nws" in supervisor._adapter_states
|
||||
state = supervisor._adapter_states["nws"]
|
||||
assert not adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
|
||||
# Restart adapter
|
||||
await supervisor._start_adapter(config)
|
||||
# Restart adapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
# Should reuse existing state
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
# Should reuse existing state
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
assert adapter_is_running(state)
|
||||
assert state.last_completed_poll == saved_poll
|
||||
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Cleanup
|
||||
supervisor._shutdown_event.set()
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_remove_adapter_clears_state(
|
||||
|
|
@ -554,14 +558,15 @@ class TestEnableDisableEnableIntegration:
|
|||
supervisor._nc = mock_nats
|
||||
supervisor._js = mock_nats.jetstream()
|
||||
|
||||
with patch("central.supervisor.NWSAdapter", MockNWSAdapter):
|
||||
await supervisor._start_adapter(config)
|
||||
# Inject mock adapter into supervisor's registry
|
||||
supervisor._adapters["nws"] = MockNWSAdapter
|
||||
await supervisor._start_adapter(config)
|
||||
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc)
|
||||
state = supervisor._adapter_states.get("nws")
|
||||
state.last_completed_poll = datetime.now(timezone.utc)
|
||||
|
||||
# Remove adapter
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
# Remove adapter
|
||||
await cleanup_adapter(supervisor, "nws")
|
||||
|
||||
# State should be gone
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
# State should be gone
|
||||
assert "nws" not in supervisor._adapter_states
|
||||
|
|
|
|||
|
|
@ -480,3 +480,122 @@ class TestApplyConfig:
|
|||
assert adapter._feed == "all_day"
|
||||
|
||||
await adapter.shutdown()
|
||||
|
||||
|
||||
|
||||
class TestSubjectFor:
|
||||
"""Test subject_for method for all magnitude tiers."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_minor(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-minor",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.minor",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=0,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.minor"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_light(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-light",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.light",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=1,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.light"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_moderate(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-moderate",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.moderate",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=2,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.moderate"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_strong(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-strong",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.strong",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=3,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.strong"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_major(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-major",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.major",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=4,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.major"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subject_great(self, temp_db_path, mock_config_store):
|
||||
config = make_adapter_config()
|
||||
adapter = USGSQuakeAdapter(
|
||||
config=config,
|
||||
config_store=mock_config_store,
|
||||
cursor_db_path=temp_db_path,
|
||||
)
|
||||
event = Event(
|
||||
id="test-great",
|
||||
adapter="usgs_quake",
|
||||
category="quake.event.great",
|
||||
time=datetime.now(timezone.utc),
|
||||
severity=5,
|
||||
geo=Geo(centroid=(-116.0, 45.0)),
|
||||
data={},
|
||||
)
|
||||
assert adapter.subject_for(event) == "central.quake.event.great"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue