Merge pull request #78 from zvx-echo6/v0_9_17_wzdx_state_filter

WZDx: poll-time state allowlist with Idaho-region default (v0.9.17)
This commit is contained in:
malice 2026-05-27 00:00:51 -06:00 committed by GitHub
commit d41c418276
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 182 additions and 15 deletions

View file

@ -5,6 +5,13 @@ First adapter to use the v0.9.0 category/subject split: category="work_zone.wzdx
NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject
state comes from the registry row (reliable, pre-enrichment); the geocoder state state comes from the registry row (reliable, pre-enrichment); the geocoder state
is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db. is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db.
State allowlist (v0.9.17): `states` narrows which registry feeds are fetched, at
poll time out-of-allowlist state feeds are never requested, so they cost no
bandwidth or upstream quota (vs. the v0.9.12 archive bbox filter, which only drops
them *after* the fetch). Semantics flipped in v0.9.17: an empty/unset `states`
now means the Idaho-region default (ID + neighbors), NOT "every eligible feed".
To poll nationally, explicitly select every state in the GUI.
""" """
import asyncio import asyncio
@ -13,7 +20,7 @@ import sqlite3
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any, Literal
import aiohttp import aiohttp
from pydantic import BaseModel from pydantic import BaseModel
@ -43,6 +50,25 @@ _DEFAULT_SEVERITY = 1
_FEED_CONCURRENCY = 6 _FEED_CONCURRENCY = 6
_FEED_TIMEOUT_S = 60 _FEED_TIMEOUT_S = 60
# 30-day month, for the informational quota_estimate (matches tomtom_incidents).
_SECONDS_PER_MONTH = 30 * 24 * 3600
# Selectable 2-letter codes (mirrors inciweb.STATE_NAME_TO_CODE's value set, so any
# registry `state` that maps to a code is also pickable). list[StateCode] renders as
# the existing "checkboxes" multi-select widget (firms.py precedent) — no new widget.
StateCode = Literal[
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL",
"IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT",
"NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI",
"SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY", "DC", "PR",
"GU", "VI", "AS", "MP",
]
# Idaho-region default: ID + its 6 neighbors. WA/UT (and ID) have eligible feeds
# in the registry today; OR/NV/WY/MT are aspirational — listed so they are picked
# up automatically if/when those state DOTs publish a WZDx feed.
_DEFAULT_STATES: list[str] = ["ID", "WA", "OR", "NV", "UT", "WY", "MT"]
_DEDUP_DDL = ( _DEDUP_DDL = (
"CREATE TABLE IF NOT EXISTS published_ids (" "CREATE TABLE IF NOT EXISTS published_ids ("
"adapter TEXT NOT NULL, event_id TEXT NOT NULL, " "adapter TEXT NOT NULL, event_id TEXT NOT NULL, "
@ -103,9 +129,11 @@ def _flatten_geometry(
class WZDxSettings(BaseModel): class WZDxSettings(BaseModel):
"""states: allowlist of 2-letter codes to poll; None = every eligible feed.""" """states: 2-letter codes whose registry feeds to poll. Empty/unset -> the
Idaho-region default (`_DEFAULT_STATES`), NOT every feed (v0.9.17 flip). Codes
are validated against `StateCode`; malformed codes are rejected at save time."""
states: list[str] | None = None states: list[StateCode] = list(_DEFAULT_STATES)
class WZDxAdapter(SourceAdapter): class WZDxAdapter(SourceAdapter):
@ -138,14 +166,17 @@ class WZDxAdapter(SourceAdapter):
self._cursor_db_path = cursor_db_path self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None self._db: sqlite3.Connection | None = None
self._states: set[str] | None = self._read_states(config) self._states: set[str] = self._read_states(config)
@staticmethod @staticmethod
def _read_states(config: AdapterConfig) -> set[str] | None: def _read_states(config: AdapterConfig) -> set[str]:
"""Configured allowlist, or the Idaho-region default when empty/unset/null
(v0.9.17 flip empty no longer means "poll every eligible feed")."""
raw = config.settings.get("states") raw = config.settings.get("states")
if not raw: if not raw:
return None return set(_DEFAULT_STATES)
return {s.strip().upper() for s in raw if s and s.strip()} or None codes = {s.strip().upper() for s in raw if s and s.strip()}
return codes or set(_DEFAULT_STATES)
async def startup(self) -> None: async def startup(self) -> None:
self._session = aiohttp.ClientSession( self._session = aiohttp.ClientSession(
@ -156,7 +187,7 @@ class WZDxAdapter(SourceAdapter):
self._db.execute(_DEDUP_DDL) self._db.execute(_DEDUP_DDL)
self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)")
self._db.commit() self._db.commit()
logger.info("WZDx adapter started", extra={"states": sorted(self._states) if self._states else None}) logger.info("WZDx adapter started", extra={"states": sorted(self._states)})
async def shutdown(self) -> None: async def shutdown(self) -> None:
if self._session: if self._session:
@ -168,7 +199,7 @@ class WZDxAdapter(SourceAdapter):
async def apply_config(self, new_config: AdapterConfig) -> None: async def apply_config(self, new_config: AdapterConfig) -> None:
self._states = self._read_states(new_config) self._states = self._read_states(new_config)
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None}) logger.info("WZDx config updated", extra={"states": sorted(self._states)})
@retry( @retry(
stop=stop_after_attempt(3), stop=stop_after_attempt(3),
@ -200,15 +231,15 @@ class WZDxAdapter(SourceAdapter):
return doc["features"] return doc["features"]
def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Eligible rows, optionally narrowed to the operator's state allowlist.""" """Eligible rows narrowed to the operator's state allowlist (always set;
defaults to the Idaho region see `_read_states`)."""
feeds: list[dict[str, Any]] = [] feeds: list[dict[str, Any]] = []
for row in registry_rows: for row in registry_rows:
if not _eligible(row): if not _eligible(row):
continue continue
if self._states is not None: code = _state_code(row.get("state"))
code = _state_code(row.get("state")) if code is None or code not in self._states:
if code is None or code not in self._states: continue
continue
feeds.append(row) feeds.append(row)
return feeds return feeds
@ -295,3 +326,29 @@ class WZDxAdapter(SourceAdapter):
enr = (event.data.get("_enriched") or {}).get("geocoder") or {} enr = (event.data.get("_enriched") or {}).get("geocoder") or {}
code = _state_code(enr.get("state")) code = _state_code(enr.get("state"))
return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}" return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}"
@classmethod
def quota_estimate(cls, settings: BaseModel, cadence_s: int) -> dict | None:
"""Informational only — WZDx is free/unauthenticated and has no upstream cap
(cap=None -> never warns, never blocks a save). Surfaces the per-month
upstream GET volume (1 registry fetch + up to one feed per allowlisted state)
so operators see how narrowing `states` cuts fetch volume."""
states = getattr(settings, "states", None) or _DEFAULT_STATES
n_states = len({s.upper() for s in states})
per_poll = 1 + n_states # 1 registry + at most one feed per in-scope state
cadence = max(cadence_s or cls.default_cadence_s, 1)
calls_per_month = round(per_poll * _SECONDS_PER_MONTH / cadence)
return {
"calls_per_month": calls_per_month,
"cap": None,
"seconds_per_month": _SECONDS_PER_MONTH,
"default_cadence_s": cls.default_cadence_s,
"percent": 0.0,
"warn": False,
"blocked": False,
"detail": (
f"~{calls_per_month:,} upstream GET(s)/month: 1 registry + up to "
f"{n_states} state feed(s) at {cadence}s cadence. WZDx is free/uncapped — "
f"fewer states means fewer fetches."
),
}

View file

@ -162,6 +162,23 @@
</fieldset> </fieldset>
{% endif %} {% endif %}
{# Standalone API-quota panel for flat-config adapters (e.g. WZDx). model_list
adapters render their own copy inside _partials/model_list.html, so skip it
here when one is present to avoid a double panel. #}
{% set has_model_list = namespace(value=false) %}
{% for field in fields %}
{% if field.widget == "model_list" %}{% set has_model_list.value = true %}{% endif %}
{% endfor %}
{% if quota and not has_model_list.value %}
<div class="quota-panel flash {% if quota.blocked %}flash-error{% elif quota.warn %}flash-warn{% endif %}"
data-quota-cap="{{ quota.cap }}"
data-quota-spm="{{ quota.seconds_per_month }}"
data-quota-default="{{ quota.default_cadence_s }}">
<strong>API quota:</strong> <span class="quota-detail">{{ quota.detail }}</span>
<span class="quota-msg">{% if quota.blocked %}<br>⛔ Over free-tier cap — reduce calls before saving.{% elif quota.warn %}<br>⚠️ Approaching free-tier cap.{% endif %}</span>
</div>
{% endif %}
{% set has_region = namespace(value=false) %} {% set has_region = namespace(value=false) %}
{% for field in fields %} {% for field in fields %}
{% if field.widget == "region" %} {% if field.widget == "region" %}

View file

@ -16,14 +16,19 @@ adapter-owned cache to redirect (unlike nwis's NWIS_CACHE_DB_PATH).
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from pydantic import ValidationError
from central.adapters.wzdx import ( from central.adapters.wzdx import (
_DEFAULT_SEVERITY, _DEFAULT_SEVERITY,
_DEFAULT_STATES,
_VEHICLE_IMPACT_SEVERITY, _VEHICLE_IMPACT_SEVERITY,
WZDxAdapter, WZDxAdapter,
WZDxSettings,
_eligible, _eligible,
_flatten_geometry, _flatten_geometry,
) )
@ -124,7 +129,10 @@ def test_event_type_split(adapter):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_poll_yields_events(adapter): async def test_poll_yields_events(tmp_path):
# Explicit allowlist (UT+IA): Iowa is outside the v0.9.17 default set, so this
# test sets states directly to exercise multi-feed fan-out + the json-skip.
adapter = WZDxAdapter(_cfg({"states": ["UT", "IA"]}), MagicMock(), tmp_path / "cursors.db")
await adapter.startup() await adapter.startup()
registry = [ registry = [
{"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}}, {"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}},
@ -158,3 +166,88 @@ def test_summary_omits_unknown_direction():
flat = {"road_names": ["I-80"], "direction": "unknown"} flat = {"road_names": ["I-80"], "direction": "unknown"}
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
assert _derive_subject(row) == "Work zone on I-80" assert _derive_subject(row) == "Work zone on I-80"
# --- v0.9.17: poll-time state allowlist ---------------------------------------
def _registry_row(state):
return {"format": "geojson", "active": True, "needapikey": False,
"version": "4", "feedname": state[:3], "state": state, "url": {"url": state}}
def test_default_states_when_unset():
# Settings default + adapter both fall back to the Idaho-region 7-set, NOT "all".
assert WZDxSettings().states == _DEFAULT_STATES
assert set(_DEFAULT_STATES) == {"ID", "WA", "OR", "NV", "UT", "WY", "MT"}
a = WZDxAdapter(_cfg({}), MagicMock(), Path("/tmp/unused.db"))
assert a._states == set(_DEFAULT_STATES)
def test_null_states_uses_default():
# The live DB row stores {"states": null} pre-deploy-DML; must not mean "all".
a = WZDxAdapter(_cfg({"states": None}), MagicMock(), Path("/tmp/unused.db"))
assert a._states == set(_DEFAULT_STATES)
def test_discover_filters_to_default_states(adapter):
# adapter fixture has empty settings -> default 7-set. ID/UT in; IA/OH out.
registry = [_registry_row(s) for s in ("idaho", "utah", "iowa", "ohio")]
kept = {r["state"] for r in adapter._discover(registry)}
assert kept == {"idaho", "utah"}
def test_discover_honours_explicit_allowlist(tmp_path):
a = WZDxAdapter(_cfg({"states": ["id"]}), MagicMock(), tmp_path / "c.db")
assert a._states == {"ID"} # lower-case input is normalized
registry = [_registry_row(s) for s in ("idaho", "utah", "iowa")]
kept = {r["state"] for r in a._discover(registry)}
assert kept == {"idaho"}
def test_malformed_state_code_rejected():
with pytest.raises(ValidationError):
WZDxSettings(states=["ID", "ZZ"])
def test_quota_estimate_informational():
q = WZDxAdapter.quota_estimate(WZDxSettings(), 600)
# 7 states -> 1 registry + 7 feeds = 8 GET/poll; 2_592_000/600 = 4320 polls.
assert q["calls_per_month"] == 8 * (2_592_000 // 600)
assert q["cap"] is None
assert q["warn"] is False and q["blocked"] is False
# Narrowing states lowers the estimate (the 82% delta the panel surfaces).
assert WZDxAdapter.quota_estimate(
WZDxSettings(states=["ID", "WA", "UT"]), 600
)["calls_per_month"] < q["calls_per_month"]
def test_edit_page_renders_standalone_quota_panel():
# v0.9.17: WZDx is a flat-config (checkboxes) adapter, so its quota panel
# renders from the standalone block in adapters_edit.html (not the model_list
# partial). Informational only -> "flash" with no warn/error class, cap=None.
from starlette.requests import Request
from central.gui import templates as gui_templates
from central.gui.form_descriptors import describe_fields
s = {"states": _DEFAULT_STATES}
fields = describe_fields(WZDxAdapter.settings_schema, s)
quota = WZDxAdapter.quota_estimate(WZDxSettings(**s), 600)
ctx = {
"operator": SimpleNamespace(username="admin"), "csrf_token": "x",
"adapter": {"name": "wzdx", "display_name": "WZDx", "description": "",
"enabled": True, "cadence_s": 600, "settings": s, "paused_at": None,
"updated_at": None, "last_error": None},
"fields": fields, "api_keys": [], "errors": None, "form_data": None,
"tile_url": "https://t/{z}/{x}/{y}.png", "tile_attribution": "OSM",
"api_key_missing": False, "requires_api_key_alias": None,
"preview_rows": None, "preview_error": None, "quota": quota,
}
req = Request({"type": "http", "method": "GET", "path": "/", "headers": [], "query_string": b""})
out = gui_templates.TemplateResponse(request=req, name="adapters_edit.html", context=ctx).body.decode()
assert 'type="checkbox" name="states" value="ID"' in out # checkboxes, default checked
assert "model-list" not in out
assert 'class="quota-panel flash "' in out # no flash-warn/flash-error
assert "upstream GET" in out and "free/uncapped" in out
assert 'data-quota-cap="None"' in out
assert "flash-warn" not in out and "flash-error" not in out