mirror of
https://github.com/zvx-echo6/central.git
synced 2026-06-10 20:04:43 +02:00
v0.10.4: switch wfigs_incidents to non-Current endpoint w/ WF active-only filter (resurrects IMT-managed fires like Blue Ridge) (#89)
Closes #89 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
0dd83a340e
commit
2232718509
2 changed files with 216 additions and 38 deletions
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
import logging
|
||||
import sqlite3
|
||||
import time
|
||||
from collections.abc import AsyncIterator
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
|
@ -18,7 +19,6 @@ from tenacity import (
|
|||
|
||||
from central.adapter import SourceAdapter
|
||||
from central.adapters.wfigs_common import (
|
||||
WFIGS_INCIDENTS_URL,
|
||||
build_regions,
|
||||
cleanup_old_observed,
|
||||
delete_observed,
|
||||
|
|
@ -41,11 +41,42 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
LAYER_NAME = "incidents"
|
||||
|
||||
# v0.10.4: switched from the `_Current` view to the parent `WFIGS_Incident_Locations`
|
||||
# endpoint. The Current view excludes IMT-managed BLM fires once they transition
|
||||
# to Type 3 IC / ICS-209 reporting (e.g. Blue Ridge: 14k acres, modified upstream
|
||||
# within the hour, but absent from _Current). The parent endpoint has the
|
||||
# IMT-managed fires; we filter to active wildfires server-side and cap recency
|
||||
# client-side. The wfigs_perimeters adapter stays on `_Current` (perimeters have
|
||||
# a different lifecycle and Blue Ridge isn't in either perimeter layer).
|
||||
WFIGS_INCIDENTS_URL = (
|
||||
"https://services3.arcgis.com/T4QMspbfLg3qTGWY/ArcGIS/rest/services/"
|
||||
"WFIGS_Incident_Locations/FeatureServer/0/query"
|
||||
)
|
||||
|
||||
# Client-side recency cutoff: drop features whose ModifiedOnDateTime_dt is older
|
||||
# than this many seconds. Server-side `ModifiedOnDateTime_dt > N` combined with
|
||||
# any other predicate returns "Unable to perform query" on this layer, so the
|
||||
# floor is enforced after the JSON parse. Computed fresh each poll -- this is
|
||||
# NOT a persisted cursor (avoids the v0.10.2.1 silent-zero failure mode).
|
||||
_RECENCY_CUTOFF_S = 30 * 86400
|
||||
|
||||
# Server-side cap. The endpoint also accepts orderByFields=ModifiedOnDateTime_dt
|
||||
# DESC, so the 300 we get back are the 300 most-recently-touched WF records in
|
||||
# the configured POOState (or globally if state is unset). Idaho currently has
|
||||
# ~30 within the 30-day client-side window.
|
||||
_RESULT_RECORD_COUNT = 300
|
||||
|
||||
|
||||
class WFIGSIncidentsSettings(BaseModel):
|
||||
"""Settings schema for WFIGS Incidents adapter."""
|
||||
|
||||
region: RegionConfig | None = None
|
||||
# v0.10.4: ISO 3166-2 POOState code (e.g. "US-ID") for the server-side
|
||||
# POOState filter. None disables the predicate -- the adapter then sees
|
||||
# every state's WF records up to the resultRecordCount cap and relies on
|
||||
# the existing client-side `region` bbox to scope. Operators normally
|
||||
# set this to match the bbox state for a tighter upstream call.
|
||||
state: str | None = None
|
||||
|
||||
|
||||
class WFIGSIncidentsAdapter(SourceAdapter):
|
||||
|
|
@ -80,6 +111,8 @@ class WFIGSIncidentsAdapter(SourceAdapter):
|
|||
self.region: RegionConfig | None = RegionConfig(**region_dict)
|
||||
else:
|
||||
self.region = None
|
||||
# v0.10.4: POOState code (e.g. "US-ID") for the server-side filter.
|
||||
self.state: str | None = config.settings.get("state")
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Initialize HTTP session and SQLite connection."""
|
||||
|
|
@ -127,6 +160,7 @@ class WFIGSIncidentsAdapter(SourceAdapter):
|
|||
self.region = RegionConfig(**region_dict)
|
||||
else:
|
||||
self.region = None
|
||||
self.state = new_config.settings.get("state")
|
||||
logger.info(
|
||||
"WFIGS incidents config updated",
|
||||
extra={"region": self.region.model_dump() if self.region else None},
|
||||
|
|
@ -152,29 +186,39 @@ class WFIGSIncidentsAdapter(SourceAdapter):
|
|||
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
|
||||
)
|
||||
async def _fetch_features(self) -> list[dict[str, Any]]:
|
||||
"""Fetch features from WFIGS FeatureServer."""
|
||||
"""Fetch features from WFIGS FeatureServer.
|
||||
|
||||
v0.10.4: switched from the `_Current` view to the parent endpoint with
|
||||
a server-side active-wildfire filter (`IncidentTypeCategory='WF' AND
|
||||
FireOutDateTime IS NULL [AND POOState='<state>']`), capped at the 300
|
||||
most-recently-touched records via `orderByFields=ModifiedOnDateTime_dt
|
||||
DESC + resultRecordCount=300`. A client-side recency cutoff drops
|
||||
anything older than ``_RECENCY_CUTOFF_S``. The cutoff is recomputed
|
||||
fresh each poll -- this is NOT a persisted cursor (see v0.10.2.1).
|
||||
"""
|
||||
if not self._session:
|
||||
raise RuntimeError("Session not initialized")
|
||||
|
||||
# Build query params
|
||||
# Server-side WHERE: active wildfires only, optional POOState scope.
|
||||
# The state code is plumbed from settings (no hardcoded codes); when
|
||||
# unset, the predicate is omitted and the call returns every state's
|
||||
# active WF records up to the result cap.
|
||||
where_parts = ["IncidentTypeCategory='WF'", "FireOutDateTime IS NULL"]
|
||||
if self.state:
|
||||
# POOState literal is a 2-segment ISO-3166-2 code ("US-ID"), no quotes
|
||||
# required inside it; escape any embedded single quotes defensively.
|
||||
safe_state = self.state.replace("'", "''")
|
||||
where_parts.append(f"POOState='{safe_state}'")
|
||||
params: dict[str, str] = {
|
||||
"outFields": "*",
|
||||
"returnGeometry": "true",
|
||||
"f": "geojson",
|
||||
"where": " AND ".join(where_parts),
|
||||
"orderByFields": "ModifiedOnDateTime_dt DESC",
|
||||
"resultRecordCount": str(_RESULT_RECORD_COUNT),
|
||||
}
|
||||
|
||||
# v0.10.2.1: full-page fetch every poll. The previous incremental
|
||||
# `ModifiedOnDateTime > timestamp '<last_poll>'` clause silently
|
||||
# returned 0 features because the upstream layer renamed the column
|
||||
# to `ModifiedOnDateTime_dt` (epoch ms) and our where-clause both
|
||||
# used the old name AND compared against a SQL timestamp literal.
|
||||
# ArcGIS treated the clause as not-matching; the fall-off detector
|
||||
# then tombstoned every previously-observed IRWINID on poll #2.
|
||||
# `wfigs_observed` + `published_ids` already de-duplicate the full
|
||||
# page, so re-fetching every poll is correct and idempotent.
|
||||
params["where"] = "1=1"
|
||||
|
||||
# Bbox filter if region configured
|
||||
# Bbox filter if region configured (defense-in-depth alongside POOState).
|
||||
if self.region:
|
||||
bbox = f"{self.region.west},{self.region.south},{self.region.east},{self.region.north}"
|
||||
params["geometry"] = bbox
|
||||
|
|
@ -187,9 +231,24 @@ class WFIGSIncidentsAdapter(SourceAdapter):
|
|||
data = await resp.json()
|
||||
|
||||
features = data.get("features", [])
|
||||
raw_count = len(features)
|
||||
|
||||
# Client-side recency floor. Server-side `ModifiedOnDateTime_dt > N`
|
||||
# combined with any other predicate is rejected on this layer, so the
|
||||
# 30-day cutoff is enforced here after the JSON parse. Computed fresh
|
||||
# each call -- never persisted.
|
||||
cutoff_ms = (int(time.time()) - _RECENCY_CUTOFF_S) * 1000
|
||||
features = [
|
||||
f for f in features
|
||||
if (f.get("properties", {}).get("ModifiedOnDateTime_dt") or 0) > cutoff_ms
|
||||
]
|
||||
logger.info(
|
||||
"WFIGS incidents fetch completed",
|
||||
extra={"feature_count": len(features)},
|
||||
extra={
|
||||
"feature_count_raw": raw_count,
|
||||
"feature_count_after_recency_filter": len(features),
|
||||
"recency_cutoff_s": _RECENCY_CUTOFF_S,
|
||||
},
|
||||
)
|
||||
return features
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""Tests for WFIGS adapters."""
|
||||
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
|
@ -10,8 +11,13 @@ from central.config_models import AdapterConfig
|
|||
from central.models import Event, Geo
|
||||
|
||||
|
||||
# Sample GeoJSON response with incidents using real WFIGS format
|
||||
# Sample GeoJSON response with incidents using real WFIGS format.
|
||||
# v0.10.4: ModifiedOnDateTime_dt is now the real upstream field name (the
|
||||
# pre-v0.10.2.1 ``ModifiedOnDateTime`` field was renamed by NIFC); set to
|
||||
# "now-ish" via the module-load timestamp so these features pass the
|
||||
# v0.10.4 client-side recency filter on every test run.
|
||||
# Note: POOState comes as ISO 3166-2 ("US-MT"), IncidentTypeCategory as codes ("WF")
|
||||
_FIXTURE_NOW_MS = int(time.time() * 1000)
|
||||
SAMPLE_INCIDENTS_RESPONSE = {
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
|
|
@ -25,7 +31,7 @@ SAMPLE_INCIDENTS_RESPONSE = {
|
|||
"DailyAcres": 150,
|
||||
"PercentContained": 25,
|
||||
"FireDiscoveryDateTime": 1716000000000,
|
||||
"ModifiedOnDateTime": 1716100000000,
|
||||
"ModifiedOnDateTime_dt": _FIXTURE_NOW_MS,
|
||||
"POOState": "US-MT", # Real format: ISO 3166-2
|
||||
"POOCounty": "Glacier",
|
||||
},
|
||||
|
|
@ -40,7 +46,7 @@ SAMPLE_INCIDENTS_RESPONSE = {
|
|||
"DailyAcres": 5,
|
||||
"PercentContained": 100,
|
||||
"FireDiscoveryDateTime": 1716200000000,
|
||||
"ModifiedOnDateTime": 1716300000000,
|
||||
"ModifiedOnDateTime_dt": _FIXTURE_NOW_MS,
|
||||
"POOState": "US-ID",
|
||||
"POOCounty": "Owyhee",
|
||||
},
|
||||
|
|
@ -55,7 +61,7 @@ SAMPLE_INCIDENTS_RESPONSE = {
|
|||
"DailyAcres": 50,
|
||||
"PercentContained": 0,
|
||||
"FireDiscoveryDateTime": 1716400000000,
|
||||
"ModifiedOnDateTime": 1716500000000,
|
||||
"ModifiedOnDateTime_dt": _FIXTURE_NOW_MS,
|
||||
"POOState": "US-FL",
|
||||
"POOCounty": "Miami-Dade",
|
||||
},
|
||||
|
|
@ -238,7 +244,11 @@ class TestWFIGSIncidentsAdapter:
|
|||
enabled=True,
|
||||
cadence_s=300,
|
||||
settings={
|
||||
"region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0}
|
||||
"region": {"north": 49.0, "south": 31.0, "east": -102.0, "west": -124.0},
|
||||
# v0.10.4: POOState scope for the server-side filter. The test
|
||||
# ships Idaho to match the production config; the assertion in
|
||||
# test_where_clause_filters_active_wf_only expects this value.
|
||||
"state": "US-ID",
|
||||
},
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
|
@ -316,18 +326,21 @@ class TestWFIGSIncidentsAdapter:
|
|||
await adapter.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_where_clause_is_1_eq_1_on_every_poll(
|
||||
async def test_where_clause_filters_active_wf_only(
|
||||
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||
):
|
||||
"""v0.10.2.1 regression guard: every poll sends ``where=1=1``.
|
||||
"""v0.10.4 regression guard: every poll sends the active-WF filter.
|
||||
|
||||
The pre-v0.10.2.1 adapter sent ``where=ModifiedOnDateTime > timestamp 'X'``
|
||||
on every poll after the first -- a clause that silently returned 0
|
||||
features because the upstream layer renamed the column to
|
||||
``ModifiedOnDateTime_dt``. That made the fall-off detector tombstone
|
||||
every previously-observed IRWINID on poll #2 (the Summit Creek bug).
|
||||
Both the first poll and every poll thereafter must now send the
|
||||
unconditional full-page query.
|
||||
v0.10.2.1 used ``where=1=1`` against the ``_Current`` endpoint. That
|
||||
endpoint excludes IMT-managed BLM fires (Blue Ridge, Sailor Cap, etc.)
|
||||
once they hit Type 3 IC reporting. v0.10.4 switched to the parent
|
||||
``WFIGS_Incident_Locations`` endpoint with a server-side active-WF
|
||||
filter that surfaces those IMT-managed fires. The bug where-clause
|
||||
``ModifiedOnDateTime[_dt] > timestamp 'X'`` from pre-v0.10.2.1 is
|
||||
also banned from the wire (the v0.10.2.1 silent-zero failure mode):
|
||||
``ModifiedOnDateTime_dt`` may only appear in ``orderByFields`` and in
|
||||
the client-side python recency filter, never as a server-side
|
||||
boolean predicate inside ``where``.
|
||||
"""
|
||||
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
|
||||
|
||||
|
|
@ -349,16 +362,26 @@ class TestWFIGSIncidentsAdapter:
|
|||
|
||||
await adapter.shutdown()
|
||||
|
||||
expected_where = (
|
||||
"IncidentTypeCategory='WF' AND FireOutDateTime IS NULL "
|
||||
"AND POOState='US-ID'"
|
||||
)
|
||||
assert len(captured) == 2, "expected one HTTP call per poll"
|
||||
for i, params in enumerate(captured):
|
||||
assert params.get("where") == "1=1", (
|
||||
f"poll #{i+1} sent where={params.get('where')!r}; expected '1=1'"
|
||||
assert params.get("where") == expected_where, (
|
||||
f"poll #{i+1} sent where={params.get('where')!r}; "
|
||||
f"expected {expected_where!r}"
|
||||
)
|
||||
assert params.get("orderByFields") == "ModifiedOnDateTime_dt DESC"
|
||||
assert params.get("resultRecordCount") == "300"
|
||||
# The where clause specifically must NOT contain a time-predicate
|
||||
# like `ModifiedOnDateTime[_dt] > N` -- that's the v0.10.2.1
|
||||
# silent-zero failure shape. (`_dt` IS allowed inside orderByFields
|
||||
# and in the post-fetch python filter; just not as a where predicate.)
|
||||
assert "ModifiedOnDateTime" not in params["where"], (
|
||||
f"poll #{i+1} smuggled ModifiedOnDateTime back into where clause: "
|
||||
f"{params['where']!r}"
|
||||
)
|
||||
# Regression guard against ANY incremental time clause sneaking back.
|
||||
for v in params.values():
|
||||
assert "ModifiedOnDateTime" not in str(v), (
|
||||
f"poll #{i+1} param value referenced ModifiedOnDateTime: {v!r}"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_last_poll_time_attribute(
|
||||
|
|
@ -373,6 +396,102 @@ class TestWFIGSIncidentsAdapter:
|
|||
"incremental where-clause; do not re-add"
|
||||
)
|
||||
|
||||
def test_endpoint_url_is_not_current_view(self):
|
||||
"""v0.10.4 regression guard: the URL constant points to the parent
|
||||
``WFIGS_Incident_Locations`` endpoint, NOT ``WFIGS_Incident_Locations_Current``.
|
||||
|
||||
The ``_Current`` view excludes IMT-managed BLM fires (Type 3 IC and up).
|
||||
Blue Ridge -- a 14k-acre Owyhee County WF actively modified within
|
||||
the hour -- is absent from ``_Current`` but present in the parent
|
||||
endpoint. Reverting this constant would silently lose that class of
|
||||
fire from Central's coverage.
|
||||
"""
|
||||
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter, WFIGS_INCIDENTS_URL
|
||||
|
||||
assert WFIGS_INCIDENTS_URL.endswith(
|
||||
"/WFIGS_Incident_Locations/FeatureServer/0/query"
|
||||
), f"unexpected endpoint suffix: {WFIGS_INCIDENTS_URL!r}"
|
||||
assert "_Current/" not in WFIGS_INCIDENTS_URL, (
|
||||
f"adapter reverted to the _Current view: {WFIGS_INCIDENTS_URL!r}"
|
||||
)
|
||||
# Cross-check that the adapter class actually uses the constant we
|
||||
# just asserted on (catches an accidental local override inside the
|
||||
# class body that bypasses the module-level constant).
|
||||
assert WFIGSIncidentsAdapter.__module__.endswith("wfigs_incidents")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_client_side_recency_filter_drops_stale_features(
|
||||
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||
):
|
||||
"""v0.10.4 regression guard: the post-fetch recency filter drops
|
||||
features whose ``ModifiedOnDateTime_dt`` is older than the cutoff.
|
||||
|
||||
Server-side `ModifiedOnDateTime_dt > N` combined with any other
|
||||
predicate is rejected on this layer (returns "Unable to perform
|
||||
query"), so the recency floor lives client-side. Three crafted
|
||||
features: one modified now (kept), one 25d ago (kept under the
|
||||
30d cutoff), one 60d ago (dropped). Expect 2 events yielded.
|
||||
"""
|
||||
from central.adapters import wfigs_incidents as mod
|
||||
from central.adapters.wfigs_incidents import WFIGSIncidentsAdapter
|
||||
|
||||
now_ms = int(time.time()) * 1000
|
||||
cutoff_s = mod._RECENCY_CUTOFF_S
|
||||
# 25 days back -- inside the 30-day window
|
||||
modified_25d_ago_ms = now_ms - 25 * 86400 * 1000
|
||||
# 60 days back -- outside the 30-day window
|
||||
modified_60d_ago_ms = now_ms - 60 * 86400 * 1000
|
||||
# Sanity: the cutoff IS 30 days (catches accidental constant change).
|
||||
assert cutoff_s == 30 * 86400, (
|
||||
f"_RECENCY_CUTOFF_S changed to {cutoff_s}s; update this test "
|
||||
"or revisit the deploy plan's volume estimate"
|
||||
)
|
||||
|
||||
def _feat(eid: str, lon: float, lat: float, modified_ms: int) -> dict:
|
||||
return {
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "Point", "coordinates": [lon, lat]},
|
||||
"properties": {
|
||||
"IrwinID": eid,
|
||||
"IncidentName": eid,
|
||||
"IncidentTypeCategory": "WF",
|
||||
"DailyAcres": 10,
|
||||
"PercentContained": 0,
|
||||
"FireDiscoveryDateTime": now_ms - 86400 * 1000,
|
||||
"ModifiedOnDateTime_dt": modified_ms,
|
||||
"POOState": "US-ID",
|
||||
"POOCounty": "Owyhee",
|
||||
},
|
||||
}
|
||||
|
||||
crafted_response = {
|
||||
"type": "FeatureCollection",
|
||||
"features": [
|
||||
_feat("FRESH-NOW", -116.5, 43.5, now_ms),
|
||||
_feat("FRESH-25D", -116.4, 43.4, modified_25d_ago_ms),
|
||||
_feat("STALE-60D", -116.3, 43.3, modified_60d_ago_ms),
|
||||
],
|
||||
}
|
||||
|
||||
adapter = WFIGSIncidentsAdapter(mock_config, mock_config_store, cursor_db_path)
|
||||
await adapter.startup()
|
||||
|
||||
resp = AsyncMock()
|
||||
resp.raise_for_status = MagicMock()
|
||||
resp.json = AsyncMock(return_value=crafted_response)
|
||||
with patch.object(
|
||||
adapter._session, "get",
|
||||
return_value=AsyncMock(__aenter__=AsyncMock(return_value=resp), __aexit__=AsyncMock()),
|
||||
):
|
||||
events = [e async for e in adapter.poll()]
|
||||
await adapter.shutdown()
|
||||
|
||||
yielded_ids = sorted(e.id for e in events)
|
||||
assert yielded_ids == ["FRESH-25D", "FRESH-NOW"], (
|
||||
f"recency filter mis-fired; yielded={yielded_ids!r} "
|
||||
"(expected the 2 within the 30-day cutoff, dropping the 60-day one)"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fall_off_emits_removal(
|
||||
self, mock_config: AdapterConfig, mock_config_store: MagicMock, cursor_db_path: Path
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue