diff --git a/sql/migrations/015_add_adapters_last_error.sql b/sql/migrations/015_add_adapters_last_error.sql new file mode 100644 index 0000000..1fcab49 --- /dev/null +++ b/sql/migrations/015_add_adapters_last_error.sql @@ -0,0 +1,6 @@ +-- Migration: 015_add_adapters_last_error +-- Adds last_error column for adapter-side error reporting. +-- Populated by supervisor when an adapter fails to start or apply config. + +ALTER TABLE config.adapters +ADD COLUMN IF NOT EXISTS last_error TEXT; diff --git a/sql/migrations/016_add_wfigs_adapters.sql b/sql/migrations/016_add_wfigs_adapters.sql new file mode 100644 index 0000000..25d45f6 --- /dev/null +++ b/sql/migrations/016_add_wfigs_adapters.sql @@ -0,0 +1,37 @@ +-- Migration: 016_add_wfigs_adapters +-- Add WFIGS incident and perimeter adapters to config.adapters +-- Idempotent: uses ON CONFLICT DO NOTHING + +-- WFIGS Incidents adapter +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'wfigs_incidents', + false, -- Ships disabled; operator enables via GUI + 300, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.0, + 'south', 31.0, + 'east', -102.0, + 'west', -124.0 + ) + ) +) +ON CONFLICT (name) DO NOTHING; + +-- WFIGS Perimeters adapter +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'wfigs_perimeters', + false, -- Ships disabled; operator enables via GUI + 300, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.0, + 'south', 31.0, + 'east', -102.0, + 'west', -124.0 + ) + ) +) +ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/017_add_inciweb_adapter.sql b/sql/migrations/017_add_inciweb_adapter.sql new file mode 100644 index 0000000..2ffaac7 --- /dev/null +++ b/sql/migrations/017_add_inciweb_adapter.sql @@ -0,0 +1,19 @@ +-- Migration: 017_add_inciweb_adapter +-- Add InciWeb adapter to config.adapters +-- Idempotent: uses ON CONFLICT DO NOTHING + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'inciweb', + false, -- Ships disabled; operator enables via GUI + 600, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.0, + 'south', 31.0, + 'east', -102.0, + 'west', -124.0 + ) + ) +) +ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/018_add_swpc_adapters.sql b/sql/migrations/018_add_swpc_adapters.sql new file mode 100644 index 0000000..29a28d8 --- /dev/null +++ b/sql/migrations/018_add_swpc_adapters.sql @@ -0,0 +1,11 @@ +-- Migration: 018_add_swpc_adapters +-- Add NOAA SWPC space weather adapters to config.adapters. +-- All three ship disabled; operator enables individually via GUI. +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES + ('swpc_alerts', false, 300, '{}'::jsonb), + ('swpc_kindex', false, 600, '{}'::jsonb), + ('swpc_protons', false, 600, '{}'::jsonb) +ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/019_add_central_space_stream.sql b/sql/migrations/019_add_central_space_stream.sql new file mode 100644 index 0000000..f249227 --- /dev/null +++ b/sql/migrations/019_add_central_space_stream.sql @@ -0,0 +1,8 @@ +-- Migration: 019_add_central_space_stream +-- Seeds the CENTRAL_SPACE JetStream stream row for central.space.> subjects. +-- 7-day retention, 1 GiB max_bytes (clamped by supervisor recompute) -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE. +-- Idempotent: uses ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_SPACE', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapter.py b/src/central/adapter.py index 276a9cf..0322d3e 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -34,6 +34,10 @@ class SourceAdapter(ABC): description: str settings_schema: type[BaseModel] requires_api_key: str | None = None + api_key_field: str | None = None + """Names the settings_schema field that holds an api_key alias reference, if any. + The GUI renders this field as a select populated from config.api_keys; + the wizard validates it against staged api_keys state.""" wizard_order: int | None = None default_cadence_s: int diff --git a/src/central/adapter_discovery.py b/src/central/adapter_discovery.py new file mode 100644 index 0000000..e26729f --- /dev/null +++ b/src/central/adapter_discovery.py @@ -0,0 +1,34 @@ +"""Adapter discovery utilities.""" + +import importlib +import logging +import pkgutil + +import central.adapters +from central.adapter import SourceAdapter + +logger = logging.getLogger(__name__) + + +def discover_adapters() -> dict[str, type[SourceAdapter]]: + """Auto-discover adapter classes from central.adapters package.""" + registry: dict[str, type[SourceAdapter]] = {} + for module_info in pkgutil.iter_modules(central.adapters.__path__): + try: + module = importlib.import_module(f"central.adapters.{module_info.name}") + except Exception as e: + logger.error( + "Failed to import adapter module", + extra={"module": module_info.name, "error": str(e)}, + ) + continue + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, SourceAdapter) + and attr is not SourceAdapter + and hasattr(attr, "name") + ): + registry[attr.name] = attr + return registry diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index 7538d96..c9b4efb 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -7,7 +7,7 @@ from collections.abc import AsyncIterator from datetime import datetime, timezone from io import StringIO from pathlib import Path -from typing import Any +from typing import Any, Literal import aiohttp from tenacity import ( @@ -54,7 +54,7 @@ SEVERITY_MAP = { class FIRMSSettings(BaseModel): """Settings schema for FIRMS adapter.""" api_key_alias: str = "firms" - satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + satellites: list[Literal["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT", "VIIRS_NOAA21_NRT"]] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] region: RegionConfig | None = None @@ -66,6 +66,7 @@ class FIRMSAdapter(SourceAdapter): description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." settings_schema = FIRMSSettings requires_api_key = "firms" + api_key_field = "api_key_alias" wizard_order = 2 default_cadence_s = 300 diff --git a/src/central/adapters/inciweb.py b/src/central/adapters/inciweb.py new file mode 100644 index 0000000..2ae0634 --- /dev/null +++ b/src/central/adapters/inciweb.py @@ -0,0 +1,477 @@ +"""InciWeb adapter for wildfire narrative updates.""" + +import html +import logging +import re +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from pathlib import Path +from typing import Any +from xml.etree import ElementTree as ET + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# InciWeb RSS feed URL +INCIWEB_RSS_URL = "https://inciweb.wildfire.gov/incidents/rss.xml" + +# State name to 2-letter code mapping +STATE_NAME_TO_CODE = { + "alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR", + "california": "CA", "colorado": "CO", "connecticut": "CT", "delaware": "DE", + "florida": "FL", "georgia": "GA", "hawaii": "HI", "idaho": "ID", + "illinois": "IL", "indiana": "IN", "iowa": "IA", "kansas": "KS", + "kentucky": "KY", "louisiana": "LA", "maine": "ME", "maryland": "MD", + "massachusetts": "MA", "michigan": "MI", "minnesota": "MN", "mississippi": "MS", + "missouri": "MO", "montana": "MT", "nebraska": "NE", "nevada": "NV", + "new hampshire": "NH", "new jersey": "NJ", "new mexico": "NM", "new york": "NY", + "north carolina": "NC", "north dakota": "ND", "ohio": "OH", "oklahoma": "OK", + "oregon": "OR", "pennsylvania": "PA", "rhode island": "RI", "south carolina": "SC", + "south dakota": "SD", "tennessee": "TN", "texas": "TX", "utah": "UT", + "vermont": "VT", "virginia": "VA", "washington": "WA", "west virginia": "WV", + "wisconsin": "WI", "wyoming": "WY", "district of columbia": "DC", + "puerto rico": "PR", "guam": "GU", "virgin islands": "VI", + "american samoa": "AS", "northern mariana islands": "MP", +} + + +def parse_coordinates_from_description(description: str) -> tuple[float, float] | None: + """ + Parse latitude/longitude from InciWeb description text. + + Format: "Latitude: 47° 3 17 Longitude: 91° 38 6" + InciWeb uses unsigned values for US coordinates (west longitude implied). + Returns (lon, lat) tuple or None if not found. + """ + # Pattern for degree/minute/second format + lat_pattern = r"Latitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)" + lon_pattern = r"Longitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)" + + lat_match = re.search(lat_pattern, description) + lon_match = re.search(lon_pattern, description) + + if not lat_match or not lon_match: + return None + + try: + lat_deg = int(lat_match.group(1)) + lat_min = int(lat_match.group(2)) + lat_sec = float(lat_match.group(3)) + + lon_deg = int(lon_match.group(1)) + lon_min = int(lon_match.group(2)) + lon_sec = float(lon_match.group(3)) + + # Convert to decimal degrees + # Latitude: positive in northern hemisphere + if lat_deg >= 0: + lat = lat_deg + lat_min / 60 + lat_sec / 3600 + else: + lat = lat_deg - lat_min / 60 - lat_sec / 3600 + + # Longitude: InciWeb gives unsigned values for US west longitudes + # Make negative for western hemisphere (US coordinates) + lon = lon_deg + lon_min / 60 + lon_sec / 3600 + if lon > 0: + lon = -lon # US longitudes are west (negative) + + return (lon, lat) + except (ValueError, TypeError): + return None + + +def parse_state_from_description(description: str) -> str | None: + """ + Parse state name from InciWeb description text. + + Format: "State: Minnesota" or "State: New Mexico" + Returns 2-letter state code or None if not found. + + Design note: State is parsed from the description rather than the title + because InciWeb titles use unit code prefixes (e.g., "MNMNS Stewart Trail", + "CACNP Santa Rosa Island Fire") which are not reliable state indicators. + The description has a structured "State: " field that reliably + identifies the state for all incidents. + """ + pattern = r"State:\s*([A-Za-z\s]+?)(?:\n|---|$)" + match = re.search(pattern, description) + + if not match: + return None + + state_name = match.group(1).strip().lower() + return STATE_NAME_TO_CODE.get(state_name) + + +def strip_html(html_text: str) -> str: + """ + Strip HTML tags and decode entities to plain text. + """ + # Decode HTML entities (handles & < > etc.) + text = html.unescape(html_text) + + # Handle   specifically (not a standard Python html entity) + text = text.replace(" ", " ") + text = text.replace("\xa0", " ") # Non-breaking space character + + # Remove HTML tags + text = re.sub(r"<[^>]+>", "", text) + + # Normalize whitespace + text = re.sub(r"\s+", " ", text) + + return text.strip() + + +def point_in_bbox( + lon: float, + lat: float, + west: float, + south: float, + east: float, + north: float, +) -> bool: + """Check if a point is within a bounding box.""" + return west <= lon <= east and south <= lat <= north + + +class InciWebSettings(BaseModel): + """Settings schema for InciWeb adapter.""" + + region: RegionConfig | None = None + + +class InciWebAdapter(SourceAdapter): + """NIFC InciWeb wildfire narrative adapter.""" + + name = "inciweb" + display_name = "NIFC InciWeb — Wildfire Narrative" + description = ( + "Narrative wildfire updates from InciWeb. Editorial; lower precision " + "than WFIGS. Use as supplementary context." + ) + settings_schema = InciWebSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Ships disabled + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + # Conditional fetch state + self._last_modified: str | None = None + self._etag: str | None = None + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def startup(self) -> None: + """Initialize HTTP session and SQLite connection.""" + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + + # Create table for dedup tracking + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + + logger.info( + "InciWeb adapter started", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + async def shutdown(self) -> None: + """Close HTTP session and SQLite connection.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("InciWeb adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + logger.info( + "InciWeb config updated", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def bump_last_seen(self, event_id: str) -> None: + """Bump the last_seen timestamp for an event.""" + if not self._db: + return + self._db.execute( + "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 14 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("InciWeb swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + """Compute NATS subject for an event.""" + state = event.geo.primary_region + if state and state.startswith("US-") and len(state) == 5: + state_code = state[3:].lower() + return f"central.fire.narrative.inciweb.{state_code}" + return "central.fire.narrative.inciweb.unknown" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_rss(self) -> list[dict[str, Any]]: + """Fetch and parse RSS feed from InciWeb.""" + if not self._session: + raise RuntimeError("Session not initialized") + + # Build request headers with conditional fetch support + headers = {"User-Agent": "Central/0.4"} + if self._last_modified: + headers["If-Modified-Since"] = self._last_modified + if self._etag: + headers["If-None-Match"] = self._etag + + async with self._session.get(INCIWEB_RSS_URL, headers=headers) as resp: + # Handle 304 Not Modified + if resp.status == 304: + logger.info("InciWeb not modified") + return [] + + resp.raise_for_status() + + # Capture conditional fetch headers for next request + self._last_modified = resp.headers.get("Last-Modified") + self._etag = resp.headers.get("ETag") + + content = await resp.text() + + # Parse RSS XML + items = [] + try: + root = ET.fromstring(content) + channel = root.find("channel") + if channel is None: + return [] + + for item_elem in channel.findall("item"): + item: dict[str, Any] = {} + + title = item_elem.find("title") + item["title"] = title.text if title is not None and title.text else "" + + link = item_elem.find("link") + item["link"] = link.text if link is not None and link.text else "" + + description = item_elem.find("description") + item["description"] = description.text if description is not None and description.text else "" + + pub_date = item_elem.find("pubDate") + item["pubDate"] = pub_date.text if pub_date is not None and pub_date.text else "" + + guid = item_elem.find("guid") + item["guid"] = guid.text if guid is not None and guid.text else "" + + # Check for dc:creator + creator = item_elem.find("{http://purl.org/dc/elements/1.1/}creator") + item["creator"] = creator.text if creator is not None and creator.text else "" + + items.append(item) + + except ET.ParseError as e: + logger.error("InciWeb RSS parse error", extra={"error": str(e)}) + raise + + logger.info( + "InciWeb fetch completed", + extra={"item_count": len(items)}, + ) + return items + + async def poll(self) -> AsyncIterator[Event]: + """Poll InciWeb for narrative updates.""" + if not self._db: + raise RuntimeError("Database not initialized") + + # Fetch RSS feed + try: + items = await self._fetch_rss() + except Exception as e: + logger.error("InciWeb fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + + for item in items: + guid = item.get("guid", "") + if not guid: + continue + + # Dedup: skip if already published + if self.is_published(guid): + self.bump_last_seen(guid) + continue + + description_html = item.get("description", "") + + # Parse coordinates from description + centroid = parse_coordinates_from_description(description_html) + + # Post-filter: skip if point outside region bbox + if self.region and centroid: + lon, lat = centroid + if not point_in_bbox( + lon, lat, + self.region.west, self.region.south, + self.region.east, self.region.north, + ): + continue + + # Parse state from description + state_code = parse_state_from_description(description_html) + + # Build regions + if state_code: + regions = [f"US-{state_code}"] + primary_region = f"US-{state_code}" + else: + regions = [] + primary_region = None + + # Parse pubDate (RFC 822 format) + pub_date_str = item.get("pubDate", "") + try: + event_time = parsedate_to_datetime(pub_date_str) + # Ensure UTC + if event_time.tzinfo is None: + event_time = event_time.replace(tzinfo=timezone.utc) + else: + event_time = event_time.astimezone(timezone.utc) + except (ValueError, TypeError): + event_time = datetime.now(timezone.utc) + + # Build geo + geo = Geo( + centroid=centroid, + bbox=(centroid[0], centroid[1], centroid[0], centroid[1]) if centroid else None, + regions=regions, + primary_region=primary_region, + ) + + # Strip HTML from description + description_plain = strip_html(description_html) + + # Build event + event = Event( + id=guid, + adapter=self.name, + category="fire.narrative.inciweb", + time=event_time, + severity=0, # Narrative; not authoritative + geo=geo, + data={ + "title": item.get("title", ""), + "description": description_plain, + "description_html": description_html, + "url": item.get("link", ""), + "guid": guid, + "raw": item, + }, + ) + + yield event + self.mark_published(guid) + events_yielded += 1 + + # Periodic cleanup of old entries + self.sweep_old_ids() + + logger.info( + "InciWeb poll completed", + extra={"events_yielded": events_yielded}, + ) diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index ce95d3a..8205a1f 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,7 +19,7 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter -from pydantic import BaseModel +from pydantic import BaseModel, Field from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore @@ -193,7 +193,11 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: class NWSSettings(BaseModel): """Settings schema for NWS adapter.""" - contact_email: str = "" + contact_email: str = Field( + default="", + pattern=r"^$|^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", + description="Contact email for NWS API User-Agent header", + ) region: RegionConfig | None = None diff --git a/src/central/adapters/swpc_alerts.py b/src/central/adapters/swpc_alerts.py new file mode 100644 index 0000000..3368824 --- /dev/null +++ b/src/central/adapters/swpc_alerts.py @@ -0,0 +1,186 @@ +"""NOAA SWPC space weather alerts adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_ALERTS_URL, + SWPCSettings, + parse_swpc_timestamp, + severity_from_alert_product_id, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCAlertsAdapter(SourceAdapter): + """NOAA SWPC space weather alerts adapter.""" + + name = "swpc_alerts" + display_name = "NOAA SWPC — Space Weather Alerts" + description = "Active NOAA SWPC space weather alerts, watches, warnings, and summaries." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC alerts adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC alerts adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC alerts config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC alerts swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + product_id = event.data.get("product_id") or "unknown" + return f"central.space.alert.{product_id.lower()}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_ALERTS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC alerts fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC alerts fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + product_id = item.get("product_id") + issue_dt_raw = item.get("issue_datetime") + if not product_id or not issue_dt_raw: + continue + + event_id = f"{product_id}|{issue_dt_raw}" + if self.is_published(event_id): + continue + + issue_dt = parse_swpc_timestamp(issue_dt_raw, "alerts") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.alert", + time=issue_dt, + severity=severity_from_alert_product_id(product_id), + geo=Geo(), + data={ + "product_id": product_id, + "issue_datetime": issue_dt_raw, + "message": item.get("message", ""), + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC alerts poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_common.py b/src/central/adapters/swpc_common.py new file mode 100644 index 0000000..004f965 --- /dev/null +++ b/src/central/adapters/swpc_common.py @@ -0,0 +1,81 @@ +"""Shared utilities for NOAA SWPC space weather adapters.""" + +import re +from datetime import datetime, timezone + +from pydantic import BaseModel + +SWPC_ALERTS_URL = "https://services.swpc.noaa.gov/products/alerts.json" +SWPC_KINDEX_URL = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json" +SWPC_PROTONS_URL = "https://services.swpc.noaa.gov/json/goes/primary/integral-protons-1-day.json" + + +class SWPCSettings(BaseModel): + """Settings schema for SWPC adapters. No operator-tunable knobs today.""" + + +def parse_swpc_timestamp(raw: str | None, endpoint_kind: str) -> datetime | None: + """Normalize SWPC timestamp strings to UTC datetime. + + endpoint_kind shapes: + alerts -> "2026-05-19 05:14:59.780" (space-separated, no TZ; UTC per message body) + kindex -> "2026-05-12T00:00:00" (ISO without TZ; UTC by convention) + protons -> "2026-05-18T05:35:00Z" (ISO with Z) + """ + if not raw: + return None + if endpoint_kind == "alerts": + try: + dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") + return dt.replace(tzinfo=timezone.utc) + if endpoint_kind == "kindex": + dt = datetime.fromisoformat(raw) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + if endpoint_kind == "protons": + raw_norm = raw[:-1] + "+00:00" if raw.endswith("Z") else raw + dt = datetime.fromisoformat(raw_norm) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + raise ValueError(f"unknown endpoint_kind: {endpoint_kind!r}") + + +def severity_from_kp(kp: float | int | None) -> int: + """Map planetary K-index value (0-9) to severity 0-4 via the G-scale. + + Kp 5 = G1 = severity 1, Kp 6 = G2 = severity 2, Kp 7 = G3 = severity 3, + Kp 8 = G4 = severity 4, Kp 9 = G5 = severity 4 (capped). + """ + if kp is None: + return 0 + if kp < 5: + return 0 + if kp < 6: + return 1 + if kp < 7: + return 2 + if kp < 8: + return 3 + return 4 + + +_ALERT_KP_PATTERN = re.compile(r"^K0([5-9])[AW]$") + + +def severity_from_alert_product_id(product_id: str | None) -> int: + """Best-effort severity for an alert from its product_id G-scale. + + Product IDs of form K0[5-9][AW] identify Kp-based geomagnetic storm + alerts and warnings (K05A=G1, K06A=G2, K07A=G3, K08A=G4, K09A=G5). + All other product IDs return 0. + """ + if not product_id: + return 0 + m = _ALERT_KP_PATTERN.match(product_id.upper()) + if not m: + return 0 + return severity_from_kp(int(m.group(1))) diff --git a/src/central/adapters/swpc_kindex.py b/src/central/adapters/swpc_kindex.py new file mode 100644 index 0000000..f05bcbb --- /dev/null +++ b/src/central/adapters/swpc_kindex.py @@ -0,0 +1,186 @@ +"""NOAA SWPC Planetary K-Index adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_KINDEX_URL, + SWPCSettings, + parse_swpc_timestamp, + severity_from_kp, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCKindexAdapter(SourceAdapter): + """NOAA SWPC planetary K-index adapter.""" + + name = "swpc_kindex" + display_name = "NOAA SWPC — Planetary K-Index" + description = "Planetary K-index measurements at 3-hour cadence from NOAA SWPC." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC kindex adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC kindex adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC kindex config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC kindex swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + return "central.space.kindex" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_KINDEX_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC kindex fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC kindex fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + time_tag = item.get("time_tag") + kp = item.get("Kp") + if not time_tag or kp is None: + continue + + event_id = time_tag + if self.is_published(event_id): + continue + + event_time = parse_swpc_timestamp(time_tag, "kindex") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.kindex", + time=event_time, + severity=severity_from_kp(kp), + geo=Geo(), + data={ + "time_tag": time_tag, + "Kp": kp, + "a_running": item.get("a_running"), + "station_count": item.get("station_count"), + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC kindex poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_protons.py b/src/central/adapters/swpc_protons.py new file mode 100644 index 0000000..1a3876e --- /dev/null +++ b/src/central/adapters/swpc_protons.py @@ -0,0 +1,185 @@ +"""NOAA SWPC GOES integral proton flux adapter.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.swpc_common import ( + SWPC_PROTONS_URL, + SWPCSettings, + parse_swpc_timestamp, +) +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + + +class SWPCProtonsAdapter(SourceAdapter): + """NOAA SWPC GOES integral proton flux adapter.""" + + name = "swpc_protons" + display_name = "NOAA SWPC — GOES Proton Flux" + description = "GOES primary satellite integral proton flux measurements (1-day window) from NOAA SWPC." + settings_schema = SWPCSettings + requires_api_key = None + api_key_field = None + wizard_order = None + default_cadence_s = 600 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + self._db.commit() + logger.info("SWPC protons adapter started") + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("SWPC protons adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + logger.info("SWPC protons config updated") + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("SWPC protons swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + return "central.space.proton_flux" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch(self) -> list[dict[str, Any]]: + if not self._session: + raise RuntimeError("Session not initialized") + async with self._session.get( + SWPC_PROTONS_URL, headers={"User-Agent": "Central/0.4"} + ) as resp: + resp.raise_for_status() + data = await resp.json() + logger.info("SWPC protons fetch completed", extra={"item_count": len(data)}) + return data + + async def poll(self) -> AsyncIterator[Event]: + if not self._db: + raise RuntimeError("Database not initialized") + + try: + items = await self._fetch() + except Exception as e: + logger.error("SWPC protons fetch failed", extra={"error": str(e)}) + raise + + events_yielded = 0 + for item in items: + time_tag = item.get("time_tag") + energy = item.get("energy") + if not time_tag or not energy: + continue + + event_id = f"{time_tag}|{energy}" + if self.is_published(event_id): + continue + + event_time = parse_swpc_timestamp(time_tag, "protons") or datetime.now(timezone.utc) + + event = Event( + id=event_id, + adapter=self.name, + category="space.proton_flux", + time=event_time, + severity=0, + geo=Geo(), + data={ + "time_tag": time_tag, + "satellite": item.get("satellite"), + "flux": item.get("flux"), + "energy": energy, + }, + ) + + yield event + self.mark_published(event_id) + events_yielded += 1 + + self.sweep_old_ids() + logger.info("SWPC protons poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index e73148f..63009ee 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -5,7 +5,7 @@ import sqlite3 from collections.abc import AsyncIterator from datetime import datetime, timezone from pathlib import Path -from typing import Any +from typing import Any, Literal import aiohttp from shapely.geometry import Point, box as shapely_box @@ -64,7 +64,7 @@ def magnitude_to_severity(mag: float) -> int: class USGSQuakeSettings(BaseModel): """Settings schema for USGS quake adapter.""" - feed: str = "all_hour" + feed: Literal["all_hour", "all_day", "all_week", "all_month"] = "all_hour" region: RegionConfig | None = None diff --git a/src/central/adapters/wfigs_common.py b/src/central/adapters/wfigs_common.py new file mode 100644 index 0000000..cdf2331 --- /dev/null +++ b/src/central/adapters/wfigs_common.py @@ -0,0 +1,242 @@ +"""Shared utilities for WFIGS (Wildland Fire Interagency Geospatial Services) adapters.""" + +import sqlite3 +from datetime import datetime, timezone +from typing import Any + +# WFIGS FeatureServer endpoints +WFIGS_INCIDENTS_URL = ( + "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" + "WFIGS_Incident_Locations_Current/FeatureServer/0/query" +) +WFIGS_PERIMETERS_URL = ( + "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" + "WFIGS_Interagency_Perimeters_Current/FeatureServer/0/query" +) + +# Fall-off sweep window: 14 days (matches WFIGS's longest fall-off: large fires) +FALLOFF_WINDOW_DAYS = 14 + +# Incident type code mappings (WFIGS uses 2-letter codes) +INCIDENT_TYPE_MAP = { + "WF": "wildfire", + "RX": "prescribed_fire", + "CX": "complex", + "FA": "false_alarm", +} + + +def normalize_state(state: str | None) -> str | None: + """Strip 'US-' prefix from POOState (ISO 3166-2 -> 2-letter).""" + if not state: + return None + if state.startswith("US-") and len(state) == 5: + return state[3:] + if len(state) == 2: + return state + return state # unknown shape, pass through + + +def normalize_incident_type(code: str | None) -> str: + """Map IncidentTypeCategory code to a readable name.""" + if not code: + return "unknown" + upper = code.upper() + if upper in INCIDENT_TYPE_MAP: + return INCIDENT_TYPE_MAP[upper] + return code.lower() + + +def severity_from_acres(acres: float | None) -> int: + """Map DailyAcres to severity level 0-4.""" + if acres is None or acres == 0: + return 0 + if acres < 10: + return 1 + if acres < 100: + return 2 + if acres < 1000: + return 3 + return 4 + + +def parse_wfigs_timestamp(epoch_ms: int | None) -> datetime | None: + """Parse WFIGS epoch milliseconds to UTC datetime.""" + if epoch_ms is None: + return None + return datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc) + + +def build_regions(state: str | None, county: str | None) -> tuple[list[str], str | None]: + """ + Build geo.regions list and primary_region from POOState and POOCounty. + + Expects normalized 2-letter state codes (e.g., "MT" not "US-MT"). + Returns (regions, primary_region). + """ + if not state: + return [], None + + state_upper = state.upper() + if county: + # Normalize county: remove spaces, uppercase + county_normalized = county.replace(" ", "_").upper() + region = f"US-{state_upper}-{county_normalized}" + return [region], region + else: + region = f"US-{state_upper}" + return [region], region + + +def subject_suffix(state: str | None, county: str | None) -> str: + """ + Build subject suffix from state and county. + + Expects normalized 2-letter state codes. + Returns lowercase state.county (county with spaces→underscores). + Falls back to "unknown" if state is not available. + """ + if not state: + return "unknown" + + state_lower = state.lower() + if county: + county_lower = county.lower().replace(" ", "_") + return f"{state_lower}.{county_lower}" + return state_lower + + +def init_observed_table(db: sqlite3.Connection) -> None: + """Create the wfigs_observed table if it doesn't exist.""" + db.execute(""" + CREATE TABLE IF NOT EXISTS wfigs_observed ( + layer TEXT NOT NULL, + irwin_id TEXT NOT NULL, + last_observed_at TEXT NOT NULL, + state TEXT, + county TEXT, + PRIMARY KEY (layer, irwin_id) + ) + """) + db.commit() + + +def get_observed_guids(db: sqlite3.Connection, layer: str) -> dict[str, tuple[str, str | None, str | None]]: + """ + Get all observed IRWIN GUIDs for a layer. + + Returns dict mapping irwin_id -> (last_observed_at, state, county). + """ + cursor = db.execute( + "SELECT irwin_id, last_observed_at, state, county FROM wfigs_observed WHERE layer = ?", + (layer,), + ) + return {row[0]: (row[1], row[2], row[3]) for row in cursor.fetchall()} + + +def update_observed( + db: sqlite3.Connection, + layer: str, + current_guids: dict[str, tuple[str | None, str | None]], +) -> None: + """ + Update the observed table with current poll's GUIDs. + + current_guids: dict mapping irwin_id -> (state, county) + """ + now_iso = datetime.now(timezone.utc).isoformat() + + # Use INSERT OR REPLACE to upsert + for irwin_id, (state, county) in current_guids.items(): + db.execute( + """ + INSERT OR REPLACE INTO wfigs_observed (layer, irwin_id, last_observed_at, state, county) + VALUES (?, ?, ?, ?, ?) + """, + (layer, irwin_id, now_iso, state, county), + ) + db.commit() + + +def delete_observed(db: sqlite3.Connection, layer: str, irwin_ids: set[str]) -> None: + """Delete fallen-off GUIDs from the observed table.""" + for irwin_id in irwin_ids: + db.execute( + "DELETE FROM wfigs_observed WHERE layer = ? AND irwin_id = ?", + (layer, irwin_id), + ) + db.commit() + + +def cleanup_old_observed(db: sqlite3.Connection, layer: str, days: int = FALLOFF_WINDOW_DAYS) -> None: + """Remove observed entries older than the sweep window.""" + cutoff = datetime.now(timezone.utc).isoformat() + db.execute( + f""" + DELETE FROM wfigs_observed + WHERE layer = ? + AND datetime(last_observed_at) < datetime(?, '-{days} days') + """, + (layer, cutoff), + ) + db.commit() + + +def point_in_bbox( + lon: float, + lat: float, + west: float, + south: float, + east: float, + north: float, +) -> bool: + """Check if a point is within a bounding box.""" + return west <= lon <= east and south <= lat <= north + + +def polygon_intersects_bbox( + geometry: dict[str, Any], + west: float, + south: float, + east: float, + north: float, +) -> bool: + """ + Check if a GeoJSON geometry intersects a bounding box. + + Uses shapely for accurate polygon intersection. + """ + try: + from shapely.geometry import box, shape + + bbox_polygon = box(west, south, east, north) + geom = shape(geometry) + return bbox_polygon.intersects(geom) + except Exception: + # If shapely fails, fall back to centroid check + if geometry.get("type") == "Point": + coords = geometry.get("coordinates", []) + if len(coords) >= 2: + return point_in_bbox(coords[0], coords[1], west, south, east, north) + return True # Include if we can't determine + + +def extract_centroid(geometry: dict[str, Any]) -> tuple[float, float] | None: + """Extract centroid from GeoJSON geometry.""" + if not geometry: + return None + + geom_type = geometry.get("type") + coords = geometry.get("coordinates") + + if geom_type == "Point" and coords and len(coords) >= 2: + return (coords[0], coords[1]) + + # For polygons, use shapely to compute centroid + try: + from shapely.geometry import shape + geom = shape(geometry) + centroid = geom.centroid + return (centroid.x, centroid.y) + except Exception: + return None diff --git a/src/central/adapters/wfigs_incidents.py b/src/central/adapters/wfigs_incidents.py new file mode 100644 index 0000000..660d1de --- /dev/null +++ b/src/central/adapters/wfigs_incidents.py @@ -0,0 +1,383 @@ +"""WFIGS Incidents adapter for wildfire incident locations.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.wfigs_common import ( + WFIGS_INCIDENTS_URL, + build_regions, + cleanup_old_observed, + delete_observed, + extract_centroid, + get_observed_guids, + init_observed_table, + normalize_incident_type, + normalize_state, + parse_wfigs_timestamp, + point_in_bbox, + severity_from_acres, + subject_suffix, + update_observed, +) +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +LAYER_NAME = "incidents" + + +class WFIGSIncidentsSettings(BaseModel): + """Settings schema for WFIGS Incidents adapter.""" + + region: RegionConfig | None = None + + +class WFIGSIncidentsAdapter(SourceAdapter): + """NIFC WFIGS wildfire incidents adapter.""" + + name = "wfigs_incidents" + display_name = "NIFC WFIGS — Wildfire Incidents" + description = "Active wildfire incident locations from NIFC WFIGS." + settings_schema = WFIGSIncidentsSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Not in setup wizard + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._last_poll_time: datetime | None = None + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def startup(self) -> None: + """Initialize HTTP session and SQLite connection.""" + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + + # Create tables for dedup and fall-off tracking + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_observed_table(self._db) + self._db.commit() + + logger.info( + "WFIGS incidents adapter started", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + async def shutdown(self) -> None: + """Close HTTP session and SQLite connection.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("WFIGS incidents adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + logger.info( + "WFIGS incidents config updated", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def bump_last_seen(self, event_id: str) -> None: + """Bump the last_seen timestamp for an event.""" + if not self._db: + return + self._db.execute( + "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 14 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("WFIGS incidents swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + """Compute NATS subject for an event.""" + # Removal events have a different subject pattern + if event.category.startswith("fire.incident.removed"): + state = event.data.get("state", "").lower() or "unknown" + return f"central.fire.incident.removed.{state}" + + # Regular incidents: central.fire.incident.. + # POOState is already normalized (2-letter code) + state = event.data.get("POOState") + county = event.data.get("POOCounty") + suffix = subject_suffix(state, county) + return f"central.fire.incident.{suffix}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_features(self) -> list[dict[str, Any]]: + """Fetch features from WFIGS FeatureServer.""" + if not self._session: + raise RuntimeError("Session not initialized") + + # Build query params + params: dict[str, str] = { + "outFields": "*", + "returnGeometry": "true", + "f": "geojson", + } + + # Time filter: only fetch modified since last poll + if self._last_poll_time: + iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") + params["where"] = f"ModifiedOnDateTime > timestamp '{iso_time}'" + else: + params["where"] = "1=1" + + # Bbox filter if region configured + if self.region: + bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" + params["geometry"] = bbox + params["geometryType"] = "esriGeometryEnvelope" + params["spatialRel"] = "esriSpatialRelIntersects" + params["inSR"] = "4326" + + async with self._session.get(WFIGS_INCIDENTS_URL, params=params) as resp: + resp.raise_for_status() + data = await resp.json() + + features = data.get("features", []) + logger.info( + "WFIGS incidents fetch completed", + extra={"feature_count": len(features)}, + ) + return features + + async def poll(self) -> AsyncIterator[Event]: + """Poll WFIGS for incident updates.""" + if not self._db: + raise RuntimeError("Database not initialized") + + # Fetch features from upstream + try: + features = await self._fetch_features() + except Exception as e: + logger.error("WFIGS incidents fetch failed", extra={"error": str(e)}) + raise + + # Get previous poll's observed GUIDs for fall-off detection + observed_before = get_observed_guids(self._db, LAYER_NAME) + + # Process features and track current GUIDs + current_guids: dict[str, tuple[str | None, str | None]] = {} + events_yielded = 0 + + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + irwin_id = props.get("IrwinID") + if not irwin_id: + continue + + # Extract location + centroid = extract_centroid(geometry) + + # Post-filter: skip if outside region bbox + if self.region and centroid: + lon, lat = centroid + if not point_in_bbox( + lon, lat, + self.region.west, self.region.south, + self.region.east, self.region.north, + ): + continue + + # Normalize at parse boundary + state_raw = props.get("POOState") + state = normalize_state(state_raw) + county = props.get("POOCounty") + incident_type_raw = props.get("IncidentTypeCategory") + incident_type = normalize_incident_type(incident_type_raw) + + # Track this GUID as observed (for fall-off detection) + # Store normalized state for consistency + current_guids[irwin_id] = (state, county) + + # Parse fields + discovery_time = parse_wfigs_timestamp(props.get("FireDiscoveryDateTime")) + daily_acres = props.get("DailyAcres") + + # Build regions (expects normalized 2-letter state code) + regions, primary_region = build_regions(state, county) + + # Build geo + if centroid: + geo = Geo( + centroid=centroid, + bbox=(centroid[0], centroid[1], centroid[0], centroid[1]), + regions=regions, + primary_region=primary_region, + ) + else: + geo = Geo(regions=regions, primary_region=primary_region) + + # Build event with normalized values in data + event = Event( + id=irwin_id, + adapter=self.name, + category=f"fire.incident.{incident_type}", + time=discovery_time or datetime.now(timezone.utc), + severity=severity_from_acres(daily_acres), + geo=geo, + data={ + "IrwinID": irwin_id, + "IncidentName": props.get("IncidentName"), + "IncidentTypeCategory": incident_type, + "IncidentTypeCategory_raw": incident_type_raw, + "DailyAcres": daily_acres, + "PercentContained": props.get("PercentContained"), + "FireDiscoveryDateTime": props.get("FireDiscoveryDateTime"), + "ModifiedOnDateTime": props.get("ModifiedOnDateTime"), + "POOState": state, + "POOState_raw": state_raw, + "POOCounty": county, + "raw": props, + }, + ) + + yield event + events_yielded += 1 + + # Detect fall-offs: GUIDs in previous but not current + fallen_off = set(observed_before.keys()) - set(current_guids.keys()) + + for irwin_id in fallen_off: + last_observed, state, county = observed_before[irwin_id] + now = datetime.now(timezone.utc) + + removal_event = Event( + id=f"{irwin_id}:removed:{now.isoformat()}", + adapter=self.name, + category="fire.incident.removed", + time=now, + severity=0, + geo=Geo(), + data={ + "irwin_id": irwin_id, + "last_observed_at": last_observed, + "state": state, + "county": county, + "reason": "fallen_off_current_service", + }, + ) + + yield removal_event + events_yielded += 1 + logger.info( + "WFIGS incident fall-off detected", + extra={"irwin_id": irwin_id, "state": state}, + ) + + # Update observed table + update_observed(self._db, LAYER_NAME, current_guids) + delete_observed(self._db, LAYER_NAME, fallen_off) + + # Periodic cleanup of old entries + cleanup_old_observed(self._db, LAYER_NAME) + self.sweep_old_ids() + + # Update last poll time + self._last_poll_time = datetime.now(timezone.utc) + + logger.info( + "WFIGS incidents poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_guids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/src/central/adapters/wfigs_perimeters.py b/src/central/adapters/wfigs_perimeters.py new file mode 100644 index 0000000..669d635 --- /dev/null +++ b/src/central/adapters/wfigs_perimeters.py @@ -0,0 +1,397 @@ +"""WFIGS Perimeters adapter for wildfire perimeter polygons.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.wfigs_common import ( + WFIGS_PERIMETERS_URL, + build_regions, + cleanup_old_observed, + delete_observed, + extract_centroid, + get_observed_guids, + init_observed_table, + normalize_incident_type, + normalize_state, + parse_wfigs_timestamp, + polygon_intersects_bbox, + severity_from_acres, + subject_suffix, + update_observed, +) +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +LAYER_NAME = "perimeters" + + +class WFIGSPerimetersSettings(BaseModel): + """Settings schema for WFIGS Perimeters adapter.""" + + region: RegionConfig | None = None + + +class WFIGSPerimetersAdapter(SourceAdapter): + """NIFC WFIGS wildfire perimeters adapter.""" + + name = "wfigs_perimeters" + display_name = "NIFC WFIGS — Wildfire Perimeters" + description = "Active wildfire perimeter polygons from NIFC WFIGS." + settings_schema = WFIGSPerimetersSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Not in setup wizard + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._last_poll_time: datetime | None = None + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def startup(self) -> None: + """Initialize HTTP session and SQLite connection.""" + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=120), # Longer timeout for large polygons + ) + self._db = sqlite3.connect(self._cursor_db_path) + + # Create tables for dedup and fall-off tracking + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_observed_table(self._db) + self._db.commit() + + logger.info( + "WFIGS perimeters adapter started", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + async def shutdown(self) -> None: + """Close HTTP session and SQLite connection.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("WFIGS perimeters adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + logger.info( + "WFIGS perimeters config updated", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def bump_last_seen(self, event_id: str) -> None: + """Bump the last_seen timestamp for an event.""" + if not self._db: + return + self._db.execute( + "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 14 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + """Compute NATS subject for an event.""" + # Removal events have a different subject pattern + if event.category.startswith("fire.perimeter.removed"): + state = event.data.get("state", "").lower() or "unknown" + return f"central.fire.perimeter.removed.{state}" + + # Regular perimeters: central.fire.perimeter.. + # POOState is already normalized (2-letter code) + state = event.data.get("POOState") + county = event.data.get("POOCounty") + suffix = subject_suffix(state, county) + return f"central.fire.perimeter.{suffix}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_features(self) -> list[dict[str, Any]]: + """Fetch features from WFIGS FeatureServer.""" + if not self._session: + raise RuntimeError("Session not initialized") + + # Build query params + params: dict[str, str] = { + "outFields": "*", + "returnGeometry": "true", + "f": "geojson", + } + + # Time filter: only fetch modified since last poll + # Note: perimeters use attr_ModifiedOnDateTime_dt field + if self._last_poll_time: + iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") + params["where"] = f"attr_ModifiedOnDateTime_dt > timestamp '{iso_time}'" + else: + params["where"] = "1=1" + + # Bbox filter if region configured + if self.region: + bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" + params["geometry"] = bbox + params["geometryType"] = "esriGeometryEnvelope" + params["spatialRel"] = "esriSpatialRelIntersects" + params["inSR"] = "4326" + + async with self._session.get(WFIGS_PERIMETERS_URL, params=params) as resp: + resp.raise_for_status() + data = await resp.json() + + features = data.get("features", []) + logger.info( + "WFIGS perimeters fetch completed", + extra={"feature_count": len(features)}, + ) + return features + + async def poll(self) -> AsyncIterator[Event]: + """Poll WFIGS for perimeter updates.""" + if not self._db: + raise RuntimeError("Database not initialized") + + # Fetch features from upstream + try: + features = await self._fetch_features() + except Exception as e: + logger.error("WFIGS perimeters fetch failed", extra={"error": str(e)}) + raise + + # Get previous poll's observed GUIDs for fall-off detection + observed_before = get_observed_guids(self._db, LAYER_NAME) + + # Process features and track current GUIDs + current_guids: dict[str, tuple[str | None, str | None]] = {} + events_yielded = 0 + + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + # WFIGS Perimeters use prefixed field names (attr_*, poly_*) + irwin_id = props.get("attr_IrwinID") or props.get("poly_IRWINID") + if not irwin_id: + continue + + # Post-filter: skip if geometry doesn't intersect region bbox + if self.region and geometry: + if not polygon_intersects_bbox( + geometry, + self.region.west, self.region.south, + self.region.east, self.region.north, + ): + continue + + # Normalize at parse boundary + state_raw = props.get("attr_POOState") + state = normalize_state(state_raw) + county = props.get("attr_POOCounty") + incident_type_raw = props.get("attr_IncidentTypeCategory") + incident_type = normalize_incident_type(incident_type_raw) + + # Track this GUID as observed (for fall-off detection) + # Store normalized state for consistency + current_guids[irwin_id] = (state, county) + + # Parse fields using prefixed names + discovery_time = parse_wfigs_timestamp(props.get("attr_FireDiscoveryDateTime")) + # Use poly_GISAcres or attr_IncidentSize for acreage + daily_acres = props.get("attr_IncidentSize") or props.get("poly_GISAcres") + + # Build regions (expects normalized 2-letter state code) + regions, primary_region = build_regions(state, county) + + # Extract centroid for geo + centroid = extract_centroid(geometry) + + # Build bbox from geometry if available + bbox = None + if geometry: + try: + from shapely.geometry import shape + geom = shape(geometry) + bounds = geom.bounds # (minx, miny, maxx, maxy) + bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) + except Exception: + if centroid: + bbox = (centroid[0], centroid[1], centroid[0], centroid[1]) + + # Build geo + geo = Geo( + centroid=centroid, + bbox=bbox, + regions=regions, + primary_region=primary_region, + ) + + # Build event with geometry in data + # Use normalized field names in event data for consistency + event = Event( + id=irwin_id, + adapter=self.name, + category=f"fire.perimeter.{incident_type}", + time=discovery_time or datetime.now(timezone.utc), + severity=severity_from_acres(daily_acres), + geo=geo, + data={ + "IrwinID": irwin_id, + "IncidentName": props.get("attr_IncidentName") or props.get("poly_IncidentName"), + "IncidentTypeCategory": incident_type, + "IncidentTypeCategory_raw": incident_type_raw, + "DailyAcres": props.get("attr_IncidentSize"), + "GISAcres": props.get("poly_GISAcres"), + "PercentContained": props.get("attr_PercentContained"), + "FireDiscoveryDateTime": props.get("attr_FireDiscoveryDateTime"), + "ModifiedOnDateTime": props.get("attr_ModifiedOnDateTime_dt"), + "POOState": state, + "POOState_raw": state_raw, + "POOCounty": county, + "geometry": geometry, # Full GeoJSON polygon + "raw": props, + }, + ) + + yield event + events_yielded += 1 + + # Detect fall-offs: GUIDs in previous but not current + fallen_off = set(observed_before.keys()) - set(current_guids.keys()) + + for irwin_id in fallen_off: + last_observed, state, county = observed_before[irwin_id] + now = datetime.now(timezone.utc) + + removal_event = Event( + id=f"{irwin_id}:removed:{now.isoformat()}", + adapter=self.name, + category="fire.perimeter.removed", + time=now, + severity=0, + geo=Geo(), + data={ + "irwin_id": irwin_id, + "last_observed_at": last_observed, + "state": state, + "county": county, + "reason": "fallen_off_current_service", + }, + ) + + yield removal_event + events_yielded += 1 + logger.info( + "WFIGS perimeter fall-off detected", + extra={"irwin_id": irwin_id, "state": state}, + ) + + # Update observed table + update_observed(self._db, LAYER_NAME, current_guids) + delete_observed(self._db, LAYER_NAME, fallen_off) + + # Periodic cleanup of old entries + cleanup_old_observed(self._db, LAYER_NAME) + self.sweep_old_ids() + + # Update last poll time + self._last_poll_time = datetime.now(timezone.utc) + + logger.info( + "WFIGS perimeters poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_guids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/src/central/archive.py b/src/central/archive.py index b64a187..c173805 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -25,6 +25,7 @@ STREAMS = [ ("CENTRAL_WX", "central.wx.>"), ("CENTRAL_FIRE", "central.fire.>"), ("CENTRAL_QUAKE", "central.quake.>"), + ("CENTRAL_SPACE", "central.space.>"), ] BATCH_SIZE = 100 diff --git a/src/central/config_models.py b/src/central/config_models.py index a5ba0d5..5516bc1 100644 --- a/src/central/config_models.py +++ b/src/central/config_models.py @@ -32,7 +32,7 @@ class AdapterConfig(BaseModel): name: str = Field(description="Unique adapter identifier") enabled: bool = Field(default=True, description="Whether adapter is active") - cadence_s: int = Field(description="Poll interval in seconds") + cadence_s: int = Field(ge=10, description="Poll interval in seconds") settings: dict[str, Any] = Field( default_factory=dict, description="Adapter-specific settings" ) diff --git a/src/central/config_store.py b/src/central/config_store.py index ac55e27..826f899 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -241,6 +241,14 @@ class ConfigStore: ) return result == "DELETE 1" + async def set_adapter_last_error(self, name: str, error: str | None) -> None: + """Set or clear the last_error field on an adapter row.""" + async with self._pool.acquire() as conn: + await conn.execute( + "UPDATE config.adapters SET last_error = $1 WHERE name = $2", + error, name, + ) + # ------------------------------------------------------------------------- # Change notifications # ------------------------------------------------------------------------- diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 71a302b..79703cb 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -247,18 +247,37 @@ def _create_app() -> FastAPI: except Exception: pass - # Import helper functions for valid values - from central.gui.routes import _get_valid_satellites, _get_valid_feeds + # Add field descriptors to adapters + from central.gui.routes import _adapter_classes + from central.gui.form_descriptors import describe_fields + adapter_classes = _adapter_classes() + wizard_adapters = sorted( + [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], + key=lambda nc: nc[1].wizard_order + ) + # Rebuild adapters with fields + enriched_adapters = [] + for name, cls in wizard_adapters: + adapter_data = next((a for a in adapters if a["name"] == name), None) + if adapter_data: + settings_dict = adapter_data.get("settings", {}) + fields = describe_fields(cls.settings_schema, settings_dict) + enriched_adapters.append({ + "name": name, + "display_name": cls.display_name, + "enabled": adapter_data.get("enabled", False), + "cadence_s": adapter_data.get("cadence_s", 300), + "settings": settings_dict, + "fields": fields, + }) response = templates.TemplateResponse( request=request, name="setup_adapters.html", context={ "csrf_token": csrf_token, - "adapters": adapters, + "adapters": enriched_adapters, "api_keys": api_keys, - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": error_msg, diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py new file mode 100644 index 0000000..ef7588e --- /dev/null +++ b/src/central/gui/form_descriptors.py @@ -0,0 +1,163 @@ +"""Form field descriptors for adapter settings. + +If a second nested settings type beyond RegionConfig appears, +refactor this helper to recurse over nested models. +""" + +from dataclasses import dataclass, field +from typing import Any, Literal, Union, get_args, get_origin + +from pydantic import BaseModel +from pydantic.fields import FieldInfo +from pydantic_core import PydanticUndefined + +from central.config_models import RegionConfig + + +@dataclass +class FieldDescriptor: + """Describes a form field for rendering.""" + name: str + label: str + widget: str # "text", "number", "checkbox", "csv", "select", "checkboxes", "region" + current_value: Any + default: Any + description: str + required: bool + options: list[str] | None = None # For select/checkboxes widgets + + +def _is_literal(tp: type) -> bool: + """Check if a type is a Literal type.""" + return get_origin(tp) is Literal + + +def _get_literal_values(tp: type) -> list[str]: + """Extract the literal values from a Literal type.""" + return list(get_args(tp)) + + +def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, list[str] | None]: + """Map a Python type to a widget type and optional options list. + + Returns: + Tuple of (widget_type, options_list_or_none) + """ + # Handle Optional/Union types + origin = get_origin(field_type) + args = get_args(field_type) + + # Check for Optional[X] (Union[X, None]) + if origin is Union or (origin is not None and type(None) in args): + # Get the non-None type + non_none_args = [a for a in args if a is not type(None)] + if non_none_args: + inner_type = non_none_args[0] + # Recursively determine widget for the inner type + return _type_to_widget_and_options(field_name, inner_type) + + # Check for Literal type (single select) + if _is_literal(field_type): + options = _get_literal_values(field_type) + return "select", [str(o) for o in options] + + # Direct type checks + if field_type is str: + return "text", None + if field_type is int: + return "number", None + if field_type is bool: + return "checkbox", None + if field_type is RegionConfig: + return "region", None + + # Check for list types + if origin is list: + inner_type = args[0] if args else None + + # list[Literal[...]] -> checkboxes + if inner_type is not None and _is_literal(inner_type): + options = _get_literal_values(inner_type) + return "checkboxes", [str(o) for o in options] + + # list[str] -> csv + if inner_type is str: + return "csv", None + + raise NotImplementedError( + f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" + ) + + # Check if it's a BaseModel subclass (nested model other than RegionConfig) + if isinstance(field_type, type) and issubclass(field_type, BaseModel): + raise NotImplementedError( + f"Field '{field_name}' has unsupported nested type: {field_type.__name__}. " + f"If a second nested type beyond RegionConfig is needed, " + f"refactor describe_fields to recurse over nested models." + ) + + raise NotImplementedError( + f"Field '{field_name}' has unsupported type: {field_type}" + ) + + +def _name_to_label(name: str) -> str: + """Convert field name to human-readable label.""" + return name.replace("_", " ").title() + + +def _is_undefined(value: Any) -> bool: + """Check if a value is Pydantic's undefined sentinel.""" + return value is PydanticUndefined + + +def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]: + """Generate field descriptors for a Pydantic model. + + Args: + model_cls: The Pydantic model class (e.g., NWSSettings) + current: Current settings values from the database + + Returns: + List of FieldDescriptor objects for rendering the form + """ + descriptors = [] + + for field_name, field_info in model_cls.model_fields.items(): + # Get the field type + field_type = field_info.annotation + + # Determine widget and options + widget, options = _type_to_widget_and_options(field_name, field_type) + + # Get current value, falling back to default + if field_name in current: + current_value = current[field_name] + elif not _is_undefined(field_info.default): + current_value = field_info.default + else: + current_value = None + + # Get default + default = field_info.default if not _is_undefined(field_info.default) else None + + # Get description + description = "" + if field_info.description: + description = field_info.description + + # Determine if required (no default and not Optional) + required = _is_undefined(field_info.default) and field_info.is_required() + + descriptors.append(FieldDescriptor( + name=field_name, + label=_name_to_label(field_name), + widget=widget, + current_value=current_value, + default=default, + description=description, + required=required, + options=options, + )) + + return descriptors diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index 3a415c2..b5c66f7 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -9,6 +9,7 @@ from typing import Any logger = logging.getLogger("central.gui.routes") + from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from central.bootstrap_config import get_settings @@ -43,12 +44,27 @@ from central.gui.audit import ( SYSTEM_UPDATE, write_audit, ) +from functools import cache + from central.gui.db import get_pool +from central.gui.form_descriptors import describe_fields, FieldDescriptor +from central.adapter_discovery import discover_adapters +from pydantic import ValidationError + +@cache +def _adapter_classes() -> dict: + """Cached adapter class discovery. + + GUI is a separate process from supervisor; walks pkgutil itself. + Python's import cache makes subsequent calls free. + """ + return discover_adapters() + router = APIRouter() # Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") @@ -57,18 +73,6 @@ ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") -def _get_valid_satellites() -> list[str]: - """Get valid satellite identifiers from firms adapter.""" - from central.adapters.firms import SATELLITE_SHORT - return list(SATELLITE_SHORT.keys()) - - -def _get_valid_feeds() -> set[str]: - """Get valid feed values from usgs_quake adapter.""" - from central.adapters.usgs_quake import VALID_FEEDS - return VALID_FEEDS - - def _get_templates(): """Get templates instance (deferred import to avoid circular).""" from central.gui import templates @@ -631,18 +635,36 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: templates = _get_templates() pool = get_pool() + # Get wizard adapters (filtered by wizard_order) + adapter_classes = _adapter_classes() + wizard_adapters = sorted( + [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], + key=lambda nc: nc[1].wizard_order + ) + # Pre-fill from cookie state or DB defaults if state.adapters: adapters = [] - for name in ["firms", "nws", "usgs_quake"]: + for name, cls in wizard_adapters: if name in state.adapters: a = state.adapters[name] - adapters.append({ - "name": name, - "enabled": a["enabled"], - "cadence_s": a["cadence_s"], - "settings": a["settings"], - }) + settings_dict = a["settings"] + else: + settings_dict = {} + fields = describe_fields(cls.settings_schema, settings_dict) + # Swap widget for api_key_field to api_key_select + if cls.api_key_field is not None: + for f in fields: + if f.name == cls.api_key_field: + f.widget = "api_key_select" + adapters.append({ + "name": name, + "display_name": cls.display_name, + "enabled": a["enabled"] if name in state.adapters else False, + "cadence_s": a["cadence_s"] if name in state.adapters else 300, + "settings": settings_dict, + "fields": fields, + }) else: async with pool.acquire() as conn: rows = await conn.fetch( @@ -652,15 +674,33 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: ORDER BY name """ ) - adapters = [] - for row in rows: - settings_data = row["settings"] or {} - adapters.append({ - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": settings_data, - }) + db_adapters = {row["name"]: row for row in rows} + + adapters = [] + for name, cls in wizard_adapters: + if name in db_adapters: + row = db_adapters[name] + settings_dict = row["settings"] or {} + enabled = row["enabled"] + cadence_s = row["cadence_s"] + else: + settings_dict = {} + enabled = False + cadence_s = 300 + fields = describe_fields(cls.settings_schema, settings_dict) + # Swap widget for api_key_field to api_key_select + if cls.api_key_field is not None: + for f in fields: + if f.name == cls.api_key_field: + f.widget = "api_key_select" + adapters.append({ + "name": name, + "display_name": cls.display_name, + "enabled": enabled, + "cadence_s": cadence_s, + "settings": settings_dict, + "fields": fields, + }) # Get API keys from wizard state (not DB) api_keys = [{"alias": k["alias"]} for k in state.api_keys] @@ -685,8 +725,6 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: "csrf_token": csrf_token, "adapters": adapters, "api_keys": api_keys, - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": None, @@ -739,7 +777,14 @@ async def setup_adapters_submit(request: Request) -> Response: "settings": row["settings"] or {}, } - for adapter_name in ["firms", "nws", "usgs_quake"]: + # Get wizard adapters (filtered by wizard_order) + adapter_classes = _adapter_classes() + wizard_adapters = sorted( + [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], + key=lambda nc: nc[1].wizard_order + ) + + for adapter_name, adapter_cls in wizard_adapters: current = current_adapters.get(adapter_name, {"enabled": False, "cadence_s": 300, "settings": {}}) current_settings = current.get("settings", {}) new_settings = dict(current_settings) @@ -747,83 +792,108 @@ async def setup_adapters_submit(request: Request) -> Response: # Parse enabled enabled = f"{adapter_name}_enabled" in form - # Parse cadence + # Parse cadence using AdapterConfig field constraint cadence_str = form.get(f"{adapter_name}_cadence_s", "") try: cadence_s = int(cadence_str) - if cadence_s < 60 or cadence_s > 3600: - errors[f"{adapter_name}_cadence_s"] = "Cadence must be between 60 and 3600 seconds" + from central.config_models import AdapterConfig + min_cadence = AdapterConfig.model_fields["cadence_s"].metadata[0].ge + if cadence_s < min_cadence: + errors[f"{adapter_name}_cadence_s"] = ( + f"Input should be greater than or equal to {min_cadence}" + ) except ValueError: errors[f"{adapter_name}_cadence_s"] = "Cadence must be a valid integer" cadence_s = current.get("cadence_s", 300) - # Adapter-specific validation - if adapter_name == "nws": - contact_email = form.get(f"{adapter_name}_contact_email", "").strip() - if enabled: - if not contact_email: - errors[f"{adapter_name}_contact_email"] = "Contact email is required when enabled" - elif not EMAIL_REGEX.match(contact_email): - errors[f"{adapter_name}_contact_email"] = "Invalid email format" + # Generic field parsing using describe_fields + fields = describe_fields(adapter_cls.settings_schema, current_settings) + for field in fields: + form_key = f"{adapter_name}_{field.name}" + + if field.widget == "text": + value = form.get(form_key, "").strip() + new_settings[field.name] = value if value else current_settings.get(field.name) + + elif field.widget == "api_key_select": + # API key alias field - stored as text, validated post-loop + value = form.get(form_key, "").strip() + new_settings[field.name] = value if value else None + + elif field.widget == "number": + value_str = form.get(form_key, "").strip() + if value_str: + try: + new_settings[field.name] = int(value_str) + except ValueError: + errors[form_key] = f"{field.label} must be a valid number" else: - new_settings["contact_email"] = contact_email - else: - new_settings["contact_email"] = contact_email if contact_email else current_settings.get("contact_email") + new_settings[field.name] = current_settings.get(field.name) - elif adapter_name == "firms": - api_key_alias = form.get(f"{adapter_name}_api_key_alias", "").strip() - satellites = form.getlist(f"{adapter_name}_satellites") + elif field.widget == "checkbox": + new_settings[field.name] = form_key in form - if api_key_alias: - # Validate against wizard state keys - if not any(k["alias"] == api_key_alias for k in state.api_keys): - errors[f"{adapter_name}_api_key_alias"] = f"API key alias does not exist" + elif field.widget == "csv": + value = form.get(form_key, "").strip() + if value: + new_settings[field.name] = [v.strip() for v in value.split(",") if v.strip()] else: - new_settings["api_key_alias"] = api_key_alias - else: - new_settings["api_key_alias"] = None + new_settings[field.name] = [] - # Validate satellites - valid_sats = set(_get_valid_satellites()) - invalid_sats = [s for s in satellites if s not in valid_sats] - if invalid_sats: - errors[f"{adapter_name}_satellites"] = f"Invalid satellites: " + ", ".join(invalid_sats) - else: - new_settings["satellites"] = satellites + elif field.widget == "select": + value = form.get(form_key, "").strip() + if value and field.options and value not in field.options: + errors[form_key] = f"Invalid {field.label.lower()}" + else: + new_settings[field.name] = value - elif adapter_name == "usgs_quake": - feed = form.get(f"{adapter_name}_feed", "").strip() - valid_feeds = _get_valid_feeds() - if feed not in valid_feeds: - errors[f"{adapter_name}_feed"] = "Invalid feed" - else: - new_settings["feed"] = feed + elif field.widget == "checkboxes": + # Use getlist for checkbox groups - absence means empty list + values = form.getlist(form_key) + if field.options: + invalid = [v for v in values if v not in field.options] + if invalid: + errors[form_key] = f"Invalid values: {', '.join(invalid)}" + else: + new_settings[field.name] = values + else: + new_settings[field.name] = values - # Region validation (all adapters) - region_north_str = form.get(f"{adapter_name}_region_north", "").strip() - region_south_str = form.get(f"{adapter_name}_region_south", "").strip() - region_east_str = form.get(f"{adapter_name}_region_east", "").strip() - region_west_str = form.get(f"{adapter_name}_region_west", "").strip() + elif field.widget == "region": + # Region validation via RegionConfig model + from central.config_models import RegionConfig + region_north_str = form.get(f"{adapter_name}_{field.name}_north", "").strip() + region_south_str = form.get(f"{adapter_name}_{field.name}_south", "").strip() + region_east_str = form.get(f"{adapter_name}_{field.name}_east", "").strip() + region_west_str = form.get(f"{adapter_name}_{field.name}_west", "").strip() + try: + region_model = RegionConfig( + north=float(region_north_str), + south=float(region_south_str), + east=float(region_east_str), + west=float(region_west_str), + ) + new_settings[field.name] = region_model.model_dump() + except (ValueError, ValidationError) as e: + errors[f"{adapter_name}_{field.name}"] = str(e) + + # Run Pydantic validation on assembled settings to catch Literal violations etc. try: - region_north = float(region_north_str) - region_south = float(region_south_str) - region_east = float(region_east_str) - region_west = float(region_west_str) + adapter_cls.settings_schema(**new_settings) + except ValidationError as e: + for err in e.errors(): + loc = err["loc"][0] if err["loc"] else "unknown" + errors[f"{adapter_name}_{loc}"] = err["msg"] - if not (-90 <= region_south < region_north <= 90): - errors[f"{adapter_name}_region"] = "Invalid latitude: south < north, both -90 to 90" - elif not (-180 <= region_west < region_east <= 180): - errors[f"{adapter_name}_region"] = "Invalid longitude: west < east, both -180 to 180" - else: - new_settings["region"] = { - "north": region_north, - "south": region_south, - "east": region_east, - "west": region_west, - } - except ValueError: - errors[f"{adapter_name}_region"] = "Region coordinates must be valid numbers" + # Generic api_key_field validation against wizard state + if adapter_cls.api_key_field is not None: + field_value = new_settings.get(adapter_cls.api_key_field) + if field_value: + if not any(k["alias"] == field_value for k in state.api_keys): + errors[f"{adapter_name}_{adapter_cls.api_key_field}"] = ( + "API key alias does not exist" + ) new_adapters[adapter_name] = { "enabled": enabled, @@ -833,12 +903,23 @@ async def setup_adapters_submit(request: Request) -> Response: # If errors, re-render if errors: - adapters = [ - {"name": name, "enabled": new_adapters[name]["enabled"], - "cadence_s": new_adapters[name]["cadence_s"], - "settings": new_adapters[name]["settings"]} - for name in ["firms", "nws", "usgs_quake"] - ] + adapters = [] + for name, cls in wizard_adapters: + settings_dict = new_adapters[name]["settings"] + fields = describe_fields(cls.settings_schema, settings_dict) + # Swap widget for api_key_field to api_key_select + if cls.api_key_field is not None: + for f in fields: + if f.name == cls.api_key_field: + f.widget = "api_key_select" + adapters.append({ + "name": name, + "display_name": cls.display_name, + "enabled": new_adapters[name]["enabled"], + "cadence_s": new_adapters[name]["cadence_s"], + "settings": settings_dict, + "fields": fields, + }) api_keys = [{"alias": k["alias"]} for k in state.api_keys] if state.system: @@ -856,8 +937,6 @@ async def setup_adapters_submit(request: Request) -> Response: "csrf_token": csrf_token, "adapters": adapters, "api_keys": api_keys, - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": "Please fix the errors below.", @@ -898,10 +977,20 @@ async def setup_finish_form(request: Request) -> HTMLResponse: adapters = [] if state.adapters: - for name in ["firms", "nws", "usgs_quake"]: + adapter_classes = _adapter_classes() + wizard_adapters = sorted( + [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], + key=lambda nc: nc[1].wizard_order + ) + for name, cls in wizard_adapters: if name in state.adapters: a = state.adapters[name] - adapters.append({"name": name, "enabled": a["enabled"], "cadence_s": a["cadence_s"]}) + adapters.append({ + "name": name, + "display_name": cls.display_name, + "enabled": a["enabled"], + "cadence_s": a["cadence_s"], + }) csrf_token, signed_token = reuse_or_generate_pre_auth_csrf(request, settings.csrf_secret) response = templates.TemplateResponse( @@ -1229,27 +1318,45 @@ async def adapters_list( templates = _get_templates() pool = get_pool() operator = request.state.operator + adapter_classes = _adapter_classes() async with pool.acquire() as conn: rows = await conn.fetch( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at + SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error FROM config.adapters ORDER BY name """ ) - adapters = [] - for row in rows: - settings = row["settings"] or {} - adapters.append({ - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - }) + adapters = [] + for row in rows: + settings = row["settings"] or {} + adapter_cls = adapter_classes.get(row["name"]) + + # Check if required API key is missing + api_key_missing = False + requires_api_key_alias = None + if adapter_cls and adapter_cls.requires_api_key is not None: + requires_api_key_alias = adapter_cls.requires_api_key + has_key = await conn.fetchval( + "SELECT 1 FROM config.api_keys WHERE alias = $1", + requires_api_key_alias, + ) + api_key_missing = not has_key + + adapters.append({ + "name": row["name"], + "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + "last_error": row["last_error"], + "api_key_missing": api_key_missing, + "requires_api_key_alias": requires_api_key_alias, + }) csrf_token = request.state.csrf_token response = templates.TemplateResponse( @@ -1275,10 +1382,14 @@ async def adapters_edit_form( pool = get_pool() operator = request.state.operator + # Look up the adapter class + adapter_classes = _adapter_classes() + adapter_cls = adapter_classes.get(name) + async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at + SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error FROM config.adapters WHERE name = $1 """, @@ -1288,11 +1399,6 @@ async def adapters_edit_form( if row is None: return Response(status_code=404, content="Adapter not found") - # Get API keys for firms dropdown - api_keys = await conn.fetch( - "SELECT alias FROM config.api_keys ORDER BY alias" - ) - # Get map tile settings from config.system sys_row = await conn.fetchrow( "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" @@ -1301,15 +1407,48 @@ async def adapters_edit_form( tile_attribution = sys_row["map_attribution"] if sys_row else "© OpenStreetMap contributors" settings = row["settings"] or {} + + # Build adapter dict with class metadata adapter = { "name": row["name"], + "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], + "description": getattr(adapter_cls, "description", "") if adapter_cls else "", "enabled": row["enabled"], "cadence_s": row["cadence_s"], "settings": settings, "paused_at": row["paused_at"], "updated_at": row["updated_at"], + "last_error": row["last_error"], } + # Generate field descriptors if we have the adapter class + fields = [] + if adapter_cls and hasattr(adapter_cls, "settings_schema"): + fields = describe_fields(adapter_cls.settings_schema, settings) + # Swap widget for api_key_field to api_key_select + if adapter_cls.api_key_field is not None: + for f in fields: + if f.name == adapter_cls.api_key_field: + f.widget = "api_key_select" + + # Fetch API keys for api_key_select widget + api_keys = [] + async with pool.acquire() as conn: + api_key_rows = await conn.fetch("SELECT alias FROM config.api_keys ORDER BY alias") + api_keys = [{"alias": r["alias"]} for r in api_key_rows] + + # Check if required API key is missing + api_key_missing = False + requires_api_key_alias = None + if adapter_cls and adapter_cls.requires_api_key is not None: + requires_api_key_alias = adapter_cls.requires_api_key + async with pool.acquire() as conn: + has_key = await conn.fetchval( + "SELECT 1 FROM config.api_keys WHERE alias = $1", + requires_api_key_alias, + ) + api_key_missing = not has_key + csrf_token = request.state.csrf_token response = templates.TemplateResponse( request=request, @@ -1318,13 +1457,14 @@ async def adapters_edit_form( "operator": operator, "csrf_token": csrf_token, "adapter": adapter, + "fields": fields, + "api_keys": api_keys, "errors": None, "form_data": None, - "api_keys": [{"alias": k["alias"]} for k in api_keys], - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, + "api_key_missing": api_key_missing, + "requires_api_key_alias": requires_api_key_alias, }, ) return response @@ -1347,24 +1487,27 @@ async def adapters_edit_submit( if not form_csrf or form_csrf != request.state.csrf_token: raise CsrfValidationError("Invalid CSRF token") - # Parse form data - form = await request.form() + # Look up the adapter class + adapter_classes = _adapter_classes() + adapter_cls = adapter_classes.get(name) + + # Parse common form fields enabled = "enabled" in form cadence_s_str = form.get("cadence_s", "") - # Build form_data for re-render on error + errors: dict[str, str] = {} form_data: dict[str, Any] = { "enabled": enabled, "cadence_s": cadence_s_str, } - errors: dict[str, str] = {} - - # Validate cadence_s + # Validate cadence_s using AdapterConfig field constraint (ge=10) try: cadence_s = int(cadence_s_str) - if cadence_s < 60 or cadence_s > 3600: - errors["cadence_s"] = "Cadence must be between 60 and 3600 seconds" + from central.config_models import AdapterConfig + min_cadence = AdapterConfig.model_fields["cadence_s"].metadata[0].ge + if cadence_s < min_cadence: + errors["cadence_s"] = f"Input should be greater than or equal to {min_cadence}" except ValueError: errors["cadence_s"] = "Cadence must be a valid integer" cadence_s = 0 @@ -1373,7 +1516,7 @@ async def adapters_edit_submit( # Get current adapter state row = await conn.fetchrow( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at + SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error FROM config.adapters WHERE name = $1 """, @@ -1384,103 +1527,113 @@ async def adapters_edit_submit( return Response(status_code=404, content="Adapter not found") current_settings = row["settings"] or {} - new_settings = dict(current_settings) - # Adapter-specific validation and settings update - if name == "nws": - contact_email = form.get("contact_email", "").strip() - form_data["contact_email"] = contact_email - if not contact_email: - errors["contact_email"] = "Contact email is required" - elif not EMAIL_REGEX.match(contact_email): - errors["contact_email"] = "Invalid email format" + # Parse and validate settings via Pydantic if we have the adapter class + new_settings = {} + if adapter_cls and hasattr(adapter_cls, "settings_schema"): + schema = adapter_cls.settings_schema + fields = describe_fields(schema, current_settings) + + # Parse form values based on widget type + parsed_values = {} + for field in fields: + raw = form.get(field.name, "") + form_data[field.name] = raw + + if field.widget == "text": + parsed_values[field.name] = raw.strip() if raw else None + elif field.widget == "number": + try: + parsed_values[field.name] = int(raw) if raw else None + except ValueError: + errors[field.name] = f"{field.label} must be a number" + elif field.widget == "checkbox": + parsed_values[field.name] = field.name in form + elif field.widget == "csv": + if raw.strip(): + parsed_values[field.name] = [v.strip() for v in raw.split(",") if v.strip()] + else: + parsed_values[field.name] = [] + elif field.widget == "select": + value = raw.strip() if raw else None + if value and field.options and value not in field.options: + errors[field.name] = f"Invalid {field.label.lower()}" + else: + parsed_values[field.name] = value + elif field.widget == "checkboxes": + # Use getlist for checkbox groups + values = form.getlist(field.name) + form_data[field.name] = values # Override raw value + if field.options: + invalid = [v for v in values if v not in field.options] + if invalid: + errors[field.name] = f"Invalid values: {', '.join(invalid)}" + else: + parsed_values[field.name] = values + else: + parsed_values[field.name] = values + elif field.widget == "api_key_select": + # API key select - validate against existing keys + value = raw.strip() if raw else None + parsed_values[field.name] = value + elif field.widget == "region": + # Region handled separately below + pass + + # Handle region fields (common pattern) + region_north_str = form.get("region_north", "").strip() + region_south_str = form.get("region_south", "").strip() + region_east_str = form.get("region_east", "").strip() + region_west_str = form.get("region_west", "").strip() + + form_data["region_north"] = region_north_str + form_data["region_south"] = region_south_str + form_data["region_east"] = region_east_str + form_data["region_west"] = region_west_str + + # Check if any region field has a value + has_region = any([region_north_str, region_south_str, region_east_str, region_west_str]) + + if has_region: + try: + region_north = float(region_north_str) + region_south = float(region_south_str) + region_east = float(region_east_str) + region_west = float(region_west_str) + + if not (-90 <= region_south < region_north <= 90): + errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90" + elif not (-180 <= region_west < region_east <= 180): + errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180" + else: + parsed_values["region"] = { + "north": region_north, + "south": region_south, + "east": region_east, + "west": region_west, + } + except ValueError: + errors["region"] = "Region coordinates must be valid numbers" else: - new_settings["contact_email"] = contact_email + parsed_values["region"] = None - elif name == "firms": - api_key_alias = form.get("api_key_alias", "").strip() - satellites = form.getlist("satellites") - form_data["api_key_alias"] = api_key_alias - form_data["satellites"] = satellites - - # Validate api_key_alias if set - if api_key_alias: - key_exists = await conn.fetchrow( - "SELECT 1 FROM config.api_keys WHERE alias = $1", - api_key_alias, - ) - if not key_exists: - errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist" - else: - new_settings["api_key_alias"] = api_key_alias - else: - new_settings["api_key_alias"] = None - - # Validate satellites - valid_sats = set(_get_valid_satellites()) - invalid_sats = [s for s in satellites if s not in valid_sats] - if invalid_sats: - errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}" - else: - new_settings["satellites"] = satellites - - elif name == "usgs_quake": - feed = form.get("feed", "").strip() - form_data["feed"] = feed - valid_feeds = _get_valid_feeds() - if feed not in valid_feeds: - errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}" - else: - new_settings["feed"] = feed - - # Region validation (applies to all adapters) - region_north_str = form.get("region_north", "").strip() - region_south_str = form.get("region_south", "").strip() - region_east_str = form.get("region_east", "").strip() - region_west_str = form.get("region_west", "").strip() - - form_data["region_north"] = region_north_str - form_data["region_south"] = region_south_str - form_data["region_east"] = region_east_str - form_data["region_west"] = region_west_str - - try: - region_north = float(region_north_str) - region_south = float(region_south_str) - region_east = float(region_east_str) - region_west = float(region_west_str) - - # Validate latitude bounds - if not (-90 <= region_south < region_north <= 90): - errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90" - # Validate longitude bounds - elif not (-180 <= region_west < region_east <= 180): - errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180" - else: - new_settings["region"] = { - "north": region_north, - "south": region_south, - "east": region_east, - "west": region_west, - } - except ValueError: - errors["region"] = "Region coordinates must be valid numbers" + # Only validate with Pydantic if no parse errors + if not errors: + try: + # Filter out None values for optional fields without defaults + validated_data = {k: v for k, v in parsed_values.items() if v is not None} + validated = schema(**validated_data) + new_settings = validated.model_dump(mode="json") + except ValidationError as e: + for err in e.errors(): + field_name = err["loc"][0] if err["loc"] else "unknown" + errors[str(field_name)] = err["msg"] + else: + # No schema - just preserve existing settings + new_settings = dict(current_settings) # If there are errors, re-render the form if errors: - adapter = { - "name": row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": current_settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - } - - api_keys = await conn.fetch( - "SELECT alias FROM config.api_keys ORDER BY alias" - ) - # Get map tile settings for re-render sys_row = await conn.fetchrow( "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" @@ -1488,6 +1641,42 @@ async def adapters_edit_submit( tile_url = sys_row["map_tile_url"] if sys_row else "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png" tile_attribution = sys_row["map_attribution"] if sys_row else "© OpenStreetMap contributors" + adapter = { + "name": row["name"], + "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], + "description": getattr(adapter_cls, "description", "") if adapter_cls else "", + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": current_settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + "last_error": row["last_error"], + } + + fields = [] + if adapter_cls and hasattr(adapter_cls, "settings_schema"): + fields = describe_fields(adapter_cls.settings_schema, current_settings) + # Swap widget for api_key_field to api_key_select + if adapter_cls.api_key_field is not None: + for f in fields: + if f.name == adapter_cls.api_key_field: + f.widget = "api_key_select" + + # Fetch API keys for api_key_select widget + api_key_rows = await conn.fetch("SELECT alias FROM config.api_keys ORDER BY alias") + api_keys = [{"alias": r["alias"]} for r in api_key_rows] + + # Check if required API key is missing + api_key_missing = False + requires_api_key_alias = None + if adapter_cls and adapter_cls.requires_api_key is not None: + requires_api_key_alias = adapter_cls.requires_api_key + has_key = await conn.fetchval( + "SELECT 1 FROM config.api_keys WHERE alias = $1", + requires_api_key_alias, + ) + api_key_missing = not has_key + csrf_token = request.state.csrf_token response = templates.TemplateResponse( request=request, @@ -1496,13 +1685,14 @@ async def adapters_edit_submit( "operator": operator, "csrf_token": csrf_token, "adapter": adapter, + "fields": fields, + "api_keys": api_keys, "errors": errors, "form_data": form_data, - "api_keys": [{"alias": k["alias"]} for k in api_keys], - "valid_satellites": _get_valid_satellites(), - "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, + "api_key_missing": api_key_missing, + "requires_api_key_alias": requires_api_key_alias, }, status_code=200, ) diff --git a/src/central/gui/templates/adapters_edit.html b/src/central/gui/templates/adapters_edit.html index 939aa75..1b5a4b7 100644 --- a/src/central/gui/templates/adapters_edit.html +++ b/src/central/gui/templates/adapters_edit.html @@ -1,6 +1,6 @@ {% extends "base.html" %} -{% block title %}Central — Edit {{ adapter.name }}{% endblock %} +{% block title %}Central — Edit {{ adapter.display_name }}{% endblock %} {% block head %} @@ -10,35 +10,173 @@ {% endblock %} {% block content %} -

Edit Adapter: {{ adapter.name }}

+

{{ adapter.display_name }}

+

{{ adapter.description }}

+ +{% if adapter.paused_at %} +
+ ⏸️ Paused since {{ adapter.paused_at }} +
+{% endif %} + +{% if adapter.last_error %} +
+ Last Error: {{ adapter.last_error }} +
+{% endif %} + +{% if api_key_missing %} +
+ ⚠️ API Key Required: This adapter requires the {{ requires_api_key_alias }} API key to be configured before it can be enabled. + Configure API Keys +
+{% endif %}
- Universal Settings + Core Settings - + {% if errors and errors.cadence_s %} {{ errors.cadence_s }} {% endif %}
+ {% if fields %}
- Adapter-Specific Settings - {% include "adapters_edit_" + adapter.name + ".html" %} -
+ Adapter Settings + {% for field in fields %} + {% if field.widget == "region" %} + {# Region is rendered in a separate fieldset below #} + {% elif field.widget == "text" %} + + + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "number" %} + + + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "checkbox" %} + + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "csv" %} + + + Comma-separated values{% if field.description %} — {{ field.description }}{% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "select" %} + + + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "checkboxes" %} + + {% set current_values = form_data.getlist(field.name) if form_data and form_data.getlist else (field.current_value or []) %} + {% for opt in field.options %} + + {% endfor %} + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + + {% elif field.widget == "api_key_select" %} + + + {% if field.description %} + {{ field.description }} + {% endif %} + {% if errors and errors[field.name] %} + {{ errors[field.name] }} + {% endif %} + {% endif %} + {% endfor %} + + {% endif %} + + {% set has_region = namespace(value=false) %} + {% for field in fields %} + {% if field.widget == "region" %} + {% set has_region.value = true %} + {% endif %} + {% endfor %} + + {% if has_region.value %}
Region {% include "_region_picker.html" %}
+ {% endif %} Cancel diff --git a/src/central/gui/templates/adapters_edit_firms.html b/src/central/gui/templates/adapters_edit_firms.html deleted file mode 100644 index a2a339a..0000000 --- a/src/central/gui/templates/adapters_edit_firms.html +++ /dev/null @@ -1,21 +0,0 @@ - - -{% if errors and errors.api_key_alias %} -{{ errors.api_key_alias }} -{% endif %} - - -{% for sat in valid_satellites %} - -{% endfor %} -{% if errors and errors.satellites %} -{{ errors.satellites }} -{% endif %} diff --git a/src/central/gui/templates/adapters_edit_nws.html b/src/central/gui/templates/adapters_edit_nws.html deleted file mode 100644 index e655a41..0000000 --- a/src/central/gui/templates/adapters_edit_nws.html +++ /dev/null @@ -1,5 +0,0 @@ - - -{% if errors and errors.contact_email %} -{{ errors.contact_email }} -{% endif %} diff --git a/src/central/gui/templates/adapters_edit_usgs_quake.html b/src/central/gui/templates/adapters_edit_usgs_quake.html deleted file mode 100644 index 0c3b7ee..0000000 --- a/src/central/gui/templates/adapters_edit_usgs_quake.html +++ /dev/null @@ -1,9 +0,0 @@ - - -{% if errors and errors.feed %} -{{ errors.feed }} -{% endif %} diff --git a/src/central/gui/templates/adapters_list.html b/src/central/gui/templates/adapters_list.html index b97ae88..f3a8e04 100644 --- a/src/central/gui/templates/adapters_list.html +++ b/src/central/gui/templates/adapters_list.html @@ -17,7 +17,12 @@ {% for adapter in adapters %} - {{ adapter.name }} + + {{ adapter.display_name or adapter.name }} + {% if adapter.api_key_missing %} + ⚠️ API Key Missing + {% endif %} + {% if adapter.enabled %}Yes{% else %}No{% endif %} {{ adapter.cadence_s }}s {{ adapter.updated_at.strftime('%Y-%m-%d %H:%M') if adapter.updated_at else '—' }} diff --git a/src/central/gui/templates/setup_adapters.html b/src/central/gui/templates/setup_adapters.html index de3f8c2..e0cc977 100644 --- a/src/central/gui/templates/setup_adapters.html +++ b/src/central/gui/templates/setup_adapters.html @@ -29,7 +29,7 @@ {% for adapter in adapters %}
- {{ adapter.name }} + {{ adapter.display_name or adapter.name }}
{% endfor %} @@ -151,11 +209,12 @@