From e0ffe686ec1bddaf361af4638c48e87d8fffa4c5 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Tue, 19 May 2026 02:47:26 +0000 Subject: [PATCH] feat(2-B): add NIFC WFIGS adapters for incidents and perimeters Two new adapters for wildfire data from NIFC WFIGS: - wfigs_incidents: Active fire incident locations - wfigs_perimeters: Active fire perimeter polygons Features: - IRWIN GUID dedup via is_published/mark_published - Fall-off detection with removal events when fires exit current - Bbox post-filtering with shapely polygon intersection - Severity mapping from DailyAcres (0-4 scale) - Subject hierarchy: central.fire... Ships disabled by default; operators enable via GUI. Co-Authored-By: Claude Opus 4.5 --- sql/migrations/016_add_wfigs_adapters.sql | 37 ++ src/central/adapters/wfigs_common.py | 211 ++++++++++ src/central/adapters/wfigs_incidents.py | 373 ++++++++++++++++++ src/central/adapters/wfigs_perimeters.py | 387 +++++++++++++++++++ tests/test_wfigs.py | 444 ++++++++++++++++++++++ 5 files changed, 1452 insertions(+) create mode 100644 sql/migrations/016_add_wfigs_adapters.sql create mode 100644 src/central/adapters/wfigs_common.py create mode 100644 src/central/adapters/wfigs_incidents.py create mode 100644 src/central/adapters/wfigs_perimeters.py create mode 100644 tests/test_wfigs.py diff --git a/sql/migrations/016_add_wfigs_adapters.sql b/sql/migrations/016_add_wfigs_adapters.sql new file mode 100644 index 0000000..25d45f6 --- /dev/null +++ b/sql/migrations/016_add_wfigs_adapters.sql @@ -0,0 +1,37 @@ +-- Migration: 016_add_wfigs_adapters +-- Add WFIGS incident and perimeter adapters to config.adapters +-- Idempotent: uses ON CONFLICT DO NOTHING + +-- WFIGS Incidents adapter +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'wfigs_incidents', + false, -- Ships disabled; operator enables via GUI + 300, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.0, + 'south', 31.0, + 'east', -102.0, + 'west', -124.0 + ) + ) +) +ON CONFLICT (name) DO NOTHING; + +-- WFIGS Perimeters adapter +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'wfigs_perimeters', + false, -- Ships disabled; operator enables via GUI + 300, + jsonb_build_object( + 'region', jsonb_build_object( + 'north', 49.0, + 'south', 31.0, + 'east', -102.0, + 'west', -124.0 + ) + ) +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/wfigs_common.py b/src/central/adapters/wfigs_common.py new file mode 100644 index 0000000..874f3eb --- /dev/null +++ b/src/central/adapters/wfigs_common.py @@ -0,0 +1,211 @@ +"""Shared utilities for WFIGS (Wildland Fire Interagency Geospatial Services) adapters.""" + +import sqlite3 +from datetime import datetime, timezone +from typing import Any + +# WFIGS FeatureServer endpoints +WFIGS_INCIDENTS_URL = ( + "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" + "WFIGS_Incident_Locations_Current/FeatureServer/0/query" +) +WFIGS_PERIMETERS_URL = ( + "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" + "WFIGS_Interagency_Perimeters_Current/FeatureServer/0/query" +) + +# Fall-off sweep window: 14 days (matches WFIGS's longest fall-off: large fires) +FALLOFF_WINDOW_DAYS = 14 + + +def severity_from_acres(acres: float | None) -> int: + """Map DailyAcres to severity level 0-4.""" + if acres is None or acres == 0: + return 0 + if acres < 10: + return 1 + if acres < 100: + return 2 + if acres < 1000: + return 3 + return 4 + + +def parse_wfigs_timestamp(epoch_ms: int | None) -> datetime | None: + """Parse WFIGS epoch milliseconds to UTC datetime.""" + if epoch_ms is None: + return None + return datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc) + + +def build_regions(state: str | None, county: str | None) -> tuple[list[str], str | None]: + """ + Build geo.regions list and primary_region from POOState and POOCounty. + + Returns (regions, primary_region). + """ + if not state: + return [], None + + state_upper = state.upper() + if county: + # Normalize county: remove spaces, uppercase + county_normalized = county.replace(" ", "_").upper() + region = f"US-{state_upper}-{county_normalized}" + return [region], region + else: + region = f"US-{state_upper}" + return [region], region + + +def subject_suffix(state: str | None, county: str | None) -> str: + """ + Build subject suffix from state and county. + + Returns lowercase state.county (county with spaces→underscores). + Falls back to "unknown" if state is not available. + """ + if not state: + return "unknown" + + state_lower = state.lower() + if county: + county_lower = county.lower().replace(" ", "_") + return f"{state_lower}.{county_lower}" + return state_lower + + +def init_observed_table(db: sqlite3.Connection) -> None: + """Create the wfigs_observed table if it doesn't exist.""" + db.execute(""" + CREATE TABLE IF NOT EXISTS wfigs_observed ( + layer TEXT NOT NULL, + irwin_id TEXT NOT NULL, + last_observed_at TEXT NOT NULL, + state TEXT, + county TEXT, + PRIMARY KEY (layer, irwin_id) + ) + """) + db.commit() + + +def get_observed_guids(db: sqlite3.Connection, layer: str) -> dict[str, tuple[str, str | None, str | None]]: + """ + Get all observed IRWIN GUIDs for a layer. + + Returns dict mapping irwin_id -> (last_observed_at, state, county). + """ + cursor = db.execute( + "SELECT irwin_id, last_observed_at, state, county FROM wfigs_observed WHERE layer = ?", + (layer,), + ) + return {row[0]: (row[1], row[2], row[3]) for row in cursor.fetchall()} + + +def update_observed( + db: sqlite3.Connection, + layer: str, + current_guids: dict[str, tuple[str | None, str | None]], +) -> None: + """ + Update the observed table with current poll's GUIDs. + + current_guids: dict mapping irwin_id -> (state, county) + """ + now_iso = datetime.now(timezone.utc).isoformat() + + # Use INSERT OR REPLACE to upsert + for irwin_id, (state, county) in current_guids.items(): + db.execute( + """ + INSERT OR REPLACE INTO wfigs_observed (layer, irwin_id, last_observed_at, state, county) + VALUES (?, ?, ?, ?, ?) + """, + (layer, irwin_id, now_iso, state, county), + ) + db.commit() + + +def delete_observed(db: sqlite3.Connection, layer: str, irwin_ids: set[str]) -> None: + """Delete fallen-off GUIDs from the observed table.""" + for irwin_id in irwin_ids: + db.execute( + "DELETE FROM wfigs_observed WHERE layer = ? AND irwin_id = ?", + (layer, irwin_id), + ) + db.commit() + + +def cleanup_old_observed(db: sqlite3.Connection, layer: str, days: int = FALLOFF_WINDOW_DAYS) -> None: + """Remove observed entries older than the sweep window.""" + cutoff = datetime.now(timezone.utc).isoformat() + db.execute( + f""" + DELETE FROM wfigs_observed + WHERE layer = ? + AND datetime(last_observed_at) < datetime(?, '-{days} days') + """, + (layer, cutoff), + ) + db.commit() + + +def point_in_bbox( + lon: float, + lat: float, + west: float, + south: float, + east: float, + north: float, +) -> bool: + """Check if a point is within a bounding box.""" + return west <= lon <= east and south <= lat <= north + + +def polygon_intersects_bbox( + geometry: dict[str, Any], + west: float, + south: float, + east: float, + north: float, +) -> bool: + """ + Check if a GeoJSON geometry intersects a bounding box. + + Uses shapely for accurate polygon intersection. + """ + try: + from shapely.geometry import box, shape + + bbox_polygon = box(west, south, east, north) + geom = shape(geometry) + return bbox_polygon.intersects(geom) + except Exception: + # If shapely fails, fall back to centroid check + if geometry.get("type") == "Point": + coords = geometry.get("coordinates", []) + if len(coords) >= 2: + return point_in_bbox(coords[0], coords[1], west, south, east, north) + return True # Include if we can't determine + + +def extract_centroid(geometry: dict[str, Any]) -> tuple[float, float] | None: + """Extract centroid from GeoJSON geometry.""" + if not geometry: + return None + + geom_type = geometry.get("type") + coords = geometry.get("coordinates") + + if geom_type == "Point" and coords and len(coords) >= 2: + return (coords[0], coords[1]) + + # For polygons, use shapely to compute centroid + try: + from shapely.geometry import shape + geom = shape(geometry) + centroid = geom.centroid + return (centroid.x, centroid.y) + except Exception: + return None diff --git a/src/central/adapters/wfigs_incidents.py b/src/central/adapters/wfigs_incidents.py new file mode 100644 index 0000000..ba584f1 --- /dev/null +++ b/src/central/adapters/wfigs_incidents.py @@ -0,0 +1,373 @@ +"""WFIGS Incidents adapter for wildfire incident locations.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.wfigs_common import ( + WFIGS_INCIDENTS_URL, + build_regions, + cleanup_old_observed, + delete_observed, + extract_centroid, + get_observed_guids, + init_observed_table, + parse_wfigs_timestamp, + point_in_bbox, + severity_from_acres, + subject_suffix, + update_observed, +) +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +LAYER_NAME = "incidents" + + +class WFIGSIncidentsSettings(BaseModel): + """Settings schema for WFIGS Incidents adapter.""" + + region: RegionConfig | None = None + + +class WFIGSIncidentsAdapter(SourceAdapter): + """NIFC WFIGS wildfire incidents adapter.""" + + name = "wfigs_incidents" + display_name = "NIFC WFIGS — Wildfire Incidents" + description = "Active wildfire incident locations from NIFC WFIGS." + settings_schema = WFIGSIncidentsSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Not in setup wizard + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._last_poll_time: datetime | None = None + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def startup(self) -> None: + """Initialize HTTP session and SQLite connection.""" + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=60), + ) + self._db = sqlite3.connect(self._cursor_db_path) + + # Create tables for dedup and fall-off tracking + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_observed_table(self._db) + self._db.commit() + + logger.info( + "WFIGS incidents adapter started", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + async def shutdown(self) -> None: + """Close HTTP session and SQLite connection.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("WFIGS incidents adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + logger.info( + "WFIGS incidents config updated", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def bump_last_seen(self, event_id: str) -> None: + """Bump the last_seen timestamp for an event.""" + if not self._db: + return + self._db.execute( + "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 14 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("WFIGS incidents swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + """Compute NATS subject for an event.""" + # Removal events have a different subject pattern + if event.category.startswith("fire.incident.removed"): + state = event.data.get("state", "").lower() or "unknown" + return f"central.fire.incident.removed.{state}" + + # Regular incidents: central.fire.incident.. + state = event.data.get("POOState") + county = event.data.get("POOCounty") + suffix = subject_suffix(state, county) + return f"central.fire.incident.{suffix}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_features(self) -> list[dict[str, Any]]: + """Fetch features from WFIGS FeatureServer.""" + if not self._session: + raise RuntimeError("Session not initialized") + + # Build query params + params: dict[str, str] = { + "outFields": "*", + "returnGeometry": "true", + "f": "geojson", + } + + # Time filter: only fetch modified since last poll + if self._last_poll_time: + iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") + params["where"] = f"ModifiedOnDateTime > timestamp '{iso_time}'" + else: + params["where"] = "1=1" + + # Bbox filter if region configured + if self.region: + bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" + params["geometry"] = bbox + params["geometryType"] = "esriGeometryEnvelope" + params["spatialRel"] = "esriSpatialRelIntersects" + params["inSR"] = "4326" + + async with self._session.get(WFIGS_INCIDENTS_URL, params=params) as resp: + resp.raise_for_status() + data = await resp.json() + + features = data.get("features", []) + logger.info( + "WFIGS incidents fetch completed", + extra={"feature_count": len(features)}, + ) + return features + + async def poll(self) -> AsyncIterator[Event]: + """Poll WFIGS for incident updates.""" + if not self._db: + raise RuntimeError("Database not initialized") + + # Fetch features from upstream + try: + features = await self._fetch_features() + except Exception as e: + logger.error("WFIGS incidents fetch failed", extra={"error": str(e)}) + raise + + # Get previous poll's observed GUIDs for fall-off detection + observed_before = get_observed_guids(self._db, LAYER_NAME) + + # Process features and track current GUIDs + current_guids: dict[str, tuple[str | None, str | None]] = {} + events_yielded = 0 + + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + irwin_id = props.get("IrwinID") + if not irwin_id: + continue + + # Extract location + centroid = extract_centroid(geometry) + + # Post-filter: skip if outside region bbox + if self.region and centroid: + lon, lat = centroid + if not point_in_bbox( + lon, lat, + self.region.west, self.region.south, + self.region.east, self.region.north, + ): + continue + + # Track this GUID as observed (for fall-off detection) + state = props.get("POOState") + county = props.get("POOCounty") + current_guids[irwin_id] = (state, county) + + # Parse fields + incident_type = props.get("IncidentTypeCategory", "unknown").lower() + discovery_time = parse_wfigs_timestamp(props.get("FireDiscoveryDateTime")) + daily_acres = props.get("DailyAcres") + + # Build regions + regions, primary_region = build_regions(state, county) + + # Build geo + if centroid: + geo = Geo( + centroid=centroid, + bbox=(centroid[0], centroid[1], centroid[0], centroid[1]), + regions=regions, + primary_region=primary_region, + ) + else: + geo = Geo(regions=regions, primary_region=primary_region) + + # Build event + event = Event( + id=irwin_id, + adapter=self.name, + category=f"fire.incident.{incident_type}", + time=discovery_time or datetime.now(timezone.utc), + severity=severity_from_acres(daily_acres), + geo=geo, + data={ + "IrwinID": irwin_id, + "IncidentName": props.get("IncidentName"), + "IncidentTypeCategory": props.get("IncidentTypeCategory"), + "DailyAcres": daily_acres, + "PercentContained": props.get("PercentContained"), + "FireDiscoveryDateTime": props.get("FireDiscoveryDateTime"), + "ModifiedOnDateTime": props.get("ModifiedOnDateTime"), + "POOState": state, + "POOCounty": county, + "raw": props, + }, + ) + + yield event + events_yielded += 1 + + # Detect fall-offs: GUIDs in previous but not current + fallen_off = set(observed_before.keys()) - set(current_guids.keys()) + + for irwin_id in fallen_off: + last_observed, state, county = observed_before[irwin_id] + now = datetime.now(timezone.utc) + + removal_event = Event( + id=f"{irwin_id}:removed:{now.isoformat()}", + adapter=self.name, + category="fire.incident.removed", + time=now, + severity=0, + geo=Geo(), + data={ + "irwin_id": irwin_id, + "last_observed_at": last_observed, + "state": state, + "county": county, + "reason": "fallen_off_current_service", + }, + ) + + yield removal_event + events_yielded += 1 + logger.info( + "WFIGS incident fall-off detected", + extra={"irwin_id": irwin_id, "state": state}, + ) + + # Update observed table + update_observed(self._db, LAYER_NAME, current_guids) + delete_observed(self._db, LAYER_NAME, fallen_off) + + # Periodic cleanup of old entries + cleanup_old_observed(self._db, LAYER_NAME) + self.sweep_old_ids() + + # Update last poll time + self._last_poll_time = datetime.now(timezone.utc) + + logger.info( + "WFIGS incidents poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_guids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/src/central/adapters/wfigs_perimeters.py b/src/central/adapters/wfigs_perimeters.py new file mode 100644 index 0000000..ae4d8b8 --- /dev/null +++ b/src/central/adapters/wfigs_perimeters.py @@ -0,0 +1,387 @@ +"""WFIGS Perimeters adapter for wildfire perimeter polygons.""" + +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.wfigs_common import ( + WFIGS_PERIMETERS_URL, + build_regions, + cleanup_old_observed, + delete_observed, + extract_centroid, + get_observed_guids, + init_observed_table, + parse_wfigs_timestamp, + polygon_intersects_bbox, + severity_from_acres, + subject_suffix, + update_observed, +) +from central.config_models import AdapterConfig, RegionConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +LAYER_NAME = "perimeters" + + +class WFIGSPerimetersSettings(BaseModel): + """Settings schema for WFIGS Perimeters adapter.""" + + region: RegionConfig | None = None + + +class WFIGSPerimetersAdapter(SourceAdapter): + """NIFC WFIGS wildfire perimeters adapter.""" + + name = "wfigs_perimeters" + display_name = "NIFC WFIGS — Wildfire Perimeters" + description = "Active wildfire perimeter polygons from NIFC WFIGS." + settings_schema = WFIGSPerimetersSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Not in setup wizard + default_cadence_s = 300 + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._last_poll_time: datetime | None = None + + # Parse region from settings + region_dict = config.settings.get("region") + if region_dict: + self.region: RegionConfig | None = RegionConfig(**region_dict) + else: + self.region = None + + async def startup(self) -> None: + """Initialize HTTP session and SQLite connection.""" + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=120), # Longer timeout for large polygons + ) + self._db = sqlite3.connect(self._cursor_db_path) + + # Create tables for dedup and fall-off tracking + self._db.execute(""" + CREATE TABLE IF NOT EXISTS published_ids ( + adapter TEXT NOT NULL, + event_id TEXT NOT NULL, + first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (adapter, event_id) + ) + """) + self._db.execute(""" + CREATE INDEX IF NOT EXISTS published_ids_last_seen + ON published_ids (last_seen) + """) + init_observed_table(self._db) + self._db.commit() + + logger.info( + "WFIGS perimeters adapter started", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + async def shutdown(self) -> None: + """Close HTTP session and SQLite connection.""" + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + logger.info("WFIGS perimeters adapter shut down") + + async def apply_config(self, new_config: AdapterConfig) -> None: + """Apply new configuration from hot-reload.""" + region_dict = new_config.settings.get("region") + if region_dict: + self.region = RegionConfig(**region_dict) + else: + self.region = None + logger.info( + "WFIGS perimeters config updated", + extra={"region": self.region.model_dump() if self.region else None}, + ) + + def is_published(self, event_id: str) -> bool: + """Check if an event has already been published.""" + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + """Mark an event as published.""" + if not self._db: + return + self._db.execute( + """ + INSERT INTO published_ids (adapter, event_id, first_seen, last_seen) + VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (adapter, event_id) DO UPDATE SET + last_seen = CURRENT_TIMESTAMP + """, + (self.name, event_id), + ) + self._db.commit() + + def bump_last_seen(self, event_id: str) -> None: + """Bump the last_seen timestamp for an event.""" + if not self._db: + return + self._db.execute( + "UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + """Remove published_ids older than 14 days. Returns count deleted.""" + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + count = cur.rowcount + if count > 0: + logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count}) + return count + + def subject_for(self, event: Event) -> str: + """Compute NATS subject for an event.""" + # Removal events have a different subject pattern + if event.category.startswith("fire.perimeter.removed"): + state = event.data.get("state", "").lower() or "unknown" + return f"central.fire.perimeter.removed.{state}" + + # Regular perimeters: central.fire.perimeter.. + state = event.data.get("POOState") + county = event.data.get("POOCounty") + suffix = subject_suffix(state, county) + return f"central.fire.perimeter.{suffix}" + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_features(self) -> list[dict[str, Any]]: + """Fetch features from WFIGS FeatureServer.""" + if not self._session: + raise RuntimeError("Session not initialized") + + # Build query params + params: dict[str, str] = { + "outFields": "*", + "returnGeometry": "true", + "f": "geojson", + } + + # Time filter: only fetch modified since last poll + # Note: perimeters use attr_ModifiedOnDateTime_dt field + if self._last_poll_time: + iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S") + params["where"] = f"attr_ModifiedOnDateTime_dt > timestamp '{iso_time}'" + else: + params["where"] = "1=1" + + # Bbox filter if region configured + if self.region: + bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" + params["geometry"] = bbox + params["geometryType"] = "esriGeometryEnvelope" + params["spatialRel"] = "esriSpatialRelIntersects" + params["inSR"] = "4326" + + async with self._session.get(WFIGS_PERIMETERS_URL, params=params) as resp: + resp.raise_for_status() + data = await resp.json() + + features = data.get("features", []) + logger.info( + "WFIGS perimeters fetch completed", + extra={"feature_count": len(features)}, + ) + return features + + async def poll(self) -> AsyncIterator[Event]: + """Poll WFIGS for perimeter updates.""" + if not self._db: + raise RuntimeError("Database not initialized") + + # Fetch features from upstream + try: + features = await self._fetch_features() + except Exception as e: + logger.error("WFIGS perimeters fetch failed", extra={"error": str(e)}) + raise + + # Get previous poll's observed GUIDs for fall-off detection + observed_before = get_observed_guids(self._db, LAYER_NAME) + + # Process features and track current GUIDs + current_guids: dict[str, tuple[str | None, str | None]] = {} + events_yielded = 0 + + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry") + + # WFIGS Perimeters use prefixed field names (attr_*, poly_*) + irwin_id = props.get("attr_IrwinID") or props.get("poly_IRWINID") + if not irwin_id: + continue + + # Post-filter: skip if geometry doesn't intersect region bbox + if self.region and geometry: + if not polygon_intersects_bbox( + geometry, + self.region.west, self.region.south, + self.region.east, self.region.north, + ): + continue + + # Track this GUID as observed (for fall-off detection) + state = props.get("attr_POOState") + county = props.get("attr_POOCounty") + current_guids[irwin_id] = (state, county) + + # Parse fields using prefixed names + incident_type = props.get("attr_IncidentTypeCategory", "unknown").lower() + discovery_time = parse_wfigs_timestamp(props.get("attr_FireDiscoveryDateTime")) + # Use poly_GISAcres or attr_IncidentSize for acreage + daily_acres = props.get("attr_IncidentSize") or props.get("poly_GISAcres") + + # Build regions + regions, primary_region = build_regions(state, county) + + # Extract centroid for geo + centroid = extract_centroid(geometry) + + # Build bbox from geometry if available + bbox = None + if geometry: + try: + from shapely.geometry import shape + geom = shape(geometry) + bounds = geom.bounds # (minx, miny, maxx, maxy) + bbox = (bounds[0], bounds[1], bounds[2], bounds[3]) + except Exception: + if centroid: + bbox = (centroid[0], centroid[1], centroid[0], centroid[1]) + + # Build geo + geo = Geo( + centroid=centroid, + bbox=bbox, + regions=regions, + primary_region=primary_region, + ) + + # Build event with geometry in data + # Use normalized field names in event data for consistency + event = Event( + id=irwin_id, + adapter=self.name, + category=f"fire.perimeter.{incident_type}", + time=discovery_time or datetime.now(timezone.utc), + severity=severity_from_acres(daily_acres), + geo=geo, + data={ + "IrwinID": irwin_id, + "IncidentName": props.get("attr_IncidentName") or props.get("poly_IncidentName"), + "IncidentTypeCategory": props.get("attr_IncidentTypeCategory"), + "DailyAcres": props.get("attr_IncidentSize"), + "GISAcres": props.get("poly_GISAcres"), + "PercentContained": props.get("attr_PercentContained"), + "FireDiscoveryDateTime": props.get("attr_FireDiscoveryDateTime"), + "ModifiedOnDateTime": props.get("attr_ModifiedOnDateTime_dt"), + "POOState": state, + "POOCounty": county, + "geometry": geometry, # Full GeoJSON polygon + "raw": props, + }, + ) + + yield event + events_yielded += 1 + + # Detect fall-offs: GUIDs in previous but not current + fallen_off = set(observed_before.keys()) - set(current_guids.keys()) + + for irwin_id in fallen_off: + last_observed, state, county = observed_before[irwin_id] + now = datetime.now(timezone.utc) + + removal_event = Event( + id=f"{irwin_id}:removed:{now.isoformat()}", + adapter=self.name, + category="fire.perimeter.removed", + time=now, + severity=0, + geo=Geo(), + data={ + "irwin_id": irwin_id, + "last_observed_at": last_observed, + "state": state, + "county": county, + "reason": "fallen_off_current_service", + }, + ) + + yield removal_event + events_yielded += 1 + logger.info( + "WFIGS perimeter fall-off detected", + extra={"irwin_id": irwin_id, "state": state}, + ) + + # Update observed table + update_observed(self._db, LAYER_NAME, current_guids) + delete_observed(self._db, LAYER_NAME, fallen_off) + + # Periodic cleanup of old entries + cleanup_old_observed(self._db, LAYER_NAME) + self.sweep_old_ids() + + # Update last poll time + self._last_poll_time = datetime.now(timezone.utc) + + logger.info( + "WFIGS perimeters poll completed", + extra={ + "events_yielded": events_yielded, + "current_observed": len(current_guids), + "fallen_off": len(fallen_off), + }, + ) diff --git a/tests/test_wfigs.py b/tests/test_wfigs.py new file mode 100644 index 0000000..539cda4 --- /dev/null +++ b/tests/test_wfigs.py @@ -0,0 +1,444 @@ +"""Tests for WFIGS adapters.""" + +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from central.config_models import AdapterConfig, RegionConfig +from central.models import Event, Geo + + +# Sample GeoJSON response with incidents +SAMPLE_INCIDENTS_RESPONSE = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [-116.5, 43.5]}, + "properties": { + "IrwinID": "GUID-001-BOISE", + "IncidentName": "Test Fire 1", + "IncidentTypeCategory": "Wildfire", + "DailyAcres": 150, + "PercentContained": 25, + "FireDiscoveryDateTime": 1716000000000, + "ModifiedOnDateTime": 1716100000000, + "POOState": "ID", + "POOCounty": "Ada", + }, + }, + { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [-117.0, 44.0]}, + "properties": { + "IrwinID": "GUID-002-CANYON", + "IncidentName": "Test Fire 2", + "IncidentTypeCategory": "PrescribedFire", + "DailyAcres": 5, + "PercentContained": 100, + "FireDiscoveryDateTime": 1716200000000, + "ModifiedOnDateTime": 1716300000000, + "POOState": "ID", + "POOCounty": "Canyon", + }, + }, + { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [-80.0, 26.0]}, + "properties": { + "IrwinID": "GUID-003-FLORIDA", + "IncidentName": "Florida Fire", + "IncidentTypeCategory": "Wildfire", + "DailyAcres": 50, + "PercentContained": 0, + "FireDiscoveryDateTime": 1716400000000, + "ModifiedOnDateTime": 1716500000000, + "POOState": "FL", + "POOCounty": "Miami-Dade", + }, + }, + ], +} + +# Perimeters API uses prefixed field names (attr_*, poly_*) +SAMPLE_PERIMETERS_RESPONSE = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[ + [-116.6, 43.4], + [-116.4, 43.4], + [-116.4, 43.6], + [-116.6, 43.6], + [-116.6, 43.4], + ]], + }, + "properties": { + "attr_IrwinID": "GUID-001-BOISE", + "attr_IncidentName": "Test Fire 1", + "attr_IncidentTypeCategory": "Wildfire", + "attr_IncidentSize": 150, + "poly_GISAcres": 148.5, + "attr_PercentContained": 25, + "attr_FireDiscoveryDateTime": 1716000000000, + "attr_ModifiedOnDateTime_dt": 1716100000000, + "attr_POOState": "ID", + "attr_POOCounty": "Ada", + }, + }, + ], +} + + +class TestWFIGSCommon: + """Tests for WFIGS common utilities.""" + + def test_severity_from_acres_none(self): + from central.adapters.wfigs_common import severity_from_acres + assert severity_from_acres(None) == 0 + assert severity_from_acres(0) == 0 + + def test_severity_from_acres_small(self): + from central.adapters.wfigs_common import severity_from_acres + assert severity_from_acres(5) == 1 + assert severity_from_acres(9.9) == 1 + + def test_severity_from_acres_medium(self): + from central.adapters.wfigs_common import severity_from_acres + assert severity_from_acres(10) == 2 + assert severity_from_acres(99) == 2 + + def test_severity_from_acres_large(self): + from central.adapters.wfigs_common import severity_from_acres + assert severity_from_acres(100) == 3 + assert severity_from_acres(999) == 3 + + def test_severity_from_acres_very_large(self): + from central.adapters.wfigs_common import severity_from_acres + assert severity_from_acres(1000) == 4 + assert severity_from_acres(100000) == 4 + + def test_parse_wfigs_timestamp(self): + from central.adapters.wfigs_common import parse_wfigs_timestamp + ts = parse_wfigs_timestamp(1716000000000) + assert ts is not None + assert ts.tzinfo == timezone.utc + assert ts.year == 2024 + + def test_parse_wfigs_timestamp_none(self): + from central.adapters.wfigs_common import parse_wfigs_timestamp + assert parse_wfigs_timestamp(None) is None + + def test_build_regions_full(self): + from central.adapters.wfigs_common import build_regions + regions, primary = build_regions("ID", "Ada") + assert regions == ["US-ID-ADA"] + assert primary == "US-ID-ADA" + + def test_build_regions_state_only(self): + from central.adapters.wfigs_common import build_regions + regions, primary = build_regions("ID", None) + assert regions == ["US-ID"] + assert primary == "US-ID" + + def test_build_regions_none(self): + from central.adapters.wfigs_common import build_regions + regions, primary = build_regions(None, None) + assert regions == [] + assert primary is None + + def test_subject_suffix(self): + from central.adapters.wfigs_common import subject_suffix + assert subject_suffix("ID", "Ada") == "id.ada" + assert subject_suffix("ID", "Ada County") == "id.ada_county" + assert subject_suffix("ID", None) == "id" + assert subject_suffix(None, None) == "unknown" + + def test_point_in_bbox(self): + from central.adapters.wfigs_common import point_in_bbox + assert point_in_bbox(-116.5, 43.5, -124, 31, -102, 49) is True + assert point_in_bbox(-80.0, 26.0, -124, 31, -102, 49) is False + + +class TestWFIGSIncidentsAdapter: + """Tests for WFIGS Incidents adapter.""" + + @pytest.fixture + def mock_config(self) -> AdapterConfig: + return AdapterConfig( + name="wfigs_incidents", + enabled=True, + cadence_s=300, + settings={ + "region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0} + }, + updated_at=datetime.now(timezone.utc), + ) + + @pytest.fixture + def mock_config_store(self) -> MagicMock: + return MagicMock() + + @pytest.fixture + def cursor_db_path(self, tmp_path: Path) -> Path: + return tmp_path / "cursors.db" + + @pytest.mark.asyncio + async def test_normalization_incidents( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """Incidents are correctly normalized to Events.""" + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + mock_response = AsyncMock() + mock_response.raise_for_status = MagicMock() + mock_response.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE) + + with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())): + events = [e async for e in adapter.poll()] + + await adapter.shutdown() + + # Should have 2 events (Florida filtered out by bbox) + assert len(events) == 2 + + event = events[0] + assert event.id == "GUID-001-BOISE" + assert event.adapter == "wfigs_incidents" + assert event.category == "fire.incident.wildfire" + assert event.severity == 3 # 150 acres = severity 3 (100-999 range) + assert event.geo.primary_region == "US-ID-ADA" + assert event.data["IrwinID"] == "GUID-001-BOISE" + + @pytest.mark.asyncio + async def test_is_published_dedup( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """is_published/mark_published provides dedup functionality.""" + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + # Initially not published + assert adapter.is_published("test-id") is False + + # Mark as published + adapter.mark_published("test-id") + + # Now it should be published + assert adapter.is_published("test-id") is True + + await adapter.shutdown() + + @pytest.mark.asyncio + async def test_fall_off_emits_removal( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """Fall-off detection emits removal events.""" + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + # First poll with 2 incidents + mock_response1 = AsyncMock() + mock_response1.raise_for_status = MagicMock() + mock_response1.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE) + + # Second poll with only 1 incident (GUID-002 fell off) + reduced_response = { + "type": "FeatureCollection", + "features": [SAMPLE_INCIDENTS_RESPONSE["features"][0]], + } + mock_response2 = AsyncMock() + mock_response2.raise_for_status = MagicMock() + mock_response2.json = AsyncMock(return_value=reduced_response) + + with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response1), __aexit__=AsyncMock())): + events1 = [e async for e in adapter.poll()] + + with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response2), __aexit__=AsyncMock())): + events2 = [e async for e in adapter.poll()] + + await adapter.shutdown() + + # First poll: 2 incident events + assert len(events1) == 2 + + # Second poll: 1 incident (seen again) + 1 removal for GUID-002 + # The incident event is yielded (supervisor does dedup via is_published) + # The removal is yielded for GUID-002 + removal_events = [e for e in events2 if e.category == "fire.incident.removed"] + assert len(removal_events) == 1 + assert removal_events[0].data["irwin_id"] == "GUID-002-CANYON" + + def test_subject_for_incidents( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + + event = Event( + id="test-id", + adapter="wfigs_incidents", + category="fire.incident.wildfire", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(primary_region="US-ID-ADA"), + data={"POOState": "ID", "POOCounty": "Ada"}, + ) + + subject = adapter.subject_for(event) + assert subject == "central.fire.incident.id.ada" + + def test_subject_for_removal( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + + event = Event( + id="test-id:removed:2024-01-01", + adapter="wfigs_incidents", + category="fire.incident.removed", + time=datetime.now(timezone.utc), + severity=0, + geo=Geo(), + data={"irwin_id": "test-id", "state": "ID"}, + ) + + subject = adapter.subject_for(event) + assert subject == "central.fire.incident.removed.id" + + @pytest.mark.asyncio + async def test_bbox_post_filter( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """Features outside bbox are filtered out.""" + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + mock_response = AsyncMock() + mock_response.raise_for_status = MagicMock() + mock_response.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE) + + with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())): + events = [e async for e in adapter.poll()] + + await adapter.shutdown() + + # Florida incident should be filtered out + assert len(events) == 2 + irwin_ids = {e.id for e in events} + assert "GUID-003-FLORIDA" not in irwin_ids + + @pytest.mark.asyncio + async def test_apply_config_region_change( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + + assert adapter.region.north == 49.0 + + new_config = AdapterConfig( + name="wfigs_incidents", + enabled=True, + cadence_s=300, + settings={ + "region": {"north": 50.0, "south": 35.0, "east": -100.0, "west": -120.0} + }, + updated_at=datetime.now(timezone.utc), + ) + await adapter.apply_config(new_config) + + assert adapter.region.north == 50.0 + assert adapter.region.south == 35.0 + + +class TestWFIGSPerimetersAdapter: + """Tests for WFIGS Perimeters adapter.""" + + @pytest.fixture + def mock_config(self) -> AdapterConfig: + return AdapterConfig( + name="wfigs_perimeters", + enabled=True, + cadence_s=300, + settings={ + "region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0} + }, + updated_at=datetime.now(timezone.utc), + ) + + @pytest.fixture + def mock_config_store(self) -> MagicMock: + return MagicMock() + + @pytest.fixture + def cursor_db_path(self, tmp_path: Path) -> Path: + return tmp_path / "cursors.db" + + @pytest.mark.asyncio + async def test_normalization_perimeters( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """Perimeters are correctly normalized to Events with geometry.""" + from central.adapters.wfigs_perimeters import WFIGSPerimetersAdapter + + adapter = WFIGSPerimetersAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + mock_response = AsyncMock() + mock_response.raise_for_status = MagicMock() + mock_response.json = AsyncMock(return_value=SAMPLE_PERIMETERS_RESPONSE) + + with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())): + events = [e async for e in adapter.poll()] + + await adapter.shutdown() + + assert len(events) == 1 + + event = events[0] + assert event.id == "GUID-001-BOISE" + assert event.adapter == "wfigs_perimeters" + assert event.category == "fire.perimeter.wildfire" + assert event.geo.primary_region == "US-ID-ADA" + assert "geometry" in event.data + assert event.data["geometry"]["type"] == "Polygon" + + def test_subject_for_perimeters( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + from central.adapters.wfigs_perimeters import WFIGSPerimetersAdapter + + adapter = WFIGSPerimetersAdapter(mock_config, mock_config_store, cursor_db_path) + + event = Event( + id="test-id", + adapter="wfigs_perimeters", + category="fire.perimeter.wildfire", + time=datetime.now(timezone.utc), + severity=2, + geo=Geo(primary_region="US-ID-ADA"), + data={"POOState": "ID", "POOCounty": "Ada", "geometry": {}}, + ) + + subject = adapter.subject_for(event) + assert subject == "central.fire.perimeter.id.ada"