WZDx: poll-time state allowlist with Idaho-region default (v0.9.17)

Narrow which WZDx registry feeds are fetched at poll time, so out-of-state
feeds are never requested (no bandwidth/quota cost), rather than fetching
nationally and dropping out-of-bounds events at the archive INSERT (v0.9.12).

- states is now list[StateCode] (Literal of 56 canonical 2-letter codes),
  default ["ID","WA","OR","NV","UT","WY","MT"]. Semantic flip: empty/unset/null
  now means the Idaho-region default, NOT "every eligible feed". Poll nationally
  by selecting all states in the GUI.
- list[Literal] renders as the existing checkboxes multi-select widget (firms.py
  precedent); pydantic rejects malformed codes at save time.
- Add informational quota_estimate (cap=None: never warns/blocks) so the edit
  page shows the fetch-volume reduction.
- adapters_edit.html: standalone quota panel for flat-config adapters, guarded
  by not has_model_list so model_list adapters don't double-render; reuses the
  identical quota.* keys + CSS so the v0.9.15 client recompute works for them too.
- 7 new tests (default/null fallback, _discover filtering, malformed rejection,
  quota math, offline Jinja panel render).

Event data shape is unchanged (poll-behavior-only), so no published_ids reset
is needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Johnson 2026-05-27 05:57:57 +00:00
commit 80460c83a8
3 changed files with 182 additions and 15 deletions

View file

@ -5,6 +5,13 @@ First adapter to use the v0.9.0 category/subject split: category="work_zone.wzdx
NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject
state comes from the registry row (reliable, pre-enrichment); the geocoder state state comes from the registry row (reliable, pre-enrichment); the geocoder state
is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db. is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db.
State allowlist (v0.9.17): `states` narrows which registry feeds are fetched, at
poll time out-of-allowlist state feeds are never requested, so they cost no
bandwidth or upstream quota (vs. the v0.9.12 archive bbox filter, which only drops
them *after* the fetch). Semantics flipped in v0.9.17: an empty/unset `states`
now means the Idaho-region default (ID + neighbors), NOT "every eligible feed".
To poll nationally, explicitly select every state in the GUI.
""" """
import asyncio import asyncio
@ -13,7 +20,7 @@ import sqlite3
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any, Literal
import aiohttp import aiohttp
from pydantic import BaseModel from pydantic import BaseModel
@ -43,6 +50,25 @@ _DEFAULT_SEVERITY = 1
_FEED_CONCURRENCY = 6 _FEED_CONCURRENCY = 6
_FEED_TIMEOUT_S = 60 _FEED_TIMEOUT_S = 60
# 30-day month, for the informational quota_estimate (matches tomtom_incidents).
_SECONDS_PER_MONTH = 30 * 24 * 3600
# Selectable 2-letter codes (mirrors inciweb.STATE_NAME_TO_CODE's value set, so any
# registry `state` that maps to a code is also pickable). list[StateCode] renders as
# the existing "checkboxes" multi-select widget (firms.py precedent) — no new widget.
StateCode = Literal[
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
"IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
"NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
"SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY", "DC", "PR",
"GU", "VI", "AS", "MP",
]
# Idaho-region default: ID + its 6 neighbors. WA/UT (and ID) have eligible feeds
# in the registry today; OR/NV/WY/MT are aspirational — listed so they are picked
# up automatically if/when those state DOTs publish a WZDx feed.
_DEFAULT_STATES: list[str] = ["ID", "WA", "OR", "NV", "UT", "WY", "MT"]
_DEDUP_DDL = ( _DEDUP_DDL = (
"CREATE TABLE IF NOT EXISTS published_ids (" "CREATE TABLE IF NOT EXISTS published_ids ("
"adapter TEXT NOT NULL, event_id TEXT NOT NULL, " "adapter TEXT NOT NULL, event_id TEXT NOT NULL, "
@ -103,9 +129,11 @@ def _flatten_geometry(
class WZDxSettings(BaseModel): class WZDxSettings(BaseModel):
"""states: allowlist of 2-letter codes to poll; None = every eligible feed.""" """states: 2-letter codes whose registry feeds to poll. Empty/unset -> the
Idaho-region default (`_DEFAULT_STATES`), NOT every feed (v0.9.17 flip). Codes
are validated against `StateCode`; malformed codes are rejected at save time."""
states: list[str] | None = None states: list[StateCode] = list(_DEFAULT_STATES)
class WZDxAdapter(SourceAdapter): class WZDxAdapter(SourceAdapter):
@ -138,14 +166,17 @@ class WZDxAdapter(SourceAdapter):
self._cursor_db_path = cursor_db_path self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None self._db: sqlite3.Connection | None = None
self._states: set[str] | None = self._read_states(config) self._states: set[str] = self._read_states(config)
@staticmethod @staticmethod
def _read_states(config: AdapterConfig) -> set[str] | None: def _read_states(config: AdapterConfig) -> set[str]:
"""Configured allowlist, or the Idaho-region default when empty/unset/null
(v0.9.17 flip empty no longer means "poll every eligible feed")."""
raw = config.settings.get("states") raw = config.settings.get("states")
if not raw: if not raw:
return None return set(_DEFAULT_STATES)
return {s.strip().upper() for s in raw if s and s.strip()} or None codes = {s.strip().upper() for s in raw if s and s.strip()}
return codes or set(_DEFAULT_STATES)
async def startup(self) -> None: async def startup(self) -> None:
self._session = aiohttp.ClientSession( self._session = aiohttp.ClientSession(
@ -156,7 +187,7 @@ class WZDxAdapter(SourceAdapter):
self._db.execute(_DEDUP_DDL) self._db.execute(_DEDUP_DDL)
self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)")
self._db.commit() self._db.commit()
logger.info("WZDx adapter started", extra={"states": sorted(self._states) if self._states else None}) logger.info("WZDx adapter started", extra={"states": sorted(self._states)})
async def shutdown(self) -> None: async def shutdown(self) -> None:
if self._session: if self._session:
@ -168,7 +199,7 @@ class WZDxAdapter(SourceAdapter):
async def apply_config(self, new_config: AdapterConfig) -> None: async def apply_config(self, new_config: AdapterConfig) -> None:
self._states = self._read_states(new_config) self._states = self._read_states(new_config)
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None}) logger.info("WZDx config updated", extra={"states": sorted(self._states)})
@retry( @retry(
stop=stop_after_attempt(3), stop=stop_after_attempt(3),
@ -200,12 +231,12 @@ class WZDxAdapter(SourceAdapter):
return doc["features"] return doc["features"]
def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Eligible rows, optionally narrowed to the operator's state allowlist.""" """Eligible rows narrowed to the operator's state allowlist (always set;
defaults to the Idaho region see `_read_states`)."""
feeds: list[dict[str, Any]] = [] feeds: list[dict[str, Any]] = []
for row in registry_rows: for row in registry_rows:
if not _eligible(row): if not _eligible(row):
continue continue
if self._states is not None:
code = _state_code(row.get("state")) code = _state_code(row.get("state"))
if code is None or code not in self._states: if code is None or code not in self._states:
continue continue
@ -295,3 +326,29 @@ class WZDxAdapter(SourceAdapter):
enr = (event.data.get("_enriched") or {}).get("geocoder") or {} enr = (event.data.get("_enriched") or {}).get("geocoder") or {}
code = _state_code(enr.get("state")) code = _state_code(enr.get("state"))
return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}" return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}"
@classmethod
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
"""Informational only — WZDx is free/unauthenticated and has no upstream cap
(cap=None -> never warns, never blocks a save). Surfaces the per-month
upstream GET volume (1 registry fetch + up to one feed per allowlisted state)
so operators see how narrowing `states` cuts fetch volume."""
states = getattr(settings, "states", None) or _DEFAULT_STATES
n_states = len({s.upper() for s in states})
per_poll = 1 + n_states # 1 registry + at most one feed per in-scope state
cadence = max(cadence_s or cls.default_cadence_s, 1)
calls_per_month = round(per_poll * _SECONDS_PER_MONTH / cadence)
return {
"calls_per_month": calls_per_month,
"cap": None,
"seconds_per_month": _SECONDS_PER_MONTH,
"default_cadence_s": cls.default_cadence_s,
"percent": 0.0,
"warn": False,
"blocked": False,
"detail": (
f"~{calls_per_month:,} upstream GET(s)/month: 1 registry + up to "
f"{n_states} state feed(s) at {cadence}s cadence. WZDx is free/uncapped — "
f"fewer states means fewer fetches."
),
}

View file

@ -162,6 +162,23 @@
</fieldset> </fieldset>
{% endif %} {% endif %}
{# Standalone API-quota panel for flat-config adapters (e.g. WZDx). model_list
adapters render their own copy inside _partials/model_list.html, so skip it
here when one is present to avoid a double panel. #}
{% set has_model_list = namespace(value=false) %}
{% for field in fields %}
{% if field.widget == "model_list" %}{% set has_model_list.value = true %}{% endif %}
{% endfor %}
{% if quota and not has_model_list.value %}
<div class="quota-panel flash {% if quota.blocked %}flash-error{% elif quota.warn %}flash-warn{% endif %}"
data-quota-cap="{{ quota.cap }}"
data-quota-spm="{{ quota.seconds_per_month }}"
data-quota-default="{{ quota.default_cadence_s }}">
<strong>API quota:</strong> <span class="quota-detail">{{ quota.detail }}</span>
<span class="quota-msg">{% if quota.blocked %}<br>⛔ Over free-tier cap — reduce calls before saving.{% elif quota.warn %}<br>⚠️ Approaching free-tier cap.{% endif %}</span>
</div>
{% endif %}
{% set has_region = namespace(value=false) %} {% set has_region = namespace(value=false) %}
{% for field in fields %} {% for field in fields %}
{% if field.widget == "region" %} {% if field.widget == "region" %}

View file

@ -16,14 +16,19 @@ adapter-owned cache to redirect (unlike nwis's NWIS_CACHE_DB_PATH).
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from pydantic import ValidationError
from central.adapters.wzdx import ( from central.adapters.wzdx import (
_DEFAULT_SEVERITY, _DEFAULT_SEVERITY,
_DEFAULT_STATES,
_VEHICLE_IMPACT_SEVERITY, _VEHICLE_IMPACT_SEVERITY,
WZDxAdapter, WZDxAdapter,
WZDxSettings,
_eligible, _eligible,
_flatten_geometry, _flatten_geometry,
) )
@ -124,7 +129,10 @@ def test_event_type_split(adapter):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_poll_yields_events(adapter): async def test_poll_yields_events(tmp_path):
# Explicit allowlist (UT+IA): Iowa is outside the v0.9.17 default set, so this
# test sets states directly to exercise multi-feed fan-out + the json-skip.
adapter = WZDxAdapter(_cfg({"states": ["UT", "IA"]}), MagicMock(), tmp_path / "cursors.db")
await adapter.startup() await adapter.startup()
registry = [ registry = [
{"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}}, {"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}},
@ -158,3 +166,88 @@ def test_summary_omits_unknown_direction():
flat = {"road_names": ["I-80"], "direction": "unknown"} flat = {"road_names": ["I-80"], "direction": "unknown"}
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
assert _derive_subject(row) == "Work zone on I-80" assert _derive_subject(row) == "Work zone on I-80"
# --- v0.9.17: poll-time state allowlist ---------------------------------------
def _registry_row(state):
return {"format": "geojson", "active": True, "needapikey": False,
"version": "4", "feedname": state[:3], "state": state, "url": {"url": state}}
def test_default_states_when_unset():
# Settings default + adapter both fall back to the Idaho-region 7-set, NOT "all".
assert WZDxSettings().states == _DEFAULT_STATES
assert set(_DEFAULT_STATES) == {"ID", "WA", "OR", "NV", "UT", "WY", "MT"}
a = WZDxAdapter(_cfg({}), MagicMock(), Path("/tmp/unused.db"))
assert a._states == set(_DEFAULT_STATES)
def test_null_states_uses_default():
# The live DB row stores {"states": null} pre-deploy-DML; must not mean "all".
a = WZDxAdapter(_cfg({"states": None}), MagicMock(), Path("/tmp/unused.db"))
assert a._states == set(_DEFAULT_STATES)
def test_discover_filters_to_default_states(adapter):
# adapter fixture has empty settings -> default 7-set. ID/UT in; IA/OH out.
registry = [_registry_row(s) for s in ("idaho", "utah", "iowa", "ohio")]
kept = {r["state"] for r in adapter._discover(registry)}
assert kept == {"idaho", "utah"}
def test_discover_honours_explicit_allowlist(tmp_path):
a = WZDxAdapter(_cfg({"states": ["id"]}), MagicMock(), tmp_path / "c.db")
assert a._states == {"ID"} # lower-case input is normalized
registry = [_registry_row(s) for s in ("idaho", "utah", "iowa")]
kept = {r["state"] for r in a._discover(registry)}
assert kept == {"idaho"}
def test_malformed_state_code_rejected():
with pytest.raises(ValidationError):
WZDxSettings(states=["ID", "ZZ"])
def test_quota_estimate_informational():
q = WZDxAdapter.quota_estimate(WZDxSettings(), 600)
# 7 states -> 1 registry + 7 feeds = 8 GET/poll; 2_592_000/600 = 4320 polls.
assert q["calls_per_month"] == 8 * (2_592_000 // 600)
assert q["cap"] is None
assert q["warn"] is False and q["blocked"] is False
# Narrowing states lowers the estimate (the 82% delta the panel surfaces).
assert WZDxAdapter.quota_estimate(
WZDxSettings(states=["ID", "WA", "UT"]), 600
)["calls_per_month"] < q["calls_per_month"]
def test_edit_page_renders_standalone_quota_panel():
# v0.9.17: WZDx is a flat-config (checkboxes) adapter, so its quota panel
# renders from the standalone block in adapters_edit.html (not the model_list
# partial). Informational only -> "flash" with no warn/error class, cap=None.
from starlette.requests import Request
from central.gui import templates as gui_templates
from central.gui.form_descriptors import describe_fields
s = {"states": _DEFAULT_STATES}
fields = describe_fields(WZDxAdapter.settings_schema, s)
quota = WZDxAdapter.quota_estimate(WZDxSettings(**s), 600)
ctx = {
"operator": SimpleNamespace(username="admin"), "csrf_token": "x",
"adapter": {"name": "wzdx", "display_name": "WZDx", "description": "",
"enabled": True, "cadence_s": 600, "settings": s, "paused_at": None,
"updated_at": None, "last_error": None},
"fields": fields, "api_keys": [], "errors": None, "form_data": None,
"tile_url": "https://t/{z}/{x}/{y}.png", "tile_attribution": "OSM",
"api_key_missing": False, "requires_api_key_alias": None,
"preview_rows": None, "preview_error": None, "quota": quota,
}
req = Request({"type": "http", "method": "GET", "path": "/", "headers": [], "query_string": b""})
out = gui_templates.TemplateResponse(request=req, name="adapters_edit.html", context=ctx).body.decode()
assert 'type="checkbox" name="states" value="ID"' in out # checkboxes, default checked
assert "model-list" not in out
assert 'class="quota-panel flash "' in out # no flash-warn/flash-error
assert "upstream GET" in out and "free/uncapped" in out
assert 'data-quota-cap="None"' in out
assert "flash-warn" not in out and "flash-error" not in out