Merge feature/2-c-inciweb: NIFC InciWeb wildfire narrative adapter

This commit is contained in:
Matt Johnson 2026-05-19 04:02:59 +00:00
commit 4c1fdb8649
3 changed files with 1095 additions and 0 deletions

View file

@ -0,0 +1,19 @@
-- Migration: 017_add_inciweb_adapter
-- Add InciWeb adapter to config.adapters
-- Idempotent: uses ON CONFLICT DO NOTHING
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'inciweb',
false, -- Ships disabled; operator enables via GUI
600,
jsonb_build_object(
'region', jsonb_build_object(
'north', 49.0,
'south', 31.0,
'east', -102.0,
'west', -124.0
)
)
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,477 @@
"""InciWeb adapter for wildfire narrative updates."""
import html
import logging
import re
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from pathlib import Path
from typing import Any
from xml.etree import ElementTree as ET
import aiohttp
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.config_models import AdapterConfig, RegionConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
# InciWeb RSS feed URL
INCIWEB_RSS_URL = "https://inciweb.wildfire.gov/incidents/rss.xml"
# State name to 2-letter code mapping
STATE_NAME_TO_CODE = {
"alabama": "AL", "alaska": "AK", "arizona": "AZ", "arkansas": "AR",
"california": "CA", "colorado": "CO", "connecticut": "CT", "delaware": "DE",
"florida": "FL", "georgia": "GA", "hawaii": "HI", "idaho": "ID",
"illinois": "IL", "indiana": "IN", "iowa": "IA", "kansas": "KS",
"kentucky": "KY", "louisiana": "LA", "maine": "ME", "maryland": "MD",
"massachusetts": "MA", "michigan": "MI", "minnesota": "MN", "mississippi": "MS",
"missouri": "MO", "montana": "MT", "nebraska": "NE", "nevada": "NV",
"new hampshire": "NH", "new jersey": "NJ", "new mexico": "NM", "new york": "NY",
"north carolina": "NC", "north dakota": "ND", "ohio": "OH", "oklahoma": "OK",
"oregon": "OR", "pennsylvania": "PA", "rhode island": "RI", "south carolina": "SC",
"south dakota": "SD", "tennessee": "TN", "texas": "TX", "utah": "UT",
"vermont": "VT", "virginia": "VA", "washington": "WA", "west virginia": "WV",
"wisconsin": "WI", "wyoming": "WY", "district of columbia": "DC",
"puerto rico": "PR", "guam": "GU", "virgin islands": "VI",
"american samoa": "AS", "northern mariana islands": "MP",
}
def parse_coordinates_from_description(description: str) -> tuple[float, float] | None:
"""
Parse latitude/longitude from InciWeb description text.
Format: "Latitude: 47° 3 17 Longitude: 91° 38 6"
InciWeb uses unsigned values for US coordinates (west longitude implied).
Returns (lon, lat) tuple or None if not found.
"""
# Pattern for degree/minute/second format
lat_pattern = r"Latitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)"
lon_pattern = r"Longitude:\s*(-?\d+)°\s*(\d+)\s*(\d+(?:\.\d+)?)"
lat_match = re.search(lat_pattern, description)
lon_match = re.search(lon_pattern, description)
if not lat_match or not lon_match:
return None
try:
lat_deg = int(lat_match.group(1))
lat_min = int(lat_match.group(2))
lat_sec = float(lat_match.group(3))
lon_deg = int(lon_match.group(1))
lon_min = int(lon_match.group(2))
lon_sec = float(lon_match.group(3))
# Convert to decimal degrees
# Latitude: positive in northern hemisphere
if lat_deg >= 0:
lat = lat_deg + lat_min / 60 + lat_sec / 3600
else:
lat = lat_deg - lat_min / 60 - lat_sec / 3600
# Longitude: InciWeb gives unsigned values for US west longitudes
# Make negative for western hemisphere (US coordinates)
lon = lon_deg + lon_min / 60 + lon_sec / 3600
if lon > 0:
lon = -lon # US longitudes are west (negative)
return (lon, lat)
except (ValueError, TypeError):
return None
def parse_state_from_description(description: str) -> str | None:
"""
Parse state name from InciWeb description text.
Format: "State: Minnesota" or "State: New Mexico"
Returns 2-letter state code or None if not found.
Design note: State is parsed from the description rather than the title
because InciWeb titles use unit code prefixes (e.g., "MNMNS Stewart Trail",
"CACNP Santa Rosa Island Fire") which are not reliable state indicators.
The description has a structured "State: <name>" field that reliably
identifies the state for all incidents.
"""
pattern = r"State:\s*([A-Za-z\s]+?)(?:\n|---|$)"
match = re.search(pattern, description)
if not match:
return None
state_name = match.group(1).strip().lower()
return STATE_NAME_TO_CODE.get(state_name)
def strip_html(html_text: str) -> str:
"""
Strip HTML tags and decode entities to plain text.
"""
# Decode HTML entities (handles &amp; &lt; &gt; etc.)
text = html.unescape(html_text)
# Handle &nbsp; specifically (not a standard Python html entity)
text = text.replace("&nbsp;", " ")
text = text.replace("\xa0", " ") # Non-breaking space character
# Remove HTML tags
text = re.sub(r"<[^>]+>", "", text)
# Normalize whitespace
text = re.sub(r"\s+", " ", text)
return text.strip()
def point_in_bbox(
lon: float,
lat: float,
west: float,
south: float,
east: float,
north: float,
) -> bool:
"""Check if a point is within a bounding box."""
return west <= lon <= east and south <= lat <= north
class InciWebSettings(BaseModel):
"""Settings schema for InciWeb adapter."""
region: RegionConfig | None = None
class InciWebAdapter(SourceAdapter):
"""NIFC InciWeb wildfire narrative adapter."""
name = "inciweb"
display_name = "NIFC InciWeb — Wildfire Narrative"
description = (
"Narrative wildfire updates from InciWeb. Editorial; lower precision "
"than WFIGS. Use as supplementary context."
)
settings_schema = InciWebSettings
requires_api_key = None
api_key_field = None
wizard_order = None # Ships disabled
default_cadence_s = 600
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
# Conditional fetch state
self._last_modified: str | None = None
self._etag: str | None = None
# Parse region from settings
region_dict = config.settings.get("region")
if region_dict:
self.region: RegionConfig | None = RegionConfig(**region_dict)
else:
self.region = None
async def startup(self) -> None:
"""Initialize HTTP session and SQLite connection."""
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
)
self._db = sqlite3.connect(self._cursor_db_path)
# Create table for dedup tracking
self._db.execute("""
CREATE TABLE IF NOT EXISTS published_ids (
adapter TEXT NOT NULL,
event_id TEXT NOT NULL,
first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (adapter, event_id)
)
""")
self._db.execute("""
CREATE INDEX IF NOT EXISTS published_ids_last_seen
ON published_ids (last_seen)
""")
self._db.commit()
logger.info(
"InciWeb adapter started",
extra={"region": self.region.model_dump() if self.region else None},
)
async def shutdown(self) -> None:
"""Close HTTP session and SQLite connection."""
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
logger.info("InciWeb adapter shut down")
async def apply_config(self, new_config: AdapterConfig) -> None:
"""Apply new configuration from hot-reload."""
region_dict = new_config.settings.get("region")
if region_dict:
self.region = RegionConfig(**region_dict)
else:
self.region = None
logger.info(
"InciWeb config updated",
extra={"region": self.region.model_dump() if self.region else None},
)
def is_published(self, event_id: str) -> bool:
"""Check if an event has already been published."""
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
"""Mark an event as published."""
if not self._db:
return
self._db.execute(
"""
INSERT INTO published_ids (adapter, event_id, first_seen, last_seen)
VALUES (?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (adapter, event_id) DO UPDATE SET
last_seen = CURRENT_TIMESTAMP
""",
(self.name, event_id),
)
self._db.commit()
def bump_last_seen(self, event_id: str) -> None:
"""Bump the last_seen timestamp for an event."""
if not self._db:
return
self._db.execute(
"UPDATE published_ids SET last_seen = CURRENT_TIMESTAMP WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
"""Remove published_ids older than 14 days. Returns count deleted."""
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
count = cur.rowcount
if count > 0:
logger.info("InciWeb swept old dedup entries", extra={"count": count})
return count
def subject_for(self, event: Event) -> str:
"""Compute NATS subject for an event."""
state = event.geo.primary_region
if state and state.startswith("US-") and len(state) == 5:
state_code = state[3:].lower()
return f"central.fire.narrative.inciweb.{state_code}"
return "central.fire.narrative.inciweb.unknown"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch_rss(self) -> list[dict[str, Any]]:
"""Fetch and parse RSS feed from InciWeb."""
if not self._session:
raise RuntimeError("Session not initialized")
# Build request headers with conditional fetch support
headers = {"User-Agent": "Central/0.4"}
if self._last_modified:
headers["If-Modified-Since"] = self._last_modified
if self._etag:
headers["If-None-Match"] = self._etag
async with self._session.get(INCIWEB_RSS_URL, headers=headers) as resp:
# Handle 304 Not Modified
if resp.status == 304:
logger.info("InciWeb not modified")
return []
resp.raise_for_status()
# Capture conditional fetch headers for next request
self._last_modified = resp.headers.get("Last-Modified")
self._etag = resp.headers.get("ETag")
content = await resp.text()
# Parse RSS XML
items = []
try:
root = ET.fromstring(content)
channel = root.find("channel")
if channel is None:
return []
for item_elem in channel.findall("item"):
item: dict[str, Any] = {}
title = item_elem.find("title")
item["title"] = title.text if title is not None and title.text else ""
link = item_elem.find("link")
item["link"] = link.text if link is not None and link.text else ""
description = item_elem.find("description")
item["description"] = description.text if description is not None and description.text else ""
pub_date = item_elem.find("pubDate")
item["pubDate"] = pub_date.text if pub_date is not None and pub_date.text else ""
guid = item_elem.find("guid")
item["guid"] = guid.text if guid is not None and guid.text else ""
# Check for dc:creator
creator = item_elem.find("{http://purl.org/dc/elements/1.1/}creator")
item["creator"] = creator.text if creator is not None and creator.text else ""
items.append(item)
except ET.ParseError as e:
logger.error("InciWeb RSS parse error", extra={"error": str(e)})
raise
logger.info(
"InciWeb fetch completed",
extra={"item_count": len(items)},
)
return items
async def poll(self) -> AsyncIterator[Event]:
"""Poll InciWeb for narrative updates."""
if not self._db:
raise RuntimeError("Database not initialized")
# Fetch RSS feed
try:
items = await self._fetch_rss()
except Exception as e:
logger.error("InciWeb fetch failed", extra={"error": str(e)})
raise
events_yielded = 0
for item in items:
guid = item.get("guid", "")
if not guid:
continue
# Dedup: skip if already published
if self.is_published(guid):
self.bump_last_seen(guid)
continue
description_html = item.get("description", "")
# Parse coordinates from description
centroid = parse_coordinates_from_description(description_html)
# Post-filter: skip if point outside region bbox
if self.region and centroid:
lon, lat = centroid
if not point_in_bbox(
lon, lat,
self.region.west, self.region.south,
self.region.east, self.region.north,
):
continue
# Parse state from description
state_code = parse_state_from_description(description_html)
# Build regions
if state_code:
regions = [f"US-{state_code}"]
primary_region = f"US-{state_code}"
else:
regions = []
primary_region = None
# Parse pubDate (RFC 822 format)
pub_date_str = item.get("pubDate", "")
try:
event_time = parsedate_to_datetime(pub_date_str)
# Ensure UTC
if event_time.tzinfo is None:
event_time = event_time.replace(tzinfo=timezone.utc)
else:
event_time = event_time.astimezone(timezone.utc)
except (ValueError, TypeError):
event_time = datetime.now(timezone.utc)
# Build geo
geo = Geo(
centroid=centroid,
bbox=(centroid[0], centroid[1], centroid[0], centroid[1]) if centroid else None,
regions=regions,
primary_region=primary_region,
)
# Strip HTML from description
description_plain = strip_html(description_html)
# Build event
event = Event(
id=guid,
adapter=self.name,
category="fire.narrative.inciweb",
time=event_time,
severity=0, # Narrative; not authoritative
geo=geo,
data={
"title": item.get("title", ""),
"description": description_plain,
"description_html": description_html,
"url": item.get("link", ""),
"guid": guid,
"raw": item,
},
)
yield event
self.mark_published(guid)
events_yielded += 1
# Periodic cleanup of old entries
self.sweep_old_ids()
logger.info(
"InciWeb poll completed",
extra={"events_yielded": events_yielded},
)

599
tests/test_inciweb.py Normal file
View file

@ -0,0 +1,599 @@
"""Tests for InciWeb adapter."""
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from central.config_models import AdapterConfig
from central.models import Event, Geo
# Real RSS snippet from InciWeb (frozen fixture)
SAMPLE_RSS_CONTENT = """<?xml version="1.0" encoding="utf-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" version="2.0" xml:base="http://inciweb.wildfire.gov/">
<channel>
<title>InciWeb</title>
<link>http://inciweb.wildfire.gov/</link>
<description>Inciweb Fire Incidents</description>
<language>en</language>
<item>
<title>MNMNS Stewart Trail</title>
<link>http://inciweb.wildfire.gov/incident-information/mnmns-stewart-trail</link>
<description>Last updated: 2026-05-18
---
The type of incident is Wildfire and involves the following unit(s) Minnesota Department of Natural Resources.
---
State: Minnesota
---
Coordinates:
Latitude: 47° 3 17 Longitude: 91° 38 6
---
NOTE: All fire perimeters and points are approximations.
---
Incident Overview: The Stewart Trail Fire was detected during the afternoon hours on Friday, May 15, 2026.&amp;nbsp;A temporary flight restriction (TFR) is in place.</description>
<pubDate>Fri, 15 May 2026 08:48:11 EDT</pubDate>
<dc:creator>llangeberg</dc:creator>
<guid isPermaLink="false">327828</guid>
</item>
<item>
<title>CACNP Santa Rosa Island Fire</title>
<link>http://inciweb.wildfire.gov/incident-information/cacnp-santa-rosa-island-fire</link>
<description>Last updated: 2026-05-18
---
The type of incident is Wildfire and involves the following unit(s) Channel Islands National Park.
---
State: California
---
Coordinates:
Latitude: 33° 55 2 Longitude: 120° 5 10
---
NOTE: All fire perimeters and points are approximations.
---
Incident Overview: On Friday, May 15, 2026, an aircraft flying over Santa Rosa Island in Channel Islands National Park reported a wildfire.&lt;br&gt;&lt;p&gt;This is a &lt;strong&gt;full-suppression&lt;/strong&gt; human-caused wildfire and is under investigation.&lt;/p&gt;&amp;nbsp;</description>
<pubDate>Sat, 16 May 2026 12:09:07 EDT</pubDate>
<dc:creator>mtheune</dc:creator>
<guid isPermaLink="false">327838</guid>
</item>
<item>
<title>Some Fire Without Coordinates</title>
<link>http://inciweb.wildfire.gov/incident-information/no-coords-fire</link>
<description>Last updated: 2026-05-18
---
The type of incident is Wildfire.
---
State: Unknown State
---
Incident Overview: This is a test incident without coordinates.</description>
<pubDate>Mon, 18 May 2026 09:00:00 EDT</pubDate>
<dc:creator>test</dc:creator>
<guid isPermaLink="false">999999</guid>
</item>
<item>
<title>Florida Fire Outside Bbox</title>
<link>http://inciweb.wildfire.gov/incident-information/florida-fire</link>
<description>Last updated: 2026-05-18
---
State: Florida
---
Coordinates:
Latitude: 26° 0 0 Longitude: 80° 0 0
---
Incident Overview: This fire is in Florida, outside the CONUS west bbox.</description>
<pubDate>Mon, 18 May 2026 10:00:00 EDT</pubDate>
<dc:creator>test</dc:creator>
<guid isPermaLink="false">888888</guid>
</item>
</channel>
</rss>"""
class TestInciWebHelpers:
"""Tests for InciWeb helper functions."""
def test_parse_coordinates_from_description(self):
"""Parse coordinates from description text."""
from central.adapters.inciweb import parse_coordinates_from_description
description = """Coordinates:
Latitude: 47° 3 17 Longitude: 91° 38 6"""
result = parse_coordinates_from_description(description)
assert result is not None
lon, lat = result
# 47° 3' 17" = 47.054722...
assert 47.0 < lat < 47.1
# 91° 38' 6" = -91.635 (west longitude)
assert -92.0 < lon < -91.0
def test_parse_coordinates_no_match(self):
"""No coordinates in description returns None."""
from central.adapters.inciweb import parse_coordinates_from_description
result = parse_coordinates_from_description("No coordinates here")
assert result is None
def test_parse_state_from_description(self):
"""Parse state name and return 2-letter code."""
from central.adapters.inciweb import parse_state_from_description
description = """---
State: Minnesota
---"""
assert parse_state_from_description(description) == "MN"
def test_parse_state_from_description_new_mexico(self):
"""Parse multi-word state name."""
from central.adapters.inciweb import parse_state_from_description
description = """State: New Mexico
---"""
assert parse_state_from_description(description) == "NM"
def test_parse_state_from_description_no_match(self):
"""Unknown state name returns None."""
from central.adapters.inciweb import parse_state_from_description
description = """State: Unknown State
---"""
assert parse_state_from_description(description) is None
def test_strip_html(self):
"""HTML tags are stripped, entities decoded."""
from central.adapters.inciweb import strip_html
html = "This is &amp;nbsp;a <strong>test</strong> with <br>line breaks."
result = strip_html(html)
assert "<" not in result
assert ">" not in result
assert "&nbsp;" not in result
assert "&amp;" not in result
assert "test" in result
class TestInciWebAdapter:
"""Tests for InciWeb adapter."""
@pytest.fixture
def mock_config(self) -> AdapterConfig:
return AdapterConfig(
name="inciweb",
enabled=True,
cadence_s=600,
settings={
"region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0}
},
updated_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_config_no_region(self) -> AdapterConfig:
return AdapterConfig(
name="inciweb",
enabled=True,
cadence_s=600,
settings={},
updated_at=datetime.now(timezone.utc),
)
@pytest.fixture
def mock_config_store(self) -> MagicMock:
return MagicMock()
@pytest.fixture
def cursor_db_path(self, tmp_path: Path) -> Path:
return tmp_path / "cursors.db"
@pytest.mark.asyncio
async def test_normalization_with_georss_point(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Items with coordinates are correctly normalized."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.text = AsyncMock(return_value=SAMPLE_RSS_CONTENT)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Bbox is west=-124, east=-102 (CONUS west)
# Minnesota at -91 longitude is OUTSIDE bbox (east of -102)
# California at -120 longitude is INSIDE bbox
# Florida at -80 longitude is OUTSIDE bbox
# Unknown state without coords passes through
assert len(events) == 2
# Check California event
ca_event = next(e for e in events if e.data["guid"] == "327838")
assert ca_event.id == "327838"
assert ca_event.adapter == "inciweb"
assert ca_event.category == "fire.narrative.inciweb"
assert ca_event.severity == 0
assert ca_event.geo.primary_region == "US-CA"
assert ca_event.geo.centroid is not None
@pytest.mark.asyncio
async def test_normalization_without_georss_point(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Items without coordinates have centroid=None."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.text = AsyncMock(return_value=SAMPLE_RSS_CONTENT)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# All 4 items pass (no region filter)
assert len(events) == 4
# Check item without coords
no_coords_event = next(e for e in events if e.data["guid"] == "999999")
assert no_coords_event.geo.centroid is None
assert no_coords_event.geo.regions == []
assert no_coords_event.geo.primary_region is None
def test_state_parse_from_title(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""State parsing from description produces correct region."""
from central.adapters.inciweb import parse_state_from_description
# Test California
assert parse_state_from_description("State: California\n") == "CA"
# Test Minnesota
assert parse_state_from_description("State: Minnesota\n---") == "MN"
# Test multi-word
assert parse_state_from_description("State: New York\n") == "NY"
# Test unknown
assert parse_state_from_description("State: Narnia\n") is None
@pytest.mark.asyncio
async def test_html_stripping(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""HTML is stripped from description, raw preserved in description_html."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.text = AsyncMock(return_value=SAMPLE_RSS_CONTENT)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# California item has HTML tags in description
ca_event = next(e for e in events if e.data["guid"] == "327838")
# Plain text should not have HTML tags
assert "<br>" not in ca_event.data["description"]
assert "<p>" not in ca_event.data["description"]
assert "<strong>" not in ca_event.data["description"]
assert "&nbsp;" not in ca_event.data["description"]
# Raw HTML should be preserved
assert "&lt;br&gt;" in ca_event.data["description_html"] or "<br>" in ca_event.data["description_html"]
def test_subject_for_with_state(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""subject_for returns correct subject with state."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config, mock_config_store, cursor_db_path)
event = Event(
id="test-id",
adapter="inciweb",
category="fire.narrative.inciweb",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(primary_region="US-CA"),
data={"title": "Test Fire", "description": "Test"},
)
subject = adapter.subject_for(event)
assert subject == "central.fire.narrative.inciweb.ca"
def test_subject_for_without_state(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""subject_for returns unknown when no state."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config, mock_config_store, cursor_db_path)
event = Event(
id="test-id",
adapter="inciweb",
category="fire.narrative.inciweb",
time=datetime.now(timezone.utc),
severity=0,
geo=Geo(),
data={"title": "Test Fire", "description": "Test"},
)
subject = adapter.subject_for(event)
assert subject == "central.fire.narrative.inciweb.unknown"
@pytest.mark.asyncio
async def test_dedup_same_guid(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""is_published/mark_published provides dedup functionality."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
# Initially not published
assert adapter.is_published("327828") is False
# Mark as published
adapter.mark_published("327828")
# Now it should be published
assert adapter.is_published("327828") is True
await adapter.shutdown()
@pytest.mark.asyncio
async def test_bbox_filters_point_outside(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Items with coords outside bbox are filtered; items without coords pass."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config, mock_config_store, cursor_db_path)
await adapter.startup()
mock_response = AsyncMock()
mock_response.raise_for_status = MagicMock()
mock_response.text = AsyncMock(return_value=SAMPLE_RSS_CONTENT)
with patch.object(adapter._session, "get", return_value=AsyncMock(__aenter__=AsyncMock(return_value=mock_response), __aexit__=AsyncMock())):
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Florida (-80 longitude) should be filtered out
guids = {e.data["guid"] for e in events}
assert "888888" not in guids # Florida, outside bbox
# Item without coords should pass through
assert "999999" in guids
@pytest.mark.asyncio
async def test_apply_config_region_change(
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""apply_config updates region."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config, mock_config_store, cursor_db_path)
assert adapter.region is not None
assert adapter.region.north == 49.0
new_config = AdapterConfig(
name="inciweb",
enabled=True,
cadence_s=600,
settings={
"region": {"north": 50.0, "south": 35.0, "east": -100.0, "west": -120.0}
},
updated_at=datetime.now(timezone.utc),
)
await adapter.apply_config(new_config)
assert adapter.region.north == 50.0
assert adapter.region.south == 35.0
@pytest.mark.asyncio
async def test_dedup_in_poll_loop(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Dedup integration: second poll with same items yields zero events."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
# Single-item RSS for clarity
single_item_rss = """<?xml version="1.0" encoding="utf-8"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" version="2.0">
<channel>
<title>InciWeb</title>
<item>
<title>Test Fire</title>
<link>http://inciweb.wildfire.gov/test</link>
<description>State: California</description>
<pubDate>Mon, 18 May 2026 09:00:00 EDT</pubDate>
<guid isPermaLink="false">DEDUP-TEST-001</guid>
</item>
</channel>
</rss>"""
def make_mock_response():
mock_response = AsyncMock()
mock_response.status = 200
mock_response.raise_for_status = MagicMock()
mock_response.text = AsyncMock(return_value=single_item_rss)
mock_response.headers = {"Last-Modified": None, "ETag": None}
return mock_response
# First poll: should yield 1 event
with patch.object(
adapter._session, "get",
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=make_mock_response()),
__aexit__=AsyncMock()
)
):
events_first = [e async for e in adapter.poll()]
assert len(events_first) == 1
assert events_first[0].data["guid"] == "DEDUP-TEST-001"
# Verify mark_published was called
assert adapter.is_published("DEDUP-TEST-001") is True
# Second poll: same item should be skipped (dedup)
with patch.object(
adapter._session, "get",
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=make_mock_response()),
__aexit__=AsyncMock()
)
):
events_second = [e async for e in adapter.poll()]
assert len(events_second) == 0 # Dedup prevents re-yield
await adapter.shutdown()
@pytest.mark.asyncio
async def test_conditional_304_yields_zero(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""HTTP 304 Not Modified returns empty list and yields zero events."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
# Mock 304 response
mock_response = AsyncMock()
mock_response.status = 304
mock_response.raise_for_status = MagicMock()
with patch.object(
adapter._session, "get",
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=mock_response),
__aexit__=AsyncMock()
)
):
events = [e async for e in adapter.poll()]
assert len(events) == 0
await adapter.shutdown()
@pytest.mark.asyncio
async def test_conditional_headers_sent_after_first_poll(
self, mock_config_no_region: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
):
"""Conditional fetch headers sent on second poll after first captures them."""
from central.adapters.inciweb import InciWebAdapter
adapter = InciWebAdapter(mock_config_no_region, mock_config_store, cursor_db_path)
await adapter.startup()
# First response with Last-Modified and ETag
first_response = AsyncMock()
first_response.status = 200
first_response.raise_for_status = MagicMock()
first_response.text = AsyncMock(return_value="""<?xml version="1.0"?>
<rss version="2.0"><channel><title>Test</title></channel></rss>""")
first_response.headers = {
"Last-Modified": "Tue, 19 May 2026 03:00:00 GMT",
"ETag": "\"abc123\"",
}
# Track headers sent on second request
captured_headers = {}
def capture_get(*args, **kwargs):
captured_headers.update(kwargs.get("headers", {}))
second_response = AsyncMock()
second_response.status = 304
second_response.raise_for_status = MagicMock()
return AsyncMock(
__aenter__=AsyncMock(return_value=second_response),
__aexit__=AsyncMock()
)
# First poll
with patch.object(
adapter._session, "get",
return_value=AsyncMock(
__aenter__=AsyncMock(return_value=first_response),
__aexit__=AsyncMock()
)
):
[e async for e in adapter.poll()]
# Verify adapter captured the headers
assert adapter._last_modified == "Tue, 19 May 2026 03:00:00 GMT"
assert adapter._etag == "\"abc123\""
# Second poll with header capture
with patch.object(adapter._session, "get", side_effect=capture_get):
[e async for e in adapter.poll()]
# Verify conditional headers were sent
assert captured_headers.get("If-Modified-Since") == "Tue, 19 May 2026 03:00:00 GMT"
assert captured_headers.get("If-None-Match") == "\"abc123\""
await adapter.shutdown()