From 80460c83a8e941e86b1dde0d86d1af73fe35a27e Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Wed, 27 May 2026 05:57:57 +0000 Subject: [PATCH] WZDx: poll-time state allowlist with Idaho-region default (v0.9.17) Narrow which WZDx registry feeds are fetched at poll time, so out-of-state feeds are never requested (no bandwidth/quota cost), rather than fetching nationally and dropping out-of-bounds events at the archive INSERT (v0.9.12). - states is now list[StateCode] (Literal of 56 canonical 2-letter codes), default ["ID","WA","OR","NV","UT","WY","MT"]. Semantic flip: empty/unset/null now means the Idaho-region default, NOT "every eligible feed". Poll nationally by selecting all states in the GUI. - list[Literal] renders as the existing checkboxes multi-select widget (firms.py precedent); pydantic rejects malformed codes at save time. - Add informational quota_estimate (cap=None: never warns/blocks) so the edit page shows the fetch-volume reduction. - adapters_edit.html: standalone quota panel for flat-config adapters, guarded by not has_model_list so model_list adapters don't double-render; reuses the identical quota.* keys + CSS so the v0.9.15 client recompute works for them too. - 7 new tests (default/null fallback, _discover filtering, malformed rejection, quota math, offline Jinja panel render). Event data shape is unchanged (poll-behavior-only), so no published_ids reset is needed. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/central/adapters/wzdx.py | 85 +++++++++++++++--- src/central/gui/templates/adapters_edit.html | 17 ++++ tests/test_wzdx.py | 95 +++++++++++++++++++- 3 files changed, 182 insertions(+), 15 deletions(-) diff --git a/src/central/adapters/wzdx.py b/src/central/adapters/wzdx.py index 36b9e37..31d42e0 100644 --- a/src/central/adapters/wzdx.py +++ b/src/central/adapters/wzdx.py @@ -5,6 +5,13 @@ First adapter to use the v0.9.0 category/subject split: category="work_zone.wzdx NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject state comes from the registry row (reliable, pre-enrichment); the geocoder state is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db. + +State allowlist (v0.9.17): `states` narrows which registry feeds are fetched, at +poll time — out-of-allowlist state feeds are never requested, so they cost no +bandwidth or upstream quota (vs. the v0.9.12 archive bbox filter, which only drops +them *after* the fetch). Semantics flipped in v0.9.17: an empty/unset `states` +now means the Idaho-region default (ID + neighbors), NOT "every eligible feed". +To poll nationally, explicitly select every state in the GUI. """ import asyncio @@ -13,7 +20,7 @@ import sqlite3 from collections.abc import AsyncIterator from datetime import datetime, timezone from pathlib import Path -from typing import Any +from typing import Any, Literal import aiohttp from pydantic import BaseModel @@ -43,6 +50,25 @@ _DEFAULT_SEVERITY = 1 _FEED_CONCURRENCY = 6 _FEED_TIMEOUT_S = 60 +# 30-day month, for the informational quota_estimate (matches tomtom_incidents). +_SECONDS_PER_MONTH = 30 * 24 * 3600 + +# Selectable 2-letter codes (mirrors inciweb.STATE_NAME_TO_CODE's value set, so any +# registry `state` that maps to a code is also pickable). list[StateCode] renders as +# the existing "checkboxes" multi-select widget (firms.py precedent) — no new widget. +StateCode = Literal[ + "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", + "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", + "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", + "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY", "DC", "PR", + "GU", "VI", "AS", "MP", +] + +# Idaho-region default: ID + its 6 neighbors. WA/UT (and ID) have eligible feeds +# in the registry today; OR/NV/WY/MT are aspirational — listed so they are picked +# up automatically if/when those state DOTs publish a WZDx feed. +_DEFAULT_STATES: list[str] = ["ID", "WA", "OR", "NV", "UT", "WY", "MT"] + _DEDUP_DDL = ( "CREATE TABLE IF NOT EXISTS published_ids (" "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " @@ -103,9 +129,11 @@ def _flatten_geometry( class WZDxSettings(BaseModel): - """states: allowlist of 2-letter codes to poll; None = every eligible feed.""" + """states: 2-letter codes whose registry feeds to poll. Empty/unset -> the + Idaho-region default (`_DEFAULT_STATES`), NOT every feed (v0.9.17 flip). Codes + are validated against `StateCode`; malformed codes are rejected at save time.""" - states: list[str] | None = None + states: list[StateCode] = list(_DEFAULT_STATES) class WZDxAdapter(SourceAdapter): @@ -138,14 +166,17 @@ class WZDxAdapter(SourceAdapter): self._cursor_db_path = cursor_db_path self._session: aiohttp.ClientSession | None = None self._db: sqlite3.Connection | None = None - self._states: set[str] | None = self._read_states(config) + self._states: set[str] = self._read_states(config) @staticmethod - def _read_states(config: AdapterConfig) -> set[str] | None: + def _read_states(config: AdapterConfig) -> set[str]: + """Configured allowlist, or the Idaho-region default when empty/unset/null + (v0.9.17 flip — empty no longer means "poll every eligible feed").""" raw = config.settings.get("states") if not raw: - return None - return {s.strip().upper() for s in raw if s and s.strip()} or None + return set(_DEFAULT_STATES) + codes = {s.strip().upper() for s in raw if s and s.strip()} + return codes or set(_DEFAULT_STATES) async def startup(self) -> None: self._session = aiohttp.ClientSession( @@ -156,7 +187,7 @@ class WZDxAdapter(SourceAdapter): self._db.execute(_DEDUP_DDL) self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") self._db.commit() - logger.info("WZDx adapter started", extra={"states": sorted(self._states) if self._states else None}) + logger.info("WZDx adapter started", extra={"states": sorted(self._states)}) async def shutdown(self) -> None: if self._session: @@ -168,7 +199,7 @@ class WZDxAdapter(SourceAdapter): async def apply_config(self, new_config: AdapterConfig) -> None: self._states = self._read_states(new_config) - logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None}) + logger.info("WZDx config updated", extra={"states": sorted(self._states)}) @retry( stop=stop_after_attempt(3), @@ -200,15 +231,15 @@ class WZDxAdapter(SourceAdapter): return doc["features"] def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: - """Eligible rows, optionally narrowed to the operator's state allowlist.""" + """Eligible rows narrowed to the operator's state allowlist (always set; + defaults to the Idaho region — see `_read_states`).""" feeds: list[dict[str, Any]] = [] for row in registry_rows: if not _eligible(row): continue - if self._states is not None: - code = _state_code(row.get("state")) - if code is None or code not in self._states: - continue + code = _state_code(row.get("state")) + if code is None or code not in self._states: + continue feeds.append(row) return feeds @@ -295,3 +326,29 @@ class WZDxAdapter(SourceAdapter): enr = (event.data.get("_enriched") or {}).get("geocoder") or {} code = _state_code(enr.get("state")) return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}" + + @classmethod + def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None: + """Informational only — WZDx is free/unauthenticated and has no upstream cap + (cap=None -> never warns, never blocks a save). Surfaces the per-month + upstream GET volume (1 registry fetch + up to one feed per allowlisted state) + so operators see how narrowing `states` cuts fetch volume.""" + states = getattr(settings, "states", None) or _DEFAULT_STATES + n_states = len({s.upper() for s in states}) + per_poll = 1 + n_states # 1 registry + at most one feed per in-scope state + cadence = max(cadence_s or cls.default_cadence_s, 1) + calls_per_month = round(per_poll * _SECONDS_PER_MONTH / cadence) + return { + "calls_per_month": calls_per_month, + "cap": None, + "seconds_per_month": _SECONDS_PER_MONTH, + "default_cadence_s": cls.default_cadence_s, + "percent": 0.0, + "warn": False, + "blocked": False, + "detail": ( + f"~{calls_per_month:,} upstream GET(s)/month: 1 registry + up to " + f"{n_states} state feed(s) at {cadence}s cadence. WZDx is free/uncapped — " + f"fewer states means fewer fetches." + ), + } diff --git a/src/central/gui/templates/adapters_edit.html b/src/central/gui/templates/adapters_edit.html index 3aed250..977d069 100644 --- a/src/central/gui/templates/adapters_edit.html +++ b/src/central/gui/templates/adapters_edit.html @@ -162,6 +162,23 @@ {% endif %} + {# Standalone API-quota panel for flat-config adapters (e.g. WZDx). model_list + adapters render their own copy inside _partials/model_list.html, so skip it + here when one is present to avoid a double panel. #} + {% set has_model_list = namespace(value=false) %} + {% for field in fields %} + {% if field.widget == "model_list" %}{% set has_model_list.value = true %}{% endif %} + {% endfor %} + {% if quota and not has_model_list.value %} +
+ API quota: {{ quota.detail }} + {% if quota.blocked %}
⛔ Over free-tier cap — reduce calls before saving.{% elif quota.warn %}
⚠️ Approaching free-tier cap.{% endif %}
+
+ {% endif %} + {% set has_region = namespace(value=false) %} {% for field in fields %} {% if field.widget == "region" %} diff --git a/tests/test_wzdx.py b/tests/test_wzdx.py index dfa5e0b..eaee5c2 100644 --- a/tests/test_wzdx.py +++ b/tests/test_wzdx.py @@ -16,14 +16,19 @@ adapter-owned cache to redirect (unlike nwis's NWIS_CACHE_DB_PATH). import json from datetime import datetime, timezone from pathlib import Path +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest +from pydantic import ValidationError + from central.adapters.wzdx import ( _DEFAULT_SEVERITY, + _DEFAULT_STATES, _VEHICLE_IMPACT_SEVERITY, WZDxAdapter, + WZDxSettings, _eligible, _flatten_geometry, ) @@ -124,7 +129,10 @@ def test_event_type_split(adapter): @pytest.mark.asyncio -async def test_poll_yields_events(adapter): +async def test_poll_yields_events(tmp_path): + # Explicit allowlist (UT+IA): Iowa is outside the v0.9.17 default set, so this + # test sets states directly to exercise multi-feed fan-out + the json-skip. + adapter = WZDxAdapter(_cfg({"states": ["UT", "IA"]}), MagicMock(), tmp_path / "cursors.db") await adapter.startup() registry = [ {"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}}, @@ -158,3 +166,88 @@ def test_summary_omits_unknown_direction(): flat = {"road_names": ["I-80"], "direction": "unknown"} row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} assert _derive_subject(row) == "Work zone on I-80" + + +# --- v0.9.17: poll-time state allowlist --------------------------------------- + +def _registry_row(state): + return {"format": "geojson", "active": True, "needapikey": False, + "version": "4", "feedname": state[:3], "state": state, "url": {"url": state}} + + +def test_default_states_when_unset(): + # Settings default + adapter both fall back to the Idaho-region 7-set, NOT "all". + assert WZDxSettings().states == _DEFAULT_STATES + assert set(_DEFAULT_STATES) == {"ID", "WA", "OR", "NV", "UT", "WY", "MT"} + a = WZDxAdapter(_cfg({}), MagicMock(), Path("/tmp/unused.db")) + assert a._states == set(_DEFAULT_STATES) + + +def test_null_states_uses_default(): + # The live DB row stores {"states": null} pre-deploy-DML; must not mean "all". + a = WZDxAdapter(_cfg({"states": None}), MagicMock(), Path("/tmp/unused.db")) + assert a._states == set(_DEFAULT_STATES) + + +def test_discover_filters_to_default_states(adapter): + # adapter fixture has empty settings -> default 7-set. ID/UT in; IA/OH out. + registry = [_registry_row(s) for s in ("idaho", "utah", "iowa", "ohio")] + kept = {r["state"] for r in adapter._discover(registry)} + assert kept == {"idaho", "utah"} + + +def test_discover_honours_explicit_allowlist(tmp_path): + a = WZDxAdapter(_cfg({"states": ["id"]}), MagicMock(), tmp_path / "c.db") + assert a._states == {"ID"} # lower-case input is normalized + registry = [_registry_row(s) for s in ("idaho", "utah", "iowa")] + kept = {r["state"] for r in a._discover(registry)} + assert kept == {"idaho"} + + +def test_malformed_state_code_rejected(): + with pytest.raises(ValidationError): + WZDxSettings(states=["ID", "ZZ"]) + + +def test_quota_estimate_informational(): + q = WZDxAdapter.quota_estimate(WZDxSettings(), 600) + # 7 states -> 1 registry + 7 feeds = 8 GET/poll; 2_592_000/600 = 4320 polls. + assert q["calls_per_month"] == 8 * (2_592_000 // 600) + assert q["cap"] is None + assert q["warn"] is False and q["blocked"] is False + # Narrowing states lowers the estimate (the 82% delta the panel surfaces). + assert WZDxAdapter.quota_estimate( + WZDxSettings(states=["ID", "WA", "UT"]), 600 + )["calls_per_month"] < q["calls_per_month"] + + +def test_edit_page_renders_standalone_quota_panel(): + # v0.9.17: WZDx is a flat-config (checkboxes) adapter, so its quota panel + # renders from the standalone block in adapters_edit.html (not the model_list + # partial). Informational only -> "flash" with no warn/error class, cap=None. + from starlette.requests import Request + + from central.gui import templates as gui_templates + from central.gui.form_descriptors import describe_fields + + s = {"states": _DEFAULT_STATES} + fields = describe_fields(WZDxAdapter.settings_schema, s) + quota = WZDxAdapter.quota_estimate(WZDxSettings(**s), 600) + ctx = { + "operator": SimpleNamespace(username="admin"), "csrf_token": "x", + "adapter": {"name": "wzdx", "display_name": "WZDx", "description": "", + "enabled": True, "cadence_s": 600, "settings": s, "paused_at": None, + "updated_at": None, "last_error": None}, + "fields": fields, "api_keys": [], "errors": None, "form_data": None, + "tile_url": "https://t/{z}/{x}/{y}.png", "tile_attribution": "OSM", + "api_key_missing": False, "requires_api_key_alias": None, + "preview_rows": None, "preview_error": None, "quota": quota, + } + req = Request({"type": "http", "method": "GET", "path": "/", "headers": [], "query_string": b""}) + out = gui_templates.TemplateResponse(request=req, name="adapters_edit.html", context=ctx).body.decode() + assert 'type="checkbox" name="states" value="ID"' in out # checkboxes, default checked + assert "model-list" not in out + assert 'class="quota-panel flash "' in out # no flash-warn/flash-error + assert "upstream GET" in out and "free/uncapped" in out + assert 'data-quota-cap="None"' in out + assert "flash-warn" not in out and "flash-error" not in out