diff --git a/sql/migrations/015_add_adapters_last_error.sql b/sql/migrations/015_add_adapters_last_error.sql deleted file mode 100644 index 1fcab49..0000000 --- a/sql/migrations/015_add_adapters_last_error.sql +++ /dev/null @@ -1,6 +0,0 @@ --- Migration: 015_add_adapters_last_error --- Adds last_error column for adapter-side error reporting. --- Populated by supervisor when an adapter fails to start or apply config. - -ALTER TABLE config.adapters -ADD COLUMN IF NOT EXISTS last_error TEXT; diff --git a/sql/migrations/016_add_wfigs_adapters.sql b/sql/migrations/016_add_wfigs_adapters.sql deleted file mode 100644 index 25d45f6..0000000 --- a/sql/migrations/016_add_wfigs_adapters.sql +++ /dev/null @@ -1,37 +0,0 @@ --- Migration: 016_add_wfigs_adapters --- Add WFIGS incident and perimeter adapters to config.adapters --- Idempotent: uses ON CONFLICT DO NOTHING - --- WFIGS Incidents adapter -INSERT INTO config.adapters (name, enabled, cadence_s, settings) -VALUES ( - 'wfigs_incidents', - false, -- Ships disabled; operator enables via GUI - 300, - jsonb_build_object( - 'region', jsonb_build_object( - 'north', 49.0, - 'south', 31.0, - 'east', -102.0, - 'west', -124.0 - ) - ) -) -ON CONFLICT (name) DO NOTHING; - --- WFIGS Perimeters adapter -INSERT INTO config.adapters (name, enabled, cadence_s, settings) -VALUES ( - 'wfigs_perimeters', - false, -- Ships disabled; operator enables via GUI - 300, - jsonb_build_object( - 'region', jsonb_build_object( - 'north', 49.0, - 'south', 31.0, - 'east', -102.0, - 'west', -124.0 - ) - ) -) -ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/017_add_inciweb_adapter.sql b/sql/migrations/017_add_inciweb_adapter.sql deleted file mode 100644 index 2ffaac7..0000000 --- a/sql/migrations/017_add_inciweb_adapter.sql +++ /dev/null @@ -1,19 +0,0 @@ --- Migration: 017_add_inciweb_adapter --- Add InciWeb adapter to config.adapters --- Idempotent: uses ON CONFLICT DO NOTHING - -INSERT INTO config.adapters (name, enabled, cadence_s, settings) -VALUES ( - 'inciweb', - false, -- Ships disabled; operator enables via GUI - 600, - jsonb_build_object( - 'region', jsonb_build_object( - 'north', 49.0, - 'south', 31.0, - 'east', -102.0, - 'west', -124.0 - ) - ) -) -ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/018_add_swpc_adapters.sql b/sql/migrations/018_add_swpc_adapters.sql deleted file mode 100644 index 29a28d8..0000000 --- a/sql/migrations/018_add_swpc_adapters.sql +++ /dev/null @@ -1,11 +0,0 @@ --- Migration: 018_add_swpc_adapters --- Add NOAA SWPC space weather adapters to config.adapters. --- All three ship disabled; operator enables individually via GUI. --- Idempotent: uses ON CONFLICT DO NOTHING. - -INSERT INTO config.adapters (name, enabled, cadence_s, settings) -VALUES - ('swpc_alerts', false, 300, '{}'::jsonb), - ('swpc_kindex', false, 600, '{}'::jsonb), - ('swpc_protons', false, 600, '{}'::jsonb) -ON CONFLICT (name) DO NOTHING; diff --git a/sql/migrations/019_add_central_space_stream.sql b/sql/migrations/019_add_central_space_stream.sql deleted file mode 100644 index f249227..0000000 --- a/sql/migrations/019_add_central_space_stream.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Migration: 019_add_central_space_stream --- Seeds the CENTRAL_SPACE JetStream stream row for central.space.> subjects. --- 7-day retention, 1 GiB max_bytes (clamped by supervisor recompute) -- mirrors CENTRAL_FIRE / CENTRAL_QUAKE. --- Idempotent: uses ON CONFLICT DO NOTHING. - -INSERT INTO config.streams (name, max_age_s, max_bytes) -VALUES ('CENTRAL_SPACE', 604800, 1073741824) -ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapter.py b/src/central/adapter.py index 0322d3e..276a9cf 100644 --- a/src/central/adapter.py +++ b/src/central/adapter.py @@ -34,10 +34,6 @@ class SourceAdapter(ABC): description: str settings_schema: type[BaseModel] requires_api_key: str | None = None - api_key_field: str | None = None - """Names the settings_schema field that holds an api_key alias reference, if any. - The GUI renders this field as a select populated from config.api_keys; - the wizard validates it against staged api_keys state.""" wizard_order: int | None = None default_cadence_s: int diff --git a/src/central/adapter_discovery.py b/src/central/adapter_discovery.py deleted file mode 100644 index e26729f..0000000 --- a/src/central/adapter_discovery.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Adapter discovery utilities.""" - -import importlib -import logging -import pkgutil - -import central.adapters -from central.adapter import SourceAdapter - -logger = logging.getLogger(__name__) - - -def discover_adapters() -> dict[str, type[SourceAdapter]]: - """Auto-discover adapter classes from central.adapters package.""" - registry: dict[str, type[SourceAdapter]] = {} - for module_info in pkgutil.iter_modules(central.adapters.__path__): - try: - module = importlib.import_module(f"central.adapters.{module_info.name}") - except Exception as e: - logger.error( - "Failed to import adapter module", - extra={"module": module_info.name, "error": str(e)}, - ) - continue - for attr_name in dir(module): - attr = getattr(module, attr_name) - if ( - isinstance(attr, type) - and issubclass(attr, SourceAdapter) - and attr is not SourceAdapter - and hasattr(attr, "name") - ): - registry[attr.name] = attr - return registry diff --git a/src/central/adapters/firms.py b/src/central/adapters/firms.py index c9b4efb..7538d96 100644 --- a/src/central/adapters/firms.py +++ b/src/central/adapters/firms.py @@ -7,7 +7,7 @@ from collections.abc import AsyncIterator from datetime import datetime, timezone from io import StringIO from pathlib import Path -from typing import Any, Literal +from typing import Any import aiohttp from tenacity import ( @@ -54,7 +54,7 @@ SEVERITY_MAP = { class FIRMSSettings(BaseModel): """Settings schema for FIRMS adapter.""" api_key_alias: str = "firms" - satellites: list[Literal["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT", "VIIRS_NOAA21_NRT"]] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] + satellites: list[str] = ["VIIRS_SNPP_NRT", "VIIRS_NOAA20_NRT"] region: RegionConfig | None = None @@ -66,7 +66,6 @@ class FIRMSAdapter(SourceAdapter): description = "Near-real-time satellite-detected fire hotspots from NASA FIRMS." settings_schema = FIRMSSettings requires_api_key = "firms" - api_key_field = "api_key_alias" wizard_order = 2 default_cadence_s = 300 diff --git a/src/central/adapters/inciweb.py b/src/central/adapters/inciweb.py deleted file mode 100644 index 2ae0634..0000000 --- a/src/central/adapters/inciweb.py +++ /dev/null @@ -1,477 +0,0 @@ -"""InciWeb adapter for wildfire narrative updates.""" - -import html -import logging -import re -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from email.utils import parsedate_to_datetime -from pathlib import Path -from typing import Any -from xml.etree import ElementTree as ET - -import aiohttp -from pydantic import BaseModel -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.config_models import AdapterConfig, RegionConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - -# InciWeb RSS feed URL -INCIWEB_RSS_URL = "https://inciweb.wildfire.gov/incidents/rss.xml" - -# State name to 2-letter code mapping -STATE_NAME_TO_CODE = { - "alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR", - "california": "CA", "colorado": "CO", "connecticut": "CT", "delaware": "DE", - "florida": "FL", "georgia": "GA", "hawaii": "HI", "idaho": "ID", - "illinois": "IL", "indiana": "IN", "iowa": "IA", "kansas": "KS", - "kentucky": "KY", "louisiana": "LA", "maine": "ME", "maryland": "MD", - "massachusetts": "MA", "michigan": "MI", "minnesota": "MN", "mississippi": "MS", - "missouri": "MO", "montana": "MT", "nebraska": "NE", "nevada": "NV", - "new hampshire": "NH", "new jersey": "NJ", "new mexico": "NM", "new york": "NY", - "north carolina": "NC", "north dakota": "ND", "ohio": "OH", "oklahoma": "OK", - "oregon": "OR", "pennsylvania": "PA", "rhode island": "RI", "south carolina": "SC", - "south dakota": "SD", "tennessee": "TN", "texas": "TX", "utah": "UT", - "vermont": "VT", "virginia": "VA", "washington": "WA", "west virginia": "WV", - "wisconsin": "WI", "wyoming": "WY", "district of columbia": "DC", - "puerto rico": "PR", "guam": "GU", "virgin islands": "VI", - "american samoa": "AS", "northern mariana islands": "MP", -} - - -def parse_coordinates_from_description(description: str) -> tuple[float, float] | None: - """ - Parse latitude/longitude from InciWeb description text. - - Format: "Latitude: 47° 3 17 Longitude: 91° 38 6" - InciWeb uses unsigned values for US coordinates (west longitude implied). - Returns (lon, lat) tuple or None if not found. - """ - # Pattern for degree/minute/second format - lat_pattern = r"Latitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)" - lon_pattern = r"Longitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)" - - lat_match = re.search(lat_pattern, description) - lon_match = re.search(lon_pattern, description) - - if not lat_match or not lon_match: - return None - - try: - lat_deg = int(lat_match.group(1)) - lat_min = int(lat_match.group(2)) - lat_sec = float(lat_match.group(3)) - - lon_deg = int(lon_match.group(1)) - lon_min = int(lon_match.group(2)) - lon_sec = float(lon_match.group(3)) - - # Convert to decimal degrees - # Latitude: positive in northern hemisphere - if lat_deg >= 0: - lat = lat_deg + lat_min / 60 + lat_sec / 3600 - else: - lat = lat_deg - lat_min / 60 - lat_sec / 3600 - - # Longitude: InciWeb gives unsigned values for US west longitudes - # Make negative for western hemisphere (US coordinates) - lon = lon_deg + lon_min / 60 + lon_sec / 3600 - if lon > 0: - lon = -lon # US longitudes are west (negative) - - return (lon, lat) - except (ValueError, TypeError): - return None - - -def parse_state_from_description(description: str) -> str | None: - """ - Parse state name from InciWeb description text. - - Format: "State: Minnesota" or "State: New Mexico" - Returns 2-letter state code or None if not found. - - Design note: State is parsed from the description rather than the title - because InciWeb titles use unit code prefixes (e.g., "MNMNS Stewart Trail", - "CACNP Santa Rosa Island Fire") which are not reliable state indicators. - The description has a structured "State: " field that reliably - identifies the state for all incidents. - """ - pattern = r"State:\s*([A-Za-z\s]+?)(?:\n|---|$)" - match = re.search(pattern, description) - - if not match: - return None - - state_name = match.group(1).strip().lower() - return STATE_NAME_TO_CODE.get(state_name) - - -def strip_html(html_text: str) -> str: - """ - Strip HTML tags and decode entities to plain text. - """ - # Decode HTML entities (handles & < > etc.) - text = html.unescape(html_text) - - # Handle   specifically (not a standard Python html entity) - text = text.replace(" ", " ") - text = text.replace("\xa0", " ") # Non-breaking space character - - # Remove HTML tags - text = re.sub(r"<[^>]+>", "", text) - - # Normalize whitespace - text = re.sub(r"\s+", " ", text) - - return text.strip() - - -def point_in_bbox( - lon: float, - lat: float, - west: float, - south: float, - east: float, - north: float, -) -> bool: - """Check if a point is within a bounding box.""" - return west <= lon <= east and south <= lat <= north - - -class InciWebSettings(BaseModel): - """Settings schema for InciWeb adapter.""" - - region: RegionConfig | None = None - - -class InciWebAdapter(SourceAdapter): - """NIFC InciWeb wildfire narrative adapter.""" - - name = "inciweb" - display_name = "NIFC InciWeb — Wildfire Narrative" - description = ( - "Narrative wildfire updates from InciWeb. Editorial; lower precision " - "than WFIGS. Use as supplementary context." - ) - settings_schema = InciWebSettings - requires_api_key = None - api_key_field = None - wizard_order = None # Ships disabled - default_cadence_s = 600 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - - # Conditional fetch state - self._last_modified: str | None = None - self._etag: str | None = None - - # Parse region from settings - region_dict = config.settings.get("region") - if region_dict: - self.region: RegionConfig | None = RegionConfig(**region_dict) - else: - self.region = None - - async def startup(self) -> None: - """Initialize HTTP session and SQLite connection.""" - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - - # Create table for dedup tracking - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - self._db.commit() - - logger.info( - "InciWeb adapter started", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - async def shutdown(self) -> None: - """Close HTTP session and SQLite connection.""" - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("InciWeb adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - """Apply new configuration from hot-reload.""" - region_dict = new_config.settings.get("region") - if region_dict: - self.region = RegionConfig(**region_dict) - else: - self.region = None - logger.info( - "InciWeb config updated", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def bump_last_seen(self, event_id: str) -> None: - """Bump the last_seen timestamp for an event.""" - if not self._db: - return - self._db.execute( - "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("InciWeb swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - """Compute NATS subject for an event.""" - state = event.geo.primary_region - if state and state.startswith("US-") and len(state) == 5: - state_code = state[3:].lower() - return f"central.fire.narrative.inciweb.{state_code}" - return "central.fire.narrative.inciweb.unknown" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch_rss(self) -> list[dict[str, Any]]: - """Fetch and parse RSS feed from InciWeb.""" - if not self._session: - raise RuntimeError("Session not initialized") - - # Build request headers with conditional fetch support - headers = {"User-Agent": "Central/0.4"} - if self._last_modified: - headers["If-Modified-Since"] = self._last_modified - if self._etag: - headers["If-None-Match"] = self._etag - - async with self._session.get(INCIWEB_RSS_URL, headers=headers) as resp: - # Handle 304 Not Modified - if resp.status == 304: - logger.info("InciWeb not modified") - return [] - - resp.raise_for_status() - - # Capture conditional fetch headers for next request - self._last_modified = resp.headers.get("Last-Modified") - self._etag = resp.headers.get("ETag") - - content = await resp.text() - - # Parse RSS XML - items = [] - try: - root = ET.fromstring(content) - channel = root.find("channel") - if channel is None: - return [] - - for item_elem in channel.findall("item"): - item: dict[str, Any] = {} - - title = item_elem.find("title") - item["title"] = title.text if title is not None and title.text else "" - - link = item_elem.find("link") - item["link"] = link.text if link is not None and link.text else "" - - description = item_elem.find("description") - item["description"] = description.text if description is not None and description.text else "" - - pub_date = item_elem.find("pubDate") - item["pubDate"] = pub_date.text if pub_date is not None and pub_date.text else "" - - guid = item_elem.find("guid") - item["guid"] = guid.text if guid is not None and guid.text else "" - - # Check for dc:creator - creator = item_elem.find("{http://purl.org/dc/elements/1.1/}creator") - item["creator"] = creator.text if creator is not None and creator.text else "" - - items.append(item) - - except ET.ParseError as e: - logger.error("InciWeb RSS parse error", extra={"error": str(e)}) - raise - - logger.info( - "InciWeb fetch completed", - extra={"item_count": len(items)}, - ) - return items - - async def poll(self) -> AsyncIterator[Event]: - """Poll InciWeb for narrative updates.""" - if not self._db: - raise RuntimeError("Database not initialized") - - # Fetch RSS feed - try: - items = await self._fetch_rss() - except Exception as e: - logger.error("InciWeb fetch failed", extra={"error": str(e)}) - raise - - events_yielded = 0 - - for item in items: - guid = item.get("guid", "") - if not guid: - continue - - # Dedup: skip if already published - if self.is_published(guid): - self.bump_last_seen(guid) - continue - - description_html = item.get("description", "") - - # Parse coordinates from description - centroid = parse_coordinates_from_description(description_html) - - # Post-filter: skip if point outside region bbox - if self.region and centroid: - lon, lat = centroid - if not point_in_bbox( - lon, lat, - self.region.west, self.region.south, - self.region.east, self.region.north, - ): - continue - - # Parse state from description - state_code = parse_state_from_description(description_html) - - # Build regions - if state_code: - regions = [f"US-{state_code}"] - primary_region = f"US-{state_code}" - else: - regions = [] - primary_region = None - - # Parse pubDate (RFC 822 format) - pub_date_str = item.get("pubDate", "") - try: - event_time = parsedate_to_datetime(pub_date_str) - # Ensure UTC - if event_time.tzinfo is None: - event_time = event_time.replace(tzinfo=timezone.utc) - else: - event_time = event_time.astimezone(timezone.utc) - except (ValueError, TypeError): - event_time = datetime.now(timezone.utc) - - # Build geo - geo = Geo( - centroid=centroid, - bbox=(centroid[0], centroid[1], centroid[0], centroid[1]) if centroid else None, - regions=regions, - primary_region=primary_region, - ) - - # Strip HTML from description - description_plain = strip_html(description_html) - - # Build event - event = Event( - id=guid, - adapter=self.name, - category="fire.narrative.inciweb", - time=event_time, - severity=0, # Narrative; not authoritative - geo=geo, - data={ - "title": item.get("title", ""), - "description": description_plain, - "description_html": description_html, - "url": item.get("link", ""), - "guid": guid, - "raw": item, - }, - ) - - yield event - self.mark_published(guid) - events_yielded += 1 - - # Periodic cleanup of old entries - self.sweep_old_ids() - - logger.info( - "InciWeb poll completed", - extra={"events_yielded": events_yielded}, - ) diff --git a/src/central/adapters/nws.py b/src/central/adapters/nws.py index 8205a1f..ce95d3a 100644 --- a/src/central/adapters/nws.py +++ b/src/central/adapters/nws.py @@ -19,7 +19,7 @@ from tenacity import ( from central import __version__ from central.adapter import SourceAdapter -from pydantic import BaseModel, Field +from pydantic import BaseModel from central.config_models import AdapterConfig, RegionConfig from central.config_store import ConfigStore @@ -193,11 +193,7 @@ def _build_regions(same_codes: list[str], ugc_codes: list[str]) -> list[str]: class NWSSettings(BaseModel): """Settings schema for NWS adapter.""" - contact_email: str = Field( - default="", - pattern=r"^$|^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", - description="Contact email for NWS API User-Agent header", - ) + contact_email: str = "" region: RegionConfig | None = None diff --git a/src/central/adapters/swpc_alerts.py b/src/central/adapters/swpc_alerts.py deleted file mode 100644 index 3368824..0000000 --- a/src/central/adapters/swpc_alerts.py +++ /dev/null @@ -1,186 +0,0 @@ -"""NOAA SWPC space weather alerts adapter.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import aiohttp -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.adapters.swpc_common import ( - SWPC_ALERTS_URL, - SWPCSettings, - parse_swpc_timestamp, - severity_from_alert_product_id, -) -from central.config_models import AdapterConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - - -class SWPCAlertsAdapter(SourceAdapter): - """NOAA SWPC space weather alerts adapter.""" - - name = "swpc_alerts" - display_name = "NOAA SWPC — Space Weather Alerts" - description = "Active NOAA SWPC space weather alerts, watches, warnings, and summaries." - settings_schema = SWPCSettings - requires_api_key = None - api_key_field = None - wizard_order = None - default_cadence_s = 300 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - - async def startup(self) -> None: - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - self._db.commit() - logger.info("SWPC alerts adapter started") - - async def shutdown(self) -> None: - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("SWPC alerts adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - logger.info("SWPC alerts config updated") - - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC alerts swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - product_id = event.data.get("product_id") or "unknown" - return f"central.space.alert.{product_id.lower()}" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch(self) -> list[dict[str, Any]]: - if not self._session: - raise RuntimeError("Session not initialized") - async with self._session.get( - SWPC_ALERTS_URL, headers={"User-Agent": "Central/0.4"} - ) as resp: - resp.raise_for_status() - data = await resp.json() - logger.info("SWPC alerts fetch completed", extra={"item_count": len(data)}) - return data - - async def poll(self) -> AsyncIterator[Event]: - if not self._db: - raise RuntimeError("Database not initialized") - - try: - items = await self._fetch() - except Exception as e: - logger.error("SWPC alerts fetch failed", extra={"error": str(e)}) - raise - - events_yielded = 0 - for item in items: - product_id = item.get("product_id") - issue_dt_raw = item.get("issue_datetime") - if not product_id or not issue_dt_raw: - continue - - event_id = f"{product_id}|{issue_dt_raw}" - if self.is_published(event_id): - continue - - issue_dt = parse_swpc_timestamp(issue_dt_raw, "alerts") or datetime.now(timezone.utc) - - event = Event( - id=event_id, - adapter=self.name, - category="space.alert", - time=issue_dt, - severity=severity_from_alert_product_id(product_id), - geo=Geo(), - data={ - "product_id": product_id, - "issue_datetime": issue_dt_raw, - "message": item.get("message", ""), - }, - ) - - yield event - self.mark_published(event_id) - events_yielded += 1 - - self.sweep_old_ids() - logger.info("SWPC alerts poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_common.py b/src/central/adapters/swpc_common.py deleted file mode 100644 index 004f965..0000000 --- a/src/central/adapters/swpc_common.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Shared utilities for NOAA SWPC space weather adapters.""" - -import re -from datetime import datetime, timezone - -from pydantic import BaseModel - -SWPC_ALERTS_URL = "https://services.swpc.noaa.gov/products/alerts.json" -SWPC_KINDEX_URL = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index.json" -SWPC_PROTONS_URL = "https://services.swpc.noaa.gov/json/goes/primary/integral-protons-1-day.json" - - -class SWPCSettings(BaseModel): - """Settings schema for SWPC adapters. No operator-tunable knobs today.""" - - -def parse_swpc_timestamp(raw: str | None, endpoint_kind: str) -> datetime | None: - """Normalize SWPC timestamp strings to UTC datetime. - - endpoint_kind shapes: - alerts -> "2026-05-19 05:14:59.780" (space-separated, no TZ; UTC per message body) - kindex -> "2026-05-12T00:00:00" (ISO without TZ; UTC by convention) - protons -> "2026-05-18T05:35:00Z" (ISO with Z) - """ - if not raw: - return None - if endpoint_kind == "alerts": - try: - dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S.%f") - except ValueError: - dt = datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") - return dt.replace(tzinfo=timezone.utc) - if endpoint_kind == "kindex": - dt = datetime.fromisoformat(raw) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.astimezone(timezone.utc) - if endpoint_kind == "protons": - raw_norm = raw[:-1] + "+00:00" if raw.endswith("Z") else raw - dt = datetime.fromisoformat(raw_norm) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt.astimezone(timezone.utc) - raise ValueError(f"unknown endpoint_kind: {endpoint_kind!r}") - - -def severity_from_kp(kp: float | int | None) -> int: - """Map planetary K-index value (0-9) to severity 0-4 via the G-scale. - - Kp 5 = G1 = severity 1, Kp 6 = G2 = severity 2, Kp 7 = G3 = severity 3, - Kp 8 = G4 = severity 4, Kp 9 = G5 = severity 4 (capped). - """ - if kp is None: - return 0 - if kp < 5: - return 0 - if kp < 6: - return 1 - if kp < 7: - return 2 - if kp < 8: - return 3 - return 4 - - -_ALERT_KP_PATTERN = re.compile(r"^K0([5-9])[AW]$") - - -def severity_from_alert_product_id(product_id: str | None) -> int: - """Best-effort severity for an alert from its product_id G-scale. - - Product IDs of form K0[5-9][AW] identify Kp-based geomagnetic storm - alerts and warnings (K05A=G1, K06A=G2, K07A=G3, K08A=G4, K09A=G5). - All other product IDs return 0. - """ - if not product_id: - return 0 - m = _ALERT_KP_PATTERN.match(product_id.upper()) - if not m: - return 0 - return severity_from_kp(int(m.group(1))) diff --git a/src/central/adapters/swpc_kindex.py b/src/central/adapters/swpc_kindex.py deleted file mode 100644 index f05bcbb..0000000 --- a/src/central/adapters/swpc_kindex.py +++ /dev/null @@ -1,186 +0,0 @@ -"""NOAA SWPC Planetary K-Index adapter.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import aiohttp -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.adapters.swpc_common import ( - SWPC_KINDEX_URL, - SWPCSettings, - parse_swpc_timestamp, - severity_from_kp, -) -from central.config_models import AdapterConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - - -class SWPCKindexAdapter(SourceAdapter): - """NOAA SWPC planetary K-index adapter.""" - - name = "swpc_kindex" - display_name = "NOAA SWPC — Planetary K-Index" - description = "Planetary K-index measurements at 3-hour cadence from NOAA SWPC." - settings_schema = SWPCSettings - requires_api_key = None - api_key_field = None - wizard_order = None - default_cadence_s = 600 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - - async def startup(self) -> None: - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - self._db.commit() - logger.info("SWPC kindex adapter started") - - async def shutdown(self) -> None: - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("SWPC kindex adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - logger.info("SWPC kindex config updated") - - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC kindex swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - return "central.space.kindex" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch(self) -> list[dict[str, Any]]: - if not self._session: - raise RuntimeError("Session not initialized") - async with self._session.get( - SWPC_KINDEX_URL, headers={"User-Agent": "Central/0.4"} - ) as resp: - resp.raise_for_status() - data = await resp.json() - logger.info("SWPC kindex fetch completed", extra={"item_count": len(data)}) - return data - - async def poll(self) -> AsyncIterator[Event]: - if not self._db: - raise RuntimeError("Database not initialized") - - try: - items = await self._fetch() - except Exception as e: - logger.error("SWPC kindex fetch failed", extra={"error": str(e)}) - raise - - events_yielded = 0 - for item in items: - time_tag = item.get("time_tag") - kp = item.get("Kp") - if not time_tag or kp is None: - continue - - event_id = time_tag - if self.is_published(event_id): - continue - - event_time = parse_swpc_timestamp(time_tag, "kindex") or datetime.now(timezone.utc) - - event = Event( - id=event_id, - adapter=self.name, - category="space.kindex", - time=event_time, - severity=severity_from_kp(kp), - geo=Geo(), - data={ - "time_tag": time_tag, - "Kp": kp, - "a_running": item.get("a_running"), - "station_count": item.get("station_count"), - }, - ) - - yield event - self.mark_published(event_id) - events_yielded += 1 - - self.sweep_old_ids() - logger.info("SWPC kindex poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/swpc_protons.py b/src/central/adapters/swpc_protons.py deleted file mode 100644 index 1a3876e..0000000 --- a/src/central/adapters/swpc_protons.py +++ /dev/null @@ -1,185 +0,0 @@ -"""NOAA SWPC GOES integral proton flux adapter.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import aiohttp -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.adapters.swpc_common import ( - SWPC_PROTONS_URL, - SWPCSettings, - parse_swpc_timestamp, -) -from central.config_models import AdapterConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - - -class SWPCProtonsAdapter(SourceAdapter): - """NOAA SWPC GOES integral proton flux adapter.""" - - name = "swpc_protons" - display_name = "NOAA SWPC — GOES Proton Flux" - description = "GOES primary satellite integral proton flux measurements (1-day window) from NOAA SWPC." - settings_schema = SWPCSettings - requires_api_key = None - api_key_field = None - wizard_order = None - default_cadence_s = 600 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - - async def startup(self) -> None: - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - self._db.commit() - logger.info("SWPC protons adapter started") - - async def shutdown(self) -> None: - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("SWPC protons adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - logger.info("SWPC protons config updated") - - def is_published(self, event_id: str) -> bool: - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("SWPC protons swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - return "central.space.proton_flux" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch(self) -> list[dict[str, Any]]: - if not self._session: - raise RuntimeError("Session not initialized") - async with self._session.get( - SWPC_PROTONS_URL, headers={"User-Agent": "Central/0.4"} - ) as resp: - resp.raise_for_status() - data = await resp.json() - logger.info("SWPC protons fetch completed", extra={"item_count": len(data)}) - return data - - async def poll(self) -> AsyncIterator[Event]: - if not self._db: - raise RuntimeError("Database not initialized") - - try: - items = await self._fetch() - except Exception as e: - logger.error("SWPC protons fetch failed", extra={"error": str(e)}) - raise - - events_yielded = 0 - for item in items: - time_tag = item.get("time_tag") - energy = item.get("energy") - if not time_tag or not energy: - continue - - event_id = f"{time_tag}|{energy}" - if self.is_published(event_id): - continue - - event_time = parse_swpc_timestamp(time_tag, "protons") or datetime.now(timezone.utc) - - event = Event( - id=event_id, - adapter=self.name, - category="space.proton_flux", - time=event_time, - severity=0, - geo=Geo(), - data={ - "time_tag": time_tag, - "satellite": item.get("satellite"), - "flux": item.get("flux"), - "energy": energy, - }, - ) - - yield event - self.mark_published(event_id) - events_yielded += 1 - - self.sweep_old_ids() - logger.info("SWPC protons poll completed", extra={"events_yielded": events_yielded}) diff --git a/src/central/adapters/usgs_quake.py b/src/central/adapters/usgs_quake.py index 63009ee..e73148f 100644 --- a/src/central/adapters/usgs_quake.py +++ b/src/central/adapters/usgs_quake.py @@ -5,7 +5,7 @@ import sqlite3 from collections.abc import AsyncIterator from datetime import datetime, timezone from pathlib import Path -from typing import Any, Literal +from typing import Any import aiohttp from shapely.geometry import Point, box as shapely_box @@ -64,7 +64,7 @@ def magnitude_to_severity(mag: float) -> int: class USGSQuakeSettings(BaseModel): """Settings schema for USGS quake adapter.""" - feed: Literal["all_hour", "all_day", "all_week", "all_month"] = "all_hour" + feed: str = "all_hour" region: RegionConfig | None = None diff --git a/src/central/adapters/wfigs_common.py b/src/central/adapters/wfigs_common.py deleted file mode 100644 index cdf2331..0000000 --- a/src/central/adapters/wfigs_common.py +++ /dev/null @@ -1,242 +0,0 @@ -"""Shared utilities for WFIGS (Wildland Fire Interagency Geospatial Services) adapters.""" - -import sqlite3 -from datetime import datetime, timezone -from typing import Any - -# WFIGS FeatureServer endpoints -WFIGS_INCIDENTS_URL = ( - "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" - "WFIGS_Incident_Locations_Current/FeatureServer/0/query" -) -WFIGS_PERIMETERS_URL = ( - "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" - "WFIGS_Interagency_Perimeters_Current/FeatureServer/0/query" -) - -# Fall-off sweep window: 14 days (matches WFIGS's longest fall-off: large fires) -FALLOFF_WINDOW_DAYS = 14 - -# Incident type code mappings (WFIGS uses 2-letter codes) -INCIDENT_TYPE_MAP = { - "WF": "wildfire", - "RX": "prescribed_fire", - "CX": "complex", - "FA": "false_alarm", -} - - -def normalize_state(state: str | None) -> str | None: - """Strip 'US-' prefix from POOState (ISO 3166-2 -> 2-letter).""" - if not state: - return None - if state.startswith("US-") and len(state) == 5: - return state[3:] - if len(state) == 2: - return state - return state # unknown shape, pass through - - -def normalize_incident_type(code: str | None) -> str: - """Map IncidentTypeCategory code to a readable name.""" - if not code: - return "unknown" - upper = code.upper() - if upper in INCIDENT_TYPE_MAP: - return INCIDENT_TYPE_MAP[upper] - return code.lower() - - -def severity_from_acres(acres: float | None) -> int: - """Map DailyAcres to severity level 0-4.""" - if acres is None or acres == 0: - return 0 - if acres < 10: - return 1 - if acres < 100: - return 2 - if acres < 1000: - return 3 - return 4 - - -def parse_wfigs_timestamp(epoch_ms: int | None) -> datetime | None: - """Parse WFIGS epoch milliseconds to UTC datetime.""" - if epoch_ms is None: - return None - return datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc) - - -def build_regions(state: str | None, county: str | None) -> tuple[list[str], str | None]: - """ - Build geo.regions list and primary_region from POOState and POOCounty. - - Expects normalized 2-letter state codes (e.g., "MT" not "US-MT"). - Returns (regions, primary_region). - """ - if not state: - return [], None - - state_upper = state.upper() - if county: - # Normalize county: remove spaces, uppercase - county_normalized = county.replace(" ", "_").upper() - region = f"US-{state_upper}-{county_normalized}" - return [region], region - else: - region = f"US-{state_upper}" - return [region], region - - -def subject_suffix(state: str | None, county: str | None) -> str: - """ - Build subject suffix from state and county. - - Expects normalized 2-letter state codes. - Returns lowercase state.county (county with spaces→underscores). - Falls back to "unknown" if state is not available. - """ - if not state: - return "unknown" - - state_lower = state.lower() - if county: - county_lower = county.lower().replace(" ", "_") - return f"{state_lower}.{county_lower}" - return state_lower - - -def init_observed_table(db: sqlite3.Connection) -> None: - """Create the wfigs_observed table if it doesn't exist.""" - db.execute(""" - CREATE TABLE IF NOT EXISTS wfigs_observed ( - layer TEXT NOT NULL, - irwin_id TEXT NOT NULL, - last_observed_at TEXT NOT NULL, - state TEXT, - county TEXT, - PRIMARY KEY (layer, irwin_id) - ) - """) - db.commit() - - -def get_observed_guids(db: sqlite3.Connection, layer: str) -> dict[str, tuple[str, str | None, str | None]]: - """ - Get all observed IRWIN GUIDs for a layer. - - Returns dict mapping irwin_id -> (last_observed_at, state, county). - """ - cursor = db.execute( - "SELECT irwin_id, last_observed_at, state, county FROM wfigs_observed WHERE layer = ?", - (layer,), - ) - return {row[0]: (row[1], row[2], row[3]) for row in cursor.fetchall()} - - -def update_observed( - db: sqlite3.Connection, - layer: str, - current_guids: dict[str, tuple[str | None, str | None]], -) -> None: - """ - Update the observed table with current poll's GUIDs. - - current_guids: dict mapping irwin_id -> (state, county) - """ - now_iso = datetime.now(timezone.utc).isoformat() - - # Use INSERT OR REPLACE to upsert - for irwin_id, (state, county) in current_guids.items(): - db.execute( - """ - INSERT OR REPLACE INTO wfigs_observed (layer, irwin_id, last_observed_at, state, county) - VALUES (?, ?, ?, ?, ?) - """, - (layer, irwin_id, now_iso, state, county), - ) - db.commit() - - -def delete_observed(db: sqlite3.Connection, layer: str, irwin_ids: set[str]) -> None: - """Delete fallen-off GUIDs from the observed table.""" - for irwin_id in irwin_ids: - db.execute( - "DELETE FROM wfigs_observed WHERE layer = ? AND irwin_id = ?", - (layer, irwin_id), - ) - db.commit() - - -def cleanup_old_observed(db: sqlite3.Connection, layer: str, days: int = FALLOFF_WINDOW_DAYS) -> None: - """Remove observed entries older than the sweep window.""" - cutoff = datetime.now(timezone.utc).isoformat() - db.execute( - f""" - DELETE FROM wfigs_observed - WHERE layer = ? - AND datetime(last_observed_at) < datetime(?, '-{days} days') - """, - (layer, cutoff), - ) - db.commit() - - -def point_in_bbox( - lon: float, - lat: float, - west: float, - south: float, - east: float, - north: float, -) -> bool: - """Check if a point is within a bounding box.""" - return west <= lon <= east and south <= lat <= north - - -def polygon_intersects_bbox( - geometry: dict[str, Any], - west: float, - south: float, - east: float, - north: float, -) -> bool: - """ - Check if a GeoJSON geometry intersects a bounding box. - - Uses shapely for accurate polygon intersection. - """ - try: - from shapely.geometry import box, shape - - bbox_polygon = box(west, south, east, north) - geom = shape(geometry) - return bbox_polygon.intersects(geom) - except Exception: - # If shapely fails, fall back to centroid check - if geometry.get("type") == "Point": - coords = geometry.get("coordinates", []) - if len(coords) >= 2: - return point_in_bbox(coords[0], coords[1], west, south, east, north) - return True # Include if we can't determine - - -def extract_centroid(geometry: dict[str, Any]) -> tuple[float, float] | None: - """Extract centroid from GeoJSON geometry.""" - if not geometry: - return None - - geom_type = geometry.get("type") - coords = geometry.get("coordinates") - - if geom_type == "Point" and coords and len(coords) >= 2: - return (coords[0], coords[1]) - - # For polygons, use shapely to compute centroid - try: - from shapely.geometry import shape - geom = shape(geometry) - centroid = geom.centroid - return (centroid.x, centroid.y) - except Exception: - return None diff --git a/src/central/adapters/wfigs_incidents.py b/src/central/adapters/wfigs_incidents.py deleted file mode 100644 index 660d1de..0000000 --- a/src/central/adapters/wfigs_incidents.py +++ /dev/null @@ -1,383 +0,0 @@ -"""WFIGS Incidents adapter for wildfire incident locations.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import aiohttp -from pydantic import BaseModel -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.adapters.wfigs_common import ( - WFIGS_INCIDENTS_URL, - build_regions, - cleanup_old_observed, - delete_observed, - extract_centroid, - get_observed_guids, - init_observed_table, - normalize_incident_type, - normalize_state, - parse_wfigs_timestamp, - point_in_bbox, - severity_from_acres, - subject_suffix, - update_observed, -) -from central.config_models import AdapterConfig, RegionConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - -LAYER_NAME = "incidents" - - -class WFIGSIncidentsSettings(BaseModel): - """Settings schema for WFIGS Incidents adapter.""" - - region: RegionConfig | None = None - - -class WFIGSIncidentsAdapter(SourceAdapter): - """NIFC WFIGS wildfire incidents adapter.""" - - name = "wfigs_incidents" - display_name = "NIFC WFIGS — Wildfire Incidents" - description = "Active wildfire incident locations from NIFC WFIGS." - settings_schema = WFIGSIncidentsSettings - requires_api_key = None - api_key_field = None - wizard_order = None # Not in setup wizard - default_cadence_s = 300 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - self._last_poll_time: datetime | None = None - - # Parse region from settings - region_dict = config.settings.get("region") - if region_dict: - self.region: RegionConfig | None = RegionConfig(**region_dict) - else: - self.region = None - - async def startup(self) -> None: - """Initialize HTTP session and SQLite connection.""" - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=60), - ) - self._db = sqlite3.connect(self._cursor_db_path) - - # Create tables for dedup and fall-off tracking - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - init_observed_table(self._db) - self._db.commit() - - logger.info( - "WFIGS incidents adapter started", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - async def shutdown(self) -> None: - """Close HTTP session and SQLite connection.""" - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("WFIGS incidents adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - """Apply new configuration from hot-reload.""" - region_dict = new_config.settings.get("region") - if region_dict: - self.region = RegionConfig(**region_dict) - else: - self.region = None - logger.info( - "WFIGS incidents config updated", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def bump_last_seen(self, event_id: str) -> None: - """Bump the last_seen timestamp for an event.""" - if not self._db: - return - self._db.execute( - "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("WFIGS incidents swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - """Compute NATS subject for an event.""" - # Removal events have a different subject pattern - if event.category.startswith("fire.incident.removed"): - state = event.data.get("state", "").lower() or "unknown" - return f"central.fire.incident.removed.{state}" - - # Regular incidents: central.fire.incident.. - # POOState is already normalized (2-letter code) - state = event.data.get("POOState") - county = event.data.get("POOCounty") - suffix = subject_suffix(state, county) - return f"central.fire.incident.{suffix}" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch_features(self) -> list[dict[str, Any]]: - """Fetch features from WFIGS FeatureServer.""" - if not self._session: - raise RuntimeError("Session not initialized") - - # Build query params - params: dict[str, str] = { - "outFields": "*", - "returnGeometry": "true", - "f": "geojson", - } - - # Time filter: only fetch modified since last poll - if self._last_poll_time: - iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") - params["where"] = f"ModifiedOnDateTime > timestamp '{iso_time}'" - else: - params["where"] = "1=1" - - # Bbox filter if region configured - if self.region: - bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" - params["geometry"] = bbox - params["geometryType"] = "esriGeometryEnvelope" - params["spatialRel"] = "esriSpatialRelIntersects" - params["inSR"] = "4326" - - async with self._session.get(WFIGS_INCIDENTS_URL, params=params) as resp: - resp.raise_for_status() - data = await resp.json() - - features = data.get("features", []) - logger.info( - "WFIGS incidents fetch completed", - extra={"feature_count": len(features)}, - ) - return features - - async def poll(self) -> AsyncIterator[Event]: - """Poll WFIGS for incident updates.""" - if not self._db: - raise RuntimeError("Database not initialized") - - # Fetch features from upstream - try: - features = await self._fetch_features() - except Exception as e: - logger.error("WFIGS incidents fetch failed", extra={"error": str(e)}) - raise - - # Get previous poll's observed GUIDs for fall-off detection - observed_before = get_observed_guids(self._db, LAYER_NAME) - - # Process features and track current GUIDs - current_guids: dict[str, tuple[str | None, str | None]] = {} - events_yielded = 0 - - for feature in features: - props = feature.get("properties", {}) - geometry = feature.get("geometry") - - irwin_id = props.get("IrwinID") - if not irwin_id: - continue - - # Extract location - centroid = extract_centroid(geometry) - - # Post-filter: skip if outside region bbox - if self.region and centroid: - lon, lat = centroid - if not point_in_bbox( - lon, lat, - self.region.west, self.region.south, - self.region.east, self.region.north, - ): - continue - - # Normalize at parse boundary - state_raw = props.get("POOState") - state = normalize_state(state_raw) - county = props.get("POOCounty") - incident_type_raw = props.get("IncidentTypeCategory") - incident_type = normalize_incident_type(incident_type_raw) - - # Track this GUID as observed (for fall-off detection) - # Store normalized state for consistency - current_guids[irwin_id] = (state, county) - - # Parse fields - discovery_time = parse_wfigs_timestamp(props.get("FireDiscoveryDateTime")) - daily_acres = props.get("DailyAcres") - - # Build regions (expects normalized 2-letter state code) - regions, primary_region = build_regions(state, county) - - # Build geo - if centroid: - geo = Geo( - centroid=centroid, - bbox=(centroid[0], centroid[1], centroid[0], centroid[1]), - regions=regions, - primary_region=primary_region, - ) - else: - geo = Geo(regions=regions, primary_region=primary_region) - - # Build event with normalized values in data - event = Event( - id=irwin_id, - adapter=self.name, - category=f"fire.incident.{incident_type}", - time=discovery_time or datetime.now(timezone.utc), - severity=severity_from_acres(daily_acres), - geo=geo, - data={ - "IrwinID": irwin_id, - "IncidentName": props.get("IncidentName"), - "IncidentTypeCategory": incident_type, - "IncidentTypeCategory_raw": incident_type_raw, - "DailyAcres": daily_acres, - "PercentContained": props.get("PercentContained"), - "FireDiscoveryDateTime": props.get("FireDiscoveryDateTime"), - "ModifiedOnDateTime": props.get("ModifiedOnDateTime"), - "POOState": state, - "POOState_raw": state_raw, - "POOCounty": county, - "raw": props, - }, - ) - - yield event - events_yielded += 1 - - # Detect fall-offs: GUIDs in previous but not current - fallen_off = set(observed_before.keys()) - set(current_guids.keys()) - - for irwin_id in fallen_off: - last_observed, state, county = observed_before[irwin_id] - now = datetime.now(timezone.utc) - - removal_event = Event( - id=f"{irwin_id}:removed:{now.isoformat()}", - adapter=self.name, - category="fire.incident.removed", - time=now, - severity=0, - geo=Geo(), - data={ - "irwin_id": irwin_id, - "last_observed_at": last_observed, - "state": state, - "county": county, - "reason": "fallen_off_current_service", - }, - ) - - yield removal_event - events_yielded += 1 - logger.info( - "WFIGS incident fall-off detected", - extra={"irwin_id": irwin_id, "state": state}, - ) - - # Update observed table - update_observed(self._db, LAYER_NAME, current_guids) - delete_observed(self._db, LAYER_NAME, fallen_off) - - # Periodic cleanup of old entries - cleanup_old_observed(self._db, LAYER_NAME) - self.sweep_old_ids() - - # Update last poll time - self._last_poll_time = datetime.now(timezone.utc) - - logger.info( - "WFIGS incidents poll completed", - extra={ - "events_yielded": events_yielded, - "current_observed": len(current_guids), - "fallen_off": len(fallen_off), - }, - ) diff --git a/src/central/adapters/wfigs_perimeters.py b/src/central/adapters/wfigs_perimeters.py deleted file mode 100644 index 669d635..0000000 --- a/src/central/adapters/wfigs_perimeters.py +++ /dev/null @@ -1,397 +0,0 @@ -"""WFIGS Perimeters adapter for wildfire perimeter polygons.""" - -import logging -import sqlite3 -from collections.abc import AsyncIterator -from datetime import datetime, timezone -from pathlib import Path -from typing import Any - -import aiohttp -from pydantic import BaseModel -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential_jitter, -) - -from central.adapter import SourceAdapter -from central.adapters.wfigs_common import ( - WFIGS_PERIMETERS_URL, - build_regions, - cleanup_old_observed, - delete_observed, - extract_centroid, - get_observed_guids, - init_observed_table, - normalize_incident_type, - normalize_state, - parse_wfigs_timestamp, - polygon_intersects_bbox, - severity_from_acres, - subject_suffix, - update_observed, -) -from central.config_models import AdapterConfig, RegionConfig -from central.config_store import ConfigStore -from central.models import Event, Geo - -logger = logging.getLogger(__name__) - -LAYER_NAME = "perimeters" - - -class WFIGSPerimetersSettings(BaseModel): - """Settings schema for WFIGS Perimeters adapter.""" - - region: RegionConfig | None = None - - -class WFIGSPerimetersAdapter(SourceAdapter): - """NIFC WFIGS wildfire perimeters adapter.""" - - name = "wfigs_perimeters" - display_name = "NIFC WFIGS — Wildfire Perimeters" - description = "Active wildfire perimeter polygons from NIFC WFIGS." - settings_schema = WFIGSPerimetersSettings - requires_api_key = None - api_key_field = None - wizard_order = None # Not in setup wizard - default_cadence_s = 300 - - def __init__( - self, - config: AdapterConfig, - config_store: ConfigStore, - cursor_db_path: Path, - ) -> None: - self._config_store = config_store - self._cursor_db_path = cursor_db_path - self._session: aiohttp.ClientSession | None = None - self._db: sqlite3.Connection | None = None - self._last_poll_time: datetime | None = None - - # Parse region from settings - region_dict = config.settings.get("region") - if region_dict: - self.region: RegionConfig | None = RegionConfig(**region_dict) - else: - self.region = None - - async def startup(self) -> None: - """Initialize HTTP session and SQLite connection.""" - self._session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=120), # Longer timeout for large polygons - ) - self._db = sqlite3.connect(self._cursor_db_path) - - # Create tables for dedup and fall-off tracking - self._db.execute(""" - CREATE TABLE IF NOT EXISTS published_ids ( - adapter TEXT NOT NULL, - event_id TEXT NOT NULL, - first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (adapter, event_id) - ) - """) - self._db.execute(""" - CREATE INDEX IF NOT EXISTS published_ids_last_seen - ON published_ids (last_seen) - """) - init_observed_table(self._db) - self._db.commit() - - logger.info( - "WFIGS perimeters adapter started", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - async def shutdown(self) -> None: - """Close HTTP session and SQLite connection.""" - if self._session: - await self._session.close() - self._session = None - if self._db: - self._db.close() - self._db = None - logger.info("WFIGS perimeters adapter shut down") - - async def apply_config(self, new_config: AdapterConfig) -> None: - """Apply new configuration from hot-reload.""" - region_dict = new_config.settings.get("region") - if region_dict: - self.region = RegionConfig(**region_dict) - else: - self.region = None - logger.info( - "WFIGS perimeters config updated", - extra={"region": self.region.model_dump() if self.region else None}, - ) - - def is_published(self, event_id: str) -> bool: - """Check if an event has already been published.""" - if not self._db: - return False - cur = self._db.execute( - "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - return cur.fetchone() is not None - - def mark_published(self, event_id: str) -> None: - """Mark an event as published.""" - if not self._db: - return - self._db.execute( - """ - INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) - VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (adapter, event_id) DO UPDATE SET - last_seen = CURRENT_TIMESTAMP - """, - (self.name, event_id), - ) - self._db.commit() - - def bump_last_seen(self, event_id: str) -> None: - """Bump the last_seen timestamp for an event.""" - if not self._db: - return - self._db.execute( - "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", - (self.name, event_id), - ) - self._db.commit() - - def sweep_old_ids(self) -> int: - """Remove published_ids older than 14 days. Returns count deleted.""" - if not self._db: - return 0 - cur = self._db.execute( - "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", - (self.name,), - ) - self._db.commit() - count = cur.rowcount - if count > 0: - logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count}) - return count - - def subject_for(self, event: Event) -> str: - """Compute NATS subject for an event.""" - # Removal events have a different subject pattern - if event.category.startswith("fire.perimeter.removed"): - state = event.data.get("state", "").lower() or "unknown" - return f"central.fire.perimeter.removed.{state}" - - # Regular perimeters: central.fire.perimeter.. - # POOState is already normalized (2-letter code) - state = event.data.get("POOState") - county = event.data.get("POOCounty") - suffix = subject_suffix(state, county) - return f"central.fire.perimeter.{suffix}" - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=1, max=30), - retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), - ) - async def _fetch_features(self) -> list[dict[str, Any]]: - """Fetch features from WFIGS FeatureServer.""" - if not self._session: - raise RuntimeError("Session not initialized") - - # Build query params - params: dict[str, str] = { - "outFields": "*", - "returnGeometry": "true", - "f": "geojson", - } - - # Time filter: only fetch modified since last poll - # Note: perimeters use attr_ModifiedOnDateTime_dt field - if self._last_poll_time: - iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") - params["where"] = f"attr_ModifiedOnDateTime_dt > timestamp '{iso_time}'" - else: - params["where"] = "1=1" - - # Bbox filter if region configured - if self.region: - bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" - params["geometry"] = bbox - params["geometryType"] = "esriGeometryEnvelope" - params["spatialRel"] = "esriSpatialRelIntersects" - params["inSR"] = "4326" - - async with self._session.get(WFIGS_PERIMETERS_URL, params=params) as resp: - resp.raise_for_status() - data = await resp.json() - - features = data.get("features", []) - logger.info( - "WFIGS perimeters fetch completed", - extra={"feature_count": len(features)}, - ) - return features - - async def poll(self) -> AsyncIterator[Event]: - """Poll WFIGS for perimeter updates.""" - if not self._db: - raise RuntimeError("Database not initialized") - - # Fetch features from upstream - try: - features = await self._fetch_features() - except Exception as e: - logger.error("WFIGS perimeters fetch failed", extra={"error": str(e)}) - raise - - # Get previous poll's observed GUIDs for fall-off detection - observed_before = get_observed_guids(self._db, LAYER_NAME) - - # Process features and track current GUIDs - current_guids: dict[str, tuple[str | None, str | None]] = {} - events_yielded = 0 - - for feature in features: - props = feature.get("properties", {}) - geometry = feature.get("geometry") - - # WFIGS Perimeters use prefixed field names (attr_*, poly_*) - irwin_id = props.get("attr_IrwinID") or props.get("poly_IRWINID") - if not irwin_id: - continue - - # Post-filter: skip if geometry doesn't intersect region bbox - if self.region and geometry: - if not polygon_intersects_bbox( - geometry, - self.region.west, self.region.south, - self.region.east, self.region.north, - ): - continue - - # Normalize at parse boundary - state_raw = props.get("attr_POOState") - state = normalize_state(state_raw) - county = props.get("attr_POOCounty") - incident_type_raw = props.get("attr_IncidentTypeCategory") - incident_type = normalize_incident_type(incident_type_raw) - - # Track this GUID as observed (for fall-off detection) - # Store normalized state for consistency - current_guids[irwin_id] = (state, county) - - # Parse fields using prefixed names - discovery_time = parse_wfigs_timestamp(props.get("attr_FireDiscoveryDateTime")) - # Use poly_GISAcres or attr_IncidentSize for acreage - daily_acres = props.get("attr_IncidentSize") or props.get("poly_GISAcres") - - # Build regions (expects normalized 2-letter state code) - regions, primary_region = build_regions(state, county) - - # Extract centroid for geo - centroid = extract_centroid(geometry) - - # Build bbox from geometry if available - bbox = None - if geometry: - try: - from shapely.geometry import shape - geom = shape(geometry) - bounds = geom.bounds # (minx, miny, maxx, maxy) - bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) - except Exception: - if centroid: - bbox = (centroid[0], centroid[1], centroid[0], centroid[1]) - - # Build geo - geo = Geo( - centroid=centroid, - bbox=bbox, - regions=regions, - primary_region=primary_region, - ) - - # Build event with geometry in data - # Use normalized field names in event data for consistency - event = Event( - id=irwin_id, - adapter=self.name, - category=f"fire.perimeter.{incident_type}", - time=discovery_time or datetime.now(timezone.utc), - severity=severity_from_acres(daily_acres), - geo=geo, - data={ - "IrwinID": irwin_id, - "IncidentName": props.get("attr_IncidentName") or props.get("poly_IncidentName"), - "IncidentTypeCategory": incident_type, - "IncidentTypeCategory_raw": incident_type_raw, - "DailyAcres": props.get("attr_IncidentSize"), - "GISAcres": props.get("poly_GISAcres"), - "PercentContained": props.get("attr_PercentContained"), - "FireDiscoveryDateTime": props.get("attr_FireDiscoveryDateTime"), - "ModifiedOnDateTime": props.get("attr_ModifiedOnDateTime_dt"), - "POOState": state, - "POOState_raw": state_raw, - "POOCounty": county, - "geometry": geometry, # Full GeoJSON polygon - "raw": props, - }, - ) - - yield event - events_yielded += 1 - - # Detect fall-offs: GUIDs in previous but not current - fallen_off = set(observed_before.keys()) - set(current_guids.keys()) - - for irwin_id in fallen_off: - last_observed, state, county = observed_before[irwin_id] - now = datetime.now(timezone.utc) - - removal_event = Event( - id=f"{irwin_id}:removed:{now.isoformat()}", - adapter=self.name, - category="fire.perimeter.removed", - time=now, - severity=0, - geo=Geo(), - data={ - "irwin_id": irwin_id, - "last_observed_at": last_observed, - "state": state, - "county": county, - "reason": "fallen_off_current_service", - }, - ) - - yield removal_event - events_yielded += 1 - logger.info( - "WFIGS perimeter fall-off detected", - extra={"irwin_id": irwin_id, "state": state}, - ) - - # Update observed table - update_observed(self._db, LAYER_NAME, current_guids) - delete_observed(self._db, LAYER_NAME, fallen_off) - - # Periodic cleanup of old entries - cleanup_old_observed(self._db, LAYER_NAME) - self.sweep_old_ids() - - # Update last poll time - self._last_poll_time = datetime.now(timezone.utc) - - logger.info( - "WFIGS perimeters poll completed", - extra={ - "events_yielded": events_yielded, - "current_observed": len(current_guids), - "fallen_off": len(fallen_off), - }, - ) diff --git a/src/central/archive.py b/src/central/archive.py index c173805..b64a187 100644 --- a/src/central/archive.py +++ b/src/central/archive.py @@ -25,7 +25,6 @@ STREAMS = [ ("CENTRAL_WX", "central.wx.>"), ("CENTRAL_FIRE", "central.fire.>"), ("CENTRAL_QUAKE", "central.quake.>"), - ("CENTRAL_SPACE", "central.space.>"), ] BATCH_SIZE = 100 diff --git a/src/central/config_models.py b/src/central/config_models.py index 5516bc1..a5ba0d5 100644 --- a/src/central/config_models.py +++ b/src/central/config_models.py @@ -32,7 +32,7 @@ class AdapterConfig(BaseModel): name: str = Field(description="Unique adapter identifier") enabled: bool = Field(default=True, description="Whether adapter is active") - cadence_s: int = Field(ge=10, description="Poll interval in seconds") + cadence_s: int = Field(description="Poll interval in seconds") settings: dict[str, Any] = Field( default_factory=dict, description="Adapter-specific settings" ) diff --git a/src/central/config_store.py b/src/central/config_store.py index 826f899..ac55e27 100644 --- a/src/central/config_store.py +++ b/src/central/config_store.py @@ -241,14 +241,6 @@ class ConfigStore: ) return result == "DELETE 1" - async def set_adapter_last_error(self, name: str, error: str | None) -> None: - """Set or clear the last_error field on an adapter row.""" - async with self._pool.acquire() as conn: - await conn.execute( - "UPDATE config.adapters SET last_error = $1 WHERE name = $2", - error, name, - ) - # ------------------------------------------------------------------------- # Change notifications # ------------------------------------------------------------------------- diff --git a/src/central/gui/__init__.py b/src/central/gui/__init__.py index 79703cb..71a302b 100644 --- a/src/central/gui/__init__.py +++ b/src/central/gui/__init__.py @@ -247,37 +247,18 @@ def _create_app() -> FastAPI: except Exception: pass - # Add field descriptors to adapters - from central.gui.routes import _adapter_classes - from central.gui.form_descriptors import describe_fields - adapter_classes = _adapter_classes() - wizard_adapters = sorted( - [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], - key=lambda nc: nc[1].wizard_order - ) - # Rebuild adapters with fields - enriched_adapters = [] - for name, cls in wizard_adapters: - adapter_data = next((a for a in adapters if a["name"] == name), None) - if adapter_data: - settings_dict = adapter_data.get("settings", {}) - fields = describe_fields(cls.settings_schema, settings_dict) - enriched_adapters.append({ - "name": name, - "display_name": cls.display_name, - "enabled": adapter_data.get("enabled", False), - "cadence_s": adapter_data.get("cadence_s", 300), - "settings": settings_dict, - "fields": fields, - }) + # Import helper functions for valid values + from central.gui.routes import _get_valid_satellites, _get_valid_feeds response = templates.TemplateResponse( request=request, name="setup_adapters.html", context={ "csrf_token": csrf_token, - "adapters": enriched_adapters, + "adapters": adapters, "api_keys": api_keys, + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": error_msg, diff --git a/src/central/gui/form_descriptors.py b/src/central/gui/form_descriptors.py deleted file mode 100644 index ef7588e..0000000 --- a/src/central/gui/form_descriptors.py +++ /dev/null @@ -1,163 +0,0 @@ -"""Form field descriptors for adapter settings. - -If a second nested settings type beyond RegionConfig appears, -refactor this helper to recurse over nested models. -""" - -from dataclasses import dataclass, field -from typing import Any, Literal, Union, get_args, get_origin - -from pydantic import BaseModel -from pydantic.fields import FieldInfo -from pydantic_core import PydanticUndefined - -from central.config_models import RegionConfig - - -@dataclass -class FieldDescriptor: - """Describes a form field for rendering.""" - name: str - label: str - widget: str # "text", "number", "checkbox", "csv", "select", "checkboxes", "region" - current_value: Any - default: Any - description: str - required: bool - options: list[str] | None = None # For select/checkboxes widgets - - -def _is_literal(tp: type) -> bool: - """Check if a type is a Literal type.""" - return get_origin(tp) is Literal - - -def _get_literal_values(tp: type) -> list[str]: - """Extract the literal values from a Literal type.""" - return list(get_args(tp)) - - -def _type_to_widget_and_options(field_name: str, field_type: type) -> tuple[str, list[str] | None]: - """Map a Python type to a widget type and optional options list. - - Returns: - Tuple of (widget_type, options_list_or_none) - """ - # Handle Optional/Union types - origin = get_origin(field_type) - args = get_args(field_type) - - # Check for Optional[X] (Union[X, None]) - if origin is Union or (origin is not None and type(None) in args): - # Get the non-None type - non_none_args = [a for a in args if a is not type(None)] - if non_none_args: - inner_type = non_none_args[0] - # Recursively determine widget for the inner type - return _type_to_widget_and_options(field_name, inner_type) - - # Check for Literal type (single select) - if _is_literal(field_type): - options = _get_literal_values(field_type) - return "select", [str(o) for o in options] - - # Direct type checks - if field_type is str: - return "text", None - if field_type is int: - return "number", None - if field_type is bool: - return "checkbox", None - if field_type is RegionConfig: - return "region", None - - # Check for list types - if origin is list: - inner_type = args[0] if args else None - - # list[Literal[...]] -> checkboxes - if inner_type is not None and _is_literal(inner_type): - options = _get_literal_values(inner_type) - return "checkboxes", [str(o) for o in options] - - # list[str] -> csv - if inner_type is str: - return "csv", None - - raise NotImplementedError( - f"Field '{field_name}' has unsupported list type: list[{inner_type.__name__ if inner_type else '?'}]" - ) - - # Check if it's a BaseModel subclass (nested model other than RegionConfig) - if isinstance(field_type, type) and issubclass(field_type, BaseModel): - raise NotImplementedError( - f"Field '{field_name}' has unsupported nested type: {field_type.__name__}. " - f"If a second nested type beyond RegionConfig is needed, " - f"refactor describe_fields to recurse over nested models." - ) - - raise NotImplementedError( - f"Field '{field_name}' has unsupported type: {field_type}" - ) - - -def _name_to_label(name: str) -> str: - """Convert field name to human-readable label.""" - return name.replace("_", " ").title() - - -def _is_undefined(value: Any) -> bool: - """Check if a value is Pydantic's undefined sentinel.""" - return value is PydanticUndefined - - -def describe_fields(model_cls: type[BaseModel], current: dict) -> list[FieldDescriptor]: - """Generate field descriptors for a Pydantic model. - - Args: - model_cls: The Pydantic model class (e.g., NWSSettings) - current: Current settings values from the database - - Returns: - List of FieldDescriptor objects for rendering the form - """ - descriptors = [] - - for field_name, field_info in model_cls.model_fields.items(): - # Get the field type - field_type = field_info.annotation - - # Determine widget and options - widget, options = _type_to_widget_and_options(field_name, field_type) - - # Get current value, falling back to default - if field_name in current: - current_value = current[field_name] - elif not _is_undefined(field_info.default): - current_value = field_info.default - else: - current_value = None - - # Get default - default = field_info.default if not _is_undefined(field_info.default) else None - - # Get description - description = "" - if field_info.description: - description = field_info.description - - # Determine if required (no default and not Optional) - required = _is_undefined(field_info.default) and field_info.is_required() - - descriptors.append(FieldDescriptor( - name=field_name, - label=_name_to_label(field_name), - widget=widget, - current_value=current_value, - default=default, - description=description, - required=required, - options=options, - )) - - return descriptors diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index b5c66f7..3a415c2 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -9,7 +9,6 @@ from typing import Any logger = logging.getLogger("central.gui.routes") - from fastapi import APIRouter, Depends, Form, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from central.bootstrap_config import get_settings @@ -44,27 +43,12 @@ from central.gui.audit import ( SYSTEM_UPDATE, write_audit, ) -from functools import cache - from central.gui.db import get_pool -from central.gui.form_descriptors import describe_fields, FieldDescriptor -from central.adapter_discovery import discover_adapters -from pydantic import ValidationError - -@cache -def _adapter_classes() -> dict: - """Cached adapter class discovery. - - GUI is a separate process from supervisor; walks pkgutil itself. - Python's import cache makes subsequent calls free. - """ - return discover_adapters() - router = APIRouter() # Streams to display on dashboard -DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_SPACE", "CENTRAL_META"] +DASHBOARD_STREAMS = ["CENTRAL_WX", "CENTRAL_FIRE", "CENTRAL_QUAKE", "CENTRAL_META"] # Email validation regex (simple but effective) ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") @@ -73,6 +57,18 @@ ALIAS_REGEX = re.compile(r"^[a-zA-Z0-9_]+$") EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") +def _get_valid_satellites() -> list[str]: + """Get valid satellite identifiers from firms adapter.""" + from central.adapters.firms import SATELLITE_SHORT + return list(SATELLITE_SHORT.keys()) + + +def _get_valid_feeds() -> set[str]: + """Get valid feed values from usgs_quake adapter.""" + from central.adapters.usgs_quake import VALID_FEEDS + return VALID_FEEDS + + def _get_templates(): """Get templates instance (deferred import to avoid circular).""" from central.gui import templates @@ -635,36 +631,18 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: templates = _get_templates() pool = get_pool() - # Get wizard adapters (filtered by wizard_order) - adapter_classes = _adapter_classes() - wizard_adapters = sorted( - [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], - key=lambda nc: nc[1].wizard_order - ) - # Pre-fill from cookie state or DB defaults if state.adapters: adapters = [] - for name, cls in wizard_adapters: + for name in ["firms", "nws", "usgs_quake"]: if name in state.adapters: a = state.adapters[name] - settings_dict = a["settings"] - else: - settings_dict = {} - fields = describe_fields(cls.settings_schema, settings_dict) - # Swap widget for api_key_field to api_key_select - if cls.api_key_field is not None: - for f in fields: - if f.name == cls.api_key_field: - f.widget = "api_key_select" - adapters.append({ - "name": name, - "display_name": cls.display_name, - "enabled": a["enabled"] if name in state.adapters else False, - "cadence_s": a["cadence_s"] if name in state.adapters else 300, - "settings": settings_dict, - "fields": fields, - }) + adapters.append({ + "name": name, + "enabled": a["enabled"], + "cadence_s": a["cadence_s"], + "settings": a["settings"], + }) else: async with pool.acquire() as conn: rows = await conn.fetch( @@ -674,33 +652,15 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: ORDER BY name """ ) - db_adapters = {row["name"]: row for row in rows} - - adapters = [] - for name, cls in wizard_adapters: - if name in db_adapters: - row = db_adapters[name] - settings_dict = row["settings"] or {} - enabled = row["enabled"] - cadence_s = row["cadence_s"] - else: - settings_dict = {} - enabled = False - cadence_s = 300 - fields = describe_fields(cls.settings_schema, settings_dict) - # Swap widget for api_key_field to api_key_select - if cls.api_key_field is not None: - for f in fields: - if f.name == cls.api_key_field: - f.widget = "api_key_select" - adapters.append({ - "name": name, - "display_name": cls.display_name, - "enabled": enabled, - "cadence_s": cadence_s, - "settings": settings_dict, - "fields": fields, - }) + adapters = [] + for row in rows: + settings_data = row["settings"] or {} + adapters.append({ + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": settings_data, + }) # Get API keys from wizard state (not DB) api_keys = [{"alias": k["alias"]} for k in state.api_keys] @@ -725,6 +685,8 @@ async def setup_adapters_form(request: Request) -> HTMLResponse: "csrf_token": csrf_token, "adapters": adapters, "api_keys": api_keys, + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": None, @@ -777,14 +739,7 @@ async def setup_adapters_submit(request: Request) -> Response: "settings": row["settings"] or {}, } - # Get wizard adapters (filtered by wizard_order) - adapter_classes = _adapter_classes() - wizard_adapters = sorted( - [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], - key=lambda nc: nc[1].wizard_order - ) - - for adapter_name, adapter_cls in wizard_adapters: + for adapter_name in ["firms", "nws", "usgs_quake"]: current = current_adapters.get(adapter_name, {"enabled": False, "cadence_s": 300, "settings": {}}) current_settings = current.get("settings", {}) new_settings = dict(current_settings) @@ -792,108 +747,83 @@ async def setup_adapters_submit(request: Request) -> Response: # Parse enabled enabled = f"{adapter_name}_enabled" in form - # Parse cadence using AdapterConfig field constraint + # Parse cadence cadence_str = form.get(f"{adapter_name}_cadence_s", "") try: cadence_s = int(cadence_str) - from central.config_models import AdapterConfig - min_cadence = AdapterConfig.model_fields["cadence_s"].metadata[0].ge - if cadence_s < min_cadence: - errors[f"{adapter_name}_cadence_s"] = ( - f"Input should be greater than or equal to {min_cadence}" - ) + if cadence_s < 60 or cadence_s > 3600: + errors[f"{adapter_name}_cadence_s"] = "Cadence must be between 60 and 3600 seconds" except ValueError: errors[f"{adapter_name}_cadence_s"] = "Cadence must be a valid integer" cadence_s = current.get("cadence_s", 300) - # Generic field parsing using describe_fields - fields = describe_fields(adapter_cls.settings_schema, current_settings) - for field in fields: - form_key = f"{adapter_name}_{field.name}" - - if field.widget == "text": - value = form.get(form_key, "").strip() - new_settings[field.name] = value if value else current_settings.get(field.name) - - elif field.widget == "api_key_select": - # API key alias field - stored as text, validated post-loop - value = form.get(form_key, "").strip() - new_settings[field.name] = value if value else None - - elif field.widget == "number": - value_str = form.get(form_key, "").strip() - if value_str: - try: - new_settings[field.name] = int(value_str) - except ValueError: - errors[form_key] = f"{field.label} must be a valid number" + # Adapter-specific validation + if adapter_name == "nws": + contact_email = form.get(f"{adapter_name}_contact_email", "").strip() + if enabled: + if not contact_email: + errors[f"{adapter_name}_contact_email"] = "Contact email is required when enabled" + elif not EMAIL_REGEX.match(contact_email): + errors[f"{adapter_name}_contact_email"] = "Invalid email format" else: - new_settings[field.name] = current_settings.get(field.name) + new_settings["contact_email"] = contact_email + else: + new_settings["contact_email"] = contact_email if contact_email else current_settings.get("contact_email") - elif field.widget == "checkbox": - new_settings[field.name] = form_key in form + elif adapter_name == "firms": + api_key_alias = form.get(f"{adapter_name}_api_key_alias", "").strip() + satellites = form.getlist(f"{adapter_name}_satellites") - elif field.widget == "csv": - value = form.get(form_key, "").strip() - if value: - new_settings[field.name] = [v.strip() for v in value.split(",") if v.strip()] + if api_key_alias: + # Validate against wizard state keys + if not any(k["alias"] == api_key_alias for k in state.api_keys): + errors[f"{adapter_name}_api_key_alias"] = f"API key alias does not exist" else: - new_settings[field.name] = [] + new_settings["api_key_alias"] = api_key_alias + else: + new_settings["api_key_alias"] = None - elif field.widget == "select": - value = form.get(form_key, "").strip() - if value and field.options and value not in field.options: - errors[form_key] = f"Invalid {field.label.lower()}" - else: - new_settings[field.name] = value + # Validate satellites + valid_sats = set(_get_valid_satellites()) + invalid_sats = [s for s in satellites if s not in valid_sats] + if invalid_sats: + errors[f"{adapter_name}_satellites"] = f"Invalid satellites: " + ", ".join(invalid_sats) + else: + new_settings["satellites"] = satellites - elif field.widget == "checkboxes": - # Use getlist for checkbox groups - absence means empty list - values = form.getlist(form_key) - if field.options: - invalid = [v for v in values if v not in field.options] - if invalid: - errors[form_key] = f"Invalid values: {', '.join(invalid)}" - else: - new_settings[field.name] = values - else: - new_settings[field.name] = values + elif adapter_name == "usgs_quake": + feed = form.get(f"{adapter_name}_feed", "").strip() + valid_feeds = _get_valid_feeds() + if feed not in valid_feeds: + errors[f"{adapter_name}_feed"] = "Invalid feed" + else: + new_settings["feed"] = feed - elif field.widget == "region": - # Region validation via RegionConfig model - from central.config_models import RegionConfig - region_north_str = form.get(f"{adapter_name}_{field.name}_north", "").strip() - region_south_str = form.get(f"{adapter_name}_{field.name}_south", "").strip() - region_east_str = form.get(f"{adapter_name}_{field.name}_east", "").strip() - region_west_str = form.get(f"{adapter_name}_{field.name}_west", "").strip() + # Region validation (all adapters) + region_north_str = form.get(f"{adapter_name}_region_north", "").strip() + region_south_str = form.get(f"{adapter_name}_region_south", "").strip() + region_east_str = form.get(f"{adapter_name}_region_east", "").strip() + region_west_str = form.get(f"{adapter_name}_region_west", "").strip() - try: - region_model = RegionConfig( - north=float(region_north_str), - south=float(region_south_str), - east=float(region_east_str), - west=float(region_west_str), - ) - new_settings[field.name] = region_model.model_dump() - except (ValueError, ValidationError) as e: - errors[f"{adapter_name}_{field.name}"] = str(e) - - # Run Pydantic validation on assembled settings to catch Literal violations etc. try: - adapter_cls.settings_schema(**new_settings) - except ValidationError as e: - for err in e.errors(): - loc = err["loc"][0] if err["loc"] else "unknown" - errors[f"{adapter_name}_{loc}"] = err["msg"] + region_north = float(region_north_str) + region_south = float(region_south_str) + region_east = float(region_east_str) + region_west = float(region_west_str) - # Generic api_key_field validation against wizard state - if adapter_cls.api_key_field is not None: - field_value = new_settings.get(adapter_cls.api_key_field) - if field_value: - if not any(k["alias"] == field_value for k in state.api_keys): - errors[f"{adapter_name}_{adapter_cls.api_key_field}"] = ( - "API key alias does not exist" - ) + if not (-90 <= region_south < region_north <= 90): + errors[f"{adapter_name}_region"] = "Invalid latitude: south < north, both -90 to 90" + elif not (-180 <= region_west < region_east <= 180): + errors[f"{adapter_name}_region"] = "Invalid longitude: west < east, both -180 to 180" + else: + new_settings["region"] = { + "north": region_north, + "south": region_south, + "east": region_east, + "west": region_west, + } + except ValueError: + errors[f"{adapter_name}_region"] = "Region coordinates must be valid numbers" new_adapters[adapter_name] = { "enabled": enabled, @@ -903,23 +833,12 @@ async def setup_adapters_submit(request: Request) -> Response: # If errors, re-render if errors: - adapters = [] - for name, cls in wizard_adapters: - settings_dict = new_adapters[name]["settings"] - fields = describe_fields(cls.settings_schema, settings_dict) - # Swap widget for api_key_field to api_key_select - if cls.api_key_field is not None: - for f in fields: - if f.name == cls.api_key_field: - f.widget = "api_key_select" - adapters.append({ - "name": name, - "display_name": cls.display_name, - "enabled": new_adapters[name]["enabled"], - "cadence_s": new_adapters[name]["cadence_s"], - "settings": settings_dict, - "fields": fields, - }) + adapters = [ + {"name": name, "enabled": new_adapters[name]["enabled"], + "cadence_s": new_adapters[name]["cadence_s"], + "settings": new_adapters[name]["settings"]} + for name in ["firms", "nws", "usgs_quake"] + ] api_keys = [{"alias": k["alias"]} for k in state.api_keys] if state.system: @@ -937,6 +856,8 @@ async def setup_adapters_submit(request: Request) -> Response: "csrf_token": csrf_token, "adapters": adapters, "api_keys": api_keys, + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, "error": "Please fix the errors below.", @@ -977,20 +898,10 @@ async def setup_finish_form(request: Request) -> HTMLResponse: adapters = [] if state.adapters: - adapter_classes = _adapter_classes() - wizard_adapters = sorted( - [(name, cls) for name, cls in adapter_classes.items() if cls.wizard_order is not None], - key=lambda nc: nc[1].wizard_order - ) - for name, cls in wizard_adapters: + for name in ["firms", "nws", "usgs_quake"]: if name in state.adapters: a = state.adapters[name] - adapters.append({ - "name": name, - "display_name": cls.display_name, - "enabled": a["enabled"], - "cadence_s": a["cadence_s"], - }) + adapters.append({"name": name, "enabled": a["enabled"], "cadence_s": a["cadence_s"]}) csrf_token, signed_token = reuse_or_generate_pre_auth_csrf(request, settings.csrf_secret) response = templates.TemplateResponse( @@ -1318,45 +1229,27 @@ async def adapters_list( templates = _get_templates() pool = get_pool() operator = request.state.operator - adapter_classes = _adapter_classes() async with pool.acquire() as conn: rows = await conn.fetch( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error + SELECT name, enabled, cadence_s, settings, paused_at, updated_at FROM config.adapters ORDER BY name """ ) - adapters = [] - for row in rows: - settings = row["settings"] or {} - adapter_cls = adapter_classes.get(row["name"]) - - # Check if required API key is missing - api_key_missing = False - requires_api_key_alias = None - if adapter_cls and adapter_cls.requires_api_key is not None: - requires_api_key_alias = adapter_cls.requires_api_key - has_key = await conn.fetchval( - "SELECT 1 FROM config.api_keys WHERE alias = $1", - requires_api_key_alias, - ) - api_key_missing = not has_key - - adapters.append({ - "name": row["name"], - "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - "last_error": row["last_error"], - "api_key_missing": api_key_missing, - "requires_api_key_alias": requires_api_key_alias, - }) + adapters = [] + for row in rows: + settings = row["settings"] or {} + adapters.append({ + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + }) csrf_token = request.state.csrf_token response = templates.TemplateResponse( @@ -1382,14 +1275,10 @@ async def adapters_edit_form( pool = get_pool() operator = request.state.operator - # Look up the adapter class - adapter_classes = _adapter_classes() - adapter_cls = adapter_classes.get(name) - async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error + SELECT name, enabled, cadence_s, settings, paused_at, updated_at FROM config.adapters WHERE name = $1 """, @@ -1399,6 +1288,11 @@ async def adapters_edit_form( if row is None: return Response(status_code=404, content="Adapter not found") + # Get API keys for firms dropdown + api_keys = await conn.fetch( + "SELECT alias FROM config.api_keys ORDER BY alias" + ) + # Get map tile settings from config.system sys_row = await conn.fetchrow( "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" @@ -1407,48 +1301,15 @@ async def adapters_edit_form( tile_attribution = sys_row["map_attribution"] if sys_row else "© OpenStreetMap contributors" settings = row["settings"] or {} - - # Build adapter dict with class metadata adapter = { "name": row["name"], - "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], - "description": getattr(adapter_cls, "description", "") if adapter_cls else "", "enabled": row["enabled"], "cadence_s": row["cadence_s"], "settings": settings, "paused_at": row["paused_at"], "updated_at": row["updated_at"], - "last_error": row["last_error"], } - # Generate field descriptors if we have the adapter class - fields = [] - if adapter_cls and hasattr(adapter_cls, "settings_schema"): - fields = describe_fields(adapter_cls.settings_schema, settings) - # Swap widget for api_key_field to api_key_select - if adapter_cls.api_key_field is not None: - for f in fields: - if f.name == adapter_cls.api_key_field: - f.widget = "api_key_select" - - # Fetch API keys for api_key_select widget - api_keys = [] - async with pool.acquire() as conn: - api_key_rows = await conn.fetch("SELECT alias FROM config.api_keys ORDER BY alias") - api_keys = [{"alias": r["alias"]} for r in api_key_rows] - - # Check if required API key is missing - api_key_missing = False - requires_api_key_alias = None - if adapter_cls and adapter_cls.requires_api_key is not None: - requires_api_key_alias = adapter_cls.requires_api_key - async with pool.acquire() as conn: - has_key = await conn.fetchval( - "SELECT 1 FROM config.api_keys WHERE alias = $1", - requires_api_key_alias, - ) - api_key_missing = not has_key - csrf_token = request.state.csrf_token response = templates.TemplateResponse( request=request, @@ -1457,14 +1318,13 @@ async def adapters_edit_form( "operator": operator, "csrf_token": csrf_token, "adapter": adapter, - "fields": fields, - "api_keys": api_keys, "errors": None, "form_data": None, + "api_keys": [{"alias": k["alias"]} for k in api_keys], + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, - "api_key_missing": api_key_missing, - "requires_api_key_alias": requires_api_key_alias, }, ) return response @@ -1487,27 +1347,24 @@ async def adapters_edit_submit( if not form_csrf or form_csrf != request.state.csrf_token: raise CsrfValidationError("Invalid CSRF token") - # Look up the adapter class - adapter_classes = _adapter_classes() - adapter_cls = adapter_classes.get(name) - - # Parse common form fields + # Parse form data + form = await request.form() enabled = "enabled" in form cadence_s_str = form.get("cadence_s", "") - errors: dict[str, str] = {} + # Build form_data for re-render on error form_data: dict[str, Any] = { "enabled": enabled, "cadence_s": cadence_s_str, } - # Validate cadence_s using AdapterConfig field constraint (ge=10) + errors: dict[str, str] = {} + + # Validate cadence_s try: cadence_s = int(cadence_s_str) - from central.config_models import AdapterConfig - min_cadence = AdapterConfig.model_fields["cadence_s"].metadata[0].ge - if cadence_s < min_cadence: - errors["cadence_s"] = f"Input should be greater than or equal to {min_cadence}" + if cadence_s < 60 or cadence_s > 3600: + errors["cadence_s"] = "Cadence must be between 60 and 3600 seconds" except ValueError: errors["cadence_s"] = "Cadence must be a valid integer" cadence_s = 0 @@ -1516,7 +1373,7 @@ async def adapters_edit_submit( # Get current adapter state row = await conn.fetchrow( """ - SELECT name, enabled, cadence_s, settings, paused_at, updated_at, last_error + SELECT name, enabled, cadence_s, settings, paused_at, updated_at FROM config.adapters WHERE name = $1 """, @@ -1527,113 +1384,103 @@ async def adapters_edit_submit( return Response(status_code=404, content="Adapter not found") current_settings = row["settings"] or {} + new_settings = dict(current_settings) - # Parse and validate settings via Pydantic if we have the adapter class - new_settings = {} - if adapter_cls and hasattr(adapter_cls, "settings_schema"): - schema = adapter_cls.settings_schema - fields = describe_fields(schema, current_settings) - - # Parse form values based on widget type - parsed_values = {} - for field in fields: - raw = form.get(field.name, "") - form_data[field.name] = raw - - if field.widget == "text": - parsed_values[field.name] = raw.strip() if raw else None - elif field.widget == "number": - try: - parsed_values[field.name] = int(raw) if raw else None - except ValueError: - errors[field.name] = f"{field.label} must be a number" - elif field.widget == "checkbox": - parsed_values[field.name] = field.name in form - elif field.widget == "csv": - if raw.strip(): - parsed_values[field.name] = [v.strip() for v in raw.split(",") if v.strip()] - else: - parsed_values[field.name] = [] - elif field.widget == "select": - value = raw.strip() if raw else None - if value and field.options and value not in field.options: - errors[field.name] = f"Invalid {field.label.lower()}" - else: - parsed_values[field.name] = value - elif field.widget == "checkboxes": - # Use getlist for checkbox groups - values = form.getlist(field.name) - form_data[field.name] = values # Override raw value - if field.options: - invalid = [v for v in values if v not in field.options] - if invalid: - errors[field.name] = f"Invalid values: {', '.join(invalid)}" - else: - parsed_values[field.name] = values - else: - parsed_values[field.name] = values - elif field.widget == "api_key_select": - # API key select - validate against existing keys - value = raw.strip() if raw else None - parsed_values[field.name] = value - elif field.widget == "region": - # Region handled separately below - pass - - # Handle region fields (common pattern) - region_north_str = form.get("region_north", "").strip() - region_south_str = form.get("region_south", "").strip() - region_east_str = form.get("region_east", "").strip() - region_west_str = form.get("region_west", "").strip() - - form_data["region_north"] = region_north_str - form_data["region_south"] = region_south_str - form_data["region_east"] = region_east_str - form_data["region_west"] = region_west_str - - # Check if any region field has a value - has_region = any([region_north_str, region_south_str, region_east_str, region_west_str]) - - if has_region: - try: - region_north = float(region_north_str) - region_south = float(region_south_str) - region_east = float(region_east_str) - region_west = float(region_west_str) - - if not (-90 <= region_south < region_north <= 90): - errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90" - elif not (-180 <= region_west < region_east <= 180): - errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180" - else: - parsed_values["region"] = { - "north": region_north, - "south": region_south, - "east": region_east, - "west": region_west, - } - except ValueError: - errors["region"] = "Region coordinates must be valid numbers" + # Adapter-specific validation and settings update + if name == "nws": + contact_email = form.get("contact_email", "").strip() + form_data["contact_email"] = contact_email + if not contact_email: + errors["contact_email"] = "Contact email is required" + elif not EMAIL_REGEX.match(contact_email): + errors["contact_email"] = "Invalid email format" else: - parsed_values["region"] = None + new_settings["contact_email"] = contact_email - # Only validate with Pydantic if no parse errors - if not errors: - try: - # Filter out None values for optional fields without defaults - validated_data = {k: v for k, v in parsed_values.items() if v is not None} - validated = schema(**validated_data) - new_settings = validated.model_dump(mode="json") - except ValidationError as e: - for err in e.errors(): - field_name = err["loc"][0] if err["loc"] else "unknown" - errors[str(field_name)] = err["msg"] - else: - # No schema - just preserve existing settings - new_settings = dict(current_settings) + elif name == "firms": + api_key_alias = form.get("api_key_alias", "").strip() + satellites = form.getlist("satellites") + form_data["api_key_alias"] = api_key_alias + form_data["satellites"] = satellites + + # Validate api_key_alias if set + if api_key_alias: + key_exists = await conn.fetchrow( + "SELECT 1 FROM config.api_keys WHERE alias = $1", + api_key_alias, + ) + if not key_exists: + errors["api_key_alias"] = f"API key alias '{api_key_alias}' does not exist" + else: + new_settings["api_key_alias"] = api_key_alias + else: + new_settings["api_key_alias"] = None + + # Validate satellites + valid_sats = set(_get_valid_satellites()) + invalid_sats = [s for s in satellites if s not in valid_sats] + if invalid_sats: + errors["satellites"] = f"Invalid satellites: {', '.join(invalid_sats)}" + else: + new_settings["satellites"] = satellites + + elif name == "usgs_quake": + feed = form.get("feed", "").strip() + form_data["feed"] = feed + valid_feeds = _get_valid_feeds() + if feed not in valid_feeds: + errors["feed"] = f"Invalid feed. Must be one of: {', '.join(sorted(valid_feeds))}" + else: + new_settings["feed"] = feed + + # Region validation (applies to all adapters) + region_north_str = form.get("region_north", "").strip() + region_south_str = form.get("region_south", "").strip() + region_east_str = form.get("region_east", "").strip() + region_west_str = form.get("region_west", "").strip() + + form_data["region_north"] = region_north_str + form_data["region_south"] = region_south_str + form_data["region_east"] = region_east_str + form_data["region_west"] = region_west_str + + try: + region_north = float(region_north_str) + region_south = float(region_south_str) + region_east = float(region_east_str) + region_west = float(region_west_str) + + # Validate latitude bounds + if not (-90 <= region_south < region_north <= 90): + errors["region"] = "Invalid latitude: south must be less than north, both between -90 and 90" + # Validate longitude bounds + elif not (-180 <= region_west < region_east <= 180): + errors["region"] = "Invalid longitude: west must be less than east, both between -180 and 180" + else: + new_settings["region"] = { + "north": region_north, + "south": region_south, + "east": region_east, + "west": region_west, + } + except ValueError: + errors["region"] = "Region coordinates must be valid numbers" # If there are errors, re-render the form if errors: + adapter = { + "name": row["name"], + "enabled": row["enabled"], + "cadence_s": row["cadence_s"], + "settings": current_settings, + "paused_at": row["paused_at"], + "updated_at": row["updated_at"], + } + + api_keys = await conn.fetch( + "SELECT alias FROM config.api_keys ORDER BY alias" + ) + # Get map tile settings for re-render sys_row = await conn.fetchrow( "SELECT map_tile_url, map_attribution FROM config.system WHERE id = true" @@ -1641,42 +1488,6 @@ async def adapters_edit_submit( tile_url = sys_row["map_tile_url"] if sys_row else "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png" tile_attribution = sys_row["map_attribution"] if sys_row else "© OpenStreetMap contributors" - adapter = { - "name": row["name"], - "display_name": getattr(adapter_cls, "display_name", row["name"]) if adapter_cls else row["name"], - "description": getattr(adapter_cls, "description", "") if adapter_cls else "", - "enabled": row["enabled"], - "cadence_s": row["cadence_s"], - "settings": current_settings, - "paused_at": row["paused_at"], - "updated_at": row["updated_at"], - "last_error": row["last_error"], - } - - fields = [] - if adapter_cls and hasattr(adapter_cls, "settings_schema"): - fields = describe_fields(adapter_cls.settings_schema, current_settings) - # Swap widget for api_key_field to api_key_select - if adapter_cls.api_key_field is not None: - for f in fields: - if f.name == adapter_cls.api_key_field: - f.widget = "api_key_select" - - # Fetch API keys for api_key_select widget - api_key_rows = await conn.fetch("SELECT alias FROM config.api_keys ORDER BY alias") - api_keys = [{"alias": r["alias"]} for r in api_key_rows] - - # Check if required API key is missing - api_key_missing = False - requires_api_key_alias = None - if adapter_cls and adapter_cls.requires_api_key is not None: - requires_api_key_alias = adapter_cls.requires_api_key - has_key = await conn.fetchval( - "SELECT 1 FROM config.api_keys WHERE alias = $1", - requires_api_key_alias, - ) - api_key_missing = not has_key - csrf_token = request.state.csrf_token response = templates.TemplateResponse( request=request, @@ -1685,14 +1496,13 @@ async def adapters_edit_submit( "operator": operator, "csrf_token": csrf_token, "adapter": adapter, - "fields": fields, - "api_keys": api_keys, "errors": errors, "form_data": form_data, + "api_keys": [{"alias": k["alias"]} for k in api_keys], + "valid_satellites": _get_valid_satellites(), + "valid_feeds": sorted(_get_valid_feeds()), "tile_url": tile_url, "tile_attribution": tile_attribution, - "api_key_missing": api_key_missing, - "requires_api_key_alias": requires_api_key_alias, }, status_code=200, ) diff --git a/src/central/gui/templates/adapters_edit.html b/src/central/gui/templates/adapters_edit.html index 1b5a4b7..939aa75 100644 --- a/src/central/gui/templates/adapters_edit.html +++ b/src/central/gui/templates/adapters_edit.html @@ -1,6 +1,6 @@ {% extends "base.html" %} -{% block title %}Central — Edit {{ adapter.display_name }}{% endblock %} +{% block title %}Central — Edit {{ adapter.name }}{% endblock %} {% block head %} @@ -10,173 +10,35 @@ {% endblock %} {% block content %} -

{{ adapter.display_name }}

-

{{ adapter.description }}

- -{% if adapter.paused_at %} -
- ⏸️ Paused since {{ adapter.paused_at }} -
-{% endif %} - -{% if adapter.last_error %} -
- Last Error: {{ adapter.last_error }} -
-{% endif %} - -{% if api_key_missing %} -
- ⚠️ API Key Required: This adapter requires the {{ requires_api_key_alias }} API key to be configured before it can be enabled. - Configure API Keys -
-{% endif %} +

Edit Adapter: {{ adapter.name }}

- Core Settings + Universal Settings - + {% if errors and errors.cadence_s %} {{ errors.cadence_s }} {% endif %}
- {% if fields %}
- Adapter Settings - - {% for field in fields %} - {% if field.widget == "region" %} - {# Region is rendered in a separate fieldset below #} - {% elif field.widget == "text" %} - - - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "number" %} - - - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "checkbox" %} - - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "csv" %} - - - Comma-separated values{% if field.description %} — {{ field.description }}{% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "select" %} - - - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "checkboxes" %} - - {% set current_values = form_data.getlist(field.name) if form_data and form_data.getlist else (field.current_value or []) %} - {% for opt in field.options %} - - {% endfor %} - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - - {% elif field.widget == "api_key_select" %} - - - {% if field.description %} - {{ field.description }} - {% endif %} - {% if errors and errors[field.name] %} - {{ errors[field.name] }} - {% endif %} - {% endif %} - {% endfor %} + Adapter-Specific Settings + {% include "adapters_edit_" + adapter.name + ".html" %}
- {% endif %} - {% set has_region = namespace(value=false) %} - {% for field in fields %} - {% if field.widget == "region" %} - {% set has_region.value = true %} - {% endif %} - {% endfor %} - - {% if has_region.value %}
Region {% include "_region_picker.html" %}
- {% endif %} Cancel diff --git a/src/central/gui/templates/adapters_edit_firms.html b/src/central/gui/templates/adapters_edit_firms.html new file mode 100644 index 0000000..a2a339a --- /dev/null +++ b/src/central/gui/templates/adapters_edit_firms.html @@ -0,0 +1,21 @@ + + +{% if errors and errors.api_key_alias %} +{{ errors.api_key_alias }} +{% endif %} + + +{% for sat in valid_satellites %} + +{% endfor %} +{% if errors and errors.satellites %} +{{ errors.satellites }} +{% endif %} diff --git a/src/central/gui/templates/adapters_edit_nws.html b/src/central/gui/templates/adapters_edit_nws.html new file mode 100644 index 0000000..e655a41 --- /dev/null +++ b/src/central/gui/templates/adapters_edit_nws.html @@ -0,0 +1,5 @@ + + +{% if errors and errors.contact_email %} +{{ errors.contact_email }} +{% endif %} diff --git a/src/central/gui/templates/adapters_edit_usgs_quake.html b/src/central/gui/templates/adapters_edit_usgs_quake.html new file mode 100644 index 0000000..0c3b7ee --- /dev/null +++ b/src/central/gui/templates/adapters_edit_usgs_quake.html @@ -0,0 +1,9 @@ + + +{% if errors and errors.feed %} +{{ errors.feed }} +{% endif %} diff --git a/src/central/gui/templates/adapters_list.html b/src/central/gui/templates/adapters_list.html index f3a8e04..b97ae88 100644 --- a/src/central/gui/templates/adapters_list.html +++ b/src/central/gui/templates/adapters_list.html @@ -17,12 +17,7 @@ {% for adapter in adapters %} - - {{ adapter.display_name or adapter.name }} - {% if adapter.api_key_missing %} - ⚠️ API Key Missing - {% endif %} - + {{ adapter.name }} {% if adapter.enabled %}Yes{% else %}No{% endif %} {{ adapter.cadence_s }}s {{ adapter.updated_at.strftime('%Y-%m-%d %H:%M') if adapter.updated_at else '—' }} diff --git a/src/central/gui/templates/setup_adapters.html b/src/central/gui/templates/setup_adapters.html index e0cc977..de3f8c2 100644 --- a/src/central/gui/templates/setup_adapters.html +++ b/src/central/gui/templates/setup_adapters.html @@ -29,7 +29,7 @@ {% for adapter in adapters %}
- {{ adapter.display_name or adapter.name }} + {{ adapter.name }}
{% endfor %} @@ -209,12 +151,11 @@