diff --git a/src/central/adapters/wfigs_incidents.py b/src/central/adapters/wfigs_incidents.py index f11b80c..fb1b786 100644 --- a/src/central/adapters/wfigs_incidents.py +++ b/src/central/adapters/wfigs_incidents.py @@ -2,6 +2,7 @@ import logging import sqlite3 +import time from collections.abc import AsyncIterator from datetime import datetime, timezone from pathlib import Path @@ -18,7 +19,6 @@ from tenacity import ( from central.adapter import SourceAdapter from central.adapters.wfigs_common import ( - WFIGS_INCIDENTS_URL, build_regions, cleanup_old_observed, delete_observed, @@ -41,11 +41,42 @@ logger = logging.getLogger(__name__) LAYER_NAME = "incidents" +# v0.10.4: switched from the `_Current` view to the parent `WFIGS_Incident_Locations` +# endpoint. The Current view excludes IMT-managed BLM fires once they transition +# to Type 3 IC / ICS-209 reporting (e.g. Blue Ridge: 14k acres, modified upstream +# within the hour, but absent from _Current). The parent endpoint has the +# IMT-managed fires; we filter to active wildfires server-side and cap recency +# client-side. The wfigs_perimeters adapter stays on `_Current` (perimeters have +# a different lifecycle and Blue Ridge isn't in either perimeter layer). +WFIGS_INCIDENTS_URL = ( + "https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/" + "WFIGS_Incident_Locations/FeatureServer/0/query" +) + +# Client-side recency cutoff: drop features whose ModifiedOnDateTime_dt is older +# than this many seconds. Server-side `ModifiedOnDateTime_dt > N` combined with +# any other predicate returns "Unable to perform query" on this layer, so the +# floor is enforced after the JSON parse. Computed fresh each poll -- this is +# NOT a persisted cursor (avoids the v0.10.2.1 silent-zero failure mode). +_RECENCY_CUTOFF_S = 30 * 86400 + +# Server-side cap. The endpoint also accepts orderByFields=ModifiedOnDateTime_dt +# DESC, so the 300 we get back are the 300 most-recently-touched WF records in +# the configured POOState (or globally if state is unset). Idaho currently has +# ~30 within the 30-day client-side window. +_RESULT_RECORD_COUNT = 300 + class WFIGSIncidentsSettings(BaseModel): """Settings schema for WFIGS Incidents adapter.""" region: RegionConfig | None = None + # v0.10.4: ISO 3166-2 POOState code (e.g. "US-ID") for the server-side + # POOState filter. None disables the predicate -- the adapter then sees + # every state's WF records up to the resultRecordCount cap and relies on + # the existing client-side `region` bbox to scope. Operators normally + # set this to match the bbox state for a tighter upstream call. + state: str | None = None class WFIGSIncidentsAdapter(SourceAdapter): @@ -80,6 +111,8 @@ class WFIGSIncidentsAdapter(SourceAdapter): self.region: RegionConfig | None = RegionConfig(**region_dict) else: self.region = None + # v0.10.4: POOState code (e.g. "US-ID") for the server-side filter. + self.state: str | None = config.settings.get("state") async def startup(self) -> None: """Initialize HTTP session and SQLite connection.""" @@ -127,6 +160,7 @@ class WFIGSIncidentsAdapter(SourceAdapter): self.region = RegionConfig(**region_dict) else: self.region = None + self.state = new_config.settings.get("state") logger.info( "WFIGS incidents config updated", extra={"region": self.region.model_dump() if self.region else None}, @@ -152,29 +186,39 @@ class WFIGSIncidentsAdapter(SourceAdapter): retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), ) async def _fetch_features(self) -> list[dict[str, Any]]: - """Fetch features from WFIGS FeatureServer.""" + """Fetch features from WFIGS FeatureServer. + + v0.10.4: switched from the `_Current` view to the parent endpoint with + a server-side active-wildfire filter (`IncidentTypeCategory='WF' AND + FireOutDateTime IS NULL [AND POOState='']`), capped at the 300 + most-recently-touched records via `orderByFields=ModifiedOnDateTime_dt + DESC + resultRecordCount=300`. A client-side recency cutoff drops + anything older than ``_RECENCY_CUTOFF_S``. The cutoff is recomputed + fresh each poll -- this is NOT a persisted cursor (see v0.10.2.1). + """ if not self._session: raise RuntimeError("Session not initialized") - # Build query params + # Server-side WHERE: active wildfires only, optional POOState scope. + # The state code is plumbed from settings (no hardcoded codes); when + # unset, the predicate is omitted and the call returns every state's + # active WF records up to the result cap. + where_parts = ["IncidentTypeCategory='WF'", "FireOutDateTime IS NULL"] + if self.state: + # POOState literal is a 2-segment ISO-3166-2 code ("US-ID"), no quotes + # required inside it; escape any embedded single quotes defensively. + safe_state = self.state.replace("'", "''") + where_parts.append(f"POOState='{safe_state}'") params: dict[str, str] = { "outFields": "*", "returnGeometry": "true", "f": "geojson", + "where": " AND ".join(where_parts), + "orderByFields": "ModifiedOnDateTime_dt DESC", + "resultRecordCount": str(_RESULT_RECORD_COUNT), } - # v0.10.2.1: full-page fetch every poll. The previous incremental - # `ModifiedOnDateTime > timestamp ''` clause silently - # returned 0 features because the upstream layer renamed the column - # to `ModifiedOnDateTime_dt` (epoch ms) and our where-clause both - # used the old name AND compared against a SQL timestamp literal. - # ArcGIS treated the clause as not-matching; the fall-off detector - # then tombstoned every previously-observed IRWINID on poll #2. - # `wfigs_observed` + `published_ids` already de-duplicate the full - # page, so re-fetching every poll is correct and idempotent. - params["where"] = "1=1" - - # Bbox filter if region configured + # Bbox filter if region configured (defense-in-depth alongside POOState). if self.region: bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}" params["geometry"] = bbox @@ -187,9 +231,24 @@ class WFIGSIncidentsAdapter(SourceAdapter): data = await resp.json() features = data.get("features", []) + raw_count = len(features) + + # Client-side recency floor. Server-side `ModifiedOnDateTime_dt > N` + # combined with any other predicate is rejected on this layer, so the + # 30-day cutoff is enforced here after the JSON parse. Computed fresh + # each call -- never persisted. + cutoff_ms = (int(time.time()) - _RECENCY_CUTOFF_S) * 1000 + features = [ + f for f in features + if (f.get("properties", {}).get("ModifiedOnDateTime_dt") or 0) > cutoff_ms + ] logger.info( "WFIGS incidents fetch completed", - extra={"feature_count": len(features)}, + extra={ + "feature_count_raw": raw_count, + "feature_count_after_recency_filter": len(features), + "recency_cutoff_s": _RECENCY_CUTOFF_S, + }, ) return features diff --git a/tests/test_wfigs.py b/tests/test_wfigs.py index 254a5f8..14c52ab 100644 --- a/tests/test_wfigs.py +++ b/tests/test_wfigs.py @@ -1,5 +1,6 @@ """Tests for WFIGS adapters.""" +import time from datetime import datetime, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch @@ -10,8 +11,13 @@ from central.config_models import AdapterConfig from central.models import Event, Geo -# Sample GeoJSON response with incidents using real WFIGS format +# Sample GeoJSON response with incidents using real WFIGS format. +# v0.10.4: ModifiedOnDateTime_dt is now the real upstream field name (the +# pre-v0.10.2.1 ``ModifiedOnDateTime`` field was renamed by NIFC); set to +# "now-ish" via the module-load timestamp so these features pass the +# v0.10.4 client-side recency filter on every test run. # Note: POOState comes as ISO 3166-2 ("US-MT"), IncidentTypeCategory as codes ("WF") +_FIXTURE_NOW_MS = int(time.time() * 1000) SAMPLE_INCIDENTS_RESPONSE = { "type": "FeatureCollection", "features": [ @@ -25,7 +31,7 @@ SAMPLE_INCIDENTS_RESPONSE = { "DailyAcres": 150, "PercentContained": 25, "FireDiscoveryDateTime": 1716000000000, - "ModifiedOnDateTime": 1716100000000, + "ModifiedOnDateTime_dt": _FIXTURE_NOW_MS, "POOState": "US-MT", # Real format: ISO 3166-2 "POOCounty": "Glacier", }, @@ -40,7 +46,7 @@ SAMPLE_INCIDENTS_RESPONSE = { "DailyAcres": 5, "PercentContained": 100, "FireDiscoveryDateTime": 1716200000000, - "ModifiedOnDateTime": 1716300000000, + "ModifiedOnDateTime_dt": _FIXTURE_NOW_MS, "POOState": "US-ID", "POOCounty": "Owyhee", }, @@ -55,7 +61,7 @@ SAMPLE_INCIDENTS_RESPONSE = { "DailyAcres": 50, "PercentContained": 0, "FireDiscoveryDateTime": 1716400000000, - "ModifiedOnDateTime": 1716500000000, + "ModifiedOnDateTime_dt": _FIXTURE_NOW_MS, "POOState": "US-FL", "POOCounty": "Miami-Dade", }, @@ -238,7 +244,11 @@ class TestWFIGSIncidentsAdapter: enabled=True, cadence_s=300, settings={ - "region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0} + "region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0}, + # v0.10.4: POOState scope for the server-side filter. The test + # ships Idaho to match the production config; the assertion in + # test_where_clause_filters_active_wf_only expects this value. + "state": "US-ID", }, updated_at=datetime.now(timezone.utc), ) @@ -316,18 +326,21 @@ class TestWFIGSIncidentsAdapter: await adapter.shutdown() @pytest.mark.asyncio - async def test_where_clause_is_1_eq_1_on_every_poll( + async def test_where_clause_filters_active_wf_only( self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path ): - """v0.10.2.1 regression guard: every poll sends ``where=1=1``. + """v0.10.4 regression guard: every poll sends the active-WF filter. - The pre-v0.10.2.1 adapter sent ``where=ModifiedOnDateTime > timestamp 'X'`` - on every poll after the first -- a clause that silently returned 0 - features because the upstream layer renamed the column to - ``ModifiedOnDateTime_dt``. That made the fall-off detector tombstone - every previously-observed IRWINID on poll #2 (the Summit Creek bug). - Both the first poll and every poll thereafter must now send the - unconditional full-page query. + v0.10.2.1 used ``where=1=1`` against the ``_Current`` endpoint. That + endpoint excludes IMT-managed BLM fires (Blue Ridge, Sailor Cap, etc.) + once they hit Type 3 IC reporting. v0.10.4 switched to the parent + ``WFIGS_Incident_Locations`` endpoint with a server-side active-WF + filter that surfaces those IMT-managed fires. The bug where-clause + ``ModifiedOnDateTime[_dt] > timestamp 'X'`` from pre-v0.10.2.1 is + also banned from the wire (the v0.10.2.1 silent-zero failure mode): + ``ModifiedOnDateTime_dt`` may only appear in ``orderByFields`` and in + the client-side python recency filter, never as a server-side + boolean predicate inside ``where``. """ from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter @@ -349,16 +362,26 @@ class TestWFIGSIncidentsAdapter: await adapter.shutdown() + expected_where = ( + "IncidentTypeCategory='WF' AND FireOutDateTime IS NULL " + "AND POOState='US-ID'" + ) assert len(captured) == 2, "expected one HTTP call per poll" for i, params in enumerate(captured): - assert params.get("where") == "1=1", ( - f"poll #{i+1} sent where={params.get('where')!r}; expected '1=1'" + assert params.get("where") == expected_where, ( + f"poll #{i+1} sent where={params.get('where')!r}; " + f"expected {expected_where!r}" + ) + assert params.get("orderByFields") == "ModifiedOnDateTime_dt DESC" + assert params.get("resultRecordCount") == "300" + # The where clause specifically must NOT contain a time-predicate + # like `ModifiedOnDateTime[_dt] > N` -- that's the v0.10.2.1 + # silent-zero failure shape. (`_dt` IS allowed inside orderByFields + # and in the post-fetch python filter; just not as a where predicate.) + assert "ModifiedOnDateTime" not in params["where"], ( + f"poll #{i+1} smuggled ModifiedOnDateTime back into where clause: " + f"{params['where']!r}" ) - # Regression guard against ANY incremental time clause sneaking back. - for v in params.values(): - assert "ModifiedOnDateTime" not in str(v), ( - f"poll #{i+1} param value referenced ModifiedOnDateTime: {v!r}" - ) @pytest.mark.asyncio async def test_no_last_poll_time_attribute( @@ -373,6 +396,102 @@ class TestWFIGSIncidentsAdapter: "incremental where-clause; do not re-add" ) + def test_endpoint_url_is_not_current_view(self): + """v0.10.4 regression guard: the URL constant points to the parent + ``WFIGS_Incident_Locations`` endpoint, NOT ``WFIGS_Incident_Locations_Current``. + + The ``_Current`` view excludes IMT-managed BLM fires (Type 3 IC and up). + Blue Ridge -- a 14k-acre Owyhee County WF actively modified within + the hour -- is absent from ``_Current`` but present in the parent + endpoint. Reverting this constant would silently lose that class of + fire from Central's coverage. + """ + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter, WFIGS_INCIDENTS_URL + + assert WFIGS_INCIDENTS_URL.endswith( + "/WFIGS_Incident_Locations/FeatureServer/0/query" + ), f"unexpected endpoint suffix: {WFIGS_INCIDENTS_URL!r}" + assert "_Current/" not in WFIGS_INCIDENTS_URL, ( + f"adapter reverted to the _Current view: {WFIGS_INCIDENTS_URL!r}" + ) + # Cross-check that the adapter class actually uses the constant we + # just asserted on (catches an accidental local override inside the + # class body that bypasses the module-level constant). + assert WFIGSIncidentsAdapter.__module__.endswith("wfigs_incidents") + + @pytest.mark.asyncio + async def test_client_side_recency_filter_drops_stale_features( + self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path + ): + """v0.10.4 regression guard: the post-fetch recency filter drops + features whose ``ModifiedOnDateTime_dt`` is older than the cutoff. + + Server-side `ModifiedOnDateTime_dt > N` combined with any other + predicate is rejected on this layer (returns "Unable to perform + query"), so the recency floor lives client-side. Three crafted + features: one modified now (kept), one 25d ago (kept under the + 30d cutoff), one 60d ago (dropped). Expect 2 events yielded. + """ + from central.adapters import wfigs_incidents as mod + from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter + + now_ms = int(time.time()) * 1000 + cutoff_s = mod._RECENCY_CUTOFF_S + # 25 days back -- inside the 30-day window + modified_25d_ago_ms = now_ms - 25 * 86400 * 1000 + # 60 days back -- outside the 30-day window + modified_60d_ago_ms = now_ms - 60 * 86400 * 1000 + # Sanity: the cutoff IS 30 days (catches accidental constant change). + assert cutoff_s == 30 * 86400, ( + f"_RECENCY_CUTOFF_S changed to {cutoff_s}s; update this test " + "or revisit the deploy plan's volume estimate" + ) + + def _feat(eid: str, lon: float, lat: float, modified_ms: int) -> dict: + return { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [lon, lat]}, + "properties": { + "IrwinID": eid, + "IncidentName": eid, + "IncidentTypeCategory": "WF", + "DailyAcres": 10, + "PercentContained": 0, + "FireDiscoveryDateTime": now_ms - 86400 * 1000, + "ModifiedOnDateTime_dt": modified_ms, + "POOState": "US-ID", + "POOCounty": "Owyhee", + }, + } + + crafted_response = { + "type": "FeatureCollection", + "features": [ + _feat("FRESH-NOW", -116.5, 43.5, now_ms), + _feat("FRESH-25D", -116.4, 43.4, modified_25d_ago_ms), + _feat("STALE-60D", -116.3, 43.3, modified_60d_ago_ms), + ], + } + + adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path) + await adapter.startup() + + resp = AsyncMock() + resp.raise_for_status = MagicMock() + resp.json = AsyncMock(return_value=crafted_response) + with patch.object( + adapter._session, "get", + return_value=AsyncMock(__aenter__=AsyncMock(return_value=resp), __aexit__=AsyncMock()), + ): + events = [e async for e in adapter.poll()] + await adapter.shutdown() + + yielded_ids = sorted(e.id for e in events) + assert yielded_ids == ["FRESH-25D", "FRESH-NOW"], ( + f"recency filter mis-fired; yielded={yielded_ids!r} " + "(expected the 2 within the 30-day cutoff, dropping the 60-day one)" + ) + @pytest.mark.asyncio async def test_fall_off_emits_removal( self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path