feat(2-B): add NIFC WFIGS adapters for incidents and perimeters

Two new adapters for wildfire data from NIFC WFIGS:
- wfigs_incidents: Active fire incident locations
- wfigs_perimeters: Active fire perimeter polygons

Features:
- IRWIN GUID dedup via is_published/mark_published
- Fall-off detection with removal events when fires exit current
- Bbox post-filtering with shapely polygon intersection
- Severity mapping from DailyAcres (0-4 scale)
- Subject hierarchy: central.fire.<layer>.<state>.<county>

Ships disabled by default; operators enable via GUI.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-19 02:47:26 +00:00
commit e0ffe686ec
5 changed files with 1452 additions and 0 deletions

View file

@ -0,0 +1,37 @@
-- Migration: 016_add_wfigs_adapters
-- Add WFIGS incident and perimeter adapters to config.adapters
-- Idempotent: uses ON CONFLICT DO NOTHING
-- WFIGS Incidents adapter
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'wfigs_incidents',
false, -- Ships disabled; operator enables via GUI
300,
jsonb_build_object(
'region', jsonb_build_object(
'north', 49.0,
'south', 31.0,
'east', -102.0,
'west', -124.0
)
)
)
ON CONFLICT (name) DO NOTHING;
-- WFIGS Perimeters adapter
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'wfigs_perimeters',
false, -- Ships disabled; operator enables via GUI
300,
jsonb_build_object(
'region', jsonb_build_object(
'north', 49.0,
'south', 31.0,
'east', -102.0,
'west', -124.0
)
)
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,211 @@
"""Shared utilities for WFIGS (Wildland Fire Interagency Geospatial Services) adapters."""
import sqlite3
from datetime import datetime, timezone
from typing import Any
# WFIGS FeatureServer endpoints
WFIGS_INCIDENTS_URL = (
"https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/"
"WFIGS_Incident_Locations_Current/FeatureServer/0/query"
)
WFIGS_PERIMETERS_URL = (
"https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/"
"WFIGS_Interagency_Perimeters_Current/FeatureServer/0/query"
)
# Fall-off sweep window: 14 days (matches WFIGS's longest fall-off: large fires)
FALLOFF_WINDOW_DAYS = 14
def severity_from_acres(acres: float | None) -> int:
"""Map DailyAcres to severity level 0-4."""
if acres is None or acres == 0:
return 0
if acres < 10:
return 1
if acres < 100:
return 2
if acres < 1000:
return 3
return 4
def parse_wfigs_timestamp(epoch_ms: int | None) -> datetime | None:
"""Parse WFIGS epoch milliseconds to UTC datetime."""
if epoch_ms is None:
return None
return datetime.fromtimestamp(epoch_ms / 1000, tz=timezone.utc)
def build_regions(state: str | None, county: str | None) -> tuple[list[str], str | None]:
"""
Build geo.regions list and primary_region from POOState and POOCounty.
Returns (regions, primary_region).
"""
if not state:
return [], None
state_upper = state.upper()
if county:
# Normalize county: remove spaces, uppercase
county_normalized = county.replace(" ", "_").upper()
region = f"US-{state_upper}-{county_normalized}"
return [region], region
else:
region = f"US-{state_upper}"
return [region], region
def subject_suffix(state: str | None, county: str | None) -> str:
"""
Build subject suffix from state and county.
Returns lowercase state.county (county with spacesunderscores).
Falls back to "unknown" if state is not available.
"""
if not state:
return "unknown"
state_lower = state.lower()
if county:
county_lower = county.lower().replace(" ", "_")
return f"{state_lower}.{county_lower}"
return state_lower
def init_observed_table(db: sqlite3.Connection) -> None:
"""Create the wfigs_observed table if it doesn't exist."""
db.execute("""
CREATE TABLE IF NOT EXISTS wfigs_observed (
layer TEXT NOT NULL,
irwin_id TEXT NOT NULL,
last_observed_at TEXT NOT NULL,
state TEXT,
county TEXT,
PRIMARY KEY (layer, irwin_id)
)
""")
db.commit()
def get_observed_guids(db: sqlite3.Connection, layer: str) -> dict[str, tuple[str, str | None, str | None]]:
"""
Get all observed IRWIN GUIDs for a layer.
Returns dict mapping irwin_id -> (last_observed_at, state, county).
"""
cursor = db.execute(
"SELECT irwin_id, last_observed_at, state, county FROM wfigs_observed WHERE layer = ?",
(layer,),
)
return {row[0]: (row[1], row[2], row[3]) for row in cursor.fetchall()}
def update_observed(
db: sqlite3.Connection,
layer: str,
current_guids: dict[str, tuple[str | None, str | None]],
) -> None:
"""
Update the observed table with current poll's GUIDs.
current_guids: dict mapping irwin_id -> (state, county)
"""
now_iso = datetime.now(timezone.utc).isoformat()
# Use INSERT OR REPLACE to upsert
for irwin_id, (state, county) in current_guids.items():
db.execute(
"""
INSERT OR REPLACE INTO wfigs_observed (layer, irwin_id, last_observed_at, state, county)
VALUES (?, ?, ?, ?, ?)
""",
(layer, irwin_id, now_iso, state, county),
)
db.commit()
def delete_observed(db: sqlite3.Connection, layer: str, irwin_ids: set[str]) -> None:
"""Delete fallen-off GUIDs from the observed table."""
for irwin_id in irwin_ids:
db.execute(
"DELETE FROM wfigs_observed WHERE layer = ? AND irwin_id = ?",
(layer, irwin_id),
)
db.commit()
def cleanup_old_observed(db: sqlite3.Connection, layer: str, days: int = FALLOFF_WINDOW_DAYS) -> None:
"""Remove observed entries older than the sweep window."""
cutoff = datetime.now(timezone.utc).isoformat()
db.execute(
f"""
DELETE FROM wfigs_observed
WHERE layer = ?
AND datetime(last_observed_at) < datetime(?, '-{days} days')
""",
(layer, cutoff),
)
db.commit()
def point_in_bbox(
lon: float,
lat: float,
west: float,
south: float,
east: float,
north: float,
) -> bool:
"""Check if a point is within a bounding box."""
return west <= lon <= east and south <= lat <= north
def polygon_intersects_bbox(
geometry: dict[str, Any],
west: float,
south: float,
east: float,
north: float,
) -> bool:
"""
Check if a GeoJSON geometry intersects a bounding box.
Uses shapely for accurate polygon intersection.
"""
try:
from shapely.geometry import box, shape
bbox_polygon = box(west, south, east, north)
geom = shape(geometry)
return bbox_polygon.intersects(geom)
except Exception:
# If shapely fails, fall back to centroid check
if geometry.get("type") == "Point":
coords = geometry.get("coordinates", [])
if len(coords) >= 2:
return point_in_bbox(coords[0], coords[1], west, south, east, north)
return True # Include if we can't determine
def extract_centroid(geometry: dict[str, Any]) -> tuple[float, float] | None:
"""Extract centroid from GeoJSON geometry."""
if not geometry:
return None
geom_type = geometry.get("type")
coords = geometry.get("coordinates")
if geom_type == "Point" and coords and len(coords) >= 2:
return (coords[0], coords[1])
# For polygons, use shapely to compute centroid
try:
from shapely.geometry import shape
geom = shape(geometry)
centroid = geom.centroid
return (centroid.x, centroid.y)
except Exception:
return None

View file

@ -0,0 +1,373 @@
"""WFIGS Incidents adapter for wildfire incident locations."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.wfigs_common import (
WFIGS_INCIDENTS_URL,
build_regions,
cleanup_old_observed,
delete_observed,
extract_centroid,
get_observed_guids,
init_observed_table,
parse_wfigs_timestamp,
point_in_bbox,
severity_from_acres,
subject_suffix,
update_observed,
)
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
LAYER_NAME = "incidents"
class WFIGSIncidentsSettings(BaseModel):
"""Settings schema for WFIGS Incidents adapter."""
region: RegionConfig | None = None
class WFIGSIncidentsAdapter(SourceAdapter):
"""NIFC WFIGS wildfire incidents adapter."""
name = "wfigs_incidents"
display_name = "NIFC WFIGS — Wildfire Incidents"
description = "Active wildfire incident locations from NIFC WFIGS."
settings_schema = WFIGSIncidentsSettings
requires_api_key = None
api_key_field = None
wizard_order = None # Not in setup wizard
default_cadence_s = 300
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self._last_poll_time: datetime | None = None
# Parse region from settings
region_dict = config.settings.get("region")
if region_dict:
self.region: RegionConfig | None = RegionConfig(**region_dict)
else:
self.region = None
async def startup(self) -> None:
"""Initialize HTTP session and SQLite connection."""
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
# Create tables for dedup and fall-off tracking
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
init_observed_table(self._db)
self._db.commit()
logger.info(
"WFIGS incidents adapter started",
extra={"region": self.region.model_dump() if self.region else None},
)
async def shutdown(self) -> None:
"""Close HTTP session and SQLite connection."""
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("WFIGS incidents adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
"""Apply new configuration from hot-reload."""
region_dict = new_config.settings.get("region")
if region_dict:
self.region = RegionConfig(**region_dict)
else:
self.region = None
logger.info(
"WFIGS incidents config updated",
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def bump_last_seen(self, event_id: str) -> None:
"""Bump the last_seen timestamp for an event."""
if not self._db:
return
self._db.execute(
"UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("WFIGS incidents swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
# Removal events have a different subject pattern
if event.category.startswith("fire.incident.removed"):
state = event.data.get("state", "").lower() or "unknown"
return f"central.fire.incident.removed.{state}"
# Regular incidents: central.fire.incident.<state>.<county>
state = event.data.get("POOState")
county = event.data.get("POOCounty")
suffix = subject_suffix(state, county)
return f"central.fire.incident.{suffix}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch_features(self) -> list[dict[str, Any]]:
"""Fetch features from WFIGS FeatureServer."""
if not self._session:
raise RuntimeError("Session not initialized")
# Build query params
params: dict[str, str] = {
"outFields": "*",
"returnGeometry": "true",
"f": "geojson",
}
# Time filter: only fetch modified since last poll
if self._last_poll_time:
iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S")
params["where"] = f"ModifiedOnDateTime > timestamp '{iso_time}'"
else:
params["where"] = "1=1"
# Bbox filter if region configured
if self.region:
bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}"
params["geometry"] = bbox
params["geometryType"] = "esriGeometryEnvelope"
params["spatialRel"] = "esriSpatialRelIntersects"
params["inSR"] = "4326"
async with self._session.get(WFIGS_INCIDENTS_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
features = data.get("features", [])
logger.info(
"WFIGS incidents fetch completed",
extra={"feature_count": len(features)},
)
return features
async def poll(self) -> AsyncIterator[Event]:
"""Poll WFIGS for incident updates."""
if not self._db:
raise RuntimeError("Database not initialized")
# Fetch features from upstream
try:
features = await self._fetch_features()
except Exception as e:
logger.error("WFIGS incidents fetch failed", extra={"error": str(e)})
raise
# Get previous poll's observed GUIDs for fall-off detection
observed_before = get_observed_guids(self._db, LAYER_NAME)
# Process features and track current GUIDs
current_guids: dict[str, tuple[str | None, str | None]] = {}
events_yielded = 0
for feature in features:
props = feature.get("properties", {})
geometry = feature.get("geometry")
irwin_id = props.get("IrwinID")
if not irwin_id:
continue
# Extract location
centroid = extract_centroid(geometry)
# Post-filter: skip if outside region bbox
if self.region and centroid:
lon, lat = centroid
if not point_in_bbox(
lon, lat,
self.region.west, self.region.south,
self.region.east, self.region.north,
):
continue
# Track this GUID as observed (for fall-off detection)
state = props.get("POOState")
county = props.get("POOCounty")
current_guids[irwin_id] = (state, county)
# Parse fields
incident_type = props.get("IncidentTypeCategory", "unknown").lower()
discovery_time = parse_wfigs_timestamp(props.get("FireDiscoveryDateTime"))
daily_acres = props.get("DailyAcres")
# Build regions
regions, primary_region = build_regions(state, county)
# Build geo
if centroid:
geo = Geo(
centroid=centroid,
bbox=(centroid[0], centroid[1], centroid[0], centroid[1]),
regions=regions,
primary_region=primary_region,
)
else:
geo = Geo(regions=regions, primary_region=primary_region)
# Build event
event = Event(
id=irwin_id,
adapter=self.name,
category=f"fire.incident.{incident_type}",
time=discovery_time or datetime.now(timezone.utc),
severity=severity_from_acres(daily_acres),
geo=geo,
data={
"IrwinID": irwin_id,
"IncidentName": props.get("IncidentName"),
"IncidentTypeCategory": props.get("IncidentTypeCategory"),
"DailyAcres": daily_acres,
"PercentContained": props.get("PercentContained"),
"FireDiscoveryDateTime": props.get("FireDiscoveryDateTime"),
"ModifiedOnDateTime": props.get("ModifiedOnDateTime"),
"POOState": state,
"POOCounty": county,
"raw": props,
},
)
yield event
events_yielded += 1
# Detect fall-offs: GUIDs in previous but not current
fallen_off = set(observed_before.keys()) - set(current_guids.keys())
for irwin_id in fallen_off:
last_observed, state, county = observed_before[irwin_id]
now = datetime.now(timezone.utc)
removal_event = Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
adapter=self.name,
category="fire.incident.removed",
time=now,
severity=0,
geo=Geo(),
data={
"irwin_id": irwin_id,
"last_observed_at": last_observed,
"state": state,
"county": county,
"reason": "fallen_off_current_service",
},
)
yield removal_event
events_yielded += 1
logger.info(
"WFIGS incident fall-off detected",
extra={"irwin_id": irwin_id, "state": state},
)
# Update observed table
update_observed(self._db, LAYER_NAME, current_guids)
delete_observed(self._db, LAYER_NAME, fallen_off)
# Periodic cleanup of old entries
cleanup_old_observed(self._db, LAYER_NAME)
self.sweep_old_ids()
# Update last poll time
self._last_poll_time = datetime.now(timezone.utc)
logger.info(
"WFIGS incidents poll completed",
extra={
"events_yielded": events_yielded,
"current_observed": len(current_guids),
"fallen_off": len(fallen_off),
},
)

View file

@ -0,0 +1,387 @@
"""WFIGS Perimeters adapter for wildfire perimeter polygons."""
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.wfigs_common import (
WFIGS_PERIMETERS_URL,
build_regions,
cleanup_old_observed,
delete_observed,
extract_centroid,
get_observed_guids,
init_observed_table,
parse_wfigs_timestamp,
polygon_intersects_bbox,
severity_from_acres,
subject_suffix,
update_observed,
)
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
LAYER_NAME = "perimeters"
class WFIGSPerimetersSettings(BaseModel):
"""Settings schema for WFIGS Perimeters adapter."""
region: RegionConfig | None = None
class WFIGSPerimetersAdapter(SourceAdapter):
"""NIFC WFIGS wildfire perimeters adapter."""
name = "wfigs_perimeters"
display_name = "NIFC WFIGS — Wildfire Perimeters"
description = "Active wildfire perimeter polygons from NIFC WFIGS."
settings_schema = WFIGSPerimetersSettings
requires_api_key = None
api_key_field = None
wizard_order = None # Not in setup wizard
default_cadence_s = 300
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self._last_poll_time: datetime | None = None
# Parse region from settings
region_dict = config.settings.get("region")
if region_dict:
self.region: RegionConfig | None = RegionConfig(**region_dict)
else:
self.region = None
async def startup(self) -> None:
"""Initialize HTTP session and SQLite connection."""
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=120), # Longer timeout for large polygons
)
self._db = sqlite3.connect(self._cursor_db_path)
# Create tables for dedup and fall-off tracking
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
init_observed_table(self._db)
self._db.commit()
logger.info(
"WFIGS perimeters adapter started",
extra={"region": self.region.model_dump() if self.region else None},
)
async def shutdown(self) -> None:
"""Close HTTP session and SQLite connection."""
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("WFIGS perimeters adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
"""Apply new configuration from hot-reload."""
region_dict = new_config.settings.get("region")
if region_dict:
self.region = RegionConfig(**region_dict)
else:
self.region = None
logger.info(
"WFIGS perimeters config updated",
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def bump_last_seen(self, event_id: str) -> None:
"""Bump the last_seen timestamp for an event."""
if not self._db:
return
self._db.execute(
"UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("WFIGS perimeters swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
# Removal events have a different subject pattern
if event.category.startswith("fire.perimeter.removed"):
state = event.data.get("state", "").lower() or "unknown"
return f"central.fire.perimeter.removed.{state}"
# Regular perimeters: central.fire.perimeter.<state>.<county>
state = event.data.get("POOState")
county = event.data.get("POOCounty")
suffix = subject_suffix(state, county)
return f"central.fire.perimeter.{suffix}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch_features(self) -> list[dict[str, Any]]:
"""Fetch features from WFIGS FeatureServer."""
if not self._session:
raise RuntimeError("Session not initialized")
# Build query params
params: dict[str, str] = {
"outFields": "*",
"returnGeometry": "true",
"f": "geojson",
}
# Time filter: only fetch modified since last poll
# Note: perimeters use attr_ModifiedOnDateTime_dt field
if self._last_poll_time:
iso_time = self._last_poll_time.strftime("%Y-%m-%d %H:%M:%S")
params["where"] = f"attr_ModifiedOnDateTime_dt > timestamp '{iso_time}'"
else:
params["where"] = "1=1"
# Bbox filter if region configured
if self.region:
bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}"
params["geometry"] = bbox
params["geometryType"] = "esriGeometryEnvelope"
params["spatialRel"] = "esriSpatialRelIntersects"
params["inSR"] = "4326"
async with self._session.get(WFIGS_PERIMETERS_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
features = data.get("features", [])
logger.info(
"WFIGS perimeters fetch completed",
extra={"feature_count": len(features)},
)
return features
async def poll(self) -> AsyncIterator[Event]:
"""Poll WFIGS for perimeter updates."""
if not self._db:
raise RuntimeError("Database not initialized")
# Fetch features from upstream
try:
features = await self._fetch_features()
except Exception as e:
logger.error("WFIGS perimeters fetch failed", extra={"error": str(e)})
raise
# Get previous poll's observed GUIDs for fall-off detection
observed_before = get_observed_guids(self._db, LAYER_NAME)
# Process features and track current GUIDs
current_guids: dict[str, tuple[str | None, str | None]] = {}
events_yielded = 0
for feature in features:
props = feature.get("properties", {})
geometry = feature.get("geometry")
# WFIGS Perimeters use prefixed field names (attr_*, poly_*)
irwin_id = props.get("attr_IrwinID") or props.get("poly_IRWINID")
if not irwin_id:
continue
# Post-filter: skip if geometry doesn't intersect region bbox
if self.region and geometry:
if not polygon_intersects_bbox(
geometry,
self.region.west, self.region.south,
self.region.east, self.region.north,
):
continue
# Track this GUID as observed (for fall-off detection)
state = props.get("attr_POOState")
county = props.get("attr_POOCounty")
current_guids[irwin_id] = (state, county)
# Parse fields using prefixed names
incident_type = props.get("attr_IncidentTypeCategory", "unknown").lower()
discovery_time = parse_wfigs_timestamp(props.get("attr_FireDiscoveryDateTime"))
# Use poly_GISAcres or attr_IncidentSize for acreage
daily_acres = props.get("attr_IncidentSize") or props.get("poly_GISAcres")
# Build regions
regions, primary_region = build_regions(state, county)
# Extract centroid for geo
centroid = extract_centroid(geometry)
# Build bbox from geometry if available
bbox = None
if geometry:
try:
from shapely.geometry import shape
geom = shape(geometry)
bounds = geom.bounds # (minx, miny, maxx, maxy)
bbox = (bounds[0], bounds[1], bounds[2], bounds[3])
except Exception:
if centroid:
bbox = (centroid[0], centroid[1], centroid[0], centroid[1])
# Build geo
geo = Geo(
centroid=centroid,
bbox=bbox,
regions=regions,
primary_region=primary_region,
)
# Build event with geometry in data
# Use normalized field names in event data for consistency
event = Event(
id=irwin_id,
adapter=self.name,
category=f"fire.perimeter.{incident_type}",
time=discovery_time or datetime.now(timezone.utc),
severity=severity_from_acres(daily_acres),
geo=geo,
data={
"IrwinID": irwin_id,
"IncidentName": props.get("attr_IncidentName") or props.get("poly_IncidentName"),
"IncidentTypeCategory": props.get("attr_IncidentTypeCategory"),
"DailyAcres": props.get("attr_IncidentSize"),
"GISAcres": props.get("poly_GISAcres"),
"PercentContained": props.get("attr_PercentContained"),
"FireDiscoveryDateTime": props.get("attr_FireDiscoveryDateTime"),
"ModifiedOnDateTime": props.get("attr_ModifiedOnDateTime_dt"),
"POOState": state,
"POOCounty": county,
"geometry": geometry, # Full GeoJSON polygon
"raw": props,
},
)
yield event
events_yielded += 1
# Detect fall-offs: GUIDs in previous but not current
fallen_off = set(observed_before.keys()) - set(current_guids.keys())
for irwin_id in fallen_off:
last_observed, state, county = observed_before[irwin_id]
now = datetime.now(timezone.utc)
removal_event = Event(
id=f"{irwin_id}:removed:{now.isoformat()}",
adapter=self.name,
category="fire.perimeter.removed",
time=now,
severity=0,
geo=Geo(),
data={
"irwin_id": irwin_id,
"last_observed_at": last_observed,
"state": state,
"county": county,
"reason": "fallen_off_current_service",
},
)
yield removal_event
events_yielded += 1
logger.info(
"WFIGS perimeter fall-off detected",
extra={"irwin_id": irwin_id, "state": state},
)
# Update observed table
update_observed(self._db, LAYER_NAME, current_guids)
delete_observed(self._db, LAYER_NAME, fallen_off)
# Periodic cleanup of old entries
cleanup_old_observed(self._db, LAYER_NAME)
self.sweep_old_ids()
# Update last poll time
self._last_poll_time = datetime.now(timezone.utc)
logger.info(
"WFIGS perimeters poll completed",
extra={
"events_yielded": events_yielded,
"current_observed": len(current_guids),
"fallen_off": len(fallen_off),
},
)

444
tests/test_wfigs.py Normal file
View file

@ -0,0 +1,444 @@
"""Tests for WFIGS adapters."""
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.config_models import AdapterConfig, RegionConfig
from central.models import Event, Geo
# Sample GeoJSON response with incidents
SAMPLE_INCIDENTS_RESPONSE = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [-116.5, 43.5]},
"properties": {
"IrwinID": "GUID-001-BOISE",
"IncidentName": "Test Fire 1",
"IncidentTypeCategory": "Wildfire",
"DailyAcres": 150,
"PercentContained": 25,
"FireDiscoveryDateTime": 1716000000000,
"ModifiedOnDateTime": 1716100000000,
"POOState": "ID",
"POOCounty": "Ada",
},
},
{
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [-117.0, 44.0]},
"properties": {
"IrwinID": "GUID-002-CANYON",
"IncidentName": "Test Fire 2",
"IncidentTypeCategory": "PrescribedFire",
"DailyAcres": 5,
"PercentContained": 100,
"FireDiscoveryDateTime": 1716200000000,
"ModifiedOnDateTime": 1716300000000,
"POOState": "ID",
"POOCounty": "Canyon",
},
},
{
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [-80.0, 26.0]},
"properties": {
"IrwinID": "GUID-003-FLORIDA",
"IncidentName": "Florida Fire",
"IncidentTypeCategory": "Wildfire",
"DailyAcres": 50,
"PercentContained": 0,
"FireDiscoveryDateTime": 1716400000000,
"ModifiedOnDateTime": 1716500000000,
"POOState": "FL",
"POOCounty": "Miami-Dade",
},
},
],
}
# Perimeters API uses prefixed field names (attr_*, poly_*)
SAMPLE_PERIMETERS_RESPONSE = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[-116.6, 43.4],
[-116.4, 43.4],
[-116.4, 43.6],
[-116.6, 43.6],
[-116.6, 43.4],
]],
},
"properties": {
"attr_IrwinID": "GUID-001-BOISE",
"attr_IncidentName": "Test Fire 1",
"attr_IncidentTypeCategory": "Wildfire",
"attr_IncidentSize": 150,
"poly_GISAcres": 148.5,
"attr_PercentContained": 25,
"attr_FireDiscoveryDateTime": 1716000000000,
"attr_ModifiedOnDateTime_dt": 1716100000000,
"attr_POOState": "ID",
"attr_POOCounty": "Ada",
},
},
],
}
class TestWFIGSCommon:
"""Tests for WFIGS common utilities."""
def test_severity_from_acres_none(self):
from central.adapters.wfigs_common import severity_from_acres
assert severity_from_acres(None) == 0
assert severity_from_acres(0) == 0
def test_severity_from_acres_small(self):
from central.adapters.wfigs_common import severity_from_acres
assert severity_from_acres(5) == 1
assert severity_from_acres(9.9) == 1
def test_severity_from_acres_medium(self):
from central.adapters.wfigs_common import severity_from_acres
assert severity_from_acres(10) == 2
assert severity_from_acres(99) == 2
def test_severity_from_acres_large(self):
from central.adapters.wfigs_common import severity_from_acres
assert severity_from_acres(100) == 3
assert severity_from_acres(999) == 3
def test_severity_from_acres_very_large(self):
from central.adapters.wfigs_common import severity_from_acres
assert severity_from_acres(1000) == 4
assert severity_from_acres(100000) == 4
def test_parse_wfigs_timestamp(self):
from central.adapters.wfigs_common import parse_wfigs_timestamp
ts = parse_wfigs_timestamp(1716000000000)
assert ts is not None
assert ts.tzinfo == timezone.utc
assert ts.year == 2024
def test_parse_wfigs_timestamp_none(self):
from central.adapters.wfigs_common import parse_wfigs_timestamp
assert parse_wfigs_timestamp(None) is None
def test_build_regions_full(self):
from central.adapters.wfigs_common import build_regions
regions, primary = build_regions("ID", "Ada")
assert regions == ["US-ID-ADA"]
assert primary == "US-ID-ADA"
def test_build_regions_state_only(self):
from central.adapters.wfigs_common import build_regions
regions, primary = build_regions("ID", None)
assert regions == ["US-ID"]
assert primary == "US-ID"
def test_build_regions_none(self):
from central.adapters.wfigs_common import build_regions
regions, primary = build_regions(None, None)
assert regions == []
assert primary is None
def test_subject_suffix(self):
from central.adapters.wfigs_common import subject_suffix
assert subject_suffix("ID", "Ada") == "id.ada"
assert subject_suffix("ID", "Ada County") == "id.ada_county"
assert subject_suffix("ID", None) == "id"
assert subject_suffix(None, None) == "unknown"
def test_point_in_bbox(self):
from central.adapters.wfigs_common import point_in_bbox
assert point_in_bbox(-116.5, 43.5, -124, 31, -102, 49) is True
assert point_in_bbox(-80.0, 26.0, -124, 31, -102, 49) is False
class TestWFIGSIncidentsAdapter:
"""Tests for WFIGS Incidents adapter."""
@pytest.fixture
def mock_config(self) -> AdapterConfig:
return AdapterConfig(
name="wfigs_incidents",
enabled=True,
cadence_s=300,
settings={
"region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0}
},
updated_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_config_store(self) -> MagicMock:
return MagicMock()
@pytest.fixture
def cursor_db_path(self, tmp_path: Path) -> Path:
return tmp_path / "cursors.db"
@pytest.mark.asyncio
async def test_normalization_incidents(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Incidents are correctly normalized to Events."""
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Should have 2 events (Florida filtered out by bbox)
assert len(events) == 2
event = events[0]
assert event.id == "GUID-001-BOISE"
assert event.adapter == "wfigs_incidents"
assert event.category == "fire.incident.wildfire"
assert event.severity == 3 # 150 acres = severity 3 (100-999 range)
assert event.geo.primary_region == "US-ID-ADA"
assert event.data["IrwinID"] == "GUID-001-BOISE"
@pytest.mark.asyncio
async def test_is_published_dedup(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""is_published/mark_published provides dedup functionality."""
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
# Initially not published
assert adapter.is_published("test-id") is False
# Mark as published
adapter.mark_published("test-id")
# Now it should be published
assert adapter.is_published("test-id") is True
await adapter.shutdown()
@pytest.mark.asyncio
async def test_fall_off_emits_removal(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Fall-off detection emits removal events."""
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
# First poll with 2 incidents
mock_response1 = AsyncMock()
mock_response1.raise_for_status = MagicMock()
mock_response1.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE)
# Second poll with only 1 incident (GUID-002 fell off)
reduced_response = {
"type": "FeatureCollection",
"features": [SAMPLE_INCIDENTS_RESPONSE["features"][0]],
}
mock_response2 = AsyncMock()
mock_response2.raise_for_status = MagicMock()
mock_response2.json = AsyncMock(return_value=reduced_response)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response1), __aexit__=AsyncMock())):
events1 = [e async for e in adapter.poll()]
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response2), __aexit__=AsyncMock())):
events2 = [e async for e in adapter.poll()]
await adapter.shutdown()
# First poll: 2 incident events
assert len(events1) == 2
# Second poll: 1 incident (seen again) + 1 removal for GUID-002
# The incident event is yielded (supervisor does dedup via is_published)
# The removal is yielded for GUID-002
removal_events = [e for e in events2 if e.category == "fire.incident.removed"]
assert len(removal_events) == 1
assert removal_events[0].data["irwin_id"] == "GUID-002-CANYON"
def test_subject_for_incidents(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
event = Event(
id="test-id",
adapter="wfigs_incidents",
category="fire.incident.wildfire",
time=datetime.now(timezone.utc),
severity=2,
geo=Geo(primary_region="US-ID-ADA"),
data={"POOState": "ID", "POOCounty": "Ada"},
)
subject = adapter.subject_for(event)
assert subject == "central.fire.incident.id.ada"
def test_subject_for_removal(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
event = Event(
id="test-id:removed:2024-01-01",
adapter="wfigs_incidents",
category="fire.incident.removed",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={"irwin_id": "test-id", "state": "ID"},
)
subject = adapter.subject_for(event)
assert subject == "central.fire.incident.removed.id"
@pytest.mark.asyncio
async def test_bbox_post_filter(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Features outside bbox are filtered out."""
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.json = AsyncMock(return_value=SAMPLE_INCIDENTS_RESPONSE)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Florida incident should be filtered out
assert len(events) == 2
irwin_ids = {e.id for e in events}
assert "GUID-003-FLORIDA" not in irwin_ids
@pytest.mark.asyncio
async def test_apply_config_region_change(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
assert adapter.region.north == 49.0
new_config = AdapterConfig(
name="wfigs_incidents",
enabled=True,
cadence_s=300,
settings={
"region": {"north": 50.0, "south": 35.0, "east": -100.0, "west": -120.0}
},
updated_at=datetime.now(timezone.utc),
)
await adapter.apply_config(new_config)
assert adapter.region.north == 50.0
assert adapter.region.south == 35.0
class TestWFIGSPerimetersAdapter:
"""Tests for WFIGS Perimeters adapter."""
@pytest.fixture
def mock_config(self) -> AdapterConfig:
return AdapterConfig(
name="wfigs_perimeters",
enabled=True,
cadence_s=300,
settings={
"region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0}
},
updated_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_config_store(self) -> MagicMock:
return MagicMock()
@pytest.fixture
def cursor_db_path(self, tmp_path: Path) -> Path:
return tmp_path / "cursors.db"
@pytest.mark.asyncio
async def test_normalization_perimeters(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Perimeters are correctly normalized to Events with geometry."""
from central.adapters.wfigs_perimeters import WFIGSPerimetersAdapter
adapter = WFIGSPerimetersAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.json = AsyncMock(return_value=SAMPLE_PERIMETERS_RESPONSE)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
assert len(events) == 1
event = events[0]
assert event.id == "GUID-001-BOISE"
assert event.adapter == "wfigs_perimeters"
assert event.category == "fire.perimeter.wildfire"
assert event.geo.primary_region == "US-ID-ADA"
assert "geometry" in event.data
assert event.data["geometry"]["type"] == "Polygon"
def test_subject_for_perimeters(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
from central.adapters.wfigs_perimeters import WFIGSPerimetersAdapter
adapter = WFIGSPerimetersAdapter(mock_config, mock_config_store, cursor_db_path)
event = Event(
id="test-id",
adapter="wfigs_perimeters",
category="fire.perimeter.wildfire",
time=datetime.now(timezone.utc),
severity=2,
geo=Geo(primary_region="US-ID-ADA"),
data={"POOState": "ID", "POOCounty": "Ada", "geometry": {}},
)
subject = adapter.subject_for(event)
assert subject == "central.fire.perimeter.id.ada"