From 7eab5fc1b12ad6f9c66a4ceab4eac919e4aa0d27 Mon Sep 17 00:00:00 2001 From: Matt Johnson Date: Mon, 25 May 2026 20:35:08 +0000 Subject: [PATCH] feat(wzdx): WZDx adapter + CENTRAL_TRAFFIC family bootstrap (v0.9.0) Opens Phase 4 transportation aggregation (Design B, Central-direct). New registry-driven wzdx adapter polls the FHWA WZDx Feed Registry, fetches each eligible v4.x GeoJSON feed concurrently, and emits work_zone events into the new CENTRAL_TRAFFIC stream. Production code; central-supervisor AND central-gui restart (new adapter class + stream + ADAPTER_GROUPS). Ships disabled. First adapter to use the category/subject split: category="work_zone.wzdx" (GUI event_type "work_zone" via split_part) while the NATS subject is central.traffic.work_zone.{state}. Subject state from the registry row, geocoder state as fallback. Severity from vehicle_impact (all-lanes-closed=3, some-lanes-closed=2, all-lanes-open=1, unknown/missing=1). Feed filter geojson + active + needapikey=false + version 4.x (21 of 39 feeds). 600s cadence. Dedup composite : in the shared cursors.db; stateless discovery (no conftest isolation entry). enrichment_locations uses the canonical ("latitude","longitude") paths. Full suite: 739 passed, 1 skipped (central and unprivileged zvx, 3x each). Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/CONSUMER-INTEGRATION.md | 43 +++ docs/PRODUCER-INTEGRATION.md | 3 +- ...25_add_wzdx_adapter_and_traffic_stream.sql | 23 ++ src/central/adapters/wzdx.py | 326 ++++++++++++++++++ src/central/gui/routes.py | 1 + .../gui/templates/_event_rows/wzdx.html | 12 + .../gui/templates/_event_summaries/wzdx.html | 7 + src/central/streams.py | 1 + tests/fixtures/wzdx_iowa_sample.json | 1 + tests/fixtures/wzdx_utah_sample.json | 1 + tests/test_events_feed_frontend.py | 2 + tests/test_wzdx.py | 152 ++++++++ 12 files changed, 571 insertions(+), 1 deletion(-) create mode 100644 sql/migrations/025_add_wzdx_adapter_and_traffic_stream.sql create mode 100644 src/central/adapters/wzdx.py create mode 100644 src/central/gui/templates/_event_rows/wzdx.html create mode 100644 src/central/gui/templates/_event_summaries/wzdx.html create mode 100644 tests/fixtures/wzdx_iowa_sample.json create mode 100644 tests/fixtures/wzdx_utah_sample.json create mode 100644 tests/test_wzdx.py diff --git a/docs/CONSUMER-INTEGRATION.md b/docs/CONSUMER-INTEGRATION.md index 009741a..bea5c75 100644 --- a/docs/CONSUMER-INTEGRATION.md +++ b/docs/CONSUMER-INTEGRATION.md @@ -131,6 +131,7 @@ Central's archive. | `CENTRAL_SPACE` | `central.space.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_DISASTER` | `central.disaster.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_HYDRO` | `central.hydro.>` | 7 | 1 GiB | ✓ | ✓ | +| `CENTRAL_TRAFFIC` | `central.traffic.>` | 7 | 1 GiB | ✓ | ✓ | | `CENTRAL_META` | `central.meta.>` | 1 | 1 GiB | — | ✓ | Retention and storage caps are migration-seeded defaults visible in `config.streams`; @@ -1475,6 +1476,48 @@ already running can disable those overlap categories via `EONETSettings.category } ``` +### wzdx — FHWA Work Zone Data Exchange (state-DOT work zones) + +Active road work zones discovered from the federal WZDx Feed Registry and each +eligible state-DOT GeoJSON feed. One event per WZDx RoadEventFeature. + +- **Stream:** `CENTRAL_TRAFFIC` +- **Subject pattern:** `central.traffic.work_zone.` + - `` is the lowercased 2-letter code from the registry row (geocoder + state as fallback), else `unknown` +- **GUI event_type:** `work_zone` — from `category = "work_zone.wzdx"`; the GUI + derives event_type as the first dotted segment of the category +- **Cadence default:** 600s (10 min) +- **Feed filter:** registry rows with `format=geojson`, `active=true`, + `needapikey=false`, `version` 4.x (~21 feeds at author time) +- **Dedup key shape:** composite `:` + (e.g. `UDOT-Construction:2365_eastbound`); reused as the inner `Event.id` +- **Event.data fields:** + + | key | type | nullable | description | + |---|---|---|---| + | `road_names` | list[str] | no | Affected road name(s); may be empty | + | `direction` | str | yes | Travel direction of the work zone | + | `description` | str | yes | Operator-readable narrative | + | `vehicle_impact` | str | yes | `all-lanes-open` / `some-lanes-closed` / `all-lanes-closed` / `unknown`; drives severity | + | `event_status` | str | yes | e.g. `active` (Utah carries it; Iowa omits it) | + | `start_date` | str (ISO 8601) | yes | Work-zone start | + | `end_date` | str (ISO 8601) | yes | Work-zone end; also sets `Event.expires` | + | `data_source_id` | str | no | WZDx `core_details.data_source_id` | + | `feed_name` | str | yes | Registry feed identifier | + | `feed_state` | str | yes | Registry state name | + | `feed_state_code` | str | yes | 2-letter code used for the subject | + | `latitude` | float | yes | First geometry coordinate (enrichment input) | + | `longitude` | float | yes | First geometry coordinate (enrichment input) | + +- **Severity:** derived from `vehicle_impact` (`all-lanes-closed`=3, + `some-lanes-closed`=2, `all-lanes-open`=1, `unknown`/missing=1) — WZDx has no + normalcy class. +- **Decipherable as-is:** mostly. Road + direction + impact + description are + user-ready; city/county/state come from geocoder enrichment. +- **Removal semantics:** none in v1. Work zones age out of upstream feeds; the + 14-day dedup sweep expires stale ids. Watch `end_date` / `Event.expires`. + ### nwis — USGS NWIS streamflow / gage height / water-temperature gauges Real-time water-data observations via the USGS NWIS OGC API v0 `latest-continuous` diff --git a/docs/PRODUCER-INTEGRATION.md b/docs/PRODUCER-INTEGRATION.md index 4cd1768..71c8dfe 100644 --- a/docs/PRODUCER-INTEGRATION.md +++ b/docs/PRODUCER-INTEGRATION.md @@ -349,7 +349,7 @@ central..[....] ``` - `` is one of `wx`, `fire`, `quake`, `space`, `disaster`, `hydro`, - `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding + `traffic`, `meta` (the current set — see [§8](#8-the-streamentry-registry) for adding one). Operators MUST be able to subscribe to all of one domain with `central..>`. - `` is adapter-driven and identifies the event category within the @@ -537,6 +537,7 @@ STREAMS: list[StreamEntry] = [ StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), + StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), ] ``` diff --git a/sql/migrations/025_add_wzdx_adapter_and_traffic_stream.sql b/sql/migrations/025_add_wzdx_adapter_and_traffic_stream.sql new file mode 100644 index 0000000..6f3389d --- /dev/null +++ b/sql/migrations/025_add_wzdx_adapter_and_traffic_stream.sql @@ -0,0 +1,23 @@ +-- Migration: 025_add_wzdx_adapter_and_traffic_stream +-- Adds the CENTRAL_TRAFFIC JetStream stream row AND the WZDx adapter row. +-- Folded into one migration because the adapter publishes onto +-- central.traffic.> -- both rows ship together (mirrors 023 nwis/hydro). +-- +-- Stream retention mirrors CENTRAL_DISASTER / CENTRAL_HYDRO (7 days, 1 GiB). +-- Adapter ships disabled; operator enables via GUI. settings {"states": null} +-- = poll every eligible feed; an allowlist of 2-letter codes narrows it. +-- +-- Additive-only: both inserts are idempotent via ON CONFLICT DO NOTHING. + +INSERT INTO config.streams (name, max_age_s, max_bytes) +VALUES ('CENTRAL_TRAFFIC', 604800, 1073741824) +ON CONFLICT (name) DO NOTHING; + +INSERT INTO config.adapters (name, enabled, cadence_s, settings) +VALUES ( + 'wzdx', + false, + 600, + '{"states": null}'::jsonb +) +ON CONFLICT (name) DO NOTHING; diff --git a/src/central/adapters/wzdx.py b/src/central/adapters/wzdx.py new file mode 100644 index 0000000..260875f --- /dev/null +++ b/src/central/adapters/wzdx.py @@ -0,0 +1,326 @@ +"""WZDx adapter — FHWA Work Zone Data Exchange registry → work_zone events. + +First adapter to use the v0.9.0 category/subject split: category="work_zone.wzdx" +(so the GUI's split_part(category,'.',1) surfaces event_type "work_zone") while the +NATS subject is "central.traffic.work_zone.{state}" on CENTRAL_TRAFFIC. Subject +state comes from the registry row (reliable, pre-enrichment); the geocoder state +is a fallback. Discovery is stateless per poll; dedup uses the shared cursors.db. +""" + +import asyncio +import logging +import sqlite3 +from collections.abc import AsyncIterator +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp +from pydantic import BaseModel +from tenacity import ( + retry, + retry_if_exception_type, + stop_after_attempt, + wait_exponential_jitter, +) + +from central.adapter import SourceAdapter +from central.adapters.inciweb import STATE_NAME_TO_CODE +from central.config_models import AdapterConfig +from central.config_store import ConfigStore +from central.models import Event, Geo + +logger = logging.getLogger(__name__) + +# FHWA Work Zone Data Exchange Feed Registry (Socrata, public-unauth). +WZDX_REGISTRY_URL = "https://datahub.transportation.gov/resource/69qe-yiui.json?$limit=200" + +# vehicle_impact -> severity. Locked: unknown/missing = 1 (real active zones). +_VEHICLE_IMPACT_SEVERITY = {"all-lanes-closed": 3, "some-lanes-closed": 2, "all-lanes-open": 1} +_DEFAULT_SEVERITY = 1 + +# Bounded per-poll fan-out (~21 feeds pass the filter; Iowa alone is ~1.4 MB). +_FEED_CONCURRENCY = 6 +_FEED_TIMEOUT_S = 60 + +_DEDUP_DDL = ( + "CREATE TABLE IF NOT EXISTS published_ids (" + "adapter TEXT NOT NULL, event_id TEXT NOT NULL, " + "first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (adapter, event_id))" +) + + +def _eligible(row: dict[str, Any]) -> bool: + """Registry-row filter (v0.9.0 locked): geojson + active + no api key + v4.x.""" + return ( + row.get("format") == "geojson" + and row.get("active") is True + and row.get("needapikey") is not True + and str(row.get("version") or "").startswith("4") + ) + + +def _state_code(state_name: str | None) -> str | None: + """Full state name (registry/geocoder) -> 2-letter UPPER code, or None.""" + if not state_name: + return None + return STATE_NAME_TO_CODE.get(state_name.strip().lower()) + + +def _parse_dt(value: str | None) -> datetime | None: + if not value: + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + +def _flatten_geometry( + geometry: dict[str, Any] | None, +) -> tuple[float | None, float | None]: + """First (lat, lon) from a WZDx geometry (coords are [lon, lat]). + + LineString/MultiPoint -> coordinates[0]; Point -> coordinates. Anything else + (Polygon, empty, missing) -> (None, None) so the event still publishes. + """ + if not geometry: + return (None, None) + coords = geometry.get("coordinates") + gtype = geometry.get("type") + try: + if gtype == "Point": + lon, lat = coords[0], coords[1] + elif gtype in ("LineString", "MultiPoint"): + lon, lat = coords[0][0], coords[0][1] + else: + return (None, None) + return (float(lat), float(lon)) + except (TypeError, IndexError, ValueError): + return (None, None) + + +class WZDxSettings(BaseModel): + """states: allowlist of 2-letter codes to poll; None = every eligible feed.""" + + states: list[str] | None = None + + +class WZDxAdapter(SourceAdapter): + """FHWA Work Zone Data Exchange registry-driven adapter.""" + + name = "wzdx" + display_name = "WZDx — Work Zone Data Exchange" + description = ( + "Federal FHWA Work Zone Data Exchange. Discovers active state-DOT GeoJSON " + "feeds from the WZDx Feed Registry and emits work_zone events." + ) + settings_schema = WZDxSettings + requires_api_key = None + api_key_field = None + wizard_order = None # Ships disabled + default_cadence_s = 600 + data_class = "event" + # Canonical point-adapter paths (FIRMS/inciweb convention, enforced by + # tests/test_enrichment_locations_coverage); the supervisor reverse-geocodes + # them into data["_enriched"]["geocoder"] (city/county/state). + enrichment_locations = [("latitude", "longitude")] + + def __init__( + self, + config: AdapterConfig, + config_store: ConfigStore, + cursor_db_path: Path, + ) -> None: + self._config_store = config_store + self._cursor_db_path = cursor_db_path + self._session: aiohttp.ClientSession | None = None + self._db: sqlite3.Connection | None = None + self._states: set[str] | None = self._read_states(config) + + @staticmethod + def _read_states(config: AdapterConfig) -> set[str] | None: + raw = config.settings.get("states") + if not raw: + return None + return {s.strip().upper() for s in raw if s and s.strip()} or None + + async def startup(self) -> None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=_FEED_TIMEOUT_S), + headers={"User-Agent": "Central/0.9 (+wzdx)"}, + ) + self._db = sqlite3.connect(self._cursor_db_path) + self._db.execute(_DEDUP_DDL) + self._db.execute("CREATE INDEX IF NOT EXISTS published_ids_last_seen ON published_ids (last_seen)") + self._db.commit() + logger.info("WZDx adapter started", extra={"states": sorted(self._states) if self._states else None}) + + async def shutdown(self) -> None: + if self._session: + await self._session.close() + self._session = None + if self._db: + self._db.close() + self._db = None + + async def apply_config(self, new_config: AdapterConfig) -> None: + self._states = self._read_states(new_config) + logger.info("WZDx config updated", extra={"states": sorted(self._states) if self._states else None}) + + def is_published(self, event_id: str) -> bool: + if not self._db: + return False + cur = self._db.execute( + "SELECT 1 FROM published_ids WHERE adapter = ? AND event_id = ?", + (self.name, event_id), + ) + return cur.fetchone() is not None + + def mark_published(self, event_id: str) -> None: + if not self._db: + return + self._db.execute( + "INSERT INTO published_ids (adapter, event_id) VALUES (?, ?) " + "ON CONFLICT (adapter, event_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP", + (self.name, event_id), + ) + self._db.commit() + + def sweep_old_ids(self) -> int: + if not self._db: + return 0 + cur = self._db.execute( + "DELETE FROM published_ids WHERE adapter = ? AND last_seen < datetime('now', '-14 days')", + (self.name,), + ) + self._db.commit() + return cur.rowcount + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=1, max=30), + retry=retry_if_exception_type((aiohttp.ClientError, TimeoutError)), + ) + async def _fetch_registry(self) -> list[dict[str, Any]]: + assert self._session is not None + async with self._session.get(WZDX_REGISTRY_URL) as resp: + resp.raise_for_status() + rows = await resp.json(content_type=None) + return rows if isinstance(rows, list) else [] + + async def _fetch_feed(self, row: dict[str, Any]) -> list[dict[str, Any]]: + """Fetch + parse one publisher feed; [] on any failure (never raises).""" + assert self._session is not None + url = (row.get("url") or {}).get("url") + if not url: + return [] + try: + async with self._session.get(url) as resp: + resp.raise_for_status() + doc = await resp.json(content_type=None) + except (aiohttp.ClientError, TimeoutError, asyncio.TimeoutError, ValueError) as exc: + logger.warning("WZDx feed failed", extra={"feed": row.get("feedname"), "error": str(exc)}) + return [] + if not isinstance(doc, dict) or not isinstance(doc.get("features"), list): + return [] + return doc["features"] + + def _discover(self, registry_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Eligible rows, optionally narrowed to the operator's state allowlist.""" + feeds: list[dict[str, Any]] = [] + for row in registry_rows: + if not _eligible(row): + continue + if self._states is not None: + code = _state_code(row.get("state")) + if code is None or code not in self._states: + continue + feeds.append(row) + return feeds + + def _build_event(self, feature: dict[str, Any], row: dict[str, Any]) -> Event | None: + """Map one WZDx RoadEventFeature to a Central Event (None to skip).""" + props = feature.get("properties") or {} + core = props.get("core_details") or {} + if core.get("event_type") != "work-zone": + return None # only work-zone this PR; detour/restriction map later + feature_id = feature.get("id") + if feature_id is None: + return None + data_source_id = core.get("data_source_id") or row.get("feedname") or "wzdx" + lat, lon = _flatten_geometry(feature.get("geometry")) + code = _state_code(row.get("state")) + return Event( + id=f"{data_source_id}:{feature_id}", + adapter=self.name, + category="work_zone.wzdx", + time=(_parse_dt(core.get("update_date")) or _parse_dt(props.get("start_date")) or datetime.now(timezone.utc)), + expires=_parse_dt(props.get("end_date")), + severity=_VEHICLE_IMPACT_SEVERITY.get(props.get("vehicle_impact"), _DEFAULT_SEVERITY), + geo=Geo( + centroid=(lon, lat) if lat is not None and lon is not None else None, + regions=[f"US-{code}"] if code else [], + primary_region=f"US-{code}" if code else None, + ), + data={ + "road_names": core.get("road_names") or [], + "direction": core.get("direction"), + "description": core.get("description"), + "vehicle_impact": props.get("vehicle_impact"), + "event_status": props.get("event_status"), + "start_date": props.get("start_date"), + "end_date": props.get("end_date"), + "data_source_id": data_source_id, + "feed_name": row.get("feedname"), + "feed_state": row.get("state"), + "feed_state_code": code, # subject routing, fixed at poll time + "latitude": lat, # enrichment_locations pair (canonical paths) + "longitude": lon, + }, + ) + + async def poll(self) -> AsyncIterator[Event]: + """Discover eligible feeds, fetch concurrently, yield work_zone events.""" + if not self._session: + raise RuntimeError("Session not initialized") + try: + registry_rows = await self._fetch_registry() + except (aiohttp.ClientError, TimeoutError) as exc: + logger.warning("WZDx registry fetch failed; skipping cycle", extra={"error": str(exc)}) + return + + feeds = self._discover(registry_rows) + logger.info("WZDx discovered feeds", extra={"eligible": len(feeds)}) + sem = asyncio.Semaphore(_FEED_CONCURRENCY) + + async def _guarded(row: dict[str, Any]) -> tuple[dict[str, Any], list[dict[str, Any]]]: + async with sem: + return row, await self._fetch_feed(row) + + results = await asyncio.gather(*[_guarded(r) for r in feeds]) + yielded = 0 + for row, features in results: + for feature in features: + try: + event = self._build_event(feature, row) + except Exception: # one bad feature never sinks a poll + logger.exception("WZDx feature parse failed", extra={"feed": row.get("feedname")}) + continue + if event is None: + continue + yield event + yielded += 1 + + self.sweep_old_ids() + logger.info("WZDx poll completed", extra={"events_yielded": yielded}) + + def subject_for(self, event: Event) -> str: + """central.traffic.work_zone.{state}; registry code first, geocoder fallback.""" + code = event.data.get("feed_state_code") + if not code: + enr = (event.data.get("_enriched") or {}).get("geocoder") or {} + code = _state_code(enr.get("state")) + return f"central.traffic.work_zone.{code.lower() if code else 'unknown'}" diff --git a/src/central/gui/routes.py b/src/central/gui/routes.py index cb56fa5..d9e2b36 100644 --- a/src/central/gui/routes.py +++ b/src/central/gui/routes.py @@ -2651,6 +2651,7 @@ ADAPTER_GROUPS = { "Space": ["swpc_alerts", "swpc_kindex", "swpc_protons"], "Geophysical": ["usgs_quake", "nwis"], "Earth Observation": ["eonet"], + "Transportation": ["wzdx"], } # Same palette the map legend uses, indexed by sorted-adapter position. EVENTS_PALETTE = [ diff --git a/src/central/gui/templates/_event_rows/wzdx.html b/src/central/gui/templates/_event_rows/wzdx.html new file mode 100644 index 0000000..f0fb7a8 --- /dev/null +++ b/src/central/gui/templates/_event_rows/wzdx.html @@ -0,0 +1,12 @@ +{# WZDx work-zone detail rows. Fields from payload->data->data; every block is + guarded so the Iowa shape (lanes/types_of_work, no event_status) and the Utah + shape (event_status, no lanes) both render without error. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{% set roads = d.get('road_names') or [] %} +{% if roads %}
Road
{{ roads | join(', ') }}{% if d.get('direction') %} ({{ d.direction }}){% endif %}
{% endif %} +{% if d.get('vehicle_impact') is not none %}
Vehicle impact
{{ d.vehicle_impact }}
{% endif %} +{% if d.get('event_status') is not none %}
Status
{{ d.event_status }}
{% endif %} +{% if d.get('start_date') is not none %}
Starts
{{ d.start_date }}
{% endif %} +{% if d.get('end_date') is not none %}
Ends
{{ d.end_date }}
{% endif %} +{% if d.get('description') is not none %}
Description
{{ d.description | truncate(200) }}
{% endif %} +{% if d.get('feed_name') is not none %}
Source feed
{{ d.feed_name }}{% if d.get('feed_state') %} ({{ d.feed_state }}){% endif %}
{% endif %} diff --git a/src/central/gui/templates/_event_summaries/wzdx.html b/src/central/gui/templates/_event_summaries/wzdx.html new file mode 100644 index 0000000..35bd6b8 --- /dev/null +++ b/src/central/gui/templates/_event_summaries/wzdx.html @@ -0,0 +1,7 @@ +{# WZDx work-zone one-line subject (v0.9.0). "Work zone on "; the + Location column (geocoder city/county/state) renders separately. Falls back to + a bare "Work zone" so we never regress to "—". Fields from payload->data->data. #} +{% set d = (event.data.get('data') or {}).get('data') or {} %} +{%- set roads = d.get('road_names') or [] -%} +{%- set road = roads[0] if roads else None -%} +Work zone{% if road %} on {{ road }}{% endif %}{% if d.get('direction') %} {{ d.direction }}{% endif %} diff --git a/src/central/streams.py b/src/central/streams.py index e0408f5..cc940d8 100644 --- a/src/central/streams.py +++ b/src/central/streams.py @@ -29,5 +29,6 @@ STREAMS: list[StreamEntry] = [ StreamEntry("CENTRAL_SPACE", "central.space.>"), StreamEntry("CENTRAL_DISASTER", "central.disaster.>"), StreamEntry("CENTRAL_HYDRO", "central.hydro.>"), + StreamEntry("CENTRAL_TRAFFIC", "central.traffic.>"), StreamEntry("CENTRAL_META", "central.meta.>", event_bearing=False), ] diff --git a/tests/fixtures/wzdx_iowa_sample.json b/tests/fixtures/wzdx_iowa_sample.json new file mode 100644 index 0000000..9d1c7b6 --- /dev/null +++ b/tests/fixtures/wzdx_iowa_sample.json @@ -0,0 +1 @@ +{"road_event_feed_info":{"publisher":"Iowa DOT","version":"4.0","data_sources":[{"data_source_id":"IowaDOT-WZDx","organization_name":"Iowa DOT"}],"update_date":"2026-05-25T19:34:45Z"},"type":"FeatureCollection","features":[{"id":"OpenTMS-Event22920571864-1","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"IowaDOT-WZDx","road_names":["US-65"],"direction":"northbound","description":"Between IA 2 (6 miles south of the Humeston area) and County Road H50 (2 miles north of the Humeston area). Road construction. Intermittent lane closure. Pilot car in operation. Look out for flaggers. From 7:00AM CDT to 5:00PM CDT on weekdays. Starting June 1, 2026 at 7:00AM CDT until June 30, 2026 at about 5:00PM CDT. Comment: Chariton RCE (800-881-5778) - Wayne County","update_date":"2026-05-22T18:17:42Z"},"start_date":"2026-06-01T12:00:00Z","end_date":"2026-06-01T22:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","beginning_cross_street":"65N-13.1","ending_cross_street":"65N-22.4","beginning_milepost":13.1,"ending_milepost":22.4,"vehicle_impact":"all-lanes-open","types_of_work":[{"type_name":"surface-work"}],"lanes":[{"order":1,"type":"shoulder","status":"open"},{"order":2,"type":"general","status":"open"},{"order":3,"type":"shoulder","status":"open"}],"location_method":"unknown"},"geometry":{"type":"MultiPoint","coordinates":[[-93.499196,40.764081],[-93.493947,40.897518]]}},{"id":"OpenTMS-Event22735489722","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"IowaDOT-WZDx","road_names":["I-80"],"direction":"eastbound","description":"Between I-35 (Clive) and Exit 127: IA 141 (Urbandale). Road closed due to night time construction work. Detour in operation. Follow the Iowa DOT-recommended detour around the closure. See map for detour(s). Starting May 26, 2026 at 10:00PM CDT until May 27, 2026 at about 4:00AM CDT. Full schedule below: \u2022 May 26, 10:00PM - May 27, 4:00AM Comment: Grimes RCE (800-251-2707) - Polk County","update_date":"2026-05-07T16:52:11Z"},"start_date":"2026-05-27T03:00:00Z","end_date":"2026-05-27T09:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","beginning_cross_street":"80E-124.3","ending_cross_street":"80E-127","beginning_milepost":124.3,"ending_milepost":127.0,"vehicle_impact":"some-lanes-closed","types_of_work":[{"type_name":"surface-work"}],"lanes":[{"order":1,"type":"shoulder","status":"closed"},{"order":2,"type":"general","status":"closed"},{"order":3,"type":"general","status":"closed"},{"order":4,"type":"general","status":"closed"},{"order":5,"type":"general","status":"closed"},{"order":6,"type":"shoulder","status":"open"}],"location_method":"unknown"},"geometry":{"type":"MultiPoint","coordinates":[[-93.776735,41.603576],[-93.776897,41.642469]]}}]} \ No newline at end of file diff --git a/tests/fixtures/wzdx_utah_sample.json b/tests/fixtures/wzdx_utah_sample.json new file mode 100644 index 0000000..6795300 --- /dev/null +++ b/tests/fixtures/wzdx_utah_sample.json @@ -0,0 +1 @@ +{"road_event_feed_info":{"publisher":"UDOT","version":"4.0","license":"https://creativecommons.org/publicdomain/zero/1.0/","data_sources":[{"data_source_id":"UDOT-construction","organization_name":"UDOT-TOC","update_date":"2023-03-19T07:03:52.1411634-06:00","update_frequency":900,"contact_name":"Chuck Felice","contact_email":"cfelice@utah.gov"}],"update_date":"2023-03-19T07:04:04.8614897-06:00","update_frequency":900,"contact_name":"Chuck Felice","contact_email":"cfelice@utah.gov"},"type":"FeatureCollection","features":[{"id":"2365_eastbound","type":"Feature","properties":{"core_details":{"event_type":"work-zone","data_source_id":"UDOT-Construction","road_names":["I-80"],"direction":"eastbound","description":"The Utah Department of Transportation (UDOT) will improve I-80 between 1300 East and 2300 East. The pavement will be replaced with new concrete throughout, and a new lane will be added to eastbound I-80 between 1300 East and 2300 East. Expect lane shifts and lane closures between 1300 East and 2300 East and minor delays in the area during the project.","creation_date":"2022-01-10T18:53:49.643Z","update_date":"2022-01-10T18:53:49.643Z"},"start_date":"2021-12-01T07:00:00Z","end_date":"2022-12-31T07:00:00Z","start_date_accuracy":"estimated","end_date_accuracy":"estimated","beginning_accuracy":"estimated","ending_accuracy":"estimated","location_method":"unknown","vehicle_impact":"unknown","beginning_cross_street":"700 E / Salt Lake City","ending_cross_street":"2300 E / Holladay","beginning_milepost":125,"ending_milepost":127,"event_status":"active"},"geometry":{"type":"LineString","coordinates":[[-111.855022,40.719556],[-111.836362,40.717367],[-111.834606,40.716772],[-111.833043,40.716002],[-111.831355,40.715327],[-111.829568,40.714835],[-111.827774,40.714344],[-111.825987,40.713849],[-111.824192,40.713376],[-111.822413,40.712896],[-111.820576,40.712489],[-111.818724,40.712591]]}}]} \ No newline at end of file diff --git a/tests/test_events_feed_frontend.py b/tests/test_events_feed_frontend.py index 6c8bd37..ac45bb4 100644 --- a/tests/test_events_feed_frontend.py +++ b/tests/test_events_feed_frontend.py @@ -1142,6 +1142,7 @@ _SAMPLE_INNER = { "usgs_quake": {"magnitude": 1.009682538298, "place": "17 km W of Searles Valley, CA"}, "wfigs_incidents": {"county": "Montezuma", "state": "CO"}, "wfigs_perimeters": {"county": "Carbon", "state": "MT"}, + "wzdx": {"road_names": ["I-80"], "direction": "eastbound"}, } # Exact expected subjects for the deterministic adapters. swpc_alerts is omitted @@ -1160,6 +1161,7 @@ _EXPECTED_SUBJECT = { "usgs_quake": "Magnitude 1.0 — 17 km W of Searles Valley, CA", "wfigs_incidents": "Wildfire incident — Montezuma, CO", "wfigs_perimeters": "Wildfire perimeter — Carbon, MT", + "wzdx": "Work zone on I-80 eastbound", } diff --git a/tests/test_wzdx.py b/tests/test_wzdx.py new file mode 100644 index 0000000..2c045ed --- /dev/null +++ b/tests/test_wzdx.py @@ -0,0 +1,152 @@ +"""Tests for the WZDx adapter. + +Fixtures are real captures trimmed to representative features: + wzdx_utah_sample.json -- curl https://udottraffic.utah.gov/wzdx/udot/v40/data + | jq '{road_event_feed_info, type, features: .features[0:1]}' + (LineString, vehicle_impact "unknown", has event_status, no lanes) + wzdx_iowa_sample.json -- curl https://iowa-atms.cloud-q-free.com/api/rest/dataprism/wzdx/wzdxfeed + | jq '{... , features: [, ]}' + (MultiPoint, lanes + types_of_work, no event_status) + +No tests/conftest isolation entry is added: WZDx dedup uses the supervisor- +injected cursors.db and registry discovery is stateless, so there is no +adapter-owned cache to redirect (unlike nwis's NWIS_CACHE_DB_PATH). +""" + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from central.adapters.wzdx import ( + _DEFAULT_SEVERITY, + _VEHICLE_IMPACT_SEVERITY, + WZDxAdapter, + _eligible, + _flatten_geometry, +) +from central.config_models import AdapterConfig + +FIX = Path(__file__).parent / "fixtures" +UTAH = json.loads((FIX / "wzdx_utah_sample.json").read_text()) +IOWA = json.loads((FIX / "wzdx_iowa_sample.json").read_text()) + + +def _cfg(settings=None): + return AdapterConfig( + name="wzdx", enabled=True, cadence_s=600, + settings=settings or {}, updated_at=datetime.now(timezone.utc), + ) + + +@pytest.fixture +def adapter(tmp_path): + return WZDxAdapter(_cfg(), MagicMock(), tmp_path / "cursors.db") + + +@pytest.mark.parametrize("row,keep", [ + ({"format": "geojson", "active": True, "needapikey": False, "version": "4.1"}, True), + ({"format": "geojson", "active": True, "needapikey": False, "version": "4"}, True), + ({"format": "json", "active": True, "needapikey": False, "version": "4.1"}, False), + ({"format": "geojson", "active": False, "needapikey": False, "version": "4.1"}, False), + ({"format": "geojson", "active": True, "needapikey": True, "version": "4.1"}, False), + ({"format": "geojson", "active": True, "needapikey": False, "version": "3.1"}, False), + ({"format": "geojson", "active": True, "needapikey": False, "version": "CWZ 1.0"}, False), +]) +def test_eligible_filter(row, keep): + assert _eligible(row) is keep + + +def test_dedup_key(adapter): + eu = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"}) + ei = adapter._build_event(IOWA["features"][0], {"feedname": "idot", "state": "iowa"}) + assert eu.id == "UDOT-Construction:2365_eastbound" + assert ei.id == "IowaDOT-WZDx:OpenTMS-Event22920571864-1" + + +@pytest.mark.parametrize("vi,sev", [ + ("all-lanes-closed", 3), ("some-lanes-closed", 2), ("all-lanes-open", 1), + ("unknown", 1), (None, 1), +]) +def test_severity(vi, sev): + assert _VEHICLE_IMPACT_SEVERITY.get(vi, _DEFAULT_SEVERITY) == sev + + +@pytest.mark.parametrize("geom,expect", [ + ({"type": "LineString", "coordinates": [[-111.8, 40.7], [-111.6, 40.6]]}, (40.7, -111.8)), + ({"type": "MultiPoint", "coordinates": [[-93.5, 40.7]]}, (40.7, -93.5)), + ({"type": "Point", "coordinates": [-93.5, 40.7]}, (40.7, -93.5)), + (None, (None, None)), + ({"type": "Polygon", "coordinates": []}, (None, None)), +]) +def test_flatten_geometry(geom, expect): + assert _flatten_geometry(geom) == expect + + +def test_build_utah_shape(adapter): + e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"}) + assert e.category == "work_zone.wzdx" + assert e.severity == 1 # vehicle_impact "unknown" + assert e.data["latitude"] is not None + assert e.data["event_status"] == "active" # Utah carries it + + +def test_build_iowa_shape(adapter): + e0 = adapter._build_event(IOWA["features"][0], {"feedname": "idot", "state": "iowa"}) + e1 = adapter._build_event(IOWA["features"][1], {"feedname": "idot", "state": "iowa"}) + assert e0.severity == 1 # all-lanes-open + assert e1.severity == 2 # some-lanes-closed + assert e0.data["event_status"] is None # Iowa lacks it -> no raise + + +def test_subject_from_registry(adapter): + e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"}) + assert adapter.subject_for(e) == "central.traffic.work_zone.ut" + + +def test_subject_unknown(adapter): + e = adapter._build_event(UTAH["features"][0], {"feedname": "x", "state": "n/a"}) + assert adapter.subject_for(e) == "central.traffic.work_zone.unknown" + + +def test_subject_geocoder_fallback(adapter): + e = adapter._build_event(UTAH["features"][0], {"feedname": "x", "state": "n/a"}) + e.data["_enriched"] = {"geocoder": {"state": "Idaho"}} + assert adapter.subject_for(e) == "central.traffic.work_zone.id" + + +def test_event_type_split(adapter): + # Mirrors routes.py split_part(category, '.', 1) -> GUI event_type. + e = adapter._build_event(UTAH["features"][0], {"feedname": "udot", "state": "utah"}) + assert e.category.split(".")[0] == "work_zone" + + +@pytest.mark.asyncio +async def test_poll_yields_events(adapter): + await adapter.startup() + registry = [ + {"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "udot", "state": "utah", "url": {"url": "u"}}, + {"format": "geojson", "active": True, "needapikey": False, "version": "4", "feedname": "idot", "state": "iowa", "url": {"url": "i"}}, + {"format": "json", "active": True, "needapikey": False, "version": "4", "feedname": "skip", "state": "ohio", "url": {"url": "s"}}, + ] + adapter._fetch_registry = AsyncMock(return_value=registry) + + async def fake_feed(row): + return {"udot": UTAH, "idot": IOWA}.get(row["feedname"], {"features": []})["features"] + + adapter._fetch_feed = fake_feed + events = [e async for e in adapter.poll()] + await adapter.shutdown() + # Utah 1 + Iowa 2 = 3; the json feed is dropped by _discover. + assert len(events) == 3 + assert {e.adapter for e in events} == {"wzdx"} + + +def test_summary_partial_renders_subject(): + # End-to-end through the real _event_summaries/wzdx.html partial selection. + from central.gui.routes import _derive_subject + flat = {"road_names": ["I-80"], "direction": "eastbound"} + row = {"adapter": "wzdx", "data": {"data": {"data": flat}}} + assert _derive_subject(row) == "Work zone on I-80 eastbound"