Merge pull request #60 from zvx-echo6/feat/wzdx-traffic-bootstrap

feat(wzdx): WZDx adapter + CENTRAL_TRAFFIC family bootstrap (v0.9.0)
This commit is contained in:
malice 2026-05-25 14:42:42 -06:00 committed by GitHub
commit c6c5367ccf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 571 additions and 1 deletions

View file

@ -131,6 +131,7 @@ Central's archive.
| `CENTRAL_SPACE` | `central.space.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_SPACE` | `central.space.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_DISASTER` | `central.disaster.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_DISASTER` | `central.disaster.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_HYDRO` | `central.hydro.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_HYDRO` | `central.hydro.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_TRAFFIC` | `central.traffic.>` | 7 | 1 GiB | ✓ | ✓ |
| `CENTRAL_META` | `central.meta.>` | 1 | 1 GiB | — | ✓ | | `CENTRAL_META` | `central.meta.>` | 1 | 1 GiB | — | ✓ |
Retention and storage caps are migration-seeded defaults visible in `config.streams`; Retention and storage caps are migration-seeded defaults visible in `config.streams`;
@ -1475,6 +1476,48 @@ already running can disable those overlap categories via `EONETSettings.category
} }
``` ```
### wzdx — FHWA Work Zone Data Exchange (state-DOT work zones)
Active road work zones discovered from the federal WZDx Feed Registry and each
eligible state-DOT GeoJSON feed. One event per WZDx RoadEventFeature.
- **Stream:** `CENTRAL_TRAFFIC`
- **Subject pattern:** `central.traffic.work_zone.<state>`
- `<state>` is the lowercased 2-letter code from the registry row (geocoder
state as fallback), else `unknown`
- **GUI event_type:** `work_zone` — from `category = "work_zone.wzdx"`; the GUI
derives event_type as the first dotted segment of the category
- **Cadence default:** 600s (10 min)
- **Feed filter:** registry rows with `format=geojson`, `active=true`,
`needapikey=false`, `version` 4.x (~21 feeds at author time)
- **Dedup key shape:** composite `<data_source_id>:<feature_id>`
(e.g. `UDOT-Construction:2365_eastbound`); reused as the inner `Event.id`
- **Event.data fields:**
| key | type | nullable | description |
|---|---|---|---|
| `road_names` | list[str] | no | Affected road name(s); may be empty |
| `direction` | str | yes | Travel direction of the work zone |
| `description` | str | yes | Operator-readable narrative |
| `vehicle_impact` | str | yes | `all-lanes-open` / `some-lanes-closed` / `all-lanes-closed` / `unknown`; drives severity |
| `event_status` | str | yes | e.g. `active` (Utah carries it; Iowa omits it) |
| `start_date` | str (ISO 8601) | yes | Work-zone start |
| `end_date` | str (ISO 8601) | yes | Work-zone end; also sets `Event.expires` |
| `data_source_id` | str | no | WZDx `core_details.data_source_id` |
| `feed_name` | str | yes | Registry feed identifier |
| `feed_state` | str | yes | Registry state name |
| `feed_state_code` | str | yes | 2-letter code used for the subject |
| `latitude` | float | yes | First geometry coordinate (enrichment input) |
| `longitude` | float | yes | First geometry coordinate (enrichment input) |
- **Severity:** derived from `vehicle_impact` (`all-lanes-closed`=3,
`some-lanes-closed`=2, `all-lanes-open`=1, `unknown`/missing=1) — WZDx has no
normalcy class.
- **Decipherable as-is:** mostly. Road + direction + impact + description are
user-ready; city/county/state come from geocoder enrichment.
- **Removal semantics:** none in v1. Work zones age out of upstream feeds; the
14-day dedup sweep expires stale ids. Watch `end_date` / `Event.expires`.
### nwis — USGS NWIS streamflow / gage height / water-temperature gauges ### nwis — USGS NWIS streamflow / gage height / water-temperature gauges
Real-time water-data observations via the USGS NWIS OGC API v0 `latest-continuous` Real-time water-data observations via the USGS NWIS OGC API v0 `latest-continuous`

View file

@ -349,7 +349,7 @@ central.<domain>.<subtype>[.<dimensions>...]
``` ```
- `<domain>` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`, - `<domain>` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`,
`meta` (the current set — see [§8](#8-the-streamentry-registry) for adding `traffic`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding
one). Operators MUST be able to subscribe to all of one domain with one). Operators MUST be able to subscribe to all of one domain with
`central.<domain>.>`. `central.<domain>.>`.
- `<subtype>` is adapter-driven and identifies the event category within the - `<subtype>` is adapter-driven and identifies the event category within the
@ -537,6 +537,7 @@ STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
] ]
``` ```

View file

@ -0,0 +1,23 @@
-- Migration: 025_add_wzdx_adapter_and_traffic_stream
-- Adds the CENTRAL_TRAFFIC JetStream stream row AND the WZDx adapter row.
-- Folded into one migration because the adapter publishes onto
-- central.traffic.> -- both rows ship together (mirrors 023 nwis/hydro).
--
-- Stream retention mirrors CENTRAL_DISASTER / CENTRAL_HYDRO (7 days, 1 GiB).
-- Adapter ships disabled; operator enables via GUI. settings {"states": null}
-- = poll every eligible feed; an allowlist of 2-letter codes narrows it.
--
-- Additive-only: both inserts are idempotent via ON CONFLICT DO NOTHING.
INSERT INTO config.streams (name, max_age_s, max_bytes)
VALUES ('CENTRAL_TRAFFIC', 604800, 1073741824)
ON CONFLICT (name) DO NOTHING;
INSERT INTO config.adapters (name, enabled, cadence_s, settings)
VALUES (
'wzdx',
false,
600,
'{"states": null}'::jsonb
)
ON CONFLICT (name) DO NOTHING;

View file

@ -0,0 +1,326 @@
"""WZDx adapter — FHWA Work Zone Data Exchange registry → work_zone events.
First adapter to use the v0.9.0 category/subject split: category="work_zone.wzdx"
(so the GUI's split_part(category,'.',1) surfaces event_type "work_zone") while the
NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject
state comes from the registry row (reliable, pre-enrichment); the geocoder state
is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db.
"""
import asyncio
import logging
import sqlite3
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiohttp
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
)
from central.adapter import SourceAdapter
from central.adapters.inciweb import STATE_NAME_TO_CODE
from central.config_models import AdapterConfig
from central.config_store import ConfigStore
from central.models import Event, Geo
logger = logging.getLogger(__name__)
# FHWA Work Zone Data Exchange Feed Registry (Socrata, public-unauth).
WZDX_REGISTRY_URL = "https://datahub.transportation.gov/resource/69qe-yiui.json?$limit=200"
# vehicle_impact -> severity. Locked: unknown/missing = 1 (real active zones).
_VEHICLE_IMPACT_SEVERITY = {"all-lanes-closed": 3, "some-lanes-closed": 2, "all-lanes-open": 1}
_DEFAULT_SEVERITY = 1
# Bounded per-poll fan-out (~21 feeds pass the filter; Iowa alone is ~1.4 MB).
_FEED_CONCURRENCY = 6
_FEED_TIMEOUT_S = 60
_DEDUP_DDL = (
"CREATE TABLE IF NOT EXISTS published_ids ("
"adapter TEXT NOT NULL, event_id TEXT NOT NULL, "
"first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
"last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
"PRIMARY KEY (adapter, event_id))"
)
def _eligible(row: dict[str, Any]) -> bool:
"""Registry-row filter (v0.9.0 locked): geojson + active + no api key + v4.x."""
return (
row.get("format") == "geojson"
and row.get("active") is True
and row.get("needapikey") is not True
and str(row.get("version") or "").startswith("4")
)
def _state_code(state_name: str | None) -> str | None:
"""Full state name (registry/geocoder) -> 2-letter UPPER code, or None."""
if not state_name:
return None
return STATE_NAME_TO_CODE.get(state_name.strip().lower())
def _parse_dt(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
def _flatten_geometry(
geometry: dict[str, Any] | None,
) -> tuple[float | None, float | None]:
"""First (lat, lon) from a WZDx geometry (coords are [lon, lat]).
LineString/MultiPoint -> coordinates[0]; Point -> coordinates. Anything else
(Polygon, empty, missing) -> (None, None) so the event still publishes.
"""
if not geometry:
return (None, None)
coords = geometry.get("coordinates")
gtype = geometry.get("type")
try:
if gtype == "Point":
lon, lat = coords[0], coords[1]
elif gtype in ("LineString", "MultiPoint"):
lon, lat = coords[0][0], coords[0][1]
else:
return (None, None)
return (float(lat), float(lon))
except (TypeError, IndexError, ValueError):
return (None, None)
class WZDxSettings(BaseModel):
"""states: allowlist of 2-letter codes to poll; None = every eligible feed."""
states: list[str] | None = None
class WZDxAdapter(SourceAdapter):
"""FHWA Work Zone Data Exchange registry-driven adapter."""
name = "wzdx"
display_name = "WZDx — Work Zone Data Exchange"
description = (
"Federal FHWA Work Zone Data Exchange. Discovers active state-DOT GeoJSON "
"feeds from the WZDx Feed Registry and emits work_zone events."
)
settings_schema = WZDxSettings
requires_api_key = None
api_key_field = None
wizard_order = None # Ships disabled
default_cadence_s = 600
data_class = "event"
# Canonical point-adapter paths (FIRMS/inciweb convention, enforced by
# tests/test_enrichment_locations_coverage); the supervisor reverse-geocodes
# them into data["_enriched"]["geocoder"] (city/county/state).
enrichment_locations = [("latitude", "longitude")]
def __init__(
self,
config: AdapterConfig,
config_store: ConfigStore,
cursor_db_path: Path,
) -> None:
self._config_store = config_store
self._cursor_db_path = cursor_db_path
self._session: aiohttp.ClientSession | None = None
self._db: sqlite3.Connection | None = None
self._states: set[str] | None = self._read_states(config)
@staticmethod
def _read_states(config: AdapterConfig) -> set[str] | None:
raw = config.settings.get("states")
if not raw:
return None
return {s.strip().upper() for s in raw if s and s.strip()} or None
async def startup(self) -> None:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=_FEED_TIMEOUT_S),
headers={"User-Agent": "Central/0.9 (+wzdx)"},
)
self._db = sqlite3.connect(self._cursor_db_path)
self._db.execute(_DEDUP_DDL)
self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)")
self._db.commit()
logger.info("WZDx adapter started", extra={"states": sorted(self._states) if self._states else None})
async def shutdown(self) -> None:
if self._session:
await self._session.close()
self._session = None
if self._db:
self._db.close()
self._db = None
async def apply_config(self, new_config: AdapterConfig) -> None:
self._states = self._read_states(new_config)
logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None})
def is_published(self, event_id: str) -> bool:
if not self._db:
return False
cur = self._db.execute(
"SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?",
(self.name, event_id),
)
return cur.fetchone() is not None
def mark_published(self, event_id: str) -> None:
if not self._db:
return
self._db.execute(
"INSERT INTO published_ids (adapter, event_id) VALUES (?, ?) "
"ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP",
(self.name, event_id),
)
self._db.commit()
def sweep_old_ids(self) -> int:
if not self._db:
return 0
cur = self._db.execute(
"DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')",
(self.name,),
)
self._db.commit()
return cur.rowcount
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=30),
retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)),
)
async def _fetch_registry(self) -> list[dict[str, Any]]:
assert self._session is not None
async with self._session.get(WZDX_REGISTRY_URL) as resp:
resp.raise_for_status()
rows = await resp.json(content_type=None)
return rows if isinstance(rows, list) else []
async def _fetch_feed(self, row: dict[str, Any]) -> list[dict[str, Any]]:
"""Fetch + parse one publisher feed; [] on any failure (never raises)."""
assert self._session is not None
url = (row.get("url") or {}).get("url")
if not url:
return []
try:
async with self._session.get(url) as resp:
resp.raise_for_status()
doc = await resp.json(content_type=None)
except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc:
logger.warning("WZDx feed failed", extra={"feed": row.get("feedname"), "error": str(exc)})
return []
if not isinstance(doc, dict) or not isinstance(doc.get("features"), list):
return []
return doc["features"]
def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Eligible rows, optionally narrowed to the operator's state allowlist."""
feeds: list[dict[str, Any]] = []
for row in registry_rows:
if not _eligible(row):
continue
if self._states is not None:
code = _state_code(row.get("state"))
if code is None or code not in self._states:
continue
feeds.append(row)
return feeds
def _build_event(self, feature: dict[str, Any], row: dict[str, Any]) -> Event | None:
"""Map one WZDx RoadEventFeature to a Central Event (None to skip)."""
props = feature.get("properties") or {}
core = props.get("core_details") or {}
if core.get("event_type") != "work-zone":
return None # only work-zone this PR; detour/restriction map later
feature_id = feature.get("id")
if feature_id is None:
return None
data_source_id = core.get("data_source_id") or row.get("feedname") or "wzdx"
lat, lon = _flatten_geometry(feature.get("geometry"))
code = _state_code(row.get("state"))
return Event(
id=f"{data_source_id}:{feature_id}",
adapter=self.name,
category="work_zone.wzdx",
time=(_parse_dt(core.get("update_date")) or _parse_dt(props.get("start_date")) or datetime.now(timezone.utc)),
expires=_parse_dt(props.get("end_date")),
severity=_VEHICLE_IMPACT_SEVERITY.get(props.get("vehicle_impact"), _DEFAULT_SEVERITY),
geo=Geo(
centroid=(lon, lat) if lat is not None and lon is not None else None,
regions=[f"US-{code}"] if code else [],
primary_region=f"US-{code}" if code else None,
),
data={
"road_names": core.get("road_names") or [],
"direction": core.get("direction"),
"description": core.get("description"),
"vehicle_impact": props.get("vehicle_impact"),
"event_status": props.get("event_status"),
"start_date": props.get("start_date"),
"end_date": props.get("end_date"),
"data_source_id": data_source_id,
"feed_name": row.get("feedname"),
"feed_state": row.get("state"),
"feed_state_code": code, # subject routing, fixed at poll time
"latitude": lat, # enrichment_locations pair (canonical paths)
"longitude": lon,
},
)
async def poll(self) -> AsyncIterator[Event]:
"""Discover eligible feeds, fetch concurrently, yield work_zone events."""
if not self._session:
raise RuntimeError("Session not initialized")
try:
registry_rows = await self._fetch_registry()
except (aiohttp.ClientError, TimeoutError) as exc:
logger.warning("WZDx registry fetch failed; skipping cycle", extra={"error": str(exc)})
return
feeds = self._discover(registry_rows)
logger.info("WZDx discovered feeds", extra={"eligible": len(feeds)})
sem = asyncio.Semaphore(_FEED_CONCURRENCY)
async def _guarded(row: dict[str, Any]) -> tuple[dict[str, Any], list[dict[str, Any]]]:
async with sem:
return row, await self._fetch_feed(row)
results = await asyncio.gather(*[_guarded(r) for r in feeds])
yielded = 0
for row, features in results:
for feature in features:
try:
event = self._build_event(feature, row)
except Exception: # one bad feature never sinks a poll
logger.exception("WZDx feature parse failed", extra={"feed": row.get("feedname")})
continue
if event is None:
continue
yield event
yielded += 1
self.sweep_old_ids()
logger.info("WZDx poll completed", extra={"events_yielded": yielded})
def subject_for(self, event: Event) -> str:
"""central.traffic.work_zone.{state}; registry code first, geocoder fallback."""
code = event.data.get("feed_state_code")
if not code:
enr = (event.data.get("_enriched") or {}).get("geocoder") or {}
code = _state_code(enr.get("state"))
return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}"

View file

@ -2651,6 +2651,7 @@ ADAPTER_GROUPS = {
"Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"], "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"],
"Geophysical": ["usgs_quake", "nwis"], "Geophysical": ["usgs_quake", "nwis"],
"Earth Observation": ["eonet"], "Earth Observation": ["eonet"],
"Transportation": ["wzdx"],
} }
# Same palette the map legend uses, indexed by sorted-adapter position. # Same palette the map legend uses, indexed by sorted-adapter position.
EVENTS_PALETTE = [ EVENTS_PALETTE = [

View file

@ -0,0 +1,12 @@
{# WZDx work-zone detail rows. Fields from payload->data->data; every block is
guarded so the Iowa shape (lanes/types_of_work, no event_status) and the Utah
shape (event_status, no lanes) both render without error. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %}
{% set roads = d.get('road_names') or [] %}
{% if roads %}<dt>Road</dt><dd>{{ roads | join(', ') }}{% if d.get('direction') %} ({{ d.direction }}){% endif %}</dd>{% endif %}
{% if d.get('vehicle_impact') is not none %}<dt>Vehicle impact</dt><dd>{{ d.vehicle_impact }}</dd>{% endif %}
{% if d.get('event_status') is not none %}<dt>Status</dt><dd>{{ d.event_status }}</dd>{% endif %}
{% if d.get('start_date') is not none %}<dt>Starts</dt><dd>{{ d.start_date }}</dd>{% endif %}
{% if d.get('end_date') is not none %}<dt>Ends</dt><dd>{{ d.end_date }}</dd>{% endif %}
{% if d.get('description') is not none %}<dt>Description</dt><dd>{{ d.description | truncate(200) }}</dd>{% endif %}
{% if d.get('feed_name') is not none %}<dt>Source feed</dt><dd>{{ d.feed_name }}{% if d.get('feed_state') %} ({{ d.feed_state }}){% endif %}</dd>{% endif %}

View file

@ -0,0 +1,7 @@
{# WZDx work-zone one-line subject (v0.9.0). "Work zone on <road> <dir>"; the
Location column (geocoder city/county/state) renders separately. Falls back to
a bare "Work zone" so we never regress to "—". Fields from payload->data->data. #}
{% set d = (event.data.get('data') or {}).get('data') or {} %}
{%- set roads = d.get('road_names') or [] -%}
{%- set road = roads[0] if roads else None -%}
Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') %} {{ d.direction }}{% endif %}

View file

@ -29,5 +29,6 @@ STREAMS: list[StreamEntry] = [
StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_SPACE", "central.space.>"),
StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"),
StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"),
StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"),
StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False),
] ]

1
tests/fixtures/wzdx_iowa_sample.json vendored Normal file
View file

@ -0,0 +1 @@
{"road_event_feed_info":{"publisher":"Iowa DOT","version":"4.0","data_sources":[{"data_source_id":"IowaDOT-WZDx","organization_name":"Iowa DOT"}],"update_date":"2026-05-25T19:34:45Z"},"type":"FeatureCollection","features":[{"id":"OpenTMS-Event22920571864-1","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"IowaDOT-WZDx","road_names":["US-65"],"direction":"northbound","description":"Between IA 2 (6 miles south of the Humeston area) and County Road H50 (2 miles north of the Humeston area). Road construction. Intermittent lane closure. Pilot car in operation. Look out for flaggers. From 7:00AM CDT to 5:00PM CDT on weekdays. Starting June 1, 2026 at 7:00AM CDT until June 30, 2026 at about 5:00PM CDT. Comment: Chariton RCE (800-881-5778) - Wayne County","update_date":"2026-05-22T18:17:42Z"},"start_date":"2026-06-01T12:00:00Z","end_date":"2026-06-01T22:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","beginning_cross_street":"65N-13.1","ending_cross_street":"65N-22.4","beginning_milepost":13.1,"ending_milepost":22.4,"vehicle_impact":"all-lanes-open","types_of_work":[{"type_name":"surface-work"}],"lanes":[{"order":1,"type":"shoulder","status":"open"},{"order":2,"type":"general","status":"open"},{"order":3,"type":"shoulder","status":"open"}],"location_method":"unknown"},"geometry":{"type":"MultiPoint","coordinates":[[-93.499196,40.764081],[-93.493947,40.897518]]}},{"id":"OpenTMS-Event22735489722","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"IowaDOT-WZDx","road_names":["I-80"],"direction":"eastbound","description":"Between I-35 (Clive) and Exit 127: IA 141 (Urbandale). Road closed due to night time construction work. Detour in operation. Follow the Iowa DOT-recommended detour around the closure. See map for detour(s). Starting May 26, 2026 at 10:00PM CDT until May 27, 2026 at about 4:00AM CDT. Full schedule below: \u2022 May 26, 10:00PM - May 27, 4:00AM Comment: Grimes RCE (800-251-2707) - Polk County","update_date":"2026-05-07T16:52:11Z"},"start_date":"2026-05-27T03:00:00Z","end_date":"2026-05-27T09:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","beginning_cross_street":"80E-124.3","ending_cross_street":"80E-127","beginning_milepost":124.3,"ending_milepost":127.0,"vehicle_impact":"some-lanes-closed","types_of_work":[{"type_name":"surface-work"}],"lanes":[{"order":1,"type":"shoulder","status":"closed"},{"order":2,"type":"general","status":"closed"},{"order":3,"type":"general","status":"closed"},{"order":4,"type":"general","status":"closed"},{"order":5,"type":"general","status":"closed"},{"order":6,"type":"shoulder","status":"open"}],"location_method":"unknown"},"geometry":{"type":"MultiPoint","coordinates":[[-93.776735,41.603576],[-93.776897,41.642469]]}}]}

1
tests/fixtures/wzdx_utah_sample.json vendored Normal file
View file

@ -0,0 +1 @@
{"road_event_feed_info":{"publisher":"UDOT","version":"4.0","license":"https://creativecommons.org/publicdomain/zero/1.0/","data_sources":[{"data_source_id":"UDOT-construction","organization_name":"UDOT-TOC","update_date":"2023-03-19T07:03:52.1411634-06:00","update_frequency":900,"contact_name":"Chuck Felice","contact_email":"cfelice@utah.gov"}],"update_date":"2023-03-19T07:04:04.8614897-06:00","update_frequency":900,"contact_name":"Chuck Felice","contact_email":"cfelice@utah.gov"},"type":"FeatureCollection","features":[{"id":"2365_eastbound","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"UDOT-Construction","road_names":["I-80"],"direction":"eastbound","description":"The Utah Department of Transportation (UDOT) will improve I-80 between 1300 East and 2300 East. The pavement will be replaced with new concrete throughout, and a new lane will be added to eastbound I-80 between 1300 East and 2300 East. Expect lane shifts and lane closures between 1300 East and 2300 East and minor delays in the area during the project.","creation_date":"2022-01-10T18:53:49.643Z","update_date":"2022-01-10T18:53:49.643Z"},"start_date":"2021-12-01T07:00:00Z","end_date":"2022-12-31T07:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","location_method":"unknown","vehicle_impact":"unknown","beginning_cross_street":"700 E / Salt Lake City","ending_cross_street":"2300 E / Holladay","beginning_milepost":125,"ending_milepost":127,"event_status":"active"},"geometry":{"type":"LineString","coordinates":[[-111.855022,40.719556],[-111.836362,40.717367],[-111.834606,40.716772],[-111.833043,40.716002],[-111.831355,40.715327],[-111.829568,40.714835],[-111.827774,40.714344],[-111.825987,40.713849],[-111.824192,40.713376],[-111.822413,40.712896],[-111.820576,40.712489],[-111.818724,40.712591]]}}]}

View file

@ -1142,6 +1142,7 @@ _SAMPLE_INNER = {
"usgs_quake": {"magnitude": 1.009682538298, "place": "17 km W of Searles Valley, CA"}, "usgs_quake": {"magnitude": 1.009682538298, "place": "17 km W of Searles Valley, CA"},
"wfigs_incidents": {"county": "Montezuma", "state": "CO"}, "wfigs_incidents": {"county": "Montezuma", "state": "CO"},
"wfigs_perimeters": {"county": "Carbon", "state": "MT"}, "wfigs_perimeters": {"county": "Carbon", "state": "MT"},
"wzdx": {"road_names": ["I-80"], "direction": "eastbound"},
} }
# Exact expected subjects for the deterministic adapters. swpc_alerts is omitted # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted
@ -1160,6 +1161,7 @@ _EXPECTED_SUBJECT = {
"usgs_quake": "Magnitude 1.0 — 17 km W of Searles Valley, CA", "usgs_quake": "Magnitude 1.0 — 17 km W of Searles Valley, CA",
"wfigs_incidents": "Wildfire incident — Montezuma, CO", "wfigs_incidents": "Wildfire incident — Montezuma, CO",
"wfigs_perimeters": "Wildfire perimeter — Carbon, MT", "wfigs_perimeters": "Wildfire perimeter — Carbon, MT",
"wzdx": "Work zone on I-80 eastbound",
} }

152
tests/test_wzdx.py Normal file
View file

@ -0,0 +1,152 @@
"""Tests for the WZDx adapter.
Fixtures are real captures trimmed to representative features:
wzdx_utah_sample.json -- curl https://udottraffic.utah.gov/wzdx/udot/v40/data
| jq '{road_event_feed_info, type, features: .features[0:1]}'
(LineString, vehicle_impact "unknown", has event_status, no lanes)
wzdx_iowa_sample.json -- curl https://iowa-atms.cloud-q-free.com/api/rest/dataprism/wzdx/wzdxfeed
| jq '{... , features: [<all-lanes-open feature>, <some-lanes-closed feature>]}'
(MultiPoint, lanes + types_of_work, no event_status)
No tests/conftest isolation entry is added: WZDx dedup uses the supervisor-
injected cursors.db and registry discovery is stateless, so there is no
adapter-owned cache to redirect (unlike nwis's NWIS_CACHE_DB_PATH).
"""
import json
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from central.adapters.wzdx import (
_DEFAULT_SEVERITY,
_VEHICLE_IMPACT_SEVERITY,
WZDxAdapter,
_eligible,
_flatten_geometry,
)
from central.config_models import AdapterConfig
FIX = Path(__file__).parent / "fixtures"
UTAH = json.loads((FIX / "wzdx_utah_sample.json").read_text())
IOWA = json.loads((FIX / "wzdx_iowa_sample.json").read_text())
def _cfg(settings=None):
return AdapterConfig(
name="wzdx", enabled=True, cadence_s=600,
settings=settings or {}, updated_at=datetime.now(timezone.utc),
)
@pytest.fixture
def adapter(tmp_path):
return WZDxAdapter(_cfg(), MagicMock(), tmp_path / "cursors.db")
@pytest.mark.parametrize("row,keep", [
({"format": "geojson", "active": True, "needapikey": False, "version": "4.1"}, True),
({"format": "geojson", "active": True, "needapikey": False, "version": "4"}, True),
({"format": "json", "active": True, "needapikey": False, "version": "4.1"}, False),
({"format": "geojson", "active": False, "needapikey": False, "version": "4.1"}, False),
({"format": "geojson", "active": True, "needapikey": True, "version": "4.1"}, False),
({"format": "geojson", "active": True, "needapikey": False, "version": "3.1"}, False),
({"format": "geojson", "active": True, "needapikey": False, "version": "CWZ 1.0"}, False),
])
def test_eligible_filter(row, keep):
assert _eligible(row) is keep
def test_dedup_key(adapter):
eu = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"})
ei = adapter._build_event(IOWA["features"][0], {"feedname": "idot", "state": "iowa"})
assert eu.id == "UDOT-Construction:2365_eastbound"
assert ei.id == "IowaDOT-WZDx:OpenTMS-Event22920571864-1"
@pytest.mark.parametrize("vi,sev", [
("all-lanes-closed", 3), ("some-lanes-closed", 2), ("all-lanes-open", 1),
("unknown", 1), (None, 1),
])
def test_severity(vi, sev):
assert _VEHICLE_IMPACT_SEVERITY.get(vi, _DEFAULT_SEVERITY) == sev
@pytest.mark.parametrize("geom,expect", [
({"type": "LineString", "coordinates": [[-111.8, 40.7], [-111.6, 40.6]]}, (40.7, -111.8)),
({"type": "MultiPoint", "coordinates": [[-93.5, 40.7]]}, (40.7, -93.5)),
({"type": "Point", "coordinates": [-93.5, 40.7]}, (40.7, -93.5)),
(None, (None, None)),
({"type": "Polygon", "coordinates": []}, (None, None)),
])
def test_flatten_geometry(geom, expect):
assert _flatten_geometry(geom) == expect
def test_build_utah_shape(adapter):
e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"})
assert e.category == "work_zone.wzdx"
assert e.severity == 1 # vehicle_impact "unknown"
assert e.data["latitude"] is not None
assert e.data["event_status"] == "active" # Utah carries it
def test_build_iowa_shape(adapter):
e0 = adapter._build_event(IOWA["features"][0], {"feedname": "idot", "state": "iowa"})
e1 = adapter._build_event(IOWA["features"][1], {"feedname": "idot", "state": "iowa"})
assert e0.severity == 1 # all-lanes-open
assert e1.severity == 2 # some-lanes-closed
assert e0.data["event_status"] is None # Iowa lacks it -> no raise
def test_subject_from_registry(adapter):
e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"})
assert adapter.subject_for(e) == "central.traffic.work_zone.ut"
def test_subject_unknown(adapter):
e = adapter._build_event(UTAH["features"][0], {"feedname": "x", "state": "n/a"})
assert adapter.subject_for(e) == "central.traffic.work_zone.unknown"
def test_subject_geocoder_fallback(adapter):
e = adapter._build_event(UTAH["features"][0], {"feedname": "x", "state": "n/a"})
e.data["_enriched"] = {"geocoder": {"state": "Idaho"}}
assert adapter.subject_for(e) == "central.traffic.work_zone.id"
def test_event_type_split(adapter):
# Mirrors routes.py split_part(category, '.', 1) -> GUI event_type.
e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"})
assert e.category.split(".")[0] == "work_zone"
@pytest.mark.asyncio
async def test_poll_yields_events(adapter):
await adapter.startup()
registry = [
{"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}},
{"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "idot", "state": "iowa", "url": {"url": "i"}},
{"format": "json", "active": True, "needapikey": False, "version": "4", "feedname": "skip", "state": "ohio", "url": {"url": "s"}},
]
adapter._fetch_registry = AsyncMock(return_value=registry)
async def fake_feed(row):
return {"udot": UTAH, "idot": IOWA}.get(row["feedname"], {"features": []})["features"]
adapter._fetch_feed = fake_feed
events = [e async for e in adapter.poll()]
await adapter.shutdown()
# Utah 1 + Iowa 2 = 3; the json feed is dropped by _discover.
assert len(events) == 3
assert {e.adapter for e in events} == {"wzdx"}
def test_summary_partial_renders_subject():
# End-to-end through the real _event_summaries/wzdx.html partial selection.
from central.gui.routes import _derive_subject
flat = {"road_names": ["I-80"], "direction": "eastbound"}
row = {"adapter": "wzdx", "data": {"data": {"data": flat}}}
assert _derive_subject(row) == "Work zone on I-80 eastbound"